Hbase数据通过Spark到MySQL

处理流程:

<1>MySQL 设置用户以及密码创建数据库以及待处理的数据表
<2>Spark从HBase中读取数据,写到MySQL中
<3>R/Python 读取MySQL中数据,进行数据探索并将得出的结论以及图标数据进行保存

<1>设置MySQL数据库,创建表

    登录主机
     用户名 root密码 ****    
    安装数据库:
    rpm -ivh MySQL-server-community.rpm 
    登录
    mysql  -h localhost -u root  .  
    登录状态修改密码
    mysql> SET PASSWORD FOR 'root'@'localhost' = PASSWORD('****');
    mysql> FLUSH PRIVILEGES
    重新登录
    mysql -u root -p 输入密码登录
    在登录状态创建数据库
        show tables;
        mysql>
        create database UNIX_DATEST;
        use  UNIX_DATEST
        CREATE TABLE Lasy(userID VARCHAR(20) primary key ,sex VARCHAR(2), name VARCHAR(16) )
        DESC Lasy
        select * from Lasy limit 3;
        select count(*) from Lasy;

     删除字段中内容
        delect from Lasy where userID = "9561";
     删除字段
        alter table Lasy drop column userID;
     清除表数据
        delect from Lasy;
     删除表
        DROP TABLE IF EXISTS Lasy;
     //授予访问权限,我登录的主机IP地址是39,所以设置如下
        GRANT ALL PRIVILEGES ON *.* TO 'root'@'172.15.10.39' IDENTIFIED BY 'test';

    主机IP:   172.15.10.30
    MySQL端口:    3306
    用户名:    root
    密码:     test
    数据库:    UNIX_DATEST
    数据表:    Lasy
        //val sql = "insert into Lasy(userID , sex,name ) values (?, ?,?)"

<2>Spark读取Hbase,并将数据存储到MySQL

 MySQL的JDBC URL编写方式:jdbc:mysql://主机名称:连接端口/数据库的名称?参数=值[用户名 密码]
Maven依赖
     <!-- mysql驱动包 -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.38</version>
        <scope>runtime</scope>
    </dependency>

配置文件
    #hbase连接配置
    hbase.zookeeper.quorum=172.15.10.191,172.15.10.192,172.15.10.193
    hbase.zookeeper.znode.parent=/hbase-unsecure
    hbase.zookeeper.property.clientPort=2181
    hbase.master.port=60000

    #hbase表名映射--测试表
    hbase.UNIX.test=first_test

    #MySQL数据库配置
     URL.MySQL=jdbc:mysql://172.15.10.30:3306/UNIX_DATEST
     USERNAME.MySQL=root
     ASSWORD.MySQL=test

 //建立mysql数据库连接
 def getMysqlConnection : Connection = {
 Class.forName("com.mysql.jdbc.Driver")
 DriverManager.getConnection("url","username",  "password")
   }
    conn = getMysqlConnection
    sql ='show tables'
    ps = conn.prepareStatement(sql)
    ps.executeBatch()
    conn.commit()
    ps.close()
    conn.close()

      /**
    * Hbase数据库转换到MySQL数据库
    */

package com.UNIX.analysis   
import java.sql.{Connection, DriverManager, PreparedStatement}

import com.UNIX.util.UtilsTrait
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.{SparkConf, SparkContext}
/**
  * Created by first on 2016/11/23.
  */
object readHBase extends UtilsTrait {

    def main(args: Array[String]) {

        //第一步:创建SparkConf对象配置一些通用的属性,并且基于这个创建一个SparkContext对象setMaster("local[2]")
        val SparkConf = new SparkConf().setMaster("local[2]").setAppName("CountingTest").set("spark.executor.memory", "1g")
        //使用SparkContext对象中的textFile函数将输入文件转换为一个RDD ,SparkContext对象代表对计算机集群的一个连接
        val sc = new SparkContext(SparkConf)

        val hbaseConf = HBaseConfiguration.create()
        //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,本例在程序里设置
        hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getProperties("hbase.zookeeper.quorum"))
        hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, getProperties("hbase.zookeeper.znode.parent"))
        hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, getProperties("hbase.zookeeper.property.clientPort"))
        hbaseConf.set(HConstants.MASTER_PORT, getProperties("hbase.master.port"))

        //扫描表,表名
        hbaseConf.set(TableInputFormat.INPUT_TABLE, getProperties("hbase.UNIX.test"))
        //通过Hadoop输入格式 访问HBase,输入数据类型是HBase,输入命令是调用SparkContext.newAPTHadoopRDD
        //调用命令中的参数hbaseConf包含了Zookeeper设置以及Hbase设置,TableInputFormat中包含了多个可以用来优化对Hbase的读取设置项
        //返回值类型是键值对,其中键的类型是ImmutableBytesWritable,值得类型是Result
        val HBase_DATARDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
        //脚手架语句,查看结果
        println(HBase_DATARDD.count())
        HBase_DATARDD.foreach(println(_))

        //读取的Hbase的RDD形式
        //  conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark_testdb", "root", "111111")
        //"insert into Lasy(userID, sex) values (?, ?)" INSERT INTO table_name (列1, 列2,...) VALUES (值1, 值2,....)


       HBase_DATARDD.map(o => {
        val strs = new Array[String](7)
        try {
            //strs(1) = new String(o._2.getRow)
            strs(1) = new String(o._1.get())
            try {
                strs(2) = new String(o._2.getValue("it".getBytes(), "sex".getBytes()))
            } catch {
                case e: Exception => strs(2) = "0"
            }
            try {
                strs(3) = new String(o._2.getValue("it".getBytes(), "dltcode".getBytes()))
            } catch {
                case e: Exception => strs(3) = "0"
            }

        } catch {
            case e: Exception =>
                System.err.println(e.getStackTrace)
        }
        strs

    }).foreachPartition(o => {
        var conn: Connection = null
        var ps: PreparedStatement = null
        val sql = "insert into Lasy(userID , sex,name ) values (?, ?,?)"
        println(sql)

        //数据转换一定要查看转换是否正确toString,还是toInt
        val ageRDD = HBase_DATARDD.map(
            o => Bytes.toString(o._2.getValue("it".getBytes, "sex".getBytes))
        ).map(o => o.toDouble)
        ageRDD.foreach(println(_))
        println(ageRDD.count())

        val nameRDD = HBase_DATARDD.map(
            o => Bytes.toString(o._2.getValue("it".getBytes, "name".getBytes))
        ).map(o => o.toDouble)
        nameRDD.foreach(println(_))
        println(nameRDD.count())

        //相关性分析Correlations
        //输入数据 :类型是连续型数据,two RDD[Double]s,两个RDD必须要有相同的长度.同一列的数据不能完全相同
        //函数以及参数:    calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.
        //输出是the output will be a Double or the correlation Matrix respectively
        //出现NaN
        val correlation: Double = Statistics.corr(ageRDD, nameRDD, "pearson") //计算不同数据的 皮尔逊相关系数
        println(correlation)    
        sc.stop()
    }
}

