【尚硅谷Java版】Flink1.13接入算子之存储在Mysql
原创【尚硅谷Java版】Flink1.13输出操作员输出Mysql
1,添加依赖项
添加 flink-connect-jdbc 的依赖和 mysql 的依赖
org.apache.flink
flink-connector-jdbc_${scala.binary.version}
${flink.version}
mysql
mysql-connector-java
5.1.45
2,创建数据库和表
基于要保存到数据库的字段构建表。

3、测试
从元素读取数据写入。mysql在数据库中
package com.atguigu.chapter05;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author potential
*/
public class SinkToMySql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从元素读取数据
DataStreamSource stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Bob", "./home", 3500L),
new Event("Alice", "./prod?id=200", 3000L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L)
);
stream.addSink(JdbcSink.sink(
"insert into clicks (user,url) values(?,?)",
((statement,event)->{
statement.setString(1,event.user);
statement.setString(2,event.url);
}),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/flinkTest?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("123456")
.build()
));
env.execute();
}
}
4,测试结果
到mysql查看相应数据库的表,发现数据已成功插入mysql的表中

至此,flink实时读取数据并写入mysql该过程已完成。
版权声明
所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除
itfan123



