Flink和Spark高阶函数flatMap

Java中的数据结构

01.List
   private static class  ArrayList<E> extends AbstractList<E> implements RandomAccess, java.io.Serializable
   public abstract class AbstractList<E> extends AbstractCollection<E> implements List<E> {}
   public abstract class AbstractCollection<E> implements Collection<E> {}
   public interface List<E> extends Collection<E> {}   
02.Array 类和 Arrays类
   数组类 Array 
       public final class Array extends Object
           元素的类型必须相同。
           它无法判断其中实际存有多少元素, length只是告诉我们array的容量
   静态类 Arrays 
     public class Arrays extends Object
           静态类专门用来操作array ,提供搜索、排序、复制等静态方法
               equals() sort()
            Arrays.asList(array):将数组array转化为List
            Arrays.asList()  public static <T> List<T> asList(T... a) {  return new ArrayList<>(a); }
   共同点//Array和Arrays类都是不能被实例化的,而是提供了很多静态的方法直接用
 03.Scala和Java之间
  scala.collection.Seq         => java.util.List
  scala.collection.mutable.Seq => java.util.List

Spark

1.案例
   JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
   JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(s -> {
     String[] parts = SPACES.split(s);
     return new Tuple2<>(parts[0], parts[1]);
   }).distinct().groupByKey().cache();    
 案例02
// Generate running word count
Dataset<Row> wordCounts = lines.flatMap(
    (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
    Encoders.STRING()).groupBy("value").count();
2.具体解释:-源码
    /**
  *  Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
  */
 def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
   val cleanF = sc.clean(f)
   new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
    }
 def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
   private var cur: Iterator[B] = empty
   private def nextCur() { cur = f(self.next()).toIterator }
   def hasNext: Boolean = {
     // Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext }
     // but slightly shorter bytecode (better JVM inlining!)
     while (!cur.hasNext) {
       if (!self.hasNext) return false
       nextCur()
     }
     true
   }
   def next(): B = (if (hasNext) cur else empty).next()
 }
 其他操作
  def collect(): JList[T] = rdd.collect().toSeq.asJava

 * @return an iterator over the elements in this list  in proper sequence  */
   Iterator<E> iterator();  使用方法iterator()要求容器返回一个 Iterator
 使用具名函数或者匿名类 -- 源码
   FlatMapFunction<LineWithTimestamp, Event> linesToEvents =
  new FlatMapFunction<LineWithTimestamp, Event>() {
    @Override
    public Iterator<Event> call(LineWithTimestamp lineWithTimestamp) throws Exception {
      ArrayList<Event> eventList = new ArrayList<Event>();
      for (String word : lineWithTimestamp.getLine().split(" ")) {
        eventList.add(new Event(word, lineWithTimestamp.getTimestamp()));
      }
      return eventList.iterator();
    }
  };

// Split the lines into words, treat words as sessionId of events
Dataset<Event> events = lines
    .withColumnRenamed("value", "line")
    .as(Encoders.bean(LineWithTimestamp.class))
    .flatMap(linesToEvents, Encoders.bean(Event.class));
源函数- 源码
 public interface FlatMapFunction<T, R> extends Serializable {
 Iterator<R> call(T t) throws Exception;
}
 return result.iterator();

Flink

public abstract void flatMap(IN value, Collector<OUT> out) throws Exception;
案例:
 01.   //使用具名类
 DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter(" ").types(Long.class, Long.class).flatMap(new UndirectEdge());

public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
@Override
public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
        invertedEdge.f0 = edge.f1;
        invertedEdge.f1 = edge.f0;
        out.collect(edge);
        out.collect(invertedEdge);
    }
}
02.//使用匿名类
 .flatMap(new FlatMapFunction<String, WordWithCount>() {
                @Override
                public void flatMap(String value, Collector<WordWithCount> out) {
                    for (String word : value.split("\\s")) {
                        out.collect(new WordWithCount(word, 1L));
                    }
                }
            })
03.//使用lambda表达式, collector数据类型的必须要声明
// collector type must be declared
   input.flatMap((Integer number, Collector<String> out) -> {
       StringBuilder builder = new StringBuilder();
       for(int i = 0; i < number; i++) {
           builder.append("a");
           out.collect(builder.toString());
       }
   })
   // provide type information explicitly
   .returns(Types.STRING)
 其中:
 /** 源码
 * Collects a record and forwards it. The collector is the "push" counterpart of the
 * {@link java.util.Iterator}, which "pulls" data in.
 */
@Public
public interface Collector<T> {

    /**
     * Emits a record.
     *
     * @param record The record to collect.
     */
    void collect(T record);

    /**
     * Closes the collector. If any data was buffered, that data will be flushed.
     */
    void close();
}

功能说明:

Java:
   forEach 循环
   JAVA 8的 java.util.function 
    Java8 添加了一个新的特性Function 标注了@FunctionalInterface注解的接口都是函数式接口
    Function    Function< T, R >    接收T对象,返回R对象 传入一个值经过函数的计算返回另一个值
Spark
    /** map    * Return a new RDD by applying a function to all elements of this RDD.
    /** flatMap  Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
    /** filter   * Return a new RDD containing only the elements that satisfy a predicate.
   foreach
Flink
    /**
 * The mapping method. Takes an element from the input data set and transforms it into exactly one element.
 * The flatMap method. Takes an element from the input data set and transforms it into zero, one, or more elements.
  Each FlatMapFunction call can return any number of elements including none.

 *  filter retains only those element for which the function returns true. Elements for which the function returns false are filtered.

  return transform("Map", outType, new StreamMap<>(clean(mapper)));
  return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
  return transform("Filter", getType(), new StreamFilter<>(clean(filter)));
  return transform("Process", outputType, operator);

参考:

Flink源码1.8与Spark源码2.3

blogroll

social