【尚硅谷Java版】Flink直接上手之无界流管理
原创【尚硅谷Java版】Flink快速上手之无界流处理
一、项目环境搭建
项目环境搭建我这里不做过多的赘述,直接参考我之前的博客里面的环境搭建完成搭建就可以了。
地址: https://blog.csdn.net/junR_980218/article/details/125366210
二、项目编写
1、编写 StreamWordCount
类,并添加如下内容
package com.atguigu.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author
* @date 2022/6/20 15:15
*/
public class StreamWordCount {
public static void main(String[] args) throws Exception {
//1、创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 等同于下面第二步 第二步是将主机名和端口号写在上面,这里是从参数中提取主机名和端口号
//ParameterTool 是Flink从当前main方法中提取参数的一个工具
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String hostname=parameterTool.get("host");
Integer port=parameterTool.getInt("port");
//2、读取文本流
DataStreamSource stringDataStreamSource = env.socketTextStream("DESKTOP-FIND8C3",9000);
//3、转换计算
SingleOutputStreamOperator> wordAndOneTuple = stringDataStreamSource.flatMap((String line, Collector> out) ->
{
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
//4、分组
KeyedStream, String> wordAndOneKeyStream = wordAndOneTuple.keyBy(data -> data.f0);
//5、求和
SingleOutputStreamOperator> sum = wordAndOneKeyStream.sum(1);
//6、打印输出
sum.print();
//7、自动执行
env.execute();
}
}
对于此段代码的第二个步骤
//2、读取文本流
,上面是直接将主机名和端口号固定了,我们也可以通过下面的操作不固定主机名和端口号
-
将上面
\2、读取文本流
全部代码换成下面的//ParameterTool 是Flink从当前main方法中提取参数的一个工具 ParameterTool parameterTool = ParameterTool.fromArgs(args); String hostname=parameterTool.get("host"); Integer port=parameterTool.getInt("port"); DataStreamSource
stringDataStreamSource = env.socketTextStream(hostname,port);
2.去配置一下配置项,才可以实现不将主机名和端口号固定的操作
三、测试
因为是无界流处理,所以需要来一条数据处理一条数据,我们通过netcat工具来实现这个交互,具体Windows安装下载netcat的过程可以参考: https://blog.csdn.net/junR_980218/article/details/125374526
然后输入数据来验证
至此,Flink快速上手之无界流处理就到此结束了~
版权声明
所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除