【尚硅谷Java版】Flink中DataStreamAPI篇之读取信息源
原创【尚硅谷Java版】Flink中DataStream API阅读文章的数据源。
flink可以从各种来源获取数据,然后进行构建。DataStream对于转换处理,数据的输入通常称为数据源,读取数据的运算符称为 源算子
(代码中调用的那个API)
在代码读取数据源的方式有以下三种:
- 从文件读取数据
- 从集合读取数据
- 从元素读取数据
- 从kafka读取数据
在使用上述4读取数据源的方法之前先创建一个名称为Event的类
注意:
这里我们需要注意以下几点:
1,类必须是公共的
2,所有属性都是公共的
*3,所有属性类型都是可序列化的。
package com.atguigu.chapter05;
import java.sql.Timestamp;
/**
* @author potential
*/
public class Event {
/**
* 这里我们需要注意以下几点:
* 1,类必须是公共的
* 2,所有属性都是公共的
* 3,所有属性类型都是可序列化的。
*/
public String user;
public String url;
public Long timestamp;
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Event() {
}
public Event(String user, String url, Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "Event{" +
"user=" + user + +
", url=" + url + +
", timestamp=" + new Timestamp(timestamp) +
};
}
}
使用4读取数据源的方法
方法1:从文件读取数据
(1)创建 input
包,在其中创建 clicks.txt
文件
(2)并在 clicks.txt
将以下内容添加到文件中(可以随意添加,也可以直接复制我在这里直接使用):
Mary,./home,1000
Alice,./cart,2000
Bob,./prod?id=100,3000
Bob,./cart,4000
Bob,./home,5000
Mary,./home,6000
Bob,./cart,7000
Bob,./home,8000
Bob,./prod?id=10, 9000
(3)编写测试类 SourceTest
,用于测试类 方法1:从文件读取数据源
//(1)从文件读取数据 批量处理 常用
DataStreamSource stream1 = env.readTextFile("input/clicks.txt");
方法2:从集合读取数据。
在测试类中使用 方法2:从集合读取数据。源
ArrayList nums=new ArrayList<>();
nums.add(2);
nums.add(5);
DataStreamSource numStream = env.fromCollection(nums);
ArrayList events = new ArrayList<>();
events.add(new Event("Mary","./home",1000L));
events.add(new Event("Bob","./cart",2000L));
DataStreamSource stream2 = env.fromCollection(events);
方法3:从元素读取数据
DataStreamSource stream3 = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
模式4:来自socket读入文本流
//(4)、从socket读入文本流
DataStreamSource stream4 = env.socketTextStream("hostname", 7777);
测试过程可参考: https://blog.csdn.net/junR_980218/article/details/125375722
测试
将以上三种方法同时写入测试类。 SourceTest
其中,如以下代码所示。测验
package com.atguigu.chapter05;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
/**
* @author potential
*/
public class SourceTest {
public static void main(String[] args) throws Exception {
//1,创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//易于调试,适用于全局并行度集。1
env.setParallelism(1);
/**
* 2,从不同来源读取数据
*/
//(1)从文件读取数据 批量处理 常用
DataStreamSource stream1 = env.readTextFile("input/clicks.txt");
//(2)从集合读取数据 常用于测试
ArrayList nums=new ArrayList<>();
nums.add(2);
nums.add(5);
DataStreamSource numStream = env.fromCollection(nums);
ArrayList events = new ArrayList<>();
events.add(new Event("Mary","./home",1000L));
events.add(new Event("Bob","./cart",2000L));
DataStreamSource stream2 = env.fromCollection(events);
//(3),从元素读取数据 常用于测试
DataStreamSource stream3 = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
//(4)、从socket读入文本流
DataStreamSource stream4 = env.socketTextStream("hostname", 7777);
stream1.print("1");
numStream.print("nums");
stream2.print("2");
stream3.print("3");
stream4.print("4");
env.execute();
}
}
测试结果
方法5:来自kafka读取数据
//(5)从kafka读取数据——即是消费数据 消费者模型
Properties properties = new Properties();
//"hadoop102:9092":指的是 虚拟机的名称和端口号
properties.setProperty("bootstrap.servers","hadoop102:9092");
properties.setProperty("group.id","consumer-group");
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset","latest");
DataStreamSource kafkaStream = env.addSource(new FlinkKafkaConsumer("clicks", new SimpleStringSchema(), properties));
kafkaStream.print();
env.execute();
测试,在虚拟机中打开。zookeeper、kafka,然后创建生产者,创建主题,然后输入数据进行测试。
版权声明
所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除
上一篇:IDEA中Lombok浏览器的安装和操作 下一篇:Navicat15下载最新版