Spark使用Scala连接HDFS
总共分为三个步骤
第一步:配置包依赖Maven
第二步:配置文件的参数的设置与读写.properties 在HDFS暂时没有使用
第三步:编写程序
//第一步:创建SparkConf对象配置一些通用的属性,并且基于这个创建一个SparkContext对象setMaster("local[2]")
val SparkConf = new SparkConf().setMaster("local[2]").setAppName("CountTest").set("spark.executor.memory", "1g")
//使用SparkContext对象中的textFile函数将输入文件转换为一个RDD ,SparkContext对象代表对计算机集群的一个连接
val sc = new SparkContext(SparkConf)
//路径前面加上file:// 表示从本地文件系统读;读取本地文本文件,创建一个字符串的RDD
val fluxRDD = sc.textFile("file:///C:\\Items.json")
//在路径前面加上hdfs://表示从hdfs文件系统;读取HDFS中文件参数是一个path,这个path
// val fluxRDD = sc.textFile("hdfs://localhost:8102/15*.json")
//hdfs://master:port/path
Spark存储对象到HDFS
保存RDD为对象
将RDD保存为对象saveAsObjectFile()
.objectFile()函数接收路径,返回对应的RDD
保存模型到RDD上为对象
//将上面的模型存储到hdfs---Spark环境下生成一个word2vec模型, 存储到hdfs.
ObjectInputStream与ObjectOutputStream类所读写的对象必须实现Serializable接口,只能将支持 java.io.Serializable 接口的对象写入流中。对象中的transient和static类型成员变量不会被读取和写入
可以使用ObjectOutputStream 将 Java 对象的基本数据类型和图形写入 OutputStream。
可以使用 ObjectInputStream 读取(重构)对象。通过在流中使用文件可以实现对象的持久存储
val hadoopConf = sc.hadoopConfiguration
hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
val fileSystem = FileSystem.get(hadoopConf)
val path = new Path("/user/hadoop/data/mllib/word2vec-object")
val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
oos.writeObject(model)
oos.close
}
//这里示例另外一个程序直接从hdfs读取序列化对象使用模型
val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
/*
* //以将序列化文件从hdfs放到本地, scala程序使用模型
* import java.io._
* import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
* val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
* val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
* ois.close
*/
Hadoop现在同时提供了新旧的两套API接口
旧版API 放在org.apache.hadoop.mapred 包中,
而新版API 则放在org.apache.hadoop.mapreduce 包及其子包中
老式
hadoopFile
saveAsHadoopFile()
新式
hadoopDataset saveAsHadoopDataset
newAPIHadoopDataset saveAsNewAPIHadoopDataset
saveAsNewAPIHadoopFile
val HBase_DATARDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
//脚手架语句,查看结果
参考
Spark中将对象序列化存储到hdfs https://my.oschina.net/waterbear/blog/525347