spark streaming kafka 开发案例与环境配置

spark | 2020-03-06 16:25:47

前面已经多次总结过hadoop spark的集群环境配置linux hadoop spark环境搭建      hive on spark集群环境搭建    hive on spark环境搭建(官方源码编译方式) ,一直用的也是spark on yarn 模式,已经能完全运行spark程序,这次要研究spark streaming + kafka 的开发配置。


1.kafka集群环境搭建

参考:kafka集群环境配置安装搭建


3.java向kafka发送消息

参考:spring KafkaTemplate 实现 kafka 消息发送案例代码


4.spark streaming 接受处理kafka消息

我用的idea搭建的scala+maven 项目开发spark,参考: Scala IDEA Maven开发配置

下面就直接上 spark streaming 代码

4.1 maven 添加 streaming 依赖

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.1.0</version>
            <scope>provided</scope>
        </dependency>


4.2 spark streaming 入口主程序


import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
  * spark streaming 入口主程序
  */
object StreamingApp {
  /**
    * main 方法 按顺序接受命令行参数
    * @param args
    */
  def main(args: Array[String]): Unit = {
    //初始化sparkSession
    val sparkConfig = new SparkConf()
    val sparkSession: SparkSession = SparkSession.builder().appName("streamingApp")
      .config(sparkConfig)
      .enableHiveSupport()
      .getOrCreate()
    //
    val durationTime=1
    //初始化StreamingContext并配置1秒间隔读取数据
    val ssc: StreamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(durationTime))
    //一个group内的消费者只会消费一次同一消息
    var groupId = "testGroup"
    //指定的消费者只能消费指定的topic消息
    var topic = "testTopic"
    //kafka参数
    val kafkaParams = Predef.Map[String, Object](
      "bootstrap.servers" -> "slave1:9092",
      "group.id" -> groupId,
      "auto.offset.reset" -> "latest",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    //对消息进行监听
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParams)
    ).transform {
      //这里可以对RDD进行预处理,比如提取一些值到driver,或者修改rdd的值
      rdd =>{rdd}
    }.map(
      //map 本来就是transform动作,可以进行rdd转换
      rdd => {rdd.value().split("\\|")}
    )
    //遍历流中的rdd
    stream.foreachRDD(rdd => {
      var rows: Array[Array[String]] = rdd.collect()
      for (i <- 0 until rows.length) {
        try {
          //对获取到的kafka rdd数据进行业务处理
          rdd.show
        }
        catch {
          case e: Throwable => {
            Logger.getLogger(this.getClass).error(s"处理消息时报错了.message", e)
          }
        }
      }
    })
    //启动streaming上下文,开始接受数据
    ssc.start()
    //等待处理停止
    ssc.awaitTermination()
  }
}


5.在集群启动spark streaming程序

用maven把scala程序打成jar包,并运行

spark-submit --name StreamingApp --class net.itxw.StreamingApp  hdfs://hadoopMaster:9000/tmp/spark-0.0.1.jar 1>/dev/null 2>&1 &

不输出日志是为了防止输出流阻塞,后台运行是因为streaming 程序要长期接受kafka消息


然后你就可以在yarn 8088看到启动的spark streaming 应用。

1.jpg


然后你就可以用第3点开发出的java程序向spark streaming发送消息,或者启动kafka 生产者客户端来发送测试消息

./kafka-console-producer.sh --broker-list master:9092 --topic testTopic

登录后即可回复 登录 | 注册
    
关注编程学问公众号