<3> R/Python 读取MySQL中数据

进行数据探索并将得出的结论以及图标数据进行保存
    <01> R MySQL以及Orcale等关系型数据库中读取数据
        ##本机需要安装和配置好MySQL的客户端
        流程:
                安装并引入RMySQL
                使用dbConnect函数打开数据库的连接
                使用dbGetQuery函数发起一个select语句,并返回结果集
                读取结束后,使用dbDisconnect函数终止与数据库的连接3
         #install.packages("RMySQL")
        library(RMySQL)
        con <- dbConnect(MySQL(),user="root",password="****",host="172.15.10.***")
        sql <- " select  userID,sex,dltcode from UNIX_DATEST.Lasy"
        rows <- dbGetQuery(con,sql)
        str(rows)
        if(dbMoreRessylts(con)) dbNextResult(con)
        dbDisconnect(con)

    <02>PythonMySQLOrcale等关系型数据库中读取数据     
    安装MySQLdb
    使用流程:
        引入 API 模块。
        获取与数据库的连接。
        执行SQL语句和存储过程。
        关闭数据库连接。
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import MySQLdb
import pandas
# 打开数据库连接
#port:MySQL服务使用的TCP端口.默认是3306.
#charset:数据库编码 charset="utf8"
conn = MySQLdb.connect(host="172.15.10.30",user="root",passwd="test",db="UNIX_DATEST")
# 创建数据表SQL语句  占位符 %s
MySQLsql = "select * from Lasy"
# 使用cursor()方法获取操作游标 
# 执行sql语句
cursor = conn.cursor()
#cursor对象提供方法包括两大类:1.执行命令,2.接收返回值
#第一类方法:执行命令
#execute(self, query, args):执行单条sql语句,接收的参数为sql语句本身和使用的参数列表,返回值为受影响的行数 
#executemany(self, query, args):执行单条sql语句,但是重复执行参数列表里的参数,返回值为受影响的行数 
n = cursor.execute(MySQLsql)

##第二类方法 接收返回值
#使用 fetchone() 方法获取单条数据, 使用fetchall() 方法获取多条数据
#fetchmany(self, size=None):接收size条返回结果行.如果size的值大于返回的结果行的数量,则会返回
#使用 fetchone() 方法获取一条数据
data = cursor.fetchone()
print data
print  type(data)
##fetchall() 方法取余下的数据
for row in cursor.fetchall():
    print row
    for r in row:
        print r
#Pandas读取Mysql数据
df = pandas.read_sql(MySQLsql,con=conn)
print df
#需要分别的关闭指针对象和连接对象
cursor.close() 
# 关闭数据库连接
conn.close()
关于安装Python安装数据库
    #MySQLdb 是用于Python链接Mysql数据库的接口,它实现了 Python 数据库 API 规范
    #pip install MySQL-python  
    MySQL-Python 1.2.5 does not support Python 3.0+ yet

概念:

时间戳是指格林威治时间1970年01月01日00时00分00秒(北京时间1970年01月01日08时00分00秒)起至现在的总秒数
Unix时间戳(Unix timestamp),或称Unix时间(Unix time)、POSIX时间(POSIX time),是一种时间表示方式
时间戳:
    Unix时间戳记是从'1970-01-01 00:00:00'GMT开始的秒数     
    MySQL中“完整”TIMESTAMP格式是14位,
        TIMESTAMP值可以从1970的某时的开始一直到2037年,精度为一秒,其值作为数字显示。
        0000-00-00 00:00:00     
    Orcale 获得当前时间的,精确到毫秒,可以指定精确豪秒的位数

参考:

MySQLdb用户指南: http://mysql-python.sourceforge.net/MySQLdb.html 
MySQLdb文档: http://mysql-python.sourceforge.net/MySQLdb-1.2.2/public/MySQLdb-module.html
python下的MySQLdb使用 http://blog.csdn.net/vincent_czz/article/details/7697039/
十分钟搞定pandas http://pandas.pydata.org/pandas-docs/stable/dsintro.html#dsintro

blogroll

social