【尚硅谷Java版】Flink1.13之自动Source
原创【尚硅谷Java版】Flink1.13之自定义Source
上一篇文章(地址: https://blog.csdn.net/junR_980218/article/details/125798564 )我们介绍 五种Flink读取数据源的方式以及这些数据源的可用性,我们将在本节中进行描述。 用户定义的数据源 。
1.基本环境
1,项目环境建设可参考: https://blog.csdn.net/junR_980218/article/details/125366210
2构建基本环境后,创建 Event实体类
package com.atguigu.chapter05;
import java.sql.Timestamp;
/**
* @author potential
*/
public class Event {
/**
* 这里我们需要注意以下几点:
* 1,类必须是公共的
* 2,所有属性都是公共的
* 3,所有属性类型都是可序列化的。
*/
public String user;
public String url;
public Long timestamp;
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Event() {
}
public Event(String user, String url, Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "Event{" +
"user=" + user + +
", url=" + url + +
", timestamp=" + new Timestamp(timestamp) +
};
}
}
3,自定义数据
(1自定义单行数据简单。sourceFounction,串行读取数据,只能并行。1,吞吐量小
package com.atguigu.chapter05;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;
/**
* @author potential
*/
public class ClickSource implements SourceFunction {
/**
* 声明标志位
*/
private Boolean running=true;
@Override
public void run(SourceContext sourceContext) throws Exception {
//随机生成的数据
Random random = new Random();
//定义字段所选的数据集。
String[] users={"Mary","Alice","Bob","Cary"};
String[] urls={"./home","./cart","./fav","./prod?id=100,/prod?id=10"};
//循环不断生成数据。
while(running){
String user=users[random.nextInt(users.length)];
String url=urls[random.nextInt(urls.length)];
//获取当前时间Calendar.getInstance() 获取毫秒getTimeInMillis()
Long timestamp= Calendar.getInstance().getTimeInMillis();
sourceContext.collect(new Event(user,url,timestamp));
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
running=false;
}
}
测试:
package com.atguigu.chapter05;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.OptionalInt;
import java.util.Random;
/**
* @author potential
*/
public class SourceCustomTest {
public static void main(String[] args) throws Exception {
//1,获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置全局并行性1
env.setParallelism(1);
//调用addSource方法 要传递的是习俗source方法
DataStreamSource customStream = env.addSource(new ClickSource());
customStream.print();
env.execute();
}
结果:

(2)自定义并行数据
定义并行数据和测试工具。ParallelSourceFunction,增加并行性,可以实现setParallelism方法设置并行以提高吞吐量。
package com.atguigu.chapter05;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.OptionalInt;
import java.util.Random;
/**
* @author potential
*/
public class SourceCustomTest {
public static void main(String[] args) throws Exception {
//1,获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置全局并行性1
env.setParallelism(1);
//实现自定义并行SourceFounction
DataStreamSource customStream = env.addSource(new ParallelCustomSource()).setParallelism(2);
customStream.print();
env.execute();
}
public static class ParallelCustomSource implements ParallelSourceFunction{
private Boolean running=true;
private Random random =new Random();
@Override
public void run(SourceContext sourceContext) throws Exception {
while(running){
sourceContext.collect(random.nextInt());
}
}
@Override
public void cancel() {
running=false;
}
}
}
测试结果:

版权声明
所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除
itfan123




