Flink和Kafka以及Kafka生产者

1.Kafka版本说明:

01.Producer: Kafka 0.8.2, Producer被重新设计,
   新版生产者:
      org.apache.kafka.clients
       org.apache.kafka.clients.producer.KafkaProducer;
   旧版的生产者:
      旧版本(0.8Scala版本)  kafka.javaapi.producer.Producer;
02.Consumer
   Kafka 0.9 则重新设计了Consumer接口。它不再区分high-level
   consumer API和low-level consumer API,而是提供了一个统一的consumer API
   Kafka 消费者总共有 3 种 API,新版 API、旧版高级 API、旧版低级 API,
     新版 API 是在 kafka 0.9 版本后增加的,推荐使用新版 API,但由于旧版低级 API 可以对消息进行更加灵活的控制
  新版:
    Consumer    org.apache.kafka.clients.consumer.KafkaConsumer;
  旧版Consumer
    High Level Consumer API 入口类: ConsumerConnector  kafka.javaapi.consumer.ConsumerConnector;    
    Lower Level Consumer API入口类: SimpleConsumer     kafka.javaapi.consumer.SimpleConsumer; 
  新版 Consumer
 03.Connect -Kafka Connect
   Kafka Connect是到0.9版本才提供的并极大的简化了其他系统与Kafka的集成

2.Kafka配置信息

Kafka 提供内置客户端以及二进制连接协议
1.使用场景评估:
     01.每个消息都很重要,是否允许丢失一小部分消息
     02.出现重复消息是否可以接受
     03.是否有严格的延迟和吞吐量要求
2.依据场景进行配置
    类型,时间,大小,次数
  配置的评估
   client.id
     识别消息来源
   acks 参数指定了必须要有多少个区分副本收到消息,生产者才会认为消息写入成功
    消息丢失的可能性:
     acks=0  acks=1  acks=all
   buffer.memory
     设置生产者内存缓冲区的大小
   compression.type
      snappy gzip
   retries
     决定了生产者重发消息的次数 默认间隔时间 100ms
     retry.backoff.ms 参数来改变时间间隔
   batch.size
     同一个批次可以使用的内存大小,按照字节数计算,而不是消息个数
   linger.ms
     发送之前等待更多消息加入批次的时间
     KafkaProducer 会根据批次填满  batch.size 或者批次等待时间时间达到上限 linger.ms 将批次发送出去

   时间:
     timeout.ms
     request.timeout.ms
     matadata.fetch.timeout.ms
    阻塞时间
     max.block.ms
    大小:
     max.request.size
     receive.buffer.bytes
     send.buffer.bytes
    顺序保证:
       max.in.flight.requests.per.connection

3.开发过程:

   A: 创建生产者,生产者要把键值对象序列化成字节数组-三个必选属性
       bootstrap.servers   key.serializer  value.serializer
   B: 创建一个 ProducerRecord  对象,该对象包含目标主题和要发送的内容,还可以指定键或者分区
       两种线程: 单线程和多线程
       三种发送消息的方式:
         同步发送    send() 发送消息,返回Future对象
         异步发送    send() 方法,并指定一个回调函数
         发送并忘记: fire and forget          
   C:消息成功写入Kafka会返回一个 RecordMetadata  对象,写入失败则会返回一个错误。
      收到错误还会尝试,几次失败后,就返回错误信息。
   说明: key.serializer 实现了 org.apache.kafka.common.serialization.Serializer接口的类
      Kafka默认提供三种 StringSerializer IntegerSerializer  ByteSerializer 
  01.pom.xml依赖
     开发版本
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka-clients</artifactId>
         <version>0.8.2.2</version>
  示例代码:
    // 步骤1 新建一个 Properties对象-并配置三个必选属性
     Properties kafkaProps = new Properties();
      kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
      kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
   // 步骤2 创建新的生产者对象 KafkaProducer ,并把设置好的 Properties 对象传给它
     Producer<String, String>  producer=  new KafkaProducer<String,String>(kafkaProps);
   // 步骤3 开始发送消息,生产者将 ProducerRecord 对象作为参数发送,使用send() 方法发送
     ProducerRecord<String, String> record = new ProducerRecord<String, String>("", "", "");
     Future<RecordMetadata> kafkaFuture = producer.send(record);
  // 同步发送消息调用get 方法等待Kafka相应,运行正常则返回一个 RecordMetadata 对象
   // 一般会发生两类错误,一类是可重试错误,一类无法通过重试解决,比如消息太大等
   try {   kafkaFuture.get();
    } catch (Exception e) {
       e.printStackTrace;
    }
    // 异步发送--回调支持
    producer.send(record, new DemoProducerCallback());
    // private class  DemoProducerCallback implements Callback{ onCpmpletion(RecordMetadata )}
扩展:
  Producer端,需要自己创造多线程并发环境才能提高客户端的出口吞吐量

Flink向kafka发送数据

Kafka sink connector(FlinkKafkaProducer)
  Flink里面支持Kafka 0.8、0.9、0.10、0.11.
pom.xml依赖
 <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
   <version>${flink.version}</version>
 </dependency>
执行:
 // 添加数据源addSource
  DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
            READ_TOPIC,   //这个 kafka topic 需要和上面的工具类的 topic 一致
            new SimpleStringSchema(),
            props)).setParallelism(1);

  // 创建kafka数据流
 //  Producer 传了三个参数:brokerList、topicId、serializationSchema(序列化)
 student.addSink(new FlinkKafkaProducer011<>(
            "localhost:9092",
            "student-write",
            new SimpleStringSchema()
    )).name("flink-connectors-kafka")
            .setParallelism(5);
Flink Kafka Consumer 的运行机制

JSON作为消息

JSON - (JavaScript Object Notation)
数据类型:      
    JSON只定义了两种数据结构,即数组[]和对象{}。对象是一组键值对
    JSON的键: String
    JSON的值: 1.数字(整数或浮点数)    2.字符串(在双引号中)
         3.逻辑值(true  false 4.null
         5.数组(在方括号中)      6.对象(在花括号中)
    符号: 花括弧,方括弧,冒号和逗号
JSON解析
fasterjson 是一个JSON解析器。它直接解析JSON文本,调用注册的事件函数,快速访问JSON中感兴趣的内容
Fastjson API入口类是com.alibaba.fastjson.JSON
 eg:  JSONObject jsObj = new JSONObject();
类型:
   JSONObject  JSONArray   Object  List  String 
方法:
    parse parseObject parseArray
    toJSON  toJSONString
注意事项:
 JSONObject 是一个json
JSONArray.toJSONString() 之后不是一个json,而是json中的一个数组
JSON.toJSONString() 不可多次使用,(最好转义一次) 因为每次调用JSON.toJSONString的时候,
     keyvalue前后都会加上"的转义符\'。多次使用toJSONString了之后,json的每个keyvalue有多个转义符

参考:

Kafka权威指南
Flink 写入数据到 Kafka https://www.jianshu.com/p/c94317de9692

blogroll

social