【尚硅谷Java版】Flink1.13接入算子之存储在Mysql

原创
小哥 3年前 (2022-11-07) 阅读数 5 #大杂烩

【尚硅谷Java版】Flink1.13输出操作员输出Mysql

1,添加依赖项
添加 flink-connect-jdbc 的依赖和 mysql 的依赖

  
        org.apache.flink
        flink-connector-jdbc_${scala.binary.version}
        ${flink.version}
    

    
        mysql
        mysql-connector-java
        5.1.45
    

2,创建数据库和表
基于要保存到数据库的字段构建表。

3、测试
从元素读取数据写入。mysql在数据库中

package com.atguigu.chapter05;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author potential
 */
public class SinkToMySql {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //从元素读取数据
        DataStreamSource stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3300L),
                new Event("Bob", "./home", 3500L),
                new Event("Alice", "./prod?id=200", 3000L),
                new Event("Bob", "./prod?id=2", 3800L),
                new Event("Bob", "./prod?id=3", 4200L)

        );

        stream.addSink(JdbcSink.sink(
                "insert into clicks (user,url) values(?,?)",
                ((statement,event)->{
                    statement.setString(1,event.user);
                    statement.setString(2,event.url);
                }),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/flinkTest?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false")
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("123456")
                        .build()
        ));
        env.execute();
    }
}

4,测试结果
到mysql查看相应数据库的表,发现数据已成功插入mysql的表中

至此,flink实时读取数据并写入mysql该过程已完成。

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除