Spark的数据读入和写出JSON
依赖
0.pom.xml文件中加入依赖依赖:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.0.4</version>
</dependency>
读取JSON数据
1.首先查看JSON串
2.解析
//Fastjson的最主要的使用入口是com.alibaba.fastjson.JSON
//Object parse(String text); // 把JSON文本parse为JSONObject或者JSONArray
//JSONObject parseObject(String text); // 把JSON文本parse成JSONObject
//<T> T parseObject(String text, Class<T> clazz); // 把JSON文本parse为JavaBean
//JSONArray parseArray(String text); // 把JSON文本parse成JSONArray
//<T> List<T>parseArray(String text, Class<T> clazz); //把JSON文本parse成JavaBean集合
例子:
import com.alibaba.fastjson.JSON
val _o = JSON.parseObject(JSON.parseObject(o).get("A").toString)
//筛选出现特定年龄的数据
val DA_AGE = Array("18","28","38","48","58")
val orderRDD = fluxRDD.filter(o => {o.contains("test") && (DA_AGE.contains(JSON.parseObject(JSON.parseObject(o).get("A").toString).get("age").toString))} )
写成JSON格式
1>明确要保存的格式
2>将数据变化成结构化RDD--然后转为字符串RDD
3>将字符串RDD转换成JSON数据保存
String toJSONString(Object object); // 将JavaBean序列化为JSON文本
String toJSONString(Object object, boolean prettyFormat); // 将JavaBean序列化为带格式的JSON文本
Object toJSON(Object javaObject); //将JavaBean转换为JSONObject或者JSONArray。
方法一、将JavaBean序列化 的get 和set
方法二、使用JSONObject中的put方法
val k = 3
val centers = center.map(_.toArray)
val requestJson = new com.alibaba.fastjson.JSONObject()
requestJson.put("k", k)
requestJson.put("center", center)
println(requestJson.toJSONString)
设置zooKeeper集群地址
<1>也可以通过将hbase-site.xml导入classpath
<2>可以在程序中配置
在集群中配置环境变量将jar包和当前目录加入CLASSPATH
Java客户端使用的配置信息是被映射在一个HBaseConfiguration 实例中.
HBaseConfiguration有一个工厂方法, HBaseConfiguration.create();
运行这个方法的时候,会去CLASSPATH,下找Hbase-site.xml,读其发现的第一个配置文件的内容。
(这个方法还会去找hbase-default.xml ; hbase.X.X.X.jar里面也会有一个an hbase-default.xml).
本例在程序里设置
在程序内配置
连接HBase的方式有两种,
其一是直接在程序中明文配置,
其二是通过读取配置文件配置
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "***.***.***.***,***.***.***.***,***.***.***.***,***.***.***.***,***.***.***.***")
hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase-unsecure")
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
hbaseConf.set(HConstants.MASTER_PORT, "1****")
使用配置方式
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getProperties("hbase.zookeeper.quorum"))
hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, getProperties("hbase.zookeeper.znode.parent"))
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, getProperties("hbase.zookeeper.property.clientPort"))
hbaseConf.set(HConstants.MASTER_PORT, getProperties("hbase.master.port"))
使用配置文件的话有两种方式
获取InputStream方法有两种
//配置内容从 Ambari中的内容查看和提取
/ //获取InputStream方法一 :通过当前类加载器的getResourceAsStream方法获取 //关于配置文件的路径问题:因为属性文件已经复制到src根目录下,可直接使用 InputStream propInStream = getClass().getClassLoader().getResourceAsStream("HbaseConfig.properties"); Properties prop = new Properties();/
//获取InputStream方法二 是从文件获取
//String filePath = "C:/My_Files/Config/HbaseConfig.properties";
String filePath = "HbaseConfig.properties";
InputStream propInStream = new FileInputStream(new File(filePath));
System.out.println(propInStream);
Properties prop = new Properties();
//load(InputStream inStream)方法从.properties属性文件对应的文件输入流中,加载属性列表到Properties类对象
try{
prop.load(propInStream);
}catch(IOException e){
System.err.println(e.getMessage());
}finally{
propInStream.close();
}
System.out.println(prop);
System.out.println(prop.getProperty("hbase.master.port"));
//Hbase读取配置文件中的内容
//prop.getProperty方法是分别是获取属性信息。
Configuration HBASE_CONFIG = new Configuration();
HBASE_CONFIG.set("hbase.zookeeper.property.clientPort",prop.getProperty("hbase.zookeeper.property.clientPort"));
HBASE_CONFIG.set("hbase.zookeeper.quorum",prop.getProperty("hbase.zookeeper.quorum"));
HBASE_CONFIG.set("hbase.master.port",prop.getProperty("hbase.master.port"));
HBASE_CONFIG.set("zookeeper.znode.parent",prop.getProperty("zookeeper.znode.parent"));
Configuration configuration = HBaseConfiguration.create(HBASE_CONFIG);
具体程序如下: 如下