【尚硅谷Java版】Flink1.13之自动Source

原创
小哥 3年前 (2022-11-07) 阅读数 9 #C#
文章标签 .net

【尚硅谷Java版】Flink1.13之自定义Source

上一篇文章(地址: https://blog.csdn.net/junR_980218/article/details/125798564 )我们介绍 五种Flink读取数据源的方式以及这些数据源的可用性,我们将在本节中进行描述。 用户定义的数据源

1.基本环境

1,项目环境建设可参考: https://blog.csdn.net/junR_980218/article/details/125366210
2构建基本环境后,创建 Event实体类

package com.atguigu.chapter05;

import java.sql.Timestamp;

/**
 * @author potential
 */
public class Event {
    /**
     * 这里我们需要注意以下几点:
     *      1,类必须是公共的
     *      2,所有属性都是公共的
     *      3,所有属性类型都是可序列化的。
     */
    public String user;
    public String url;
    public Long timestamp;

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    public Event() {
    }

    public Event(String user, String url, Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Event{" +
                "user=" + user +  +
                ", url=" + url +  +
                ", timestamp=" + new Timestamp(timestamp) +
                };
    }
}

3,自定义数据

(1自定义单行数据简单。sourceFounction,串行读取数据,只能并行。1,吞吐量小

package com.atguigu.chapter05;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

/**
 * @author potential
 */
public class ClickSource implements SourceFunction {
    /**
     * 声明标志位
     */
    private Boolean running=true;

    @Override
    public void run(SourceContext sourceContext) throws Exception {
        //随机生成的数据
        Random random = new Random();
        //定义字段所选的数据集。
        String[] users={"Mary","Alice","Bob","Cary"};
        String[] urls={"./home","./cart","./fav","./prod?id=100,/prod?id=10"};

        //循环不断生成数据。
        while(running){
            String user=users[random.nextInt(users.length)];
            String url=urls[random.nextInt(urls.length)];
            //获取当前时间Calendar.getInstance() 获取毫秒getTimeInMillis()
            Long timestamp= Calendar.getInstance().getTimeInMillis();

            sourceContext.collect(new Event(user,url,timestamp));
            Thread.sleep(1000L);
        }
    }

    @Override
    public void cancel() {
        running=false;
    }
}

测试:

package com.atguigu.chapter05;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

import java.util.OptionalInt;
import java.util.Random;

/**
 * @author potential
 */
public class SourceCustomTest {
    public static void main(String[] args) throws Exception {
        //1,获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置全局并行性1
        env.setParallelism(1);

        //调用addSource方法  要传递的是习俗source方法
    DataStreamSource customStream = env.addSource(new ClickSource());

        customStream.print();
        env.execute();
    }

结果:

(2)自定义并行数据

定义并行数据和测试工具。ParallelSourceFunction,增加并行性,可以实现setParallelism方法设置并行以提高吞吐量。

package com.atguigu.chapter05;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;

import java.util.OptionalInt;
import java.util.Random;

/**
 * @author potential
 */
public class SourceCustomTest {
    public static void main(String[] args) throws Exception {
        //1,获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置全局并行性1
        env.setParallelism(1);

        //实现自定义并行SourceFounction
        DataStreamSource customStream = env.addSource(new ParallelCustomSource()).setParallelism(2);

        customStream.print();
        env.execute();

    }

    public static class ParallelCustomSource implements ParallelSourceFunction{

        private Boolean running=true;
        private Random random =new Random();

        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while(running){
                sourceContext.collect(random.nextInt());
            }

        }

        @Override
        public void cancel() {
            running=false;

        }
    }
}

测试结果:

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除