1.1概念介绍
Kafka 是 linkedin 用于日志处理的分布式消息队列,同时支持离线和在线日志处理。
kafka 对消息保存时
根据 Topic 进行归类,
发送消息者成为 Producer,
消息接受者成为 Consumer
此外 kafka 集群由多个 kafka 实例组成,
每个实例 (server) 称为 broker。
无论是 kafka 集群,还是 producer 和 consumer
都依赖于 zookeeper 来保证系统可用性,
为集群保存一些 meta 信息。
1.2 Kafka应用场景
1. 日志收集
日志收集方面,其实开源产品有很多,包括 Scribe、Apache Flume。
很多人使用 Kafka 代替日志聚合(log aggregation)。
日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或 HDFS)进行处理。
然而 Kafka忽略掉文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。
这就让 Kafka 处理过程延迟更低,更容易支持多数据源和分布式数据处理。
比起以日志为中心的系统比如 Scribe 或者 Flume 来说,Kafka 提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟。
2. 行为跟踪--流量跟踪
Kafka 的另一个应用场景是跟踪用户浏览页面、搜索及其他行为,以发布-订阅的模式实时记录到对应的 topic 里。
那么这些结果被订阅者拿到后,就可以做进一步的实时处理,或实时监控,或放到 Hadoop 离线数据仓库里处理。
3. 持久性日志(commit log)
Kafka 可以为一种外部的持久性日志的分布式系统提供服务。
这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。
Kafka 中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka 类似于 Apache BookKeeper 项目。
1.3 Kafka基本概念
Topic:特指 Kafka 处理的消息源(feeds of messages)的不同分类。
Partition:Topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)。
Message:消息,是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。
Producers:消息和数据生产者,向 Kafka 的一个 topic 发布消息的过程叫做 producers。
Consumers:消息和数据消费者,订阅 topics 并处理其发布的消息的过程叫做 consumers。
Broker:缓存代理,Kafka 集群中的一台或多台服务器统称为 broker。
设计原理
Kafka 的设计初衷是希望作为一个统一的信息收集平台,
能够实时的收集反馈信息,并需要能够支撑较大的数据量,且具备良好的容错能力。
2.1 Kafka 的 Topics/Log
一个Topic 可以认为是一类消息,每个 topic 将被分成多 partition (区),每个partition在存储层面是 append log 文件。
任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部,每条消息在文件中的位置称 offset(偏移量),partition 是以文件的形式存储在文件系统中。
Logs 文件根据 broker 中的配置要求,保留一定时间后删除来释放磁盘空间。
Partition:
Topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
partition 中的每条消息都会被分配一个有序的 id(offset)。
2.2 Kafka的储存策略
kafka以 topic 来进行消息管理,每个 topic 包含多个 partition,每个 part 对应一个逻辑 log,有多个 segment 组成。
每个 segment 中存储多条消息(见下图),消息 id 由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。
broker 收到发布消息往对应 partition 的最后一个 segment 上添加该消息。
每个 part 在内存中对应一个index,记录每个 segment 中的第一条消息偏移。
发布者发到某个 topic 的 消息会被均匀的分布到多个 part 上(随机或根据用户指定的回调函数进行分布),
broker 收到发布消息往对应 part 的最后一个 segment 上添加 该消息,
当某个 segment 上的消息条数达到配置值或消息发布时间超过阈值时,segment 上的消息会被 flush 到磁盘,
只有 flush 到磁盘上的消息订阅者才能订阅到,segment 达到一定的大小后将不会再往该 segment 写数据,broker 会创建新的 segment。
2.3 Kafka的消息发送的流程
由于 kafka broker 会持久化数据,broker 没有内存压力,因此,consumer 非常适合采取 pull 的方式消费数据 Producer 向 Kafka(push)推数据,consumer 从 kafka 拉(pull)数据。
2.4 Kafka 的 Zookeeper 协调控制
管理 broker 与 consumer 的动态加入与离开。
触发负载均衡,当 broker 或 consumer 加入或离开时会触发负载均衡算法,使得一个 consumer group 内的多个 consumer 的订阅负载平衡。
维护消费关系及每个 partion 的消费信息
Zookeeper上的细节:
每个 broker 启动后会在 zookeeper 上注册一个临时的 broker registry,包含 broker 的 ip 地址和端口号,所存储的 topics 和 partitions 信息。
每个 consumer 启动后会在 zookeeper 上注册一个临时的 consumer registry:包含 consumer 所属的consumer group 以及订阅的 topics。
每个 consumer group 关 联一个临时的 owner registry 和一个持久的 offset registry。对于被订阅的每个 partition 包含一个 owner registry,
内容为订阅这个 partition 的 consumer id;同时包含一个 offset registry,内容为上一次订阅的 offset。
lead broker
和Flume比较
Kafka 是一个可持久化的分布式的消息队列,你可以有许多生产者和很多的消费者共享多个主题 Topics。
相比之下,Flume 是一个专用工具被设计为旨在往 HDFS,HBase 发送数据。它对 HDFS 有特殊的优化,并且集成了 Hadoop 的安全特性。
所以,Cloudera 建议如果数据被多个系统消费的话,使用 kafka;如果数据被设计给 Hadoop 使用,使用 Flume。
Flume 和 Kafka 可以很好地结合起来使用。如果你的设计需要从 Kafka 到 Hadoop 的流数据,
使用 Flume 代理并配置 Kafka 的 Source 读取数据也是可行的,没有必要实现自己的消费者。
可以直接利用 Flume 与 HDFS 及 HBase 的结合的所有好处。
你可以使用 Cloudera Manager 对消费者的监控,并且你甚至可以添加拦截器进行一些流处理。
Flume 和 Kafka 可以结合起来使用。通常会使用 Flume + Kafka 的方式。
其实如果为了利用 Flume 已有的写 HDFS 功能,也可以使用 Kafka + Flume 的方式。
Kafka的使用
1.使用Kafka步骤
任务:大数据开发,需要进行大量日志传输--数据传输的事务
如何使用 scala编写生产者消费者并实践操作
技术:
01.消息队列技术
点对点通讯--一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构
多点广播
发布/订阅 (Publish/Subscribe) 模式
消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息
发送者和接收者之间的耦合关系变得更为松散。Kafka是一种分布式的发布/订阅消息系统
生产者负责产生消息,在中间加入持久化层——broker,生产者把数据存放在broker中,消费者从broker中取数据
02.数据来源(sources)和多个数据目的地
Kafka将消息以topic为单位进行归纳
Kafka提供了Java客户端,并且对多种语言都提供了支持
可靠-------有三种方式
At least once。至少一次。消息绝不会丢失,但有可能重新发送
At most once。最多一次。消息可能丢失,但永远不会重发。通过设置Producer异步提交可以实现
Exactly once。传递一次且仅一次。要求利用外部存储系统配合Kafka的offset来保证。
高可用----kafka引入replication和leader机制来实现
高吞吐量---依赖于OS文件系统的页缓存、sendfile技术和线性读写磁盘--磁盘顺序写---消息压缩
场景:
-再牛逼的语法解决不了傻逼的业务和逻辑需求
主要使用场景是处理网站活动日志
2.案例式--使用说明
前提条件--zookeeper 和kafka都启动了
start zookeeper
start kafka with default configuration
01.创建一个主题
create a topic---名为test_topic的topic,有10个分区,每个分区需分配3个副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 10 --topic test_topic
02.打包程序--打包程序
package this example
//先运行kafkaProducer ,再运行kafkaConsumer,即可得到生产者的数据
03.运行生产者
Run the Producer
java -cp kafka_example-0.1.0-SNAPSHOT.jar com.colobu.kafka.ScalaProducerExample 10000 test_topic localhost:9092
04.运行消费者
Run the Consumer
java -cp kafka_example-0.1.0-SNAPSHOT.jar com.colobu.kafka.ScalaConsumerExample localhost:9092 group1 test_topic 10 0
3.进一步深入
01.创建主题说明:
参数说明:
partitions分区数
replication-factor副本
过程说明:
后台逻辑会监听zookeeper下对应的目录节点,
一旦发起topic创建命令,该命令会创建新的数据节点从而触发后台的创建逻辑。
:1. 确定分区副本的分配方案(就是每个分区的副本都分配到哪些broker上);
2. 创建zookeeper节点,把这个方案写入/brokers/topics/<topic>节点下
Kafka的controller负责提供的
查看bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic
Topic。是划分Message的逻辑概念
Producer发送消息到达Broker后,消息按照Topic提交到Partition
02.生产者
数据输入:
三种流程
同步发送流程
异步发送流程
更新元信息流程
03.消费者--consumer
消费者可以使用相同的 group.id 加入群组
一个组的最大并行度是组中的消费者数量←不是分区。
Kafka将主题的分区分配给组中的使用者,以便每个分区仅由组中的一个使用者使用。
Kafka保证消息只能被组中的一个消费者读取。
消费者可以按照消息存储在日志中的顺序查看消息
producer将消息推送到broker,consumer从broker拉取消息
consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息。
customer拥有了offset的控制权,可以向后回滚去重新消费之前的消息
数据输出:
消费者拿到后,就可以做进一步的实时处理,或实时监控,或放到 Hadoop 离线数据仓库里处理
4.方案
1. Kafka -> Flume –> Hadoop Hdfs
2. Kafka -> Kafka-connect-hdfs -> Hadoop Hdfs
参考:
Kafka如何创建topic? http://www.cnblogs.com/huxi2b/p/5923252.html
Kafka学习整理八(topic管理) http://blog.csdn.net/LOUISLIAOXH/article/details/51597173
kafka生产者原理详解 http://kaimingwan.com/post/kafka/kafkasheng-chan-zhe-yuan-li-xiang-jie
Kafka学习(2)——理解Kafka http://www.2cto.com/database/201611/561486.html
Kafka快速上手教程 https://www.shiyanlou.com/courses/785/labs/2657/document
apache kafka系列之源码分析走读-kafka内部模块分析 http://blog.csdn.net/lizhitao/article/details/37911993