Spark SQL读取数据

Spark SQL--处理半结构和结构数据

Spark SQL--将Hive的查询作为Spark的任务提交到Spark集群上
SchemaRDD,专门用于Spark SQL 查询的RDD,是存放Row对象的RDD。每个Row对象代表一行记录。在本次使用的版本中作为DataFrame。

操作过程

    Spark支持Hive---Spark编译期中设定,Spark2.0--SparkSession 统一了SQLContext HiveContext.
      前情--配置--依赖--引入--逻辑实现
    Spark不支持Hive  import org.apache.spark.sql.SQLContext //如果spark版本不支持Hive,则需要创建SQLContext
    SparkSession 1.6引入,
0.在使用中引入Spark SQL需要添加一些额外的依赖--支持hive的版本的依赖---
在集成环境中如果已经添加,则不需要引入,
集成环境地址/usr/hdp/2.4.0.0-169/spark/lib
spark-assembly-1.6.0.2.4.0.0-169-hadoop2.7.1.2.4.0.0-169.jar
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>2.1.0</version>
        </dependency>

1.输入:
    1.基于Hive
      import org.apache.spark.sql.hive.HiveContext  //导入Spark SQL

        val SparkConf = new SparkConf().set()
        val sc = new SparkContext(SparkConf)
        val hiveCtx = new import org.apache.spark.sql.hive.HiveContext(sc)

        val rows = hiveCtx.sql("SELECT key,value FROM users")//生成SchemaRDD
        val keys = rows.map(row => row.getInt(0))

    2基于Hive操作Parquet文件
     import org.apache.spark.sql.hive.HiveContext  //导入Spark SQL
            Parquet是基于列存储的数据结构.
             parquet是列式存储格式的一种文件类型,列式存储有以下的核心优势:
                a)可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。 
                b)压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,
                    可以使用更高效的压缩编码(例如Run Length EncodingDelta Encoding)进一步节约存储空间。 
                c)只读取需要的列,支持向量运算,能够获取更好的扫描性能
            (1)parquet文件加载
                sqlContext.read.parquet("people.parquet");(也可用load方法加载)
            (2)parquet文件输出
                 val people:RDD[Person], people.write.parquet("people.parquet")
            Hive区分大小写,Parquet不区分大小写
            hive允许所有的列为空,而Parquet不允许所有的列全为空
                当将Hive metastore Parquet表转换为Spark SQL Parquet表时,
                需要将Hive metastore schemaParquet schema进行一致化

        val SparkConf = new SparkConf().set()
        val sc = new SparkContext(SparkConf)
        val hiveCtx = new import org.apache.spark.sql.hive.HiveContext(sc)          
        //读取数据
        val tweets = hiveCtx.ParquetFile(ParquetFile)
        //Parquet文件注册为Spark SQL临时表,在表上运行查询语句
        tweets.registerTempTable("Atweets")
        val rows = hiveCtx.sql("SELECT key,value FROM users")//生成SchemaRDD
        val keys = rows.map(row => row.getInt(0))

    基于JSON文件--Spark SQL 结构信息推断
      import org.apache.spark.sql.hive.HiveContext

        val SparkConf = new SparkConf().set()
        val sc = new SparkContext(SparkConf)
        val hiveCtx = new import org.apache.spark.sql.hive.HiveContext(sc)

        val tweets = hiveCtx.jsonFile(jsonFile)
        tweets.registerTempTable("Atweets")

        val results  = hiveCtx.sql("SELECT userID,age FROM tweets")  //生成SchemaRDD
    基于case class RDD
        case class HappyPerson()
        val ha = sc.parallellize()
        ha.registerTempTable()
2.中间
    生成SchemaRDD---1.3后已经使用DataFrame代替了
3.输出
创建HiveContext
Spark SQLHive作为文件读入
Spark SQLhive作为一个表读入

sqlContext.read.parquet

Spark--处理非结构数据-半结构数据-结构数据

1.原生态的输入命令
    sc.textFile()-------------saveAsTextFile()
    sc.wholeTextFiles()-------saveAsTextFile()
    sc.sequenceFile()---------saveAsSequenceFile()
    sc.objectFile()------------RDD.saveAsObjectFile()

2.HadoopAPI--有新旧两套API接口
      val HBase_DATARDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], 
                            classOf[ImmutableBytesWritable], classOf[Result])
3.其他输入和输出命令

参考:

Spark SQL官方文档阅读--待完善    http://blog.csdn.net/dabokele/article/details/48707511
Spark SQL 之 Data Sources    http://www.cnblogs.com/BYRans/p/5005342.html
http://spark.apache.org/docs/1.6.0/sql-programming-guide.html#datasets
Intel李锐:Hive on Spark解析 http://www.csdn.net/article/2015-04-24/2824545

blogroll

social