OneProxy :: 用Spark直接在MySQL和PostgreSQL集群上做数据分析和机器学习

Apache Spark是一个优秀的数据分析平台,可以通过统一的RDD或DataFrame接口访问外部的数据源,通常是将大量数据存放在Hadoop/Hive平台上,这样就需要将线上关系数据库(例如Oracle或MySQL)中的数据定期或实时同步到Hadoop/Hive中,实在是有些麻烦。

Spark也支持通过JDBC去访问关系数据库中的数据,只是目前的实现可以很方便地连接一个库中的一张表,限制了数据访问能力,Hadoop/Hive能支持大数据量分析的原因是因为数据是切片分布在不同的机器上的,只要关系数据库中的表也能切分成多片并分在不同的数据库机器上,本质上和Hadoop/Hive就没有太大的区别。基于MySQL/PostgreSQL协议的OneProxy可以很方便地通过灵活的分片机制将数据打散到不同的服务器中,Spark去连接OneProxy就可以带动整个MySQL/PostgreSQL集群了。

spark2_over_oneproxy

我们认为关系数据库的操作能力要比Hadoop/Hive强,很多计算或汇总操作可以通过精心编写的SQL下放到数据库层实现,而Spark层只做二次汇总,来提升数据分析的性能;另外数据库上可以有多唯度的索引来过滤数据,使得Spark可以更有效地从下层获取要分析的基础数据。在这个结构下,小查询和大量的事务操作仍通过OneProxy来操作,而复杂的分析任务则交给Spark来执行,但数据只有一份,因此省去了数据同步的操作,只需要关心数据库主备之间的数据同步就行。

Spark下发的SQL通过OneProxy后,一样可以走读写分离,或仅从备库查询。OneProxy具有比Spark更好的数据分片机制,SQL下放到OneProxy后会做分区过滤(Partition Prune)操作,以减少数据扫描的范围。通过平民软件研发的OneProxy Connector for Spark就可以在MySQL集群上直接进行大数据分析了,只需要将jar包(下载页面)放到Spark(目前仅支持2.0.0)的jars目录下(同时将MySQL和PostgreSQL的JDBC驱动包也放到此目录)就完成安装了。

下面是一个Spark SQL的例子,其中“my_date”是一张使用OneProxy分片(Sharding)的表,如下所示:

spark-sql> CREATE TEMPORARY VIEW  my_date
         > USING com.onexsoft.oneproxy
         > OPTIONS (
         >   url    'jdbc:mysql://192.168.20.20:3307/test',
         >   user   'test',
         >   password   '123',
         >   dbtable 'my_date');
Time taken: 2.729 seconds
spark-sql> select * from my_date;
2016-08-02 00:00:00    	100
2016-09-01 01:00:00    	100
2016-10-01 01:00:00    	100
Time taken: 0.499 seconds, Fetched 3 row(s)

接下来是使用Scala在Spark Shell中的例子,如下所示:

scala> val oneproxy = com.onexsoft.oneproxy.SparkConn.mysql(
     |    "192.168.20.20:3307","test","123")
oneproxy: .....OneProxyContext = ....OneProxyContext@c212536

scala> val mydf = oneproxy.openTable(spark, "my_date")
mydf: org.apache.spark.sql.DataFrame = [id: timestamp, col2: int]

scala> mydf.show()
+--------------------+----+
|                  id|col2|
+--------------------+----+
|2016-08-02 00:00:...| 100|
|2016-09-01 01:00:...| 100|
|2016-10-01 01:00:...| 100|
+--------------------+----+

针对大表,分片的信息全部交给OneProxy来管理,OneProxy Connector会从OneProxy中获取分片信息,并在Spark中使用按OneProxy分片一致的并行策略。在前面的例子中虽然在Spark上没有定义分片,但实际上是并行执行的,并行度取决于OneProxy里定义的分片的个数。不再需要在Spark中定义JDBCRDD的“numPartitions / partitionColumn / lowerBound / upperBound”属性,也不要求SQL中要有两个绑定变量。“SparkConn”类定义了方便的连接OneProxy for MySQL或OneProxy for PostgreSQL的方法,返回的“OneProxyContext”对象主要实现了以下方法,以便你轻松地获取数据。详细用法如下所示:

  1. 构造参数需要传三个参数,分别是连接OneProxy的JDBC url串、用户名、口令。
  2. desc(SparkSession, sql:String)方法,根据SQL语句获取StructType类型(即字段结构信息)。
  3. query(SparkSession, sql:String)方法,根据SQL语句获取JdbcRDD结果,请注意这个返回的是RDD格式。
  4. openTable(SparkSession, tablename:String)方法,根据表名获取DataFrame结果
  5. openQuery(SparkSession, sql:String, StructType)方法,根据自定议的SQL语句获取DataFrame结果,实际上是先query,再调用SparkSession.createDataFrame方法将RDD转换成DataFrame返回。

OneProxy的多种数据分片算法(Range/List/Hash/Composite)和关系数据库中的多索引支持,可以避免Hadoop/Hive单个RowKey带来的一些数据访问缺限。提供的接口很简单,目前并不地持在Spark中写入数据到MySQL集群,写入操作经由OneProxy直接操作就可以了。OneProxy可以布署多台,再使用JDBC端的Load Balance功能以防单点连接,或每个Spark节点布署一个OneProxy实例,每个Spark只连接本地的OneProxy节点,以避免网络单点和热点。