【尚硅谷Java版】Flink中DataStreamAPI篇之读取信息源

原创
小哥 3年前 (2022-11-07) 阅读数 37 #大杂烩

【尚硅谷Java版】Flink中DataStream API阅读文章的数据源。

flink可以从各种来源获取数据,然后进行构建。DataStream对于转换处理,数据的输入通常称为数据源,读取数据的运算符称为 源算子 (代码中调用的那个API)
在代码读取数据源的方式有以下三种:

  1. 从文件读取数据
  2. 从集合读取数据
  3. 从元素读取数据
  4. 从kafka读取数据

在使用上述4读取数据源的方法之前先创建一个名称为Event的类

注意:
这里我们需要注意以下几点:
1,类必须是公共的
2,所有属性都是公共的
*3,所有属性类型都是可序列化的。

package com.atguigu.chapter05;

import java.sql.Timestamp;

/**
 * @author potential
 */
public class Event {
    /**
     * 这里我们需要注意以下几点:
     *      1,类必须是公共的
     *      2,所有属性都是公共的
     *      3,所有属性类型都是可序列化的。
     */
    public String user;
    public String url;
    public Long timestamp;

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public Long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }

    public Event() {
    }

    public Event(String user, String url, Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Event{" +
                "user=" + user +  +
                ", url=" + url +  +
                ", timestamp=" + new Timestamp(timestamp) +
                };
    }
}

使用4读取数据源的方法

方法1:从文件读取数据

(1)创建 input 包,在其中创建 clicks.txt 文件

(2)并在 clicks.txt 将以下内容添加到文件中(可以随意添加,也可以直接复制我在这里直接使用):

 Mary,./home,1000
Alice,./cart,2000
Bob,./prod?id=100,3000
Bob,./cart,4000
Bob,./home,5000
Mary,./home,6000
Bob,./cart,7000
Bob,./home,8000
Bob,./prod?id=10, 9000

(3)编写测试类 SourceTest ,用于测试类 方法1:从文件读取数据源

   //(1)从文件读取数据 批量处理   常用
        DataStreamSource stream1 = env.readTextFile("input/clicks.txt");

方法2:从集合读取数据。

在测试类中使用 方法2:从集合读取数据。源

 ArrayList nums=new ArrayList<>();
        nums.add(2);
        nums.add(5);
        DataStreamSource numStream = env.fromCollection(nums);

        ArrayList events = new ArrayList<>();
        events.add(new Event("Mary","./home",1000L));
        events.add(new Event("Bob","./cart",2000L));
        DataStreamSource stream2 = env.fromCollection(events);

方法3:从元素读取数据

  DataStreamSource stream3 = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

模式4:来自socket读入文本流

 //(4)、从socket读入文本流
        DataStreamSource stream4  = env.socketTextStream("hostname", 7777);

测试过程可参考: https://blog.csdn.net/junR_980218/article/details/125375722

测试

将以上三种方法同时写入测试类。 SourceTest 其中,如以下代码所示。测验

package com.atguigu.chapter05;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

/**
 * @author potential
 */
public class SourceTest {
    public static void main(String[] args) throws Exception {
        //1,创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //易于调试,适用于全局并行度集。1
        env.setParallelism(1);
        /**
         * 2,从不同来源读取数据
         */
        //(1)从文件读取数据 批量处理   常用
        DataStreamSource stream1 = env.readTextFile("input/clicks.txt");

        //(2)从集合读取数据  常用于测试
        ArrayList nums=new ArrayList<>();
        nums.add(2);
        nums.add(5);
        DataStreamSource numStream = env.fromCollection(nums);

        ArrayList events = new ArrayList<>();
        events.add(new Event("Mary","./home",1000L));
        events.add(new Event("Bob","./cart",2000L));
        DataStreamSource stream2 = env.fromCollection(events);

        //(3),从元素读取数据  常用于测试
        DataStreamSource stream3 = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );
          //(4)、从socket读入文本流
        DataStreamSource stream4  = env.socketTextStream("hostname", 7777);

        stream1.print("1");
        numStream.print("nums");
        stream2.print("2");
        stream3.print("3");
        stream4.print("4");

        env.execute();

    }
}

测试结果

方法5:来自kafka读取数据

//(5)从kafka读取数据——即是消费数据 消费者模型
Properties properties = new Properties();
//"hadoop102:9092":指的是 虚拟机的名称和端口号
properties.setProperty("bootstrap.servers","hadoop102:9092");
properties.setProperty("group.id","consumer-group");
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset","latest");

DataStreamSource kafkaStream = env.addSource(new FlinkKafkaConsumer("clicks", new SimpleStringSchema(), properties));

kafkaStream.print();
env.execute();

测试,在虚拟机中打开。zookeeper、kafka,然后创建生产者,创建主题,然后输入数据进行测试。

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除

热门