【尚硅谷Java版】Flink1.13转换经刀之基本变换算子

原创
小哥 3年前 (2022-11-07) 阅读数 21 #大杂烩

【尚硅谷Java版】Flink1.13 转换运算符的基本转换运算符。

数据源读取数据后,我们可以使用各种转换运算符来传输一个或多个DataStream已转换为新DataStream。一个Flink程序的核心实际上是所有两点转换操作,它们决定了处理的业务逻辑。

1.基本转换运算符

1、映射map
map是一个大数据操作操作符,主要用于将数据流中的数据转换为新的数据流。
简单地说,这是一对一的对应关系,消耗一个元素产生一个元素。如下图所示:

我们只需要基地 DataStream调用map()方法 可以转换。方法需要传入的参数是接口。MapFunction实施;返回值类型或DataStream,但一般(流中元素的类型)可能会改变。

具体用途:

package com.atguigu.chapter05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import sun.security.mscapi.CPublicKey;

/**
 * @author potential
 */
public class TransformMapTest {
    public static void main(String[] args) throws Exception {
        //1,创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //平行度已设置1
        env.setParallelism(1);

        //从元素读取数据
        DataStreamSource stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        //执行转换计算,提取user字段
        /**
         * 1,使用自定义类实现MapFunction接口
         */
        SingleOutputStreamOperator map = stream.map(new MyMapper());

        //2,使用匿名类实现MapFunction接口
        SingleOutputStreamOperator map1 = stream.map(new MapFunction() {
            @Override
            public String map(Event event) throws Exception {
                return event.user;
            }
        });

        //3、传入Lambda表达式
        SingleOutputStreamOperator map2 = stream.map(data -> data.user);

       map.print();
       map1.print();
       map2.print();

        env.execute();
    }

    /**
     * 自定义类实现MapFunction接口  MapFunction
     * MapFunction此通用,其中T指未转换的数据类型。 O指转换后需要输出的数据类型。
     * 这里 我们转换之前的数据是Event这个对象  转换后为Event对象中的元素 所以应该是String
     */

        public static class MyMapper implements MapFunction{

            @Override
            public String map(Event event) throws Exception {
                return event.user;
            }
        }
}


2、过滤 filter
filter作为转换操作,顾名思义,对数据流执行筛选,使用布尔条件表达式设置筛选条件,并确定流中的每个元素,如果true则元件输出正常,如果false然后过滤掉元素。如下图所示:

进行filter转换后 新数据流的数据类型与元数据流相同。 ,filter转换需要传入参数。 实现FilterFunction接口而FilterFunction要实现的目标filter方法,这等效于返回布尔类型的条件表达式。

具体使用:

package com.atguigu.chapter05;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author potential
 */
public class TransformFilterTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //从元素读取数据
        DataStreamSource stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        //1,在实现中传递。FilterFunction类的对象。
        SingleOutputStreamOperator filter = stream.filter(new MyFilter());

        //2,传入匿名类实现。FilterFunction接口
        SingleOutputStreamOperator filter1 = stream.filter(new FilterFunction() {
            @Override
            public boolean filter(Event event) throws Exception {
                return event.user.equals("Mary");
            }
        });
        //3、传入lambda表达式
        stream.filter(data->data.user.equals("Mary")).print("Mary click");

        filter.print();
        filter1.print();
        env.execute();

    }

    //实现自定义FilterFunction
    public static class MyFilter implements FilterFunction{

        @Override
        public boolean filter(Event event) throws Exception {
            return event.user.equals("Mary");
        }
    }

}

测试结果:

3,平面映射 flatMap

flatMap该操作称为平面映射,主要是 整个数据流。(通常,它是一种集合类型)拆分为单个 使用消耗元素可以产生0~多个元素。
flatMap可以考虑 扁平化+映射 一组两步操作,即根据某个规则拆分数据,然后进行拆分元素转换处理。


具体用途:

package com.atguigu.chapter05;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author potential
 */
public class TransformFlatMapTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //从元素读取数据
        DataStreamSource stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        //1、实现FlatMapFunction
        stream.flatMap(new MyFlatMap()).print("1");

        //2,传入lambda表达式
        stream.flatMap((Event event,Collector out)->{
            if(event.user.equals("Mary")) {
                out.collect(event.url);
            } else if(event.user.equals("Bob")) {
                out.collect(event.user);
                out.collect(event.url);
                out.collect(event.timestamp.toString());

            }}).returns(new TypeHint() {
                }).print("2");

        env.execute();

    }
        //实现自定义FlatMapFunction
        public static class MyFlatMap implements FlatMapFunction{

            @Override
            public void flatMap(Event event, Collector collector) throws Exception {

                collector.collect(event.user);
                collector.collect(event.url);
                collector.collect(event.timestamp.toString());

            }
    }

}

测试结果:

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除