Spark SQL 论文
Spark SQL: Relational Data Processing in Spark

背景

Spark SQL 整合了关系型处理、函数编程API,可以用SQL表示复杂的查询,它增加了两个不同之处:

  • 将申明API DataFrame 和 关系处理紧密的整合在一起了
  • 提供了一个高度优化,可扩展的框架 Catalyst

MapReduce可以提供很强的能力,但是太底层了,于是一系列申明式的,带优化的系统出现了
Hive、Pig、Dremel、Shark等等
但这些系统的不足以处理很多大数据场景

  • 很多用户需要执行ETL,他们的各种数据源是结构、半结构的
  • 用户想要执行更复杂的分析如 机器学习、图处理

有些系统则将 关系查询 和复杂的程序算法组合到一起,但是结合的并不好
于是用户只能再这两种范式上 二选一了
本文提供了一个新的组件:Spark SQL,它构建于早期的Shark之上,但不会强制用户对关系模型、程序API之间二选一
而是混合了这两种模型的特性

  • 提供了DataFreame API,可以在外部数据源、Spark内建的RDD之上执行关系操作,类似于R,但它是延迟执行的,所以可以
  • 为支持各种数据源和算法,提供了Catalyst,可以增加数据源、优化规则、数据类型用于处理某个领域,如机器学习

DataFrame的各种操作会通过Catalyst实现各种优化

  • 用scala的模式匹配来表示的
  • 提供分析、计划、运行时代码生成
  • 可扩展新的数据源、包括结构、半结构
  • 用户定义的函数、用户定义的类型(机器学习)
  • 函数式的语言非常适合构建编译器
  • 目标是将更多的功能整合到Spark SQL中,机器学习、图、流处理等

尽管 Shark 可以实现关系型的优化,但仍然面临三个问题

  • 只能用于查询hive catalog中的外部数据源,对Spark中的关系查询没法使用
  • 只能通过SQL字符串的方式调用,在模块化的程序中并不方便
  • Hive优化是定制化的很难扩展新的特性,如机器学习中的数据类型,或新的数据源

为了在RDD中扩展支持关系处理,Spark SQL需要实现如下目标

  • 支持关系处理,使用Spark程序、使用友好的API扩展数据源
  • 提供高性能的管理DBMS技术
  • 支持新的数据源,如半结构化的,以及扩展的数据源,支持联邦查询
  • 使扩展能支持更高级的分析算法,如图处理、机器学习

编程接口

DataFrame 类似关系数据库中的表,包含了schema,可以做更多的操作和优化
每个 DataFrame 对象都表示为一个 逻辑集合,并且是延迟计算的,可以计算一个数据集
比如下面这段:

1
2
3
4
ctx = new HiveContext ()
users = ctx.table (" users ")
young = users.where(users (" age ") < 21)
println(young.count ())

user、young 都是DataFrame,他们类似一个AST,而不是scala中的函数,他们都是一个逻辑计划
叫 DataFrame 的原因是,它有点类似于 python、R 中的结构化数据依赖库

对于下面这个操作,计算 女士雇员的数量

1
2
3
4
5
employees
.join(dept , employees (" deptId ") === dept ("id "))
.where( employees (" gender ") === "female ")
.groupBy(dept ("id"), dept (" name "))
.agg(count (" name "))

employees 是一个 DataFrame, employees(“ieptId”) 是 deptID列的表达式
表达式对象有各种操作,可以转为其他表达式
这些操作都构建为一个AST,并由Catalyst优化

逻辑计划是立刻生成的,所以如果用户 输入的SQL有误,比如表不存在,字段类型不合法就会立刻报错

DataFrame反射得到对象的类型,下面的usersRDD,就可以确定出name和age的类型

1
2
3
4
5
6
7
8
9
case class User(name: String , age: Int)
// Create an RDD of User objects
usersRDD = spark. parallelize (
List(User (" Alice", 22), User (" Bob", 19)))
// View the RDD as a DataFrame
usersDF = usersRDD.toDF

views = ctx.table (" pageviews ")
usersDF.join(views , usersDF (" name ") === views (" user "))

上述最后两段,读取hive表,和刚刚在内存中创建的对象做join

支持 cache(),可以做列式的压缩,如 字典编码,length长度编码等

很多数据库也支持 UDF,如PG,但是他们需要先用写某个实现,注册进去,然后再执行SQL查询
也就是创建UDF的过程,和查询的过程是分开的,调试不方便
DataFrame 允许创建和查询放在一起:

1
2
3
4
val model: LogisticRegressionModel = ...
ctx.udf. register (" predict",
(x: Float , y: Float) => model.predict(Vector(x, y)))
ctx.sql (" SELECT predict(age , weight) FROM users ")

Catalyst 优化

设计 catalyst的两个目标

  • 可以轻松的将新特性、优化技术加入到Spark SQL中,尤其是各种半结构数据和高级分析
  • 可以让开发者定义他们的规则,支持规则下推,支持RBO、CBO,可以增加数据源、数据类型

函数式语言的设计部分是为了构建编译器,scala非常适合这一点
之前想要开发一个 优化会很麻烦,使用 scala则很简单
x + (1 + 2) 对应的表达式为:

1
Add( Attribute (x), Add(Literal (1), Literal (2)))

它会被转换为一个 AST
之后对这个 AST应用一个规则

1
2
3
tree. transform {
case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
}

于是就被转换为 x + 3 可以将很多规则,用于这些AST树之上,编写任意的scala代码,实现模式匹配功能
模式匹配是很多函数语言的特性,可以提取出值的内嵌结构数据类型
通过模式匹配,可以将 树A 转为树B
模式匹配只包含部分功能,所以它只匹配一棵树的 子集
模式匹配只会匹配到那些能匹配上的,对那些不匹配的则会跳过,整个过程是自动的,递归下降的

