Spark实时计算--Spark Streaming 集成Kafka

主要内容
01.Kafka和Spark Streaming  
   http://spark.apache.org/docs/latest/streaming-kafka-integration.html
02.Kafka和Spring
03.Kafka的介绍

1.Kafka和Spark Streaming

01. pom.xml引入Maven
  版本
    spark-streaming_2.11
    spark-streaming-kafka-0-10_2.11 
    kafka_2.11              0.11.0.1
02.引入--声明
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.common.serialization.StringDeserializer

03.流程
   val sparkConf = new SparkConf()
                            .set("spark.streaming.kafka.maxRatePerPartition", "20")//设置该值需要知道Kafka有多少个分区
                             .set("spark.driver.allowMultipleContexts", "true")

  val ssc = new StreamingContext(sparkConf, Seconds(5))

  val kafkaParams = Map[String, Object](
           "bootstrap.servers" -"localhost:9092,anotherhost:9092",
           "key.deserializer" -classOf[StringDeserializer],
            "value.deserializer" -classOf[StringDeserializer],
            "group.id" -"use_a_separate_group_id_for_each_stream",
             "auto.offset.reset" -"latest",
             "enable.auto.commit" -(false: java.lang.Boolean)
           )
val topics = Array("topicA", "topicB")
//KafkaUtils.createDirectStream 返回值是ConsumerRecord[K, V]类型
// topicpartitionoffset
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)
//业务处理

// 计算完毕后
ssc.awaitTermination();  
说明:
00.批处理间隔必须根据应用程序和可用群集资源的延迟要求进行设置
01.StreamingContext对象是Spark Streaming所有流操作的主要入口。
    一个StreamingContext 对象可以用SparkConf对象创建。
    一个SparkContext可以重复利用创建多个StreamingContext
    只要在创建下一个StreamingContext之前停止前一个StreamingContext(而不停止SparkContext)即可
     Seconds(5)   10秒消费一次topic 1topic2
02.KafkaUtils
       订阅主题
    三种主题订阅方式-- subscribe
           subscribe()  -- 通常使用ArrayList来指定消费者主题
           subscribe()   --指定一个监听器
            subscribe()
  kafka设置:
  val group = "spark-computing_stat"
   val kafkaParams = Map[String, Object](
                 "bootstrap.servers" ->  ("kafka.brokers"),
                 "key.deserializer" -> classOf[StringDeserializer],
                 "value.deserializer" -> classOf[StringDeserializer],
                 "group.id" -> group,
                 "auto.offset.reset" -> "latest",
                 "enable.auto.commit" -> (false: java.lang.Boolean)
                 )
  val topics = Array(" Info", " chInfo", " atchInfo")
  参数说明:
#连接kafka 的brokers
          示例: #kafka.brokers=178.10.10.91:6667,178.10.10.92:6667,178.10.10.93:6667
auto.offset.reset含义是当数据被消费完之后会,如果spark streaming的程序由于某种原因停止之后再启动, 下次不会重复消费之前消费过的数
其他--驱动程序容错
消息消费:
   At-most-once
   At-least-once
   Exactly-once
  Kafka spark streaming
     01.Receiver-based Approach 第一种是利用 Kafka 消费者高级 API  Spark 的工作节点上创建消费者线程,订阅 Kafka 中的消息,
      数据会传输到 Spark 工作节点的执行器中
         Kafka Write Ahead LogsWAL)来保证数据可靠性和一致性的数据保存方式
         方法:kafkaUtils.createStream
    02. Direct Approach (No Receivers)如果需要保证数据安全可以通过SparkcheckPoint
                 //设置在HDFS上的checkpoint目录 --                 ssc.checkpoint('hdfs://spark/sparkstreaming/checkpoint')
                   //设置通过间隔时间,定时持久checkpointhdfs--      stream.checkpoint(Seconds(50))
                缺点是无法使用基于zookeeperkafka监控工具
       函数:kafkaUtils.createDirectStream

2.Kafka和Spring

和Spring整合
  整合方式:方式一: The Spring for Apache Kafka (spring-kafka) project
             spring-kafka   https://spring.io/projects/spring-kafka
            方式二: spring-integration-kafka
                https://github.com/spring-projects/spring-integration-kafka
                Spring Integration Kafka is now based on the Spring for Apache Kafka project
                    spring-integration-kafka module of Spring Integration
            Spring Integration Kafka 是基于 Apache Kafka 和Spring Integration来集成Kafka,对开发配置提供了方便
    创建生产者的相关操作有Spring容器来管理KafkaTemplate对象实例化
    消费者: MessageListenerContainer 接口

3.Kafka的介绍

3.1脚本管理

服务管理:
    启动kafka脚本  kafka-cluster-start.sh
               kafka-cluster-stop.sh
配置管理
     kafka-configs.sh
创建主题
    kafka-topics.sh  --create --zookeeper 
                        -- replication-factor
                        --partitions 
                        --topic
            kafka-topics.sh    --delete  --zookeeper 
                                                    --topic
            kafka-topics.sh  --list --zookeeper
            kafka-topics.sh  --describe--zookeeper
