1.Kafka版本说明:
01.Producer: Kafka 0.8.2, Producer被重新设计,
新版生产者:
org.apache.kafka.clients
org.apache.kafka.clients.producer.KafkaProducer;
旧版的生产者:
旧版本(0.8Scala版本) kafka.javaapi.producer.Producer;
02.Consumer
Kafka 0.9 则重新设计了Consumer接口。它不再区分high-level
consumer API和low-level consumer API,而是提供了一个统一的consumer API
Kafka 消费者总共有 3 种 API,新版 API、旧版高级 API、旧版低级 API,
新版 API 是在 kafka 0.9 版本后增加的,推荐使用新版 API,但由于旧版低级 API 可以对消息进行更加灵活的控制
新版:
Consumer org.apache.kafka.clients.consumer.KafkaConsumer;
旧版Consumer
High Level Consumer API 入口类: ConsumerConnector kafka.javaapi.consumer.ConsumerConnector;
Lower Level Consumer API入口类: SimpleConsumer kafka.javaapi.consumer.SimpleConsumer;
新版 Consumer
03.Connect -Kafka Connect
Kafka Connect是到0.9版本才提供的并极大的简化了其他系统与Kafka的集成
2.Kafka配置信息
Kafka 提供内置客户端以及二进制连接协议
1.使用场景评估:
01.每个消息都很重要,是否允许丢失一小部分消息
02.出现重复消息是否可以接受
03.是否有严格的延迟和吞吐量要求
2.依据场景进行配置
类型,时间,大小,次数
配置的评估
client.id
识别消息来源
acks 参数指定了必须要有多少个区分副本收到消息,生产者才会认为消息写入成功
消息丢失的可能性:
acks=0 acks=1 acks=all
buffer.memory
设置生产者内存缓冲区的大小
compression.type
snappy gzip
retries
决定了生产者重发消息的次数 默认间隔时间 100ms
retry.backoff.ms 参数来改变时间间隔
batch.size
同一个批次可以使用的内存大小,按照字节数计算,而不是消息个数
linger.ms
发送之前等待更多消息加入批次的时间
KafkaProducer 会根据批次填满 batch.size 或者批次等待时间时间达到上限 linger.ms 将批次发送出去
时间:
timeout.ms
request.timeout.ms
matadata.fetch.timeout.ms
阻塞时间
max.block.ms
大小:
max.request.size
receive.buffer.bytes
send.buffer.bytes
顺序保证:
max.in.flight.requests.per.connection
3.开发过程:
A: 创建生产者,生产者要把键值对象序列化成字节数组-三个必选属性
bootstrap.servers key.serializer value.serializer
B: 创建一个 ProducerRecord 对象,该对象包含目标主题和要发送的内容,还可以指定键或者分区
两种线程: 单线程和多线程
三种发送消息的方式:
同步发送 send() 发送消息,返回Future对象
异步发送 send() 方法,并指定一个回调函数
发送并忘记: fire and forget
C:消息成功写入Kafka会返回一个 RecordMetadata 对象,写入失败则会返回一个错误。
收到错误还会尝试,几次失败后,就返回错误信息。
说明: key.serializer 实现了 org.apache.kafka.common.serialization.Serializer接口的类
Kafka默认提供三种 StringSerializer IntegerSerializer ByteSerializer
01.pom.xml依赖
开发版本
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
示例代码:
// 步骤1 新建一个 Properties对象-并配置三个必选属性
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 步骤2 创建新的生产者对象 KafkaProducer ,并把设置好的 Properties 对象传给它
Producer<String, String> producer= new KafkaProducer<String,String>(kafkaProps);
// 步骤3 开始发送消息,生产者将 ProducerRecord 对象作为参数发送,使用send() 方法发送
ProducerRecord<String, String> record = new ProducerRecord<String, String>("", "", "");
Future<RecordMetadata> kafkaFuture = producer.send(record);
// 同步发送消息调用get 方法等待Kafka相应,运行正常则返回一个 RecordMetadata 对象
// 一般会发生两类错误,一类是可重试错误,一类无法通过重试解决,比如消息太大等
try { kafkaFuture.get();
} catch (Exception e) {
e.printStackTrace;
}
// 异步发送--回调支持
producer.send(record, new DemoProducerCallback());
// private class DemoProducerCallback implements Callback{ onCpmpletion(RecordMetadata )}
扩展:
Producer端,需要自己创造多线程并发环境才能提高客户端的出口吞吐量
Flink向kafka发送数据
Kafka sink connector(FlinkKafkaProducer)
Flink里面支持Kafka 0.8、0.9、0.10、0.11.
pom.xml依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
执行:
// 添加数据源addSource
DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
READ_TOPIC, //这个 kafka topic 需要和上面的工具类的 topic 一致
new SimpleStringSchema(),
props)).setParallelism(1);
// 创建kafka数据流
// Producer 传了三个参数:brokerList、topicId、serializationSchema(序列化)
student.addSink(new FlinkKafkaProducer011<>(
"localhost:9092",
"student-write",
new SimpleStringSchema()
)).name("flink-connectors-kafka")
.setParallelism(5);
Flink Kafka Consumer 的运行机制
JSON作为消息
JSON - (JavaScript Object Notation)
数据类型:
JSON只定义了两种数据结构,即数组[]和对象{}。对象是一组键值对
JSON的键: String
JSON的值: 1.数字(整数或浮点数) 2.字符串(在双引号中)
3.逻辑值(true 或 false) 4.null
5.数组(在方括号中) 6.对象(在花括号中)
符号: 花括弧,方括弧,冒号和逗号
JSON解析
fasterjson 是一个JSON解析器。它直接解析JSON文本,调用注册的事件函数,快速访问JSON中感兴趣的内容
Fastjson API入口类是com.alibaba.fastjson.JSON
eg: JSONObject jsObj = new JSONObject();
类型:
JSONObject JSONArray Object List String
方法:
parse parseObject parseArray
toJSON toJSONString
注意事项:
JSONObject 是一个json
JSONArray.toJSONString() 之后不是一个json,而是json中的一个数组
JSON.toJSONString() 不可多次使用,(最好转义一次) 因为每次调用JSON.toJSONString的时候,
在key和value前后都会加上"的转义符\'。多次使用toJSONString了之后,json的每个key和value有多个转义符
参考:
Kafka权威指南
Flink 写入数据到 Kafka https://www.jianshu.com/p/c94317de9692