当转换 达到了一个临界点时,就停止转换
这个临界点 相当于不动点法,从根出发遍历一次后,没有发生变化,那么就停止运行
对一颗树一般执行下面四个阶段

  • 分析一个逻辑计划,并解析器引用
  • 逻辑计划优化
  • 物理执行计划
  • 代码生成,编译部分查询 到 java字节码

第三阶段可能会生成多个计划,通过比较其 代码选择一个最合适的,其他的都是基于规则的

分析

  • Spark SQL开始于一个关系计算,如通过SQL转换为AST、或者DataFrame 对象构建而成
  • 一开始都是包含了未解析的逻辑计划
  • 查询 SELECT col FROM sales,会对表和列做绑定
  • 从catalog中查询关系表达式中的名字
  • 将名字做映射
  • 根据唯一ID,决定属性是否映射到相同的值,如 col = col
  • 强制类型转换,如 col + 1,无法判断是否正确,只有解析完类型后才能确定

逻辑优化

  • 执行一些经典的RBO规则,如常量折叠、谓词下推、列裁剪、null传播、bool表达式简化等
  • catalyst使用的是模式匹配方式,增加一个规则比较简单

增加一个优化规则,处理聚合函数中的 decimal类型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
object DecimalAggregates extends Rule[ LogicalPlan ] {
/** Maximum number of decimal digits in a Long */
val MAX_LONG_DIGITS = 18
def apply(plan: LogicalPlan ): LogicalPlan = {
	plan transformAllExpressions {
		case Sum(e @ DecimalType . Expression (prec , scale ))
		if prec + 10 <= MAX_LONG_DIGITS =>
			MakeDecimal (Sum( LongValue (e)), prec + 10, scale)
	}
}

物理执行计划

  • 逻辑计划会生成一个或者多个物理计划,然后通过CBO的方式选择一个代价最低的
  • 这篇论文所述,当时只将CBO用于join选择的场景
  • 物理阶段也有RBO的优化,如map操作的 管道投影、filter等

代码生成

  • catalyst依靠 quasiquotes 降低代码生成AST,然后让编译器生成字节码
  • 解释执行效率很低,需要遍历语法书,会有大量的分支跳转、虚函数调用

部分代码的编译生成方式:

1
2
3
4
5
6
def compile(node: Node ): AST = node match {
	case Literal(value) => q"$value"
	case Attribute (name) => q"row.get($name )"
	case Add(left , right) =>
		q"${compile(left )} + ${compile(right )}"
}

如上,一颗语法树:Add(Literal(1), Attribute("x")),会变成 1 + row.get("x")

数据类型

  • 实现自定义的数据类型,需要实现createRelation函数
  • 获取一系列key、value参数,以及返回BaseRelation对象
  • 成功加载后,就会返回一个schema和一个预估的字节数
  • TableScan返回一个表的所有列
  • PrunedScan只返回固定的列
  • PrunedFilteredScan,只返回固定的列,并带有下推,数据源根据filter只返回一些行

自定义类型

  • 用于机器学习、图计算等高级场景
  • 可以将自定义的对象,转换为Catalyst Row

一个自定义类型的例子

1
2
3
4
5
6
7
8
9
class PointUDT extends UserDefinedType [Point] {
	def dataType = StructType (Seq( // Our native structure
		StructField ("x", DoubleType ),
		StructField ("y", DoubleType )
	))
	def serialize (p: Point) = Row(p.x, p.y)
	def deserialize (r: Row) =
		Point(r. getDouble (0), r. getDouble (1))
}

高级分析特性

除了上述描述的,Catalyst框架还支持三个重要功能

  • 半结构化数据自动推断
  • 在机器学习场景下,提供更高级别的API
  • 联邦查询

半结构化数据推断

  • 大数据场景下的JSON使用频率很高,但有时候使用起来很麻烦,因为可能会增加字段
  • 有些人使用jackson库去做映射,将JSON转为Java对象,有的自己实现的很底层的转换
  • spark sql可以自动推断数据类型
  • 推断算法类似 推断XML类型,但不会执行任意深度的递归调用
  • 会尝试推断出各种类型,如果int32可以放下就是integer,不然就是long,再长就是decimal,万能类型string等等

机器学习场景的例子,用管道的方式提取特征,执行逻辑回归

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
data = <DataFrame of (text , label) records >
tokenizer = Tokenizer ()
. setInputCol (" text "). setOutputCol (" words ")
tf = HashingTF ()
. setInputCol (" words "). setOutputCol (" features ")
lr = LogisticRegression ()
. setInputCol (" features ")
pipeline = Pipeline (). setStages ([ tokenizer , tf , lr])
model = pipeline.fit(data)

联邦查询

  • 需要join不同数据源,比如提取用户的一些特性信息,这些信息存储在不同的地方
  • 正常来说是禁止的,或者查询成本很高
  • spark sql 可以用谓词下推的方式,将sql直接发送到mysql端

例子

1
2
3
4
5
6
7
8
9
CREATE TEMPORARY TABLE users USING jdbc
OPTIONS(driver "mysql" url "jdbc:mysql :// userDB/users ")
  
CREATE TEMPORARY TABLE logs
USING json OPTIONS (path "logs.json ")

SELECT users.id , users.name , logs.message
FROM users JOIN logs WHERE users.id = logs.userId
AND users. registrationDate > "2015 -01 -01"

之后会将mysql的部分提取出来,下推到mysql端执行

1
2
SELECT users.id , users.name FROM users
WHERE users. registrationDate > "2015 -01 -01"