Flink-scala实现批计算和流计算

Flink 的计算

离线计算--任务调度-定时调度任务来处理数据
流失任务-- 属于常驻进程任务
Specifying Keys
    在Flink中有许多函数需要我们为其指定key,比如groupBy,Join中的where等
    要求在一组元素上定义一个key,只是被定义在实际数据之上的函数,以指导分组算子使用
    四种选择的方式
            指定keyTuple中的Field Position
            指定key选择函数(Key Selector Functions)
            一个或多个字段位置键(field position keys) ,这个仅仅对Tuple类型的DataSet有效)
            Case Class中的字段--DataSet转换成case class,实现和转换成Tuple1类似

接口类型一

Java- DataStream DataSet||Scala
流计算 --import org.apache.flink.streaming.api.scala.extensions._
    //1.环境设置- StreamExecutionEnvironment 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //容错设置- Fault Tolerance-n是以毫秒为单位的检查点间隔
    env.enableCheckpointing(5000) 
    //2.读取数据--获取的预定义Stream  Data Source  
    val text = env.socketTextStream("localhost", 9999)
    //3.数据处理
    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)
    //4.输出数据

    //5.触发执行 trigger the program execution by calling execute() 
        env.execute("Window WordCount");
批量计算 -- import org.apache.flink.api.scala.extensions._ 
    //1.环境设置- ExecutionEnvironment 
    val env = ExecutionEnvironment.getExecutionEnvironment
    //2.读取数据-获取的预定义   Data Sources
    val text = env.readTextFile("/path/to/file")
    //3.数据处理--Transformation
    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
     .map { (_, 1) }
     .groupBy(0)
      .sum(1)
    //4.输出数据--Sink
    counts.writeAsCsv(outputPath, "\n", " ")

接口类型二- the Table API and SQL

0.Maven 设置
1.代码
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    // input tables and output tables--// register a Table
    tableEnv.registerTable("table1", ...)    
    // create a Table from a Table API query
    val tapiResult = tableEnv.scan("table1").select(...)
     // Create a Table from a SQL query
    val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")

    // emit a Table API result Table to a TableSink, same for SQL result
    // create a TableSink
    val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
     tapiResult.writeToSink(sink)
2.批量计算 
  val bEnv = ExecutionEnvironment.getExecutionEnvironment
  val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
3.说明:两种层级的API的交互
    A DataStream or DataSet can be registered in a TableEnvironment as a Table
    Convert a DataStream or DataSet into a Table
    Convert a Table into a DataStream or DataSet

接口类型三:SQL

QL Client -- Flink’s Table & SQL API makes it possible to work with queries written in the SQL language
支持两种方式
     supports two modes for maintaining and visualizing results. 命令:SET execution.result-mode=
         table mode 表模式
         changelog mode 模式

常见流式计算

01.多流关联--流式计算-数据到达是一个增量的过程,并且数据到达的时间不确定和无需,
   因此在数据处理过程中涉及到中间状态的保持和恢复机制等问题
02.维度表的使用
    全量加载和增量加载
03.任务优化

参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html
https://github.com/apache/flink/tree/master/flink-examples

blogroll

social