【Kafka】从kafka中读取最新统计

原创
小哥 3年前 (2022-11-07) 阅读数 79 #C#
文章标签 .net

【Kafka】从kafka读取中的最新数据

前情提要:我只是在这里看书kafka生产者内部的数据已经配置好,并将自动监控数据库中的更改以推入。kafka因此,这里对生产者没有太多解释。

1.死循环无限拉力。kafka数据

1.1 总体框架分析

1、要想从Kafka要读入数据,必须首先 消费者信息已配置

//1,创建使用者配置信息
        Properties properties = new Properties();
        //2,为配置信息赋值
        //2.1 kafka集群信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");
        //2.2 启用自动提交offset 每次提交后offset处于消费的最新位置
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        //2.3 自动提交offset延时 1每秒提交一次
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        //2.4 key value反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //2.5 消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"base_db_app_210325");

2在消费者的基本配置信息完成之后, 创建消费者,订阅主题,供以后使用。

 //创建消费者
        KafkaConsumer consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList("ticdc-paperfree-monitor"));

3,订阅主题,相当于已订阅kafka下一步是 消费 。而kafka使用消息的方式是poll拉,我们就在这里kafka数据在消费中,上面我们选择了自动提交。offset,然后每次offset这是最后一次消费完成后的最新位置,所以我们接下来每次消费得到的是最新的未消费数据!

while (true) {
           //获取数据
           ConsumerRecords poll = consumer.poll(100);

           //分析和打印
           for (ConsumerRecord stringStringConsumerRecord : poll) {
               System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());

           }
       }

1.2 测试

方法一:

1、创建MyConsumer1类,根据以上对整体结构的分析,添加以下代码,并进行测试。

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

/**
 * @author potential
 */
public class MyConsumer1 {

    public static void main(String[] args) {
        //1,创建使用者配置信息
        Properties properties = new Properties();
        //2,为配置信息赋值
        //2.1 kafka集群信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");
        //2.2 启用自动提交offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        //2.3 自动提交offset延时 1每秒提交一次
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        //2.4 key value反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //2.5 消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"base_db_app_210325");

        //创建消费者
        KafkaConsumer consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList("ticdc-paperfree-monitor"));

       while (true) {
           //获取数据
           ConsumerRecords poll = consumer.poll(100);

           //分析和打印
           for (ConsumerRecord stringStringConsumerRecord : poll) {
               System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());

           }
       }
//        //关闭连接
//        consumer.close();

    }
}

方法二:
2、创建MyConsumer2类,根据以上对整体结构的分析,添加以下代码,并进行测试。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;

/**
 * @author potential
 */
public class MyConsumer2 {

    public static void main(String[] args) {
        //配置必要的参数
        //准备一个map集合放置参数
        Map config = new HashMap();
        //bootserverscon
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092");
        //启用自动提交offset
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        //自动提交offset延时 1每秒提交一次
        config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        //valuedeserilizer
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer.class);
        //groupid
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "base_db_app_210325");

        //如果找不到偏移,则设置earliest,然后从最新消费开始,也就是说,当消费者开始最近一次消费时
        //一定要注意顺序,阅读的顺序会影响
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

//        //这里是将消费者的补偿重置为生产者的补偿。
//        Map hashMaps = new HashMap();
//        hashMaps.put(new TopicPartition("ticdc-paperfree-monitor", 0), new OffsetAndMetadata(129));
        //消费者
        KafkaConsumer consumer = new KafkaConsumer(config);
//        //放置刚刚设置的偏移。
//        consumer.commitSync(hashMaps);
        //先订阅后消费
        consumer.subscribe(Arrays.asList("ticdc-paperfree-monitor"));

//        // 从主题的分区批量拉取消息。
//        //final ConsumerRecords consumerRecords = consumer.poll(3_000);
//        ConsumerRecords consumerRecords = consumer.poll(3000);

        while (true) {
            //获取数据
            ConsumerRecords poll = consumer.poll(100);

            //分析和打印
            for (ConsumerRecord stringStringConsumerRecord : poll) {
                System.out.println("stringStringConsumerRecord.key() = " + stringStringConsumerRecord.key() + "------" + stringStringConsumerRecord.value() + "----" + stringStringConsumerRecord.topic());

            }
        }

//
//        //遍历从主题分区提取的此批处理消息 这里是取出整个分区中的所有数据。
//        consumerRecords.forEach(new java.util.function.Consumer>() {
//            @Override
//            public void accept(ConsumerRecord consumerRecord) {
//                System.out.println(
//                        consumerRecord.topic() +" "
//                                +consumerRecord.offset() + "  "
//                                +consumerRecord.key() +"  "
//                                +consumerRecord.value()+" "
//                );
//            }
//        });
//        consumer.close();

    }
}

注意:
模式1。模式2只在写作上有所不同,整体结构相同,请选择一个写作。
至此,从kafka读取中的最新数据的流程就全部结束了。

二、@KafkaListener注解 实现监听kafka数据

1,导入依赖项

【我这里SpringBoot版本是2.2.13】


            org.apache.kafka
            kafka-clients
            2.3.1
        

        
            org.springframework.kafka
            spring-kafka
            2.3.7.RELEASE
        

注意:
1、springboot +2、kafka-clients +3、spring-kafka(如下图所示Sprig for Apache Kafka Version) 这三个 注意版本对应 。具体对应关系如下图所示:
2轮廓
application.yml 将以下内容添加到文件中:

spring:
  kafka:
    consumer:
      bootstrap-servers: 192.168.9.220:9092,192.168.9.221:9092,192.168.9.222:9092 #集群信息
      producer: #生产者
        retries: 0 #设置大于0值,客户端将发送失败的记录以重新发送。
        batch-size: 16384 #批量大小
        buffer-momory: 33554432 #生产侧缓冲区大小
        acks: 1 #应答级别
        #指定消息key以及消息体的解码方式。  序列化和反序列化
        key- key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializ
      consumer:
        group-id: base_db_app_210325
        enable-auto-comnit: true #是否自动提交offset
        auto-offset-reset: latest #重置为分区中的最新版本offset(消费者分区中新生成的数据)
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        topic-name: ticdc-paperfree-monitor #主题
      listener:
        ack-mode: manual_immediate

3、创建MyConsumer类,添加以下内容:

 @KafkaListener(id = "test1", topics = "ticdc-paperfree-monitor")//这里id这是随机的。我在这里叫它test1,我这里的主题直接写死了,拿ticdc-paperfree-monitor本主题下的数据,也可以${},动态获取主题名称,group_id
    public void listen(ConsumerRecord record) {
        //从Kafka读取的数据
        System.out.println("topic:" + record.topic());
        System.out.println("value:" + record.value());
        }

4、测试
运行 主启动类 ,将在程序运行时自动侦听并输出数据。

3.参考文献

https://blog.csdn.net/m0_67391270/article/details/126505944
https://blog.csdn.net/weixin_46271129/article/details/119800649

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除

热门