【尚硅谷Java版】Flink直接上手之批处置

原创
小哥 3年前 (2022-11-16) 阅读数 38 #大杂烩

【尚硅谷Java版】Flink 快速上手之批处理

一、开发环境

1、Java 8
2、IDEA 2021.03
3、Maven 3.6.1
4、Flink 1.13.0
5、Git 

二、搭建项目

创建一个Maven项目


选择本地Maven仓库

添加依赖


        8
        8
        1.13.0
        1.8
        2.12
        1.7.30
    

    
        
        
            org.apache.flink
            flink-java
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-clients_${scala.binary.version}
            ${flink.version}
        

        
        
        
            org.slf4j
            slf4j-api
            1.7.30
        
        
        
            org.apache.logging.log4j
            log4j-to-slf4j
            2.14.0
        
       

日志文件配置,创建 log4j.properties 文件,并且添加如下内容

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.lo4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] % - 5p %c %x - %m%n

三、项目编写

创建input包,并且创建 words.txt 文件并在其中添加下面内容

hello world
hello flink
hello java

创建 BatchWordCount 类,添加如下内容

package com.atguigu.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * @author 
 * @date 2022/6/20 10:11
 */
public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        //1、创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2、从文件中读取数据
        DataSource stringDataSource = env.readTextFile("input/words.txt");
        //3、将每行数据进行分词,转换成二元组类型
        FlatMapOperator>  wordAndOneTuple = stringDataSource.flatMap((String line, Collector> out) ->
                        //将一行文本进行分词
                {
                    String[] words = line.split(" ");
                    //将每个单词转换成二元组输出
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1L));
                    }
                }
        ).returns(Types.TUPLE(Types.STRING, Types.LONG));
        //4、按照word进行分组
        UnsortedGrouping> wordAndOneGroup = wordAndOneTuple.groupBy(0);
        //5、分组内进行聚合统计
        AggregateOperator> sum = wordAndOneGroup.sum(1);
        //6、打印结果输出
        sum.print();
    }
}

运行结果,将 words.txt 文件中单词的个数进行的汇总输出,结果如下图所示

Flink 快速上手之使用 DataSet API 实现批处理就全部完成了。

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除

热门