【Kafka】从kafka中读取最新统计
原创【Kafka】从kafka读取中的最新数据
前情提要:我只是在这里看书kafka生产者内部的数据已经配置好,并将自动监控数据库中的更改以推入。kafka因此,这里对生产者没有太多解释。
1.死循环无限拉力。kafka数据
1.1 总体框架分析
1、要想从Kafka要读入数据,必须首先 消费者信息已配置
//1,创建使用者配置信息
Properties properties = new Properties();
//2,为配置信息赋值
//2.1 kafka集群信息
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");
//2.2 启用自动提交offset 每次提交后offset处于消费的最新位置
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
//2.3 自动提交offset延时 1每秒提交一次
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//2.4 key value反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//2.5 消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"base_db_app_210325");
2在消费者的基本配置信息完成之后, 创建消费者,订阅主题,供以后使用。
//创建消费者
KafkaConsumer consumer = new KafkaConsumer<>(properties);
//订阅主题
consumer.subscribe(Collections.singletonList("ticdc-paperfree-monitor"));
3,订阅主题,相当于已订阅kafka下一步是 消费
。而kafka使用消息的方式是poll拉,我们就在这里kafka数据在消费中,上面我们选择了自动提交。offset,然后每次offset这是最后一次消费完成后的最新位置,所以我们接下来每次消费得到的是最新的未消费数据!
while (true) {
//获取数据
ConsumerRecords poll = consumer.poll(100);
//分析和打印
for (ConsumerRecord stringStringConsumerRecord : poll) {
System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());
}
}
1.2 测试
方法一:
1、创建MyConsumer1类,根据以上对整体结构的分析,添加以下代码,并进行测试。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
/**
* @author potential
*/
public class MyConsumer1 {
public static void main(String[] args) {
//1,创建使用者配置信息
Properties properties = new Properties();
//2,为配置信息赋值
//2.1 kafka集群信息
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");
//2.2 启用自动提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
//2.3 自动提交offset延时 1每秒提交一次
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//2.4 key value反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//2.5 消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"base_db_app_210325");
//创建消费者
KafkaConsumer consumer = new KafkaConsumer<>(properties);
//订阅主题
consumer.subscribe(Collections.singletonList("ticdc-paperfree-monitor"));
while (true) {
//获取数据
ConsumerRecords poll = consumer.poll(100);
//分析和打印
for (ConsumerRecord stringStringConsumerRecord : poll) {
System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());
}
}
// //关闭连接
// consumer.close();
}
}
方法二:
2、创建MyConsumer2类,根据以上对整体结构的分析,添加以下代码,并进行测试。
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
/**
* @author potential
*/
public class MyConsumer2 {
public static void main(String[] args) {
//配置必要的参数
//准备一个map集合放置参数
Map config = new HashMap();
//bootserverscon
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");
//启用自动提交offset
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
//自动提交offset延时 1每秒提交一次
config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//valuedeserilizer
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class);
//groupid
config.put(ConsumerConfig.GROUP_ID_CONFIG, "base_db_app_210325");
//如果找不到偏移,则设置earliest,然后从最新消费开始,也就是说,当消费者开始最近一次消费时
//一定要注意顺序,阅读的顺序会影响
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// //这里是将消费者的补偿重置为生产者的补偿。
// Map hashMaps = new HashMap();
// hashMaps.put(new TopicPartition("ticdc-paperfree-monitor", 0), new OffsetAndMetadata(129));
//消费者
KafkaConsumer consumer = new KafkaConsumer(config);
// //放置刚刚设置的偏移。
// consumer.commitSync(hashMaps);
//先订阅后消费
consumer.subscribe(Arrays.asList("ticdc-paperfree-monitor"));
// // 从主题的分区批量拉取消息。
// //final ConsumerRecords consumerRecords = consumer.poll(3_000);
// ConsumerRecords consumerRecords = consumer.poll(3000);
while (true) {
//获取数据
ConsumerRecords poll = consumer.poll(100);
//分析和打印
for (ConsumerRecord stringStringConsumerRecord : poll) {
System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());
}
}
//
// //遍历从主题分区提取的此批处理消息 这里是取出整个分区中的所有数据。
// consumerRecords.forEach(new java.util.function.Consumer>() {
// @Override
// public void accept(ConsumerRecord consumerRecord) {
// System.out.println(
// consumerRecord.topic() +" "
// +consumerRecord.offset() + " "
// +consumerRecord.key() +" "
// +consumerRecord.value()+" "
// );
// }
// });
// consumer.close();
}
}
注意:
模式1。模式2只在写作上有所不同,整体结构相同,请选择一个写作。
至此,从kafka读取中的最新数据的流程就全部结束了。
二、@KafkaListener注解 实现监听kafka数据
1,导入依赖项
【我这里SpringBoot版本是2.2.13】
org.apache.kafka
kafka-clients
2.3.1
org.springframework.kafka
spring-kafka
2.3.7.RELEASE
注意:
1、springboot +2、kafka-clients +3、spring-kafka(如下图所示Sprig for Apache Kafka Version) 这三个 注意版本对应
。具体对应关系如下图所示:
2轮廓
在 application.yml
将以下内容添加到文件中:
spring:
kafka:
consumer:
bootstrap-servers: 192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092 #集群信息
producer: #生产者
retries: 0 #设置大于0值,客户端将发送失败的记录以重新发送。
batch-size: 16384 #批量大小
buffer-momory: 33554432 #生产侧缓冲区大小
acks: 1 #应答级别
#指定消息key以及消息体的解码方式。 序列化和反序列化
key- key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializ
consumer:
group-id: base_db_app_210325
enable-auto-comnit: true #是否自动提交offset
auto-offset-reset: latest #重置为分区中的最新版本offset(消费者分区中新生成的数据)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
topic-name: ticdc-paperfree-monitor #主题
listener:
ack-mode: manual_immediate
3、创建MyConsumer类,添加以下内容:
@KafkaListener(id = "test1", topics = "ticdc-paperfree-monitor")//这里id这是随机的。我在这里叫它test1,我这里的主题直接写死了,拿ticdc-paperfree-monitor本主题下的数据,也可以${},动态获取主题名称,group_id
public void listen(ConsumerRecord record) {
//从Kafka读取的数据
System.out.println("topic:" + record.topic());
System.out.println("value:" + record.value());
}
4、测试
运行 主启动类
,将在程序运行时自动侦听并输出数据。
3.参考文献
https://blog.csdn.net/m0_67391270/article/details/126505944
https://blog.csdn.net/weixin_46271129/article/details/119800649
版权声明
所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除