HDFS和Spark的交互

Spark使用Scala连接HDFS

总共分为三个步骤
第一步:配置包依赖Maven
第二步:配置文件的参数的设置与读写.properties 在HDFS暂时没有使用
第三步:编写程序

    //第一步:创建SparkConf对象配置一些通用的属性,并且基于这个创建一个SparkContext对象setMaster("local[2]")
    val SparkConf = new SparkConf().setMaster("local[2]").setAppName("CountTest").set("spark.executor.memory", "1g")
    //使用SparkContext对象中的textFile函数将输入文件转换为一个RDD ,SparkContext对象代表对计算机集群的一个连接
    val sc = new SparkContext(SparkConf)
    //路径前面加上file:// 表示从本地文件系统读;读取本地文本文件,创建一个字符串的RDD
    val fluxRDD = sc.textFile("file:///C:\\Items.json")
    //在路径前面加上hdfs://表示从hdfs文件系统;读取HDFS中文件参数是一个path,这个path
    // val fluxRDD = sc.textFile("hdfs://localhost:8102/15*.json")
    //hdfs://master:port/path

Spark存储对象到HDFS

保存RDD为对象

    将RDD保存为对象saveAsObjectFile()
   .objectFile()函数接收路径,返回对应的RDD

保存模型到RDD上为对象

//将上面的模型存储到hdfs---Spark环境下生成一个word2vec模型, 存储到hdfs.
ObjectInputStreamObjectOutputStream类所读写的对象必须实现Serializable接口,只能将支持 java.io.Serializable 接口的对象写入流中。对象中的transientstatic类型成员变量不会被读取和写入
可以使用ObjectOutputStream  Java 对象的基本数据类型和图形写入 OutputStream
可以使用 ObjectInputStream 读取(重构)对象。通过在流中使用文件可以实现对象的持久存储
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
val fileSystem = FileSystem.get(hadoopConf)
val path = new Path("/user/hadoop/data/mllib/word2vec-object")
val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
oos.writeObject(model)
oos.close
}

//这里示例另外一个程序直接从hdfs读取序列化对象使用模型
val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
val sample_model = ois.readObject.asInstanceOf[Word2VecModel]

  /*
    * //以将序列化文件从hdfs放到本地, scala程序使用模型
    * import java.io._
    * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
    * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
    * val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
    * ois.close
    */

Hadoop现在同时提供了新旧的两套API接口

旧版API 放在org.apache.hadoop.mapred 包中,
而新版API 则放在org.apache.hadoop.mapreduce 包及其子包中

老式

hadoopFile
saveAsHadoopFile()

新式

 hadoopDataset    saveAsHadoopDataset     
 newAPIHadoopDataset saveAsNewAPIHadoopDataset
                    saveAsNewAPIHadoopFile

val HBase_DATARDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    //脚手架语句,查看结果

参考

Spark中将对象序列化存储到hdfs https://my.oschina.net/waterbear/blog/525347

blogroll

social