前面已经多次总结过hadoop spark的集群环境配置 ,一直用的也是spark on yarn 模式,已经能完全运行spark程序,这次要研究spark streaming + kafka 的开发配置。
1.kafka集群环境搭建
参考:
3.java向kafka发送消息
参考:
4.spark streaming 接受处理kafka消息
我用的idea搭建的scala+maven 项目开发spark,参考: Scala IDEA Maven开发配置
下面就直接上 spark streaming 代码
4.1 maven 添加 streaming 依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.1.0</version> <scope>provided</scope> </dependency>
4.2 spark streaming 入口主程序
import org.apache.kafka.common.serialization.StringDeserializer import org.apache.log4j.Logger import org.apache.spark.SparkConf import org.apache.spark.sql._ import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} /** * spark streaming 入口主程序 */ object StreamingApp { /** * main 方法 按顺序接受命令行参数 * @param args */ def main(args: Array[String]): Unit = { //初始化sparkSession val sparkConfig = new SparkConf() val sparkSession: SparkSession = SparkSession.builder().appName("streamingApp") .config(sparkConfig) .enableHiveSupport() .getOrCreate() // val durationTime=1 //初始化StreamingContext并配置1秒间隔读取数据 val ssc: StreamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(durationTime)) //一个group内的消费者只会消费一次同一消息 var groupId = "testGroup" //指定的消费者只能消费指定的topic消息 var topic = "testTopic" //kafka参数 val kafkaParams = Predef.Map[String, Object]( "bootstrap.servers" -> "slave1:9092", "group.id" -> groupId, "auto.offset.reset" -> "latest", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "enable.auto.commit" -> (true: java.lang.Boolean) ) //对消息进行监听 val stream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParams) ).transform { //这里可以对RDD进行预处理,比如提取一些值到driver,或者修改rdd的值 rdd =>{rdd} }.map( //map 本来就是transform动作,可以进行rdd转换 rdd => {rdd.value().split("\\|")} ) //遍历流中的rdd stream.foreachRDD(rdd => { var rows: Array[Array[String]] = rdd.collect() for (i <- 0 until rows.length) { try { //对获取到的kafka rdd数据进行业务处理 rdd.show } catch { case e: Throwable => { Logger.getLogger(this.getClass).error(s"处理消息时报错了.message", e) } } } }) //启动streaming上下文,开始接受数据 ssc.start() //等待处理停止 ssc.awaitTermination() } }
5.在集群启动spark streaming程序
用maven把scala程序打成jar包,并运行
spark-submit --name StreamingApp --class net.itxw.StreamingApp hdfs://hadoopMaster:9000/tmp/spark-0.0.1.jar 1>/dev/null 2>&1 &
不输出日志是为了防止输出流阻塞,后台运行是因为streaming 程序要长期接受kafka消息
然后你就可以在yarn 8088看到启动的spark streaming 应用。
然后你就可以用第3点开发出的java程序向spark streaming发送消息,或者启动kafka 生产者客户端来发送测试消息
./kafka-console-producer.sh --broker-list master:9092 --topic testTopic