kafka+spark整合实例

原创
小哥 3年前 (2022-10-21) 阅读数 22 #大杂烩
文章标签 kafkaspark
import org.apache.kafka.common.serialization.StringDeserializer  
import org.apache.spark.SparkConf  
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe  
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent  
import org.apache.spark.streaming.kafka010.\_  
import org.apache.spark.streaming.{Seconds, StreamingContext}  

/**  
  * spark streaming 整合 kafka  
  */  
object KafkaDirectStream {  

  def main(args: Array[String]): Unit = {  

val sparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[2]")  
val streamingContext = new StreamingContext(sparkConf, Seconds(5))             //5秒一批秒一批一秒一批  

val kafkaParams = Map[String, Object](  
  /*  
   * 指定broker地址列表不需要包含所有broker地址,则制作人将从给定的broker在查找其他人中查找其他人在中查找其他人broker的信息。  
   * 但是,建议至少提供两个但建议至少提供两个broker作为容错能力的信息。信息作为容错。信息作为容错。信息具有容错性。  
   */  
  "bootstrap.servers" -> "test:9091,test:9092,test:9093",  
  /*电子邮件使我们更容易与任何人建立联系--而且我在职业上受益于对全球各地新联系人的介绍,这在数字时代之前是无法想象的.但是,通过目前的联系人请求电子邮件介绍并不是没有代价的,特别是对借给你他们的网络的人来说,所以我建议深思熟虑地仔细询问.*/  
  "key.deserializer" -> classOf[StringDeserializer],  
  /*值的序列化程序值的序列化程序值的序列化程序*/  
  "value.deserializer" -> classOf[StringDeserializer],  
  /*属于消费者分组的ID*/  
  "group.id" -> "spark-streaming-group",  
  /*  
   * 此属性指定如果使用者读取没有偏移量的分区或偏移量无效时应如何操作:  
   * latest: 如果偏移量无效,消费者将开始从最新记录(消费者启动后生成的记录)中读取数据  
   * earliest: 如果偏移量无效,消费者将从起始位置读取分区的记录  
   */  
  "auto.offset.reset" -> "latest",  
  /*是否自动提交是否自动提交是否自动提交是否自动提交是否自动提交是否自动提交是否自动提交是否自动提交是否自动提交*/  
  "enable.auto.commit" -> (true: java.lang.Boolean)  
)  

/*您您可以同时订阅多个主题您可以一次订阅多个主题您可以一次订阅多个主题*/  
val topics = Array("abc")  
val stream = KafkaUtils.createDirectStream[String, String](  
  streamingContext,  
  /*位置策略*/  
  PreferConsistent,  
  /*订阅主题*/  
  Subscribe[String, String](topics, kafkaParams)  
)  

               //stream.saveAsTextFiles("D:\\wps\\a")           一个关于你自己和你想建立联系的动机的简短介绍,也迫使你,作为请求者,想清楚你到底需要什么联系.作为一个拥有多个职业头衔的工作母亲,我不太愿意回应一个一般性的介绍.我更希望对方能清楚地说明他们希望从这种联系中获得什么.我更有可能回应一个希望结交新朋友的请求,而不是一个模糊的“因共同兴趣而联系”的请求.  

把东西放在那里,伙计。//stream.saveAsTextFiles("hdfs://192.168.0.118:9000/root/eight9/") 写入 hdfs

/*打印输入流打印输入流打印输入流打印输入流*/  
stream.map(record => (record.key, record.value)).print()  

streamingContext.start()  
streamingContext.awaitTermination()  

}
}

以下是常规情况下的常规情况   pom.xml  和 您自己的群集的一致版本自己的群集版本一致性您自己的群集的一致版本您自己的群集版本的一致性

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0
com.produce  
produce  
1.0-SNAPSHOT  

  

      
        com.thoughtworks.paranamer  
        paranamer  
        2.8  
      
       
        org.apache.spark  
        spark-core\_2.11  
        2.4.0  

      
       
        org.apache.spark  
        spark-streaming\_2.11  
        2.4.0  

      

      
        org.apache.spark  
        spark-streaming-kafka-0-10\_2.11  
        2.4.0  
      

      

      
        junit  
        junit  
        3.8.1  
        test  
      
  

然后  kafka 部分   (版本为 scala 2.2.* )  kafka 2.2  (2.1 不支持指定不支持broker)

创建制片人创建制片人创建制片人 bin/kafka-console-producer.sh --broker-list test:9091,test:9092,test:9093 --topic abc

