Spark读写JSON和Hbase连接的配置

Spark的数据读入和写出JSON

依赖

0.pom.xml文件中加入依赖依赖:
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.0.4</version>
</dependency>

读取JSON数据

1.首先查看JSON
2.解析
//Fastjson的最主要的使用入口是com.alibaba.fastjson.JSON
//Object     parse(String text);                        // JSON文本parseJSONObject或者JSONArray
//JSONObject parseObject(String text)                  // JSON文本parseJSONObject
//<T> T      parseObject(String text, Class<T> clazz);  // JSON文本parseJavaBean
//JSONArray  parseArray(String text);                   // JSON文本parseJSONArray
//<T> List<T>parseArray(String text, Class<T> clazz);   //JSON文本parseJavaBean集合
例子:
 import com.alibaba.fastjson.JSON
 val _o = JSON.parseObject(JSON.parseObject(o).get("A").toString)
  //筛选出现特定年龄的数据
  val DA_AGE = Array("18","28","38","48","58")
  val orderRDD = fluxRDD.filter(o =>  {o.contains("test") && (DA_AGE.contains(JSON.parseObject(JSON.parseObject(o).get("A").toString).get("age").toString))} )

写成JSON格式

1>明确要保存的格式
2>将数据变化成结构化RDD--然后转为字符串RDD
3>将字符串RDD转换成JSON数据保存
 String toJSONString(Object object);                        // 将JavaBean序列化为JSON文本
 String toJSONString(Object object, boolean prettyFormat);  // 将JavaBean序列化为带格式的JSON文本
 Object toJSON(Object javaObject);                          //将JavaBean转换为JSONObject或者JSONArray。
    方法一、将JavaBean序列化 的get 和set
    方法二、使用JSONObject中的put方法
        val k = 3
        val centers = center.map(_.toArray)
        val requestJson = new com.alibaba.fastjson.JSONObject()
        requestJson.put("k", k)
        requestJson.put("center", center)
        println(requestJson.toJSONString)

设置zooKeeper集群地址

<1>也可以通过将hbase-site.xml导入classpath
<2>可以在程序中配置

在集群中配置环境变量将jar包和当前目录加入CLASSPATH

Java客户端使用的配置信息是被映射在一个HBaseConfiguration 实例中. 
HBaseConfiguration有一个工厂方法, HBaseConfiguration.create(); 
运行这个方法的时候,会去CLASSPATH,下找Hbase-site.xml,读其发现的第一个配置文件的内容。 
(这个方法还会去找hbase-default.xml ; hbase.X.X.X.jar里面也会有一个an hbase-default.xml).

本例在程序里设置

在程序内配置
连接HBase的方式有两种,
  其一是直接在程序中明文配置,
  其二是通过读取配置文件配置
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "***.***.***.***,***.***.***.***,***.***.***.***,***.***.***.***,***.***.***.***")
hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase-unsecure")
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
hbaseConf.set(HConstants.MASTER_PORT, "1****")

使用配置方式

val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getProperties("hbase.zookeeper.quorum"))
hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, getProperties("hbase.zookeeper.znode.parent"))
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, getProperties("hbase.zookeeper.property.clientPort"))
hbaseConf.set(HConstants.MASTER_PORT, getProperties("hbase.master.port"))

使用配置文件的话有两种方式

获取InputStream方法有两种
//配置内容从 Ambari中的内容查看和提取

/ //获取InputStream方法一 :通过当前类加载器的getResourceAsStream方法获取 //关于配置文件的路径问题:因为属性文件已经复制到src根目录下,可直接使用 InputStream propInStream = getClass().getClassLoader().getResourceAsStream("HbaseConfig.properties"); Properties prop = new Properties();/

    //获取InputStream方法二   是从文件获取
    //String filePath = "C:/My_Files/Config/HbaseConfig.properties";
    String filePath = "HbaseConfig.properties";
    InputStream propInStream = new FileInputStream(new File(filePath)); 
    System.out.println(propInStream);
    Properties prop = new Properties();       
    //load(InputStream inStream)方法从.properties属性文件对应的文件输入流中,加载属性列表到Properties类对象
    try{
        prop.load(propInStream);
    }catch(IOException e){
        System.err.println(e.getMessage());
    }finally{
        propInStream.close();
    }
    System.out.println(prop);
    System.out.println(prop.getProperty("hbase.master.port"));
    //Hbase读取配置文件中的内容
    //prop.getProperty方法是分别是获取属性信息。
    Configuration HBASE_CONFIG = new Configuration();
    HBASE_CONFIG.set("hbase.zookeeper.property.clientPort",prop.getProperty("hbase.zookeeper.property.clientPort")); 
    HBASE_CONFIG.set("hbase.zookeeper.quorum",prop.getProperty("hbase.zookeeper.quorum")); 
    HBASE_CONFIG.set("hbase.master.port",prop.getProperty("hbase.master.port"));
    HBASE_CONFIG.set("zookeeper.znode.parent",prop.getProperty("zookeeper.znode.parent"));
    Configuration configuration = HBaseConfiguration.create(HBASE_CONFIG);

具体程序如下: 如下

blogroll

social