Published: 2019-11-24 19:18:00
By ytwan
In Big Data .
tags: Flink
Spark中的Row
public interface Row extends scala.Serializable Represents one row of output from a relational operator.
Allows both generic access by ordinal,
which will incur boxing overhead for primitives,
as well as native primitive access.
It is invalid to use the native primitive interface to retrieve a value that is null,
instead a user must check isNullAt before attempting to retrieve a value that might be null.
To create a new Row, use RowFactory.create() in Java or Row.apply() in Scala.
// Row newRow = new GenericRowWithSchema(data.toSeq(), schema);
JavaRDD案例
-- JavaRDD --AbstractJavaRDDLike --JavaRDDLike
JavaRDD[T]
JavaPairRDD mapToPair -> JavaPairRDD
01.foreach
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Integer> javaRDD = sparkContext.parallelize(data,3);
javaRDD.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});
02.map
implements Function<T, T>
javaexampleRdd.map(new Function<Row, JSONObject>() {
Override
public JSONObject call(Row row) throws Exception {}
}
def map[R](f: JFunction[T, R]): JavaRDD[R] =
new JavaRDD(rdd.map(f)(fakeClassTag))(fakeClassTag)
其中JFunction 即为 函数接口 Function
public interface Function<T1, R> extends Serializable {
R call(T1 v1) throws Exception;
}
03.filter
JavaRDD .filter(new Function<Row, Boolean>())
04. flatMap
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala
JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
Dataset 案例
01.foreach 使用匿名类
dataTemp1.foreach(new ForeachFunction<Row > (){
@Override
public void call(Row r) throws Exception {
Integer Daya = r.getAs("Name");
System.out.println(Daya.toString());
}
});
01.map 使用匿名类
Dataset<Integer> dat = dataTemp1.map(
new MapFunction<Row , Integer > (){
@Override
public Integer call(Row r) throws Exception {
Integer Daya = r.getAs("Name");
System.out.println(Daya.toString());
return Daya;
}
},Encoders.INT());
02. map 使用具名类
dataTemp1.map(new DealMapFunction(),Encoders.INT());
其中
public class DealMapFunction implements MapFunction<Row ,Integer > {
@Override
public Integer call(Row r) throws Exception {
Integer Daya = r.getAs("Name");
System.out.println(Daya.toString());
return Daya;
}
}
03.map 使用lambda表达式 >=Java 1.8
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
(MapFunction<Row , String > ) row -> "Name: " + row.getString(0),
stringEncoder);
Filter
DataSet< > .filter( FilterFunction<Row> {} )
def filter(func: FilterFunction[T]): Dataset[T]
表达式以及列的形式
def filter(func: (T) ⇒ Boolean): Dataset[T]
def filter(conditionExpr: String): Dataset[T]
def filter(condition: Column): Dataset[T]
FlatMap
def flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] = {
val func: (T) => Iterator[U] = x => f.call(x).asScala
flatMap(func)(encoder)
}
参考: