Published: 2018-07-01 11:49:00
By ytwan
In Big Data .
tags: things
主要内容
01.Kafka和Spark Streaming
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
02.Kafka和Spring
03.Kafka的介绍
1.Kafka和Spark Streaming
01. pom . xml 引入 Maven
版本
spark - streaming_2 . 11
spark - streaming - kafka - 0 - 10 _2 . 11
kafka_2 . 11 0.11 . 0.1
02. 引入 -- 声明
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming. { Seconds , StreamingContext }
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.common.serialization.StringDeserializer
03. 流程
val sparkConf = new SparkConf ()
. set ( "spark.streaming.kafka.maxRatePerPartition" , "20" ) // 设置该值需要知道 Kafka 有多少个分区
. set ( "spark.driver.allowMultipleContexts" , "true" )
val ssc = new StreamingContext ( sparkConf , Seconds ( 5 ))
val kafkaParams = Map [ String , Object ](
"bootstrap.servers" - "localhost:9092,anotherhost:9092" ,
"key.deserializer" - classOf [ StringDeserializer ],
"value.deserializer" - classOf [ StringDeserializer ],
"group.id" - "use_a_separate_group_id_for_each_stream" ,
"auto.offset.reset" - "latest" ,
"enable.auto.commit" - ( false : java . lang . Boolean )
)
val topics = Array ( "topicA" , "topicB" )
// KafkaUtils . createDirectStream 返回值是 ConsumerRecord [ K , V ] 类型
// topic , partition 和 offset
val stream = KafkaUtils . createDirectStream [ String , String ](
ssc ,
PreferConsistent ,
Subscribe [ String , String ]( topics , kafkaParams )
)
// 业务处理
// 计算完毕后
ssc . awaitTermination ();
说明:
00. 批处理间隔必须根据应用程序和可用群集资源的延迟要求进行设置
01. StreamingContext 对象是 Spark Streaming 所有流操作的主要入口。
一个 StreamingContext 对象可以用 SparkConf 对象创建。
一个 SparkContext 可以重复利用创建多个 StreamingContext ,
只要在创建下一个 StreamingContext 之前停止前一个 StreamingContext (而不停止 SparkContext )即可
Seconds ( 5 ) 每 10 秒消费一次 topic 1 和 topic2
02. KafkaUtils
订阅主题
三种主题订阅方式 -- subscribe
subscribe () -- 通常使用 ArrayList 来指定消费者主题
subscribe () -- 指定一个监听器
subscribe ()
kafka 设置:
val group = "spark-computing_stat"
val kafkaParams = Map [ String , Object ](
"bootstrap.servers" -> ( "kafka.brokers" ),
"key.deserializer" -> classOf [ StringDeserializer ],
"value.deserializer" -> classOf [ StringDeserializer ],
"group.id" -> group ,
"auto.offset.reset" -> "latest" ,
"enable.auto.commit" -> ( false : java . lang . Boolean )
)
val topics = Array ( " Info" , " chInfo" , " atchInfo" )
参数说明:
#连接kafka 的brokers
示例: #kafka.brokers=178.10.10.91:6667,178.10.10.92:6667,178.10.10.93:6667
auto . offset . reset 含义是当数据被消费完之后会,如果 spark streaming 的程序由于某种原因停止之后再启动, 下次不会重复消费之前消费过的数
其他 -- 驱动程序容错
消息消费:
At - most - once
At - least - once
Exactly - once
Kafka 和 spark streaming
01. Receiver - based Approach 第一种是利用 Kafka 消费者高级 API 在 Spark 的工作节点上创建消费者线程,订阅 Kafka 中的消息,
数据会传输到 Spark 工作节点的执行器中
Kafka 用 Write Ahead Logs ( WAL )来保证数据可靠性和一致性的数据保存方式
方法: kafkaUtils . createStream
02. Direct Approach ( No Receivers ) 如果需要保证数据安全可以通过 Spark 的 checkPoint
// 设置在 HDFS 上的 checkpoint 目录 -- ssc . checkpoint ( 'hdfs://spark/sparkstreaming/checkpoint' )
// 设置通过间隔时间 , 定时持久 checkpoint 到 hdfs 上 -- stream . checkpoint ( Seconds ( 50 ))
缺点是无法使用基于 zookeeper 的 kafka 监控工具
函数: kafkaUtils . createDirectStream
2.Kafka和Spring
和Spring整合
整合方式:方式一: The Spring for Apache Kafka (spring-kafka) project
spring-kafka https://spring.io/projects/spring-kafka
方式二: spring-integration-kafka
https://github.com/spring-projects/spring-integration-kafka
Spring Integration Kafka is now based on the Spring for Apache Kafka project
spring-integration-kafka module of Spring Integration
Spring Integration Kafka 是基于 Apache Kafka 和Spring Integration来集成Kafka,对开发配置提供了方便
创建生产者的相关操作有Spring容器来管理KafkaTemplate对象实例化
消费者: MessageListenerContainer 接口
3.Kafka的介绍
3.1脚本管理
服务管理:
启动kafka脚本 kafka-cluster-start.sh
kafka-cluster-stop.sh
配置管理
kafka-configs.sh
创建主题
kafka-topics.sh --create --zookeeper
-- replication-factor
--partitions
--topic
kafka-topics.sh --delete --zookeeper
--topic
kafka-topics.sh --list --zookeeper
kafka-topics.sh --describe--zookeeper
生产者
kafka-console-producer.sh --broker-list --topic --property
消费者:
01. 新版消费者 通过消费组协调器同一管理GroupCoordinator 去掉了对Zookeeper的依赖
消费组 group --指定消费者消费的主题
kafka-console-consumers.sh
--from-beginning
--offset
02.消费者列表 主题列表 消费组消费主题的分区偏移量--已消费的最大偏移量
消费单个主题,消费多个主题
消息的单播和多播
单播: 一条消息只能被某一个消费者消费的模式
多播: 能被多个消费者消费的模式
实现方式:消费者在不同的消费组
不存在广播:不能<所有消费者>
03.查看消费偏移量
kafka-consumer-offset-checker.sh
kafka-consumer-groups.sh
logSize 消息最大偏移量
Lag 消费者未消费的剩余量 或者已消费但是为提交而落后消费偏移量的的剩余量
提交偏移量的频率
offset.flush.interval.ms
保存偏移量的文件路径
offset.stoage.file.filename
主题订阅方式-- subscribe
subscribe() -- 通常使用ArrayList来指定消费者主题
subscribe() --指定一个监听器
subscribe()
分区:
分区迁移-- 生成分区分配方案 kafka-reassign-partitions.sh
增加分区-增加副本
连接器 standalone 和 distributed 两种工作模式
connect-standalone.sh
connect-distributed.sh
安全机制和数据备份
01.身份认证和权限控制
02.镜像操作 kafka-mirror-maker.sh
3.2 API接口
ZkUtils类
AdminUtils类
AdminUtils.createTopic() 创建主题
AdminUtils.addPartitions() 则增加分区
AdminUtils.deleteTopic()
ZkClient类
01.生产者 Producer API
创建Properties对象-设置生产者级别的配置
bootstrap.servers
key.serializer
value.serializer
根据Properties对象实例化一个KafkaProducer对象
send方法--需要实现org.apache.kafka.clients.producer.Callback()接口
返回 Future对象
关闭KafkaProducer ,释放连接的资源
多线程--KafkaProducerThread 线程类
02.消费者API
新版消费者(Java版本)和旧版消费者(Scala版本)--新版本的消费者不在强依赖Zookeeper,
偏移量保存在Kafka内部主题"__consumer_offsets"
Low-level 底层API
High-Level 高级API
KafkaConsumer
bootstrap.servers
key.serializer org.apache.kafka.common.serialization.StringDeserializer
value.serializer org.apache.kafka.common.serialization.StringDeserializer
enable.auto.commit
auto.commit.interval.ms
group.id
KafkaConsumer三种订阅主题的方式KafkaConsumer.subscribe()
subscribe( Collection(String) topics) 通常使用ArrayList
subscribe(Collection(String) topics, ConsumerRebalanceListener listener)
subscribe(Pattern pattern, ConsumerRebalanceListener listener)
偏移量
timestampsToSearch()
consumer.commitAsync(new OffsetCommitCallback())
消费速度控制
pause() 和resume()
序列化和反序列化 Avro依赖Schema来实现数据结构的定义,Schema主要有Json对象来表示
主题-分区-副本-Kafka集群和Zookeeper连接地址
主题
分区--Kafka并行处理的基本单元
副本-副本不能超过节点数
生产者
消费者
消费组 groupid
消费者消费位移确认
自动提交 enable.auto.commit
手动提交:
异步提交 commitAsync()
同步提交 commitSync()
-- 最好partiton数目是consumer数目的整数倍
enable.auto.commit=true
--自动提交的方式--自动提交是在kafka拉取到数据之后就直接提交
-- auto.offset.reset
[earliest | latest],表示将offset置到哪里
从最开始的地方消费:
automatically reset the offset to the earliest offset,自动将偏移量置为最早的
当各分区下有已提交的offset时,从提交的offset开始消费;
无提交的offset时,从头开始消费
从最新的数据消费
当各分区下有已提交的offset时,从提交的offset开始消费;
无提交的offset时,消费新产生的该分区下的数据
抛出异常-none
record 消息
3.3 Kafka核心组件和主要流程:
控制器 kafkaController
Leader
控制器选举机制和过程 --依赖于Zookeeper
协调器
消费者协调 ConsumerCoordinator
消费组协调器 GroupCoordinator
任务管理协调器 WorkCoodinator
网络通信服务
SocketServer 基于Java NIO实现的网络通信组件
KafkaRequestHandlerPool
日志管理子系统
一主题为基本单位进行组织的
4.参考:
Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
https://www.jianshu.com/p/688e1b751a85
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
https://www.tutorialspoint.com/apache_kafka/apache_kafka_integration_spark.htm