登录主机
用户名 root密码 ****
安装数据库:
rpm -ivh MySQL-server-community.rpm
登录
mysql -h localhost -u root .
登录状态修改密码
mysql> SET PASSWORD FOR 'root'@'localhost' = PASSWORD('****');
mysql> FLUSH PRIVILEGES
重新登录
mysql -u root -p 输入密码登录
在登录状态创建数据库
show tables;
mysql>
create database UNIX_DATEST;
use UNIX_DATEST
CREATE TABLE Lasy(userID VARCHAR(20) primary key ,sex VARCHAR(2), name VARCHAR(16) )
DESC Lasy
select * from Lasy limit 3;
select count(*) from Lasy;
删除字段中内容
delect from Lasy where userID = "9561";
删除字段
alter table Lasy drop column userID;
清除表数据
delect from Lasy;
删除表
DROP TABLE IF EXISTS Lasy;
//授予访问权限,我登录的主机IP地址是39,所以设置如下
GRANT ALL PRIVILEGES ON *.* TO 'root'@'172.15.10.39' IDENTIFIED BY 'test';
主机IP: 172.15.10.30
MySQL端口: 3306
用户名: root
密码: test
数据库: UNIX_DATEST
数据表: Lasy
//val sql = "insert into Lasy(userID , sex,name ) values (?, ?,?)"
<2>Spark读取Hbase,并将数据存储到MySQL
MySQL的JDBC URL编写方式:jdbc:mysql://主机名称:连接端口/数据库的名称?参数=值[用户名 密码]
Maven依赖
<!-- mysql驱动包 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version><scope>runtime</scope></dependency>
配置文件
#hbase连接配置
hbase.zookeeper.quorum=172.15.10.191,172.15.10.192,172.15.10.193
hbase.zookeeper.znode.parent=/hbase-unsecure
hbase.zookeeper.property.clientPort=2181
hbase.master.port=60000
#hbase表名映射--测试表
hbase.UNIX.test=first_test
#MySQL数据库配置
URL.MySQL=jdbc:mysql://172.15.10.30:3306/UNIX_DATEST
USERNAME.MySQL=root
ASSWORD.MySQL=test
//建立mysql数据库连接
def getMysqlConnection : Connection = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("url","username", "password")
}
conn = getMysqlConnection
sql ='show tables'
ps = conn.prepareStatement(sql)
ps.executeBatch()
conn.commit()
ps.close()
conn.close()
/**
* Hbase数据库转换到MySQL数据库
*/
package com.UNIX.analysis
import java.sql.{Connection, DriverManager, PreparedStatement}
import com.UNIX.util.UtilsTrait
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by first on 2016/11/23.
*/
object readHBase extends UtilsTrait {
def main(args: Array[String]) {
//第一步:创建SparkConf对象配置一些通用的属性,并且基于这个创建一个SparkContext对象setMaster("local[2]")
val SparkConf = new SparkConf().setMaster("local[2]").setAppName("CountingTest").set("spark.executor.memory", "1g")
//使用SparkContext对象中的textFile函数将输入文件转换为一个RDD ,SparkContext对象代表对计算机集群的一个连接
val sc = new SparkContext(SparkConf)
val hbaseConf = HBaseConfiguration.create()
//设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,本例在程序里设置
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, getProperties("hbase.zookeeper.quorum"))
hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, getProperties("hbase.zookeeper.znode.parent"))
hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, getProperties("hbase.zookeeper.property.clientPort"))
hbaseConf.set(HConstants.MASTER_PORT, getProperties("hbase.master.port"))
//扫描表,表名
hbaseConf.set(TableInputFormat.INPUT_TABLE, getProperties("hbase.UNIX.test"))
//通过Hadoop输入格式 访问HBase,输入数据类型是HBase,输入命令是调用SparkContext.newAPTHadoopRDD
//调用命令中的参数hbaseConf包含了Zookeeper设置以及Hbase设置,TableInputFormat中包含了多个可以用来优化对Hbase的读取设置项
//返回值类型是键值对,其中键的类型是ImmutableBytesWritable,值得类型是Result
val HBase_DATARDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
//脚手架语句,查看结果
println(HBase_DATARDD.count())
HBase_DATARDD.foreach(println(_))
//读取的Hbase的RDD形式
// conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark_testdb", "root", "111111")
//"insert into Lasy(userID, sex) values (?, ?)" INSERT INTO table_name (列1, 列2,...) VALUES (值1, 值2,....)
HBase_DATARDD.map(o => {
val strs = new Array[String](7)
try {
//strs(1) = new String(o._2.getRow)
strs(1) = new String(o._1.get())
try {
strs(2) = new String(o._2.getValue("it".getBytes(), "sex".getBytes()))
} catch {
case e: Exception => strs(2) = "0"
}
try {
strs(3) = new String(o._2.getValue("it".getBytes(), "dltcode".getBytes()))
} catch {
case e: Exception => strs(3) = "0"
}
} catch {
case e: Exception =>
System.err.println(e.getStackTrace)
}
strs
}).foreachPartition(o => {
var conn: Connection = null
var ps: PreparedStatement = null
val sql = "insert into Lasy(userID , sex,name ) values (?, ?,?)"
println(sql)
//数据转换一定要查看转换是否正确toString,还是toInt
val ageRDD = HBase_DATARDD.map(
o => Bytes.toString(o._2.getValue("it".getBytes, "sex".getBytes))
).map(o => o.toDouble)
ageRDD.foreach(println(_))
println(ageRDD.count())
val nameRDD = HBase_DATARDD.map(
o => Bytes.toString(o._2.getValue("it".getBytes, "name".getBytes))
).map(o => o.toDouble)
nameRDD.foreach(println(_))
println(nameRDD.count())
//相关性分析Correlations
//输入数据 :类型是连续型数据,two RDD[Double]s,两个RDD必须要有相同的长度.同一列的数据不能完全相同
//函数以及参数: calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.
//输出是the output will be a Double or the correlation Matrix respectively
//出现NaN
val correlation: Double = Statistics.corr(ageRDD, nameRDD, "pearson") //计算不同数据的 皮尔逊相关系数
println(correlation)
sc.stop()
}
}