Spark的数据读入和写出
Spark的数据读入和写出
1.调用parallelize函数直接从集合中获取数据,并存入RDD中
2.从文本中读取数据到RDD中
文件类型:这个文本可以是纯文本文件、CSV、sequence文件,JSON、protocol buffer以及对象文件
文件系统:本地(file://)、HDFS(hdfs://)上,或者在S3上
3.读取数据库中的数据
HBase数据库
Java连接的数据库:Orcale、MySQL
4.其他模块-Spark SQL
文件系统读写
//第一步:创建SparkConf对象配置一些通用的属性,并且基于这个创建一个SparkContext对象setMaster("local[2]")
val SparkConf = new SparkConf().setMaster("local[2]").setAppName("CountingOrderTest").set("spark.executor.memory", "1g")
//使用SparkContext对象中的textFile函数将输入文件转换为一个RDD ,SparkContext对象代表对计算机集群的一个连接
val sc = new SparkContext(SparkConf)
//从不同的文件系统中读取数据
//路径前面加上file:// 表示从本地文件系统读;读取本地文本文件,创建一个字符串的RDD
val fluxRDD = sc.textFile("file:///C:\\Data\\my.json")
//在路径前面加上hdfs://表示从hdfs文件系统;读取HDFS中文件参数是一个path,这个path
// val fluxRDD = sc.textFile("hdfs://172.15.106.21:80/center/my*.json")
//在路径前面加上s3n://表示从Amazon S3 文件系统;读取HDFS中文件参数是一个path,这个path
// val fluxRDD = sc.textFile("s3n://bucket/center/my.txt)
从数据库中读写数据
0.数据表的准备:
Hbase表创建:
hbase shell
》 create 'SPARK_DATA',{NAME=>'it',COMPRESSION=>'SNAPPY'}
创建HBase表可以指定压缩格式
#查看Hbase中数据:
list
describe 'mytest'
scan 'mytest',{LIMIT=>5}
count 'mytest', {INTERVAL => 10000, CACHE => 50000}
MySQL创建表<用户名-数据库-表>
creat databae sparktest
use sparktest
create table test{
static_date DATE,
ball_name VARCHAR(50) NOT NULL,
ball_cost DOUBLE,
ball_people INT,
PRIMARY KEY(static_date,ball_name)
};
Orcale创建表
1.配置文件以及依赖
#Hbase连接配置
hbase.zookeeper.quorum=*,*.*.*.
hbase.zookeeper.znode.parent=/hbase-unsecure
hbase.zookeeper.property.clientPort=2181
hbase.master.port=
配置的内容查看的位置:Ambari的Hbase Service中-Configs--Advanced--Advanced hbase-site
#Oracle数据库配置
rdb.URL.ORCALE=jdbc:oracle:
rdb.USERNAME.ORCALE=
rdb.PASSWORD.ORCALE=
#MySQL数据库的配置,其中用户名是 ,数据库
##rdb.URL.MySQL=jdbc:mysql://localhost/test
rdb.URL.MySQL=jdbc:mysql://172.16.110.36:3306/testbd
rdb.USERNAME.MySQL=spark
rdb.PASSWORD.MySQL=**
#注意
依赖包的问题是通过Maven工程自动加载解决
Orcale数据库依赖问题,Maven没有得到授权,所以处理方式可参考Spark开发流程中的解决办法
2.读取配置文件
首先将配置文件放置并将配置文件关联。关联方式根据不同的开发工具自行确定
以下函数读取配置文件,包依赖
import java.io.File
import java.util.Properties
import org.apache.commons.lang.SystemUtils
/**
* 取得通用配置文件属性
*/
def getProperties(key : String) = {
val prop = new Properties()
val in = if (!SystemUtils.IS_OS_WINDOWS) {
this.getClass.getClassLoader.getResourceAsStream("config" + File.separator + "config.properties")
} else {
this.getClass.getClassLoader.getResourceAsStream("config" + File.separator + "config.properties")
}
prop.load(in)
prop.getProperty(key)
}
3.建立连接---操作<读入和写出>---关闭连接
/**
* 建立oracle数据库连接
*/
def createOracleConnection() = {
Class.forName("oracle.jdbc.OracleDriver").newInstance()
DriverManager.getConnection(getProperties("rdb.URL.ORCALE"),
getProperties("rdb.USERNAME.ORCALE"),
getProperties("rdb.PASSWORD.ORCALE"))
}
/**
* 建立mysql数据库连接
*/
def getMysqlConnection : Connection = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection( getProperties("rdb.URL.MySQL"),
getProperties("rdb.USERNAME.MySQL"),
getProperties("rdb.PASSWORD.MySQL"))
}
/**
* 建立Hbase数据库连接
*/
//读Hbase数据库
//Spark读取HBase,主要使用SparkContext 提供的newAPIHadoopRDDAPI将表的内容以 RDDs 的形式加载到 Spark 中。
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getProperties("hbase.zookeeper.quorum"))
hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, getProperties("hbase.zookeeper.znode.parent"))
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, getProperties("hbase.zookeeper.property.clientPort"))
hbaseConf.set(HConstants.MASTER_PORT, getProperties("hbase.master.port"))
//设置查询的表名
hbaseConf.set(TableInputFormat.INPUT_TABLE, getProperties("hbase.unix.test"))
//通过Hadoop输入格式 访问HBase,输入数据类型是HBase,输入命令是调用SparkContext.newAPTHadoopRDD
//调用命令中的参数hbaseConf包含了Zookeeper设置以及Hbase设置,TableInputFormat中包含了多个可以用来优化对Hbase的读取设置项
//返回值类型是键值对,其中键的类型是ImmutableBytesWritable,值得类型是Result
val HBase_DATARDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
//脚手架语句,查看结果
println(HBase_DATARDD.count())
//遍历输出
//模式匹配--元组匹配
usersRDD.foreach{ case (_,result) =>
val key = Bytes.toInt(result.getRow)
val name = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes))
val age = Bytes.toInt(result.getValue("basic".getBytes,"age".getBytes))
println("Row key:"+key+" Name:"+name+" Age:"+age)
}
//使用键值对
HBase_DATARDD.foreach{
o =>{
val(_,result)=o
println( Bytes.toString(result.getValue("it".getBytes,"name".getBytes)) )}
}
//使用下标式
HBase_DATARDD.foreach{
o => println( Bytes.toInt(o._2.getValue("it".getBytes,"name".getBytes)) )
}
//写数据到HBase数据库中
//方法一
//Step 1:我们需要先创建一个 JobConf。
//Step 2: RDD 到表模式的映射
//HBase 上的操作都需要先创建一个操作对象Put,Get,Delete等,然后调用Table上的相对应的方法
//Step 3: 读取RDD并转换
//Step 4: 使用saveAsHadoopDataset方法写入HBase
//方法二
//将 RDD[(uid:Int, name:String, age:Int)] 转换成 RDD[(ImmutableBytesWritable, Put)]
//<RDD的形式是Iterator[(String, Any)]>
//定义 HBase 的配置
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getProperties("hbase.zookeeper.quorum"))
hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, getProperties("hbase.zookeeper.znode.parent"))
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, getProperties("hbase.zookeeper.property.clientPort"))
hbaseConf.set(HConstants.MASTER_PORT, getProperties("hbase.master.port"))
//新版 API 中加入了 Connection,HAdmin成了Admin,HTable成了Table,而Admin和Table只能通过Connection获得。
//Connection的创建是个重量级的操作,由于Connection是线程安全的,所以推荐使用单例,其工厂方法需要一个HBaseConfiguration。
//Connection 的创建是个重量级的工作,线程安全,是操作hbase的入口
//指定输出格式和输出表名
val tableName: String="mytest"
val conn = ConnectionFactory.createConnection(hbaseConf)
val table = conn.getTable(TableName.valueOf(tableName))
//不同的数据类型的写入不同,具体的看RDD中的元素的形式
ageRDD.foreach(o => {
if (o._1 != "") {
////准备插入一条数据
val put = new Put(Bytes.toBytes(o._1))
// 模式匹配--类型匹配
//在数据类型不给定的情况下,Scala使用类型推导确定变量的具体类型,进而针对不同的类型做出相应的处理
//数据类型--元组Tuple ,可以通过变量名._N来访问元组的内容,其中N表示元组中元素的索引号,本例中o._1 以及o._2所示
o._2 match {
//为put操作指定 column 和 value
case d: Double =>
put.addColumn(Bytes.toBytes("it"), Bytes.toBytes(columns), Bytes.toBytes(String.valueOf(d)))
case s: String =>
put.addColumn(Bytes.toBytes("it"), Bytes.toBytes(columns), Bytes.toBytes(s))
case _ =>
put.addColumn(Bytes.toBytes("it"), Bytes.toBytes(columns), Bytes.toBytes(String.valueOf(o._2)))
}
//提交
table.put(put)
}
table.close()
conn.close()
}
4.RDD类型转换
数据类型以及转换
获取数据后,就是对数据进行预处理以及分析的过程。在这个过程中,如何进行数据处理则成为了接下来要解决的问题了
由于我们处理的数据是RDD,所以,在这里主要使用RDD来进行程序处理。在2.0版本的Spark中,有DataSet类型的数据
在之后的学习和使用中,再做迁移。在这里,先解决如何使用RDD进行数据分析的问题。
首先理清RDD的类型以及对应RDD的操作,以及不同RDD之间的相互转化
<1>RDD类型
一般的RDD
数值型RDD
字符型RDD
pair RDD
控制RDD在不同节点的分布
模块RDD
Mlib模块中的RDD
<2>对应的操作:
Transformation
Action
<3>不同RDD之间的转换
从pair RDD中提取数据
从pair RDD中提取数据成为 数值型RDD 其中K,V
val ssqcodeRDD = HBase_DATARDD.map(
o => Bytes.toInt(o._2.getValue("it".getBytes,"age".getBytes))
).map(o => o.toDouble)
把数组和标号组成一个元组RDD,参见《Spark高级数据分析》P75 页
Spark SQL读取结构化数据
Spark SQL支持多种结构化数据源作为输入
读取Hive结构化数据
配置:前提可以连接Hive,即Hive 存在,并且在依赖中做配置
import org.apache.spl.hive.HiveContext
//创建HiveContext对象,Spark SQL的入口
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
//使用HQL语言进行查询<Hive 查询语言>
val rows = hiveCtx.sql("SELECT time,amount from users")
读取JSON数据
import org.apache.spl.hive.HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
val allRows = hiveCtx.jsonFile("order.json")
//注册成表
allRows.registerTempTable(allRows)
val rows = hiveCtx.sql("SELECT time,amount from users")
脚手架语句-Scaffolding
脚手架语句,主要是用于在探索阶段,对程序的功能进行搭建和验证,程序完成后撤下。用于查看数据以及语句是否可行,起辅助和调试作用
1.一般脚手架语句
针对一般的RDD
HBase_DATARDD.first() //第一个数据
HBase_DATARDD.top(N) //N是要查看的数据量大小例如 top(10)
HBase_DATARDD.take() //部分数据,包含指定数量记录的数据
HBase_DATARDD.collect() //全部数据,返回包含RDD 所有对象的数组,数据量大的情况下,不要使用,会占满内存
HBase_DATARDD.count()
针对pair RDD
HBase_DATARDD.collectAsMap()
HBase_DATARDD.count()
println(HBase_DATARDD.count())
println(HBase_DATARDD.first())
//打印所有 tableRDD.foreach(println(_))
2.//脚手架程序,通过parallelize创造数据,来看是否可以走通
//数据转换一定要查看转换是否正确toString,还是toInt
val ageRDD = HBase_DATARDD.map(
o => Bytes.toInt(o._2.getValue("it".getBytes,"age".getBytes))
).map(o => o.toDouble)
ageRDD.foreach(println(_))
println(ageRDD.count())
val nameRDD = HBase_DATARDD.map(
//o => Bytes.toInt(o._2.getValue("it".getBytes,"age".getBytes))
o => Bytes.toStringo._2.getValue("it".getBytes,"name".getBytes))
).map(o => o.toDouble)
nameRDD.foreach(println(_))
println(nameRDD.count())
// 脚手架程序
// 用来调试,以及排除错误
// val arr = Array(8.58, 8.25, 8.420188658)
// val ageRDD = sc.parallelize(arr)
// val arr1 = Array(8.4201 , 8.420191268, 8.420191268)
// val nameRDD = sc.parallelize(arr1)
// nameRDD.foreach(println(_))
// println(nameRDD.count())
//相关性分析Correlations
//输入数据 :类型是连续型数据,two RDD[Double]s,两个RDD必须要有相同的长度.同一列的数据不能完全相同
//函数以及参数: calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.
//输出是the output will be a Double or the correlation Matrix respectively
//出现NaN
val correlation: Double = Statistics.corr(ageRDD, nameRDD,"pearson")//计算不同数据的 皮尔逊相关系数
println(correlation)
参考
spark对hbase操作--http://www.cnblogs.com/hd-zg/p/5917751.htmlp/5917751.html
《Spark快速大数据分析》
《Spark高级数据分析》