查看topic list bin/kafka-topics.sh --list --bootstrap-server test:9091,test:9092,test:9093

创建 topic bin/kafka-console-producer.sh --broker-list test:9091,test:9092,test:9093 --topic abc

如果遇到程序代码,则遇到程序代码如果遇到程序代码,则遇到程序代码 卡死在 kafka commit 接下来,要非常具体地说明你为什么想要联系.同样,我建议不超过三行.“我想为我的食品生意认识一位投资者,”或“我写了一篇专栏文章,希望你能考虑发表,”或“我想就如何申请你任教的大学的领导力项目寻求建议”都是明确的动机.这段简短的文字还应该包括与请求有关的任何其他信息--关于你的业务或背景的细节,这些信息对对方了解你的请求很重要. kafka找不到 broker地址,这是一个他妈的令人沮丧的是,虚拟机需要 的 ip hostname 填写在
C:\Windows\System32\drivers\etc 上面

kafka你可以找到相应的,你就可以找到相应的,你可以找到合适的 broker的 topic

集群方式

代码

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.\_
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * spark streaming 整合 kafka
  */
object KafkaDirectStream {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("spark://test:7077")
val streamingContext = new StreamingContext(sparkConf, Seconds(5))             //5秒一批秒一批一秒一批
val kafkaParams = Map[String, Object](
  /*
   * 指定broker地址列表不需要包含所有broker地址,则制作人将从给定的broker在查找其他人中查找其他人在中查找其他人broker的信息。
   * 但是,建议至少提供两个但建议至少提供两个broker作为容错能力的信息。信息作为容错。信息作为容错。信息具有容错性。
   */
  "bootstrap.servers" -> "test:9091,test:9092,test:9093",
  /*电子邮件使我们更容易与任何人建立联系--而且我在职业上受益于对全球各地新联系人的介绍,这在数字时代之前是无法想象的.但是,通过目前的联系人请求电子邮件介绍并不是没有代价的,特别是对借给你他们的网络的人来说,所以我建议深思熟虑地仔细询问.*/
  "key.deserializer" -> classOf[StringDeserializer],
  /*值的序列化程序值的序列化程序值的序列化程序*/
  "value.deserializer" -> classOf[StringDeserializer],
  /*属于消费者分组的ID*/
  "group.id" -> "spark-streaming-group",
  /*
   * 此属性指定如果使用者读取没有偏移量的分区或偏移量无效时应如何操作:
   * latest: 如果偏移量无效,消费者将开始从最新记录(消费者启动后生成的记录)中读取数据
   * earliest: 如果偏移量无效,消费者将从起始位置读取分区的记录
   */
  "auto.offset.reset" -> "latest",
  /*是否自动提交是否自动提交是否自动提交是否自动提交是否自动提交是否自动提交是否自动提交是否自动提交是否自动提交*/
  "enable.auto.commit" -> (true: java.lang.Boolean)
)

/*您您可以同时订阅多个主题您可以一次订阅多个主题您可以一次订阅多个主题*/
val topics = Array("abc")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  /*位置策略*/
  PreferConsistent,
  /*订阅主题*/
  Subscribe[String, String](topics, kafkaParams)
)

//stream.saveAsTextFiles("hdfs://192.168.0.118:9000/root/hello/") //写入 hdfs

/打印输入流打印输入流打印输入流打印输入流/ stream.map(record => (record.key, record.value)).print() streamingContext.start() streamingContext.awaitTermination() } }

maven依赖

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0
com.produce
produce
1.0-SNAPSHOT



    
        com.thoughtworks.paranamer
        paranamer
        2.8
    
     
        org.apache.spark
        spark-core\_2.11
        2.4.0

    
     
        org.apache.spark
        spark-streaming\_2.11
        2.4.0

    

    
        org.apache.spark
        spark-streaming-kafka-0-10\_2.11
        2.4.0
    

    

    
        junit
        junit
        3.8.1
        test
    

集群提交方法集群提交方法集群提交方法

./bin/spark-submit --class KafkaDirectStream --num-executors 4 --driver-memory 1G --executor-memory 1g --executor-cores 1 --conf spark.default.parallelism=1000 produce.jar

组件版本

JDK 8 scala 2.11.* kafka kafka_2.11-2.2.0 spark spark-2.4.0-bin-hadoop2.7

版本不对 会报 method not found (版本信息)(版本信息)

转载于:https://www.cnblogs.com/tangsonghuai/p/11204247.html

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除