【尚硅谷Java版】Flink1.13转换经刀之基本变换算子
原创【尚硅谷Java版】Flink1.13 转换运算符的基本转换运算符。
数据源读取数据后,我们可以使用各种转换运算符来传输一个或多个DataStream已转换为新DataStream。一个Flink程序的核心实际上是所有两点转换操作,它们决定了处理的业务逻辑。
1.基本转换运算符
1、映射map
map是一个大数据操作操作符,主要用于将数据流中的数据转换为新的数据流。
简单地说,这是一对一的对应关系,消耗一个元素产生一个元素。如下图所示:

我们只需要基地 DataStream调用map()方法 可以转换。方法需要传入的参数是接口。MapFunction实施;返回值类型或DataStream,但一般(流中元素的类型)可能会改变。
具体用途:
package com.atguigu.chapter05;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import sun.security.mscapi.CPublicKey;
/**
* @author potential
*/
public class TransformMapTest {
public static void main(String[] args) throws Exception {
//1,创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//平行度已设置1
env.setParallelism(1);
//从元素读取数据
DataStreamSource stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
//执行转换计算,提取user字段
/**
* 1,使用自定义类实现MapFunction接口
*/
SingleOutputStreamOperator map = stream.map(new MyMapper());
//2,使用匿名类实现MapFunction接口
SingleOutputStreamOperator map1 = stream.map(new MapFunction() {
@Override
public String map(Event event) throws Exception {
return event.user;
}
});
//3、传入Lambda表达式
SingleOutputStreamOperator map2 = stream.map(data -> data.user);
map.print();
map1.print();
map2.print();
env.execute();
}
/**
* 自定义类实现MapFunction接口 MapFunction
* MapFunction此通用,其中T指未转换的数据类型。 O指转换后需要输出的数据类型。
* 这里 我们转换之前的数据是Event这个对象 转换后为Event对象中的元素 所以应该是String
*/
public static class MyMapper implements MapFunction{
@Override
public String map(Event event) throws Exception {
return event.user;
}
}
}

2、过滤 filter
filter作为转换操作,顾名思义,对数据流执行筛选,使用布尔条件表达式设置筛选条件,并确定流中的每个元素,如果true则元件输出正常,如果false然后过滤掉元素。如下图所示:

进行filter转换后 新数据流的数据类型与元数据流相同。 ,filter转换需要传入参数。 实现FilterFunction接口 , 而FilterFunction要实现的目标filter方法,这等效于返回布尔类型的条件表达式。 。
具体使用:
package com.atguigu.chapter05;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author potential
*/
public class TransformFilterTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从元素读取数据
DataStreamSource stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
//1,在实现中传递。FilterFunction类的对象。
SingleOutputStreamOperator filter = stream.filter(new MyFilter());
//2,传入匿名类实现。FilterFunction接口
SingleOutputStreamOperator filter1 = stream.filter(new FilterFunction() {
@Override
public boolean filter(Event event) throws Exception {
return event.user.equals("Mary");
}
});
//3、传入lambda表达式
stream.filter(data->data.user.equals("Mary")).print("Mary click");
filter.print();
filter1.print();
env.execute();
}
//实现自定义FilterFunction
public static class MyFilter implements FilterFunction{
@Override
public boolean filter(Event event) throws Exception {
return event.user.equals("Mary");
}
}
}
测试结果:

3,平面映射 flatMap
flatMap该操作称为平面映射,主要是 整个数据流。(通常,它是一种集合类型)拆分为单个 使用消耗元素可以产生0~多个元素。
flatMap可以考虑 扁平化+映射 一组两步操作,即根据某个规则拆分数据,然后进行拆分元素转换处理。

具体用途:
package com.atguigu.chapter05;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author potential
*/
public class TransformFlatMapTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从元素读取数据
DataStreamSource stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
//1、实现FlatMapFunction
stream.flatMap(new MyFlatMap()).print("1");
//2,传入lambda表达式
stream.flatMap((Event event,Collector out)->{
if(event.user.equals("Mary")) {
out.collect(event.url);
} else if(event.user.equals("Bob")) {
out.collect(event.user);
out.collect(event.url);
out.collect(event.timestamp.toString());
}}).returns(new TypeHint() {
}).print("2");
env.execute();
}
//实现自定义FlatMapFunction
public static class MyFlatMap implements FlatMapFunction{
@Override
public void flatMap(Event event, Collector collector) throws Exception {
collector.collect(event.user);
collector.collect(event.url);
collector.collect(event.timestamp.toString());
}
}
}
测试结果:

版权声明
所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除
itfan123


