Spark的数据读入和写出

Spark的数据读入和写出

Spark的数据读入和写出

1.调用parallelize函数直接从集合中获取数据,并存入RDD中
2.从文本中读取数据到RDD中
    文件类型:这个文本可以是纯文本文件、CSV、sequence文件,JSON、protocol buffer以及对象文件
    文件系统:本地(file://)、HDFS(hdfs://)上,或者在S3上
3.读取数据库中的数据
    HBase数据库
    Java连接的数据库:Orcale、MySQL
4.其他模块-Spark SQL

文件系统读写

    //第一步:创建SparkConf对象配置一些通用的属性,并且基于这个创建一个SparkContext对象setMaster("local[2]")
    val SparkConf = new SparkConf().setMaster("local[2]").setAppName("CountingOrderTest").set("spark.executor.memory", "1g")
    //使用SparkContext对象中的textFile函数将输入文件转换为一个RDD ,SparkContext对象代表对计算机集群的一个连接
    val sc = new SparkContext(SparkConf)
    //从不同的文件系统中读取数据
    //路径前面加上file:// 表示从本地文件系统读;读取本地文本文件,创建一个字符串的RDD
    val fluxRDD = sc.textFile("file:///C:\\Data\\my.json")
    //在路径前面加上hdfs://表示从hdfs文件系统;读取HDFS中文件参数是一个path,这个path
    // val fluxRDD = sc.textFile("hdfs://172.15.106.21:80/center/my*.json")
    //在路径前面加上s3n://表示从Amazon S3 文件系统;读取HDFS中文件参数是一个path,这个path
    // val fluxRDD = sc.textFile("s3n://bucket/center/my.txt)

从数据库中读写数据

0.数据表的准备:

Hbase表创建:
    hbase shell
    》   create 'SPARK_DATA',{NAME=>'it',COMPRESSION=>'SNAPPY'}
        创建HBase表可以指定压缩格式
#查看Hbase中数据:
        list
        describe 'mytest'
        scan 'mytest',{LIMIT=>5}
        count 'mytest', {INTERVAL => 10000, CACHE => 50000}
MySQL创建表<用户名-数据库-表>
    creat databae sparktest
    use sparktest
    create table  test{
        static_date     DATE,
        ball_name   VARCHAR(50) NOT NULL,       
        ball_cost   DOUBLE,
        ball_people INT,
        PRIMARY KEY(static_date,ball_name)
        };
 Orcale创建表

1.配置文件以及依赖

#Hbase连接配置
    hbase.zookeeper.quorum=*,*.*.*.
    hbase.zookeeper.znode.parent=/hbase-unsecure
    hbase.zookeeper.property.clientPort=2181
    hbase.master.port=
    配置的内容查看的位置:Ambari的Hbase Service中-Configs--Advanced--Advanced hbase-site
#Oracle数据库配置
    rdb.URL.ORCALE=jdbc:oracle:
    rdb.USERNAME.ORCALE=
    rdb.PASSWORD.ORCALE=
#MySQL数据库的配置,其中用户名是   ,数据库
    ##rdb.URL.MySQL=jdbc:mysql://localhost/test
    rdb.URL.MySQL=jdbc:mysql://172.16.110.36:3306/testbd
    rdb.USERNAME.MySQL=spark
    rdb.PASSWORD.MySQL=**
#注意
    依赖包的问题是通过Maven工程自动加载解决
    Orcale数据库依赖问题,Maven没有得到授权,所以处理方式可参考Spark开发流程中的解决办法

2.读取配置文件

首先将配置文件放置并将配置文件关联。关联方式根据不同的开发工具自行确定
以下函数读取配置文件,包依赖
    import java.io.File
    import java.util.Properties
    import org.apache.commons.lang.SystemUtils
/**
  * 取得通用配置文件属性
  */
def getProperties(key : String) = {
    val prop = new Properties()
    val in = if (!SystemUtils.IS_OS_WINDOWS) {
        this.getClass.getClassLoader.getResourceAsStream("config" + File.separator + "config.properties")
    } else {
        this.getClass.getClassLoader.getResourceAsStream("config" + File.separator + "config.properties")
    }
    prop.load(in)
    prop.getProperty(key)
}

3.建立连接---操作<读入和写出>---关闭连接

      /**
      * 建立oracle数据库连接
      */
    def createOracleConnection() = {
        Class.forName("oracle.jdbc.OracleDriver").newInstance()
        DriverManager.getConnection(getProperties("rdb.URL.ORCALE"),
            getProperties("rdb.USERNAME.ORCALE"),
            getProperties("rdb.PASSWORD.ORCALE"))
    }

          /**
            * 建立mysql数据库连接
            */
          def getMysqlConnection : Connection = {
            Class.forName("com.mysql.jdbc.Driver")
            DriverManager.getConnection( getProperties("rdb.URL.MySQL"),
              getProperties("rdb.USERNAME.MySQL"),
              getProperties("rdb.PASSWORD.MySQL"))
          }

     /**
     * 建立Hbase数据库连接
     */     
      //读Hbase数据库
        //Spark读取HBase,主要使用SparkContext 提供的newAPIHadoopRDDAPI将表的内容以 RDDs 的形式加载到 Spark 中。
     val hbaseConf = HBaseConfiguration.create()
     hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getProperties("hbase.zookeeper.quorum"))
     hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, getProperties("hbase.zookeeper.znode.parent"))
     hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, getProperties("hbase.zookeeper.property.clientPort"))
     hbaseConf.set(HConstants.MASTER_PORT, getProperties("hbase.master.port"))

     //设置查询的表名
     hbaseConf.set(TableInputFormat.INPUT_TABLE, getProperties("hbase.unix.test"))
     //通过Hadoop输入格式 访问HBase,输入数据类型是HBase,输入命令是调用SparkContext.newAPTHadoopRDD
     //调用命令中的参数hbaseConf包含了Zookeeper设置以及Hbase设置,TableInputFormat中包含了多个可以用来优化对Hbase的读取设置项
     //返回值类型是键值对,其中键的类型是ImmutableBytesWritable,值得类型是Result
     val HBase_DATARDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
     //脚手架语句,查看结果
     println(HBase_DATARDD.count())
    //遍历输出
    //模式匹配--元组匹配
        usersRDD.foreach{ case (_,result) =>
         val key = Bytes.toInt(result.getRow)
         val name = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes))
         val age = Bytes.toInt(result.getValue("basic".getBytes,"age".getBytes))
        println("Row key:"+key+" Name:"+name+" Age:"+age)
    }
    //使用键值对
             HBase_DATARDD.foreach{
                o =>{
            val(_,result)=o
            println( Bytes.toString(result.getValue("it".getBytes,"name".getBytes)) )}
            }

        //使用下标式
             HBase_DATARDD.foreach{
                o => println( Bytes.toInt(o._2.getValue("it".getBytes,"name".getBytes)) )
             }


    //写数据到HBase数据库中
      //方法一
            //Step 1:我们需要先创建一个 JobConf。
            //Step 2: RDD 到表模式的映射
                //HBase 上的操作都需要先创建一个操作对象Put,Get,Delete等,然后调用Table上的相对应的方法
            //Step 3: 读取RDD并转换
            //Step 4: 使用saveAsHadoopDataset方法写入HBase
      //方法二
    //将 RDD[(uid:Int, name:String, age:Int)] 转换成 RDD[(ImmutableBytesWritable, Put)]
    //<RDD的形式是Iterator[(String, Any)]>
        //定义 HBase 的配置
      val hbaseConf = HBaseConfiguration.create()
      hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getProperties("hbase.zookeeper.quorum"))
      hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, getProperties("hbase.zookeeper.znode.parent"))
      hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, getProperties("hbase.zookeeper.property.clientPort"))
      hbaseConf.set(HConstants.MASTER_PORT, getProperties("hbase.master.port"))
     //新版 API 中加入了 Connection,HAdmin成了Admin,HTable成了Table,而Admin和Table只能通过Connection获得。
    //Connection的创建是个重量级的操作,由于Connection是线程安全的,所以推荐使用单例,其工厂方法需要一个HBaseConfiguration。
    //Connection 的创建是个重量级的工作,线程安全,是操作hbase的入口
      //指定输出格式和输出表名
      val tableName: String="mytest"
      val conn = ConnectionFactory.createConnection(hbaseConf)
      val table = conn.getTable(TableName.valueOf(tableName))
    //不同的数据类型的写入不同,具体的看RDD中的元素的形式
          ageRDD.foreach(o => {
              if (o._1 != "") {
                    ////准备插入一条数据
                  val put = new Put(Bytes.toBytes(o._1))
            //  模式匹配--类型匹配
            //在数据类型不给定的情况下,Scala使用类型推导确定变量的具体类型,进而针对不同的类型做出相应的处理
            //数据类型--元组Tuple  ,可以通过变量名._N来访问元组的内容,其中N表示元组中元素的索引号,本例中o._1 以及o._2所示
                  o._2 match {
                        //为put操作指定 column 和 value
                      case d: Double =>
                          put.addColumn(Bytes.toBytes("it"), Bytes.toBytes(columns), Bytes.toBytes(String.valueOf(d)))
                      case s: String =>
                          put.addColumn(Bytes.toBytes("it"), Bytes.toBytes(columns), Bytes.toBytes(s))
                      case _ =>
                          put.addColumn(Bytes.toBytes("it"), Bytes.toBytes(columns), Bytes.toBytes(String.valueOf(o._2)))
                  }
                //提交
                  table.put(put)
              }      
        table.close()
        conn.close()
     }

4.RDD类型转换

数据类型以及转换
获取数据后,就是对数据进行预处理以及分析的过程。在这个过程中,如何进行数据处理则成为了接下来要解决的问题了
由于我们处理的数据是RDD,所以,在这里主要使用RDD来进行程序处理。在2.0版本的Spark中,有DataSet类型的数据
在之后的学习和使用中,再做迁移。在这里,先解决如何使用RDD进行数据分析的问题。
首先理清RDD的类型以及对应RDD的操作,以及不同RDD之间的相互转化

<1>RDD类型
    一般的RDD
            数值型RDD
            字符型RDD  
    pair RDD

            控制RDD在不同节点的分布
    模块RDD
            Mlib模块中的RDD

<2>对应的操作:
        Transformation
        Action

<3>不同RDD之间的转换
从pair RDD中提取数据
  从pair RDD中提取数据成为 数值型RDD 其中K,V
    val ssqcodeRDD = HBase_DATARDD.map(
        o =>  Bytes.toInt(o._2.getValue("it".getBytes,"age".getBytes))
    ).map(o => o.toDouble)

把数组和标号组成一个元组RDD,参见《Spark高级数据分析》P75 页

Spark SQL读取结构化数据

Spark SQL支持多种结构化数据源作为输入

读取Hive结构化数据

 配置:前提可以连接Hive,即Hive 存在,并且在依赖中做配置

 import org.apache.spl.hive.HiveContext
 //创建HiveContext对象,Spark SQL的入口
 val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
 //使用HQL语言进行查询<Hive 查询语言>
 val rows = hiveCtx.sql("SELECT time,amount from users")

读取JSON数据

 import org.apache.spl.hive.HiveContext  
 val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
 val allRows = hiveCtx.jsonFile("order.json")
 //注册成表
 allRows.registerTempTable(allRows)  
 val rows = hiveCtx.sql("SELECT time,amount from users")

脚手架语句-Scaffolding

脚手架语句,主要是用于在探索阶段,对程序的功能进行搭建和验证,程序完成后撤下。用于查看数据以及语句是否可行,起辅助和调试作用
1.一般脚手架语句   
针对一般的RDD
HBase_DATARDD.first()       //第一个数据
HBase_DATARDD.top(N)        //N是要查看的数据量大小例如 top(10)
HBase_DATARDD.take()        //部分数据,包含指定数量记录的数据
HBase_DATARDD.collect()     //全部数据,返回包含RDD 所有对象的数组,数据量大的情况下,不要使用,会占满内存
HBase_DATARDD.count()

针对pair RDD
HBase_DATARDD.collectAsMap()    
HBase_DATARDD.count()

println(HBase_DATARDD.count())
println(HBase_DATARDD.first())
//打印所有 tableRDD.foreach(println(_))

2.//脚手架程序,通过parallelize创造数据,来看是否可以走通
     //数据转换一定要查看转换是否正确toString,还是toInt
     val ageRDD = HBase_DATARDD.map(
         o =>  Bytes.toInt(o._2.getValue("it".getBytes,"age".getBytes))
     ).map(o => o.toDouble)
    ageRDD.foreach(println(_))
    println(ageRDD.count())
    val nameRDD = HBase_DATARDD.map(
        //o =>  Bytes.toInt(o._2.getValue("it".getBytes,"age".getBytes))
        o => Bytes.toStringo._2.getValue("it".getBytes,"name".getBytes))
    ).map(o => o.toDouble)
    nameRDD.foreach(println(_))
    println(nameRDD.count())
//      脚手架程序
//      用来调试,以及排除错误
//        val arr = Array(8.58, 8.25, 8.420188658)
//        val ageRDD = sc.parallelize(arr)
//        val arr1 = Array(8.4201 , 8.420191268, 8.420191268)
//        val nameRDD = sc.parallelize(arr1)
//        nameRDD.foreach(println(_))
//        println(nameRDD.count())
    //相关性分析Correlations
    //输入数据 :类型是连续型数据,two RDD[Double]s,两个RDD必须要有相同的长度.同一列的数据不能完全相同
    //函数以及参数:    calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.
    //输出是the output will be a Double or the correlation Matrix respectively
    //出现NaN
    val correlation: Double = Statistics.corr(ageRDD, nameRDD,"pearson")//计算不同数据的 皮尔逊相关系数
    println(correlation)

参考

spark对hbase操作--http://www.cnblogs.com/hd-zg/p/5917751.htmlp/5917751.html
《Spark快速大数据分析》
《Spark高级数据分析》

blogroll

social