Spark在不同数据层级高阶函数

Spark中的Row

public interface Row extends scala.Serializable Represents one row of output from a relational operator.
 Allows both generic access by ordinal, 
 which will incur boxing overhead for primitives, 
            as well as native primitive access.
It is invalid to use the native primitive interface to retrieve a value that is null, 
instead a user must check isNullAt before attempting to retrieve a value that might be null.
To create a new Row, use RowFactory.create() in Java  or Row.apply() in Scala.

 // Row newRow  = new GenericRowWithSchema(data.toSeq(), schema);

JavaRDD案例

-- JavaRDD --AbstractJavaRDDLike --JavaRDDLike  
 JavaRDD[T]
 JavaPairRDD   mapToPair -> JavaPairRDD
01.foreach
 List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
    JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
    JavaRDD<Integer> javaRDD = sparkContext.parallelize(data,3);
    javaRDD.foreach(new VoidFunction<Integer>() {
        @Override
        public void call(Integer integer) throws Exception {
            System.out.println(integer);
        }
    });
02.map
         implements Function<T, T>
    javaexampleRdd.map(new Function<Row, JSONObject>() {
        Override
        public JSONObject call(Row row) throws Exception {}
     }

       def map[R](f: JFunction[T, R]): JavaRDD[R] =
         new JavaRDD(rdd.map(f)(fakeClassTag))(fakeClassTag)
     其中JFunction 即为 函数接口 Function  
        public interface Function<T1, R> extends Serializable {
          R call(T1 v1) throws Exception;
         }

03.filter
   JavaRDD   .filter(new Function<Row, Boolean>())

04. flatMap
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala
JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
 }

Dataset 案例

01.foreach 使用匿名类
  dataTemp1.foreach(new ForeachFunction<Row >(){
        @Override
        public void call(Row r) throws Exception {
            Integer Daya = r.getAs("Name");
            System.out.println(Daya.toString());
        }
    });

 01.map 使用匿名类
 Dataset<Integer> dat =  dataTemp1.map(
    new MapFunction<Row, Integer>(){
    @Override
    public Integer call(Row r) throws Exception {
        Integer Daya = r.getAs("Name");
        System.out.println(Daya.toString());
        return Daya;
          }
      },Encoders.INT());

 02. map 使用具名类
    dataTemp1.map(new DealMapFunction(),Encoders.INT());
    其中
     public class DealMapFunction implements MapFunction<Row,Integer > {
        @Override
         public Integer call(Row r) throws Exception {
            Integer Daya = r.getAs("Name");
            System.out.println(Daya.toString());
            return Daya;
         }
     }


 03.map 使用lambda表达式  >=Java 1.8
   Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
    stringEncoder);

Filter
   DataSet<>  .filter( FilterFunction<Row> {}  )
    def filter(func: FilterFunction[T]): Dataset[T]
   表达式以及列的形式
   def filter(func: (T) ⇒ Boolean): Dataset[T]
    def filter(conditionExpr: String): Dataset[T]
    def filter(condition: Column): Dataset[T]

FlatMap
  def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
   val func: (T) => Iterator[U] = x => f.call(x).asScala
    flatMap(func)(encoder)
  }

参考:

Spark源代码

blogroll

social