Spark支持Hive---在Spark编译期中设定,Spark2.0--中SparkSession统一了SQLContext和HiveContext.前情--配置--依赖--引入--逻辑实现Spark不支持Hiveimportorg.apache.spark.sql.SQLContext//如果spark版本不支持Hive,则需要创建SQLContextSparkSession在1.6引入,0.在使用中引入SparkSQL需要添加一些额外的依赖--支持hive的版本的依赖---在集成环境中如果已经添加,则不需要引入,集成环境地址/usr/hdp/2.4.0.0-169/spark/lib中spark-assembly-1.6.0.2.4.0.0-169-hadoop2.7.1.2.4.0.0-169.jar<!--https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>2.1.0</version></dependency>1.输入:1.基于Hive表importorg.apache.spark.sql.hive.HiveContext//导入SparkSQLvalSparkConf=newSparkConf().set()valsc=newSparkContext(SparkConf)valhiveCtx=newimportorg.apache.spark.sql.hive.HiveContext(sc)valrows=hiveCtx.sql("SELECT key,value FROM users")//生成SchemaRDDvalkeys=rows.map(row=>row.getInt(0))2基于Hive操作Parquet文件importorg.apache.spark.sql.hive.HiveContext//导入SparkSQLParquet是基于列存储的数据结构.parquet是列式存储格式的一种文件类型,列式存储有以下的核心优势:a)可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。b)压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如RunLengthEncoding和DeltaEncoding)进一步节约存储空间。c)只读取需要的列,支持向量运算,能够获取更好的扫描性能(1)parquet文件加载sqlContext.read.parquet("people.parquet");(也可用load方法加载)(2)parquet文件输出valpeople:RDD[Person],people.write.parquet("people.parquet")Hive区分大小写,Parquet不区分大小写hive允许所有的列为空,而Parquet不允许所有的列全为空当将HivemetastoreParquet表转换为SparkSQLParquet表时,需要将Hivemetastoreschema和Parquetschema进行一致化valSparkConf=newSparkConf().set()valsc=newSparkContext(SparkConf)valhiveCtx=newimportorg.apache.spark.sql.hive.HiveContext(sc)//读取数据valtweets=hiveCtx.ParquetFile(ParquetFile)//将Parquet文件注册为SparkSQL临时表,在表上运行查询语句tweets.registerTempTable("Atweets")valrows=hiveCtx.sql("SELECT key,value FROM users")//生成SchemaRDDvalkeys=rows.map(row=>row.getInt(0))基于JSON文件--SparkSQL结构信息推断importorg.apache.spark.sql.hive.HiveContextvalSparkConf=newSparkConf().set()valsc=newSparkContext(SparkConf)valhiveCtx=newimportorg.apache.spark.sql.hive.HiveContext(sc)valtweets=hiveCtx.jsonFile(jsonFile)tweets.registerTempTable("Atweets")valresults=hiveCtx.sql("SELECT userID,age FROM tweets")//生成SchemaRDD基于caseclassRDDcaseclassHappyPerson()valha=sc.parallellize()ha.registerTempTable()2.中间生成SchemaRDD---1.3后已经使用DataFrame代替了3.输出创建HiveContextSparkSQL将Hive作为文件读入SparkSQL将hive作为一个表读入sqlContext.read.parquet