生产者
        kafka-console-producer.sh  --broker-list  --topic --property
消费者:
  01. 新版消费者 通过消费组协调器同一管理GroupCoordinator 去掉了对Zookeeper的依赖
      消费组  group   --指定消费者消费的主题
        kafka-console-consumers.sh   
               --from-beginning
               --offset
    02.消费者列表 主题列表  消费组消费主题的分区偏移量--已消费的最大偏移量
     消费单个主题,消费多个主题
      消息的单播和多播
       单播:  一条消息只能被某一个消费者消费的模式
       多播: 能被多个消费者消费的模式
       实现方式:消费者在不同的消费组
       不存在广播:不能<所有消费者>
     03.查看消费偏移量
        kafka-consumer-offset-checker.sh
        kafka-consumer-groups.sh
        logSize  消息最大偏移量
        Lag  消费者未消费的剩余量   或者已消费但是为提交而落后消费偏移量的的剩余量
    提交偏移量的频率
       offset.flush.interval.ms
     保存偏移量的文件路径
       offset.stoage.file.filename
      主题订阅方式-- subscribe
           subscribe()  -- 通常使用ArrayList来指定消费者主题
           subscribe()   --指定一个监听器
            subscribe()
分区:
      分区迁移-- 生成分区分配方案 kafka-reassign-partitions.sh
      增加分区-增加副本
连接器 standalone 和 distributed 两种工作模式
  connect-standalone.sh
  connect-distributed.sh
安全机制和数据备份
    01.身份认证和权限控制
    02.镜像操作 kafka-mirror-maker.sh

3.2 API接口

    ZkUtils类
    AdminUtils类  
                AdminUtils.createTopic() 创建主题
                AdminUtils.addPartitions() 则增加分区
                AdminUtils.deleteTopic()
    ZkClient类
01.生产者 Producer API
创建Properties对象-设置生产者级别的配置
     bootstrap.servers
      key.serializer
      value.serializer
    根据Properties对象实例化一个KafkaProducer对象
         send方法--需要实现org.apache.kafka.clients.producer.Callback()接口
         返回 Future对象
    关闭KafkaProducer ,释放连接的资源
    多线程--KafkaProducerThread 线程类
02.消费者API
新版消费者(Java版本)和旧版消费者(Scala版本)--新版本的消费者不在强依赖Zookeeper,
偏移量保存在Kafka内部主题"__consumer_offsets"
  Low-level 底层API
  High-Level 高级API
KafkaConsumer
     bootstrap.servers
      key.serializer          org.apache.kafka.common.serialization.StringDeserializer
      value.serializer        org.apache.kafka.common.serialization.StringDeserializer
      enable.auto.commit
      auto.commit.interval.ms
      group.id
KafkaConsumer三种订阅主题的方式KafkaConsumer.subscribe()
   subscribe( Collection(String) topics)  通常使用ArrayList
   subscribe(Collection(String) topics,  ConsumerRebalanceListener listener)
   subscribe(Pattern pattern,  ConsumerRebalanceListener listener)
偏移量
  timestampsToSearch()
  consumer.commitAsync(new OffsetCommitCallback())
消费速度控制
    pause()   和resume()
序列化和反序列化    Avro依赖Schema来实现数据结构的定义,Schema主要有Json对象来表示

主题-分区-副本-Kafka集群和Zookeeper连接地址
            主题
            分区--Kafka并行处理的基本单元
            副本-副本不能超过节点数
生产者
消费者 
消费组 groupid
消费者消费位移确认
    自动提交 enable.auto.commit
    手动提交:
        异步提交  commitAsync()
        同步提交 commitSync()
-- 最好partiton数目是consumer数目的整数倍
enable.auto.commit=true
--自动提交的方式--自动提交是在kafka拉取到数据之后就直接提交
-- auto.offset.reset
[earliest | latest],表示将offset置到哪里 
    从最开始的地方消费:
            automatically reset the offset to the earliest offset,自动将偏移量置为最早的       
            当各分区下有已提交的offset时,从提交的offset开始消费;
                无提交的offset时,从头开始消费 
    从最新的数据消费
       当各分区下有已提交的offset时,从提交的offset开始消费;
        无提交的offset时,消费新产生的该分区下的数据 
    抛出异常-none
    record  消息

3.3 Kafka核心组件和主要流程:

控制器 kafkaController
             Leader 
              控制器选举机制和过程 --依赖于Zookeeper
协调器
    消费者协调        ConsumerCoordinator
    消费组协调器    GroupCoordinator
    任务管理协调器 WorkCoodinator
网络通信服务
    SocketServer  基于Java NIO实现的网络通信组件
    KafkaRequestHandlerPool
日志管理子系统
一主题为基本单位进行组织的

4.参考:

Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
https://www.jianshu.com/p/688e1b751a85
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
https://www.tutorialspoint.com/apache_kafka/apache_kafka_integration_spark.htm

blogroll

social