Spark Tutorial

1 Spark计算原理

RDD(Resilient Distributed Dataset)弹性分布式数据,其中Resilent使用replicate(冗余备份)。Spark使用sparkcontext.textfile加载本地文件或内存中的数据变成RDD,通过transformation操作定义新的RDD(懒计算,这里只是定义),最终使用action执行计算并存储到外部节点。

常见的Transformation包括map, filter, groupby, join等,输入RDD,返回RDD。

常见的Action包括count, collect, reduce, fold, aggregate等,输入RDD,以python(或scala等)对象的形式返回一个值或结果。

过程构造DAG计算依赖图,提交给DAGScheduler,DAGScheduler会将任务划分为多个stage,每个stage都是一个任务集。前部分的stages做Map, 后部分的stages做Reduce,中间使用shuffle连接。shuffle将Map的结果通过序列化、反序列化(CPU将对象和二进制之间的转换)、跨节点IO和磁盘IO等操作传递给Reduce。DAGScheduler计算得到各stages之间的依赖关系,并将任务集提交给TaskScheduler,TaskScheduler将认为分发给excutor运行,执行结果先反馈给TaskScheduler,然后反馈给DAGScheduler

DataFrame 大部分情况下, RDD还是用来对数据集进行基本的转换, 它的数据是非结构化的(以parallelize存储list集合为例,每个item以java对象的形式存储,不同的item在集群中分布式存储), DataFrame则更适合处理结构化的(以Row对象的形式在集群中分布式存储,并且Spark知道row对象的列名、列数据类型等),关系型数据(table in relational database)。DataFrame支持map, filter, aggregate, average, sum, SQL直接操作。Datafram可以通过调用.rdd直接转换为RDD数据。

2.1 Spark RDD API

Transformation

rdd.cache()

对RDD进行持久化,把它保存到内存中。当数据被反复访问时,进行持久化是十分必要的。 cache并不是action, 执行它的时候没有触发task。

rdd.map()

对RDD中每个元素执行一个指定的函数,生成一个新的RDD,任何原RDD中的元素在新的RDD中都有一个元素与之对应。

x = sc.parallelize([1,2,3])
y = x.map(lambda x: (x, x*100, x**2))
print(y.collect()) # ((1, 100, 1), (2, 200, 4), (3, 300, 9))

rdd.flatMap()

与map类似,只是将结果转为一维。

x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x, x*100, x**2))
print(y.collect()) # (1, 100, 1, 2, 200, 4, 3, 300, 9))

rdd.glom()

flattens elements on the same partition

x = sc.parallelize([1,2,3])
x.glom().collect() # [[1], [2, 3]]

rdd.mapPartitions()

map将指定函数应用于输入的RDD的每个元素,mapPartition则将输入函数应用于每个分区。

x = sc.parallelize([1,2,3])
def f(iterator):
    yield sum(iterator)
y = x.mapPartitions(f)
x.glom().collect() # [[1], [2, 3]]
y.glom().collect() #[[1], [5]]

rdd.mapPartitionsWithIndex()

在mapPartitions的基础上加上partition index

x = sc.parallelize([1,2,3], 2)
def f(partitionIndex, iterator):
    yield (partitionIndex, sum(iterator))
y = x.mapPartitions(f)
x.glom().collect() # [[1], [2, 3]]
y.glom().collect() #[[(0, 1)], [(1, 5)]]

rdd.getNumPartitions()

返回RDD的partition数目

x = sc.parallelize([1,2,3], 2)
print(x.getNumPartitions()) # 2

rdd.filter()

返回符合过滤条件的元素


x = sc.parallelize([1,2,3,4], 2)
y = x.filter(lambda x: x%2==1)
print(y.collect()) # [1,3]

rdd.distince()

去除重复元素

rdd.sample()

做抽样,第一个参数withReplacement=True/False表示是否有放回抽样, 第二个参数fraction表示比例,第三个参数设置seed

x = sc.parallelize(range(7))
ylist = [x.sample(False, 0.5) for i in range(5)]

for item in ylist():
    print(item.collect())
# [0, 1, 2, 4, 6]
# [0, 1, 3, 6]
# [1, 5]
# [0, 1, 4, 5, 6]
# [1, 2, 3]

rdd.takeSample()

rdd.sample()的一种替代,第二参数为num表示抽样的数量。直接返回collection,而非rdd,因此不用再调用collect()

rdd.union()

合并两个rdd

x = sc.parallelize(range(4))
y = sc.parallelize(range(4, 7))
z = x.union(y)
print(z.collect()) # 1, 2, 3, 4, 5, 6, 

rdd.intersection()

与union使用方法按类似,求两个rdd的共同元素

rdd.cartesian()

返回笛卡尔乘积 [1,2] -> [(1, 1), (1, 2), (2, 1), (2, 2)]

rdd.sorBy()

按照规则对元素排序

x = sc.parallelize(['Cat','Apple','Bat'])
y = x.sortBy(lambda x:x[0]) # sort by first character
print(y.collect()) # ['Apple', 'Bat', 'Cat']

rdd.sortByKey()

x = sc.parallelize([('B',1),('A',2),('C',3)])
y = x.sortByKey()
print(x.collect()) # [('B', 1), ('A', 2), ('C', 3)]
print(y.collect()) # [('A', 2), ('B', 1), ('C', 3)]

rdd.groupBy()

按照条件把元素归到各group

x = sc.parallelize(range(6))
rst = x.groupBy(lambda x:x%3).collect()
for k, v in rst:
    print(k, sorted(v))
# (0, [0, 3])
# (2, [2, 5])
# (1, [1, 4])

rdd.pipe()

将元素传递给外部进程

sc.parallelize([1, 2, 3]).pipe('cat').collect()
# [u'1', u'2', u'3']

rdd.foreach()

Applies a funtion to all elements of this rdd, similar to map

rdd = sc.parallelize([1,2,3,4])
for f(item):
    with open(filepath, 'a+') as wf:
        wf.write(item)

y = x.foreach(f)
print(y) # foreach returns None

rdd.foreachPartition()

map-> mapPartitions 与 foreach -> foreachPartition的关系

Actions

reduce

将前两个元素传入函数,产生新的值,逐个将新的值和下个元素传入到函数中。

x = sc.parallelize([1,2,3])
y = x.reduce(lambda x, accumulated: x + accumulated)
print(y)

rdd.fold()

将同一个partition下的元素做聚合,然后再聚合各个partition

x = sc.parallelize([1,2,3])
neutral_zero_value = 0
y = x.fold(neutral_zero_value, lambda x, summ: x+summ)
print(y) # 6

rdd.aggregate()

reduce 和 fold都要求中间返回值与rdd中元素的数据类型保持一致,而aggregate则没有这个限制。
seqOp和comOp可以理解为先进行在partition内做使用seqOp做reduce操作,后使用comOp在partitions之间做操作。

# compute summation and production

x = sc.parallelize([2,3,4])
neutral_zero_value = (0, 1)
seqOp = lambda aggregated, elem: (aggregated[0]+elem, aggregated[1]*elem)
comOp = lambda aggregated, elem: (aggregated[0]+elem[0], aggregated[1]*elem[1])

rst = x.aggregate(neutral_zero_value, seqOp, comOp) #(9, 24)

rdd.top()

以list形式返回最大的k各元素

rdd.takeOrdered(nums=k)

返回最小的k个元素

rdd.take(num=k)

返回数据的前k个元素

rdd.first

返回第一个元素

rdd.histogram(buckets=k)

>>> x = sc.parallelize([1,3,1,2,3])
>>> y = x.histogram(buckets = 2)
>>> print(y)
([1, 2, 3], [2, 3]) # [1 2), [2, 3] 以2作为x axis分界值,count分别为2,3

other transformations:

rdd.collectAsMap()

输入rdd,返回dict

rdd.keys()

输入rdd, 返回keys组成的list

rdd.values()

同上,返回values

rdd.reduceByKey()

针对kv结构的数据,将相同的key的values放到一起,执行一个reduce操作

x = sc.parallelize([('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)])
y = x.reduceByKey(lambda agg, x: agg+x)

rdd.countByKey()

使用方法同上

join

inner join

>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2), ("a", 3)])
>>> x.join(y).collect()
[('a', (1, 2)), ('a', (1, 3))]

combineByKey()

>>> x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
>>> createCombiner = (lambda el: [(el,el**2)])
>>> mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) # append to aggregated
>>> mergeComb = (lambda agg1,agg2: agg1 + agg2 )  # append agg1 with agg2
>>> y = x.combineByKey(createCombiner,mergeVal,mergeComb)
>>> y.collect()
[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]

aggregateByKey()

>>> x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
>>> createCombiner = (lambda el: [(el,el**2)])
>>> mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) # append to aggregated
>>> mergeComb = (lambda agg1,agg2: agg1 + agg2 )  # append agg1 with agg2
>>> y = x.combineByKey(createCombiner,mergeVal,mergeComb)
>>> y.collect()
[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]

foldByKey()

合并同一key下的values

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.foldBykey(0, add).collect()
# [("a":2), ("b":1)]

mapValues()

对kv数据,对value进行map操作,不修改key

>>> x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])
>>> y = x.mapValues(lambda x: [i**2 for i in x])
>>> y.collect()
[('A', [1, 4, 9]), ('B', [16, 25])]

flatMapValues()

>>> x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])
>>> y = x.flatMapValues(lambda x: [i**2 for i in x])
>>> y.collect()
[('A', 1), ('A', 4), ('A', 9), ('B', 16), ('B', 25)]

groupWith()

>>> x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))])
>>> y = sc.parallelize([('B',(7,7)),('A',6),('D',(5,5))])
>>> z = sc.parallelize([('D',9),('B',(8,8))])
>>> a = x.groupWith(y,z)

>>> for k, v in list(a.collect()):
...     print(k, [list(i) for i in v])
...
('B', [[(3, 3)], [(7, 7)], [(8, 8)]])
('D', [[], [(5, 5)], [9]])
('A', [[2, (1, 1)], [6], []])
('C', [[4], [], []])

keyBy()

>>> x = sc.parallelize([1,2,3])
>>> y = x.keyBy(lambda x: x**2)
>>> print(y.collect())
[(1, 1), (4, 2), (9, 3)]

zip()

x = sc.parallelize(['B','A','A'])
y = x.map(lambda x: ord(x))
z = x.zip(y)
>>> print(x.collect())
['B', 'A', 'A']
>>> print(y.collect())
[66, 65, 65]
>>> print(z.collect())
[('B', 66), ('A', 65), ('A', 65)]

write: saveAsTextFile

peopleDF.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")

2.2 Spark DataFrame API

将RDD转为Dataframe:1. 使用toDF()

这种方法不需要手动指定schema

def f(x):
    data = dict()
    data['name'] = x[0]
    data['age'] = x[1]
    return data

peopleDF = sc.textFile("file:///usr/local/spark/people.txt").map(lambda line: line.split(',')).map(lambda x: Row(**f(x))).toDF()
peopleDF = peopleDF.createOrReplaceTempView("people") # Creates or replaces a local temporary view (session based view) with this DataFrame.

将RDD转为Dataframe:2. 使用spark.createDataFrame(rowRDD, schema)

将RDD转化为Row(attributes[0], attributes[1]),即一系列row对象,最后在指定schema

from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

fields = list(map(lambda filedName: StructField(fildName, StringType(), nullable=True), ['name', 'age']))
schema = StructType(fields)

rowRDD = sc.textFile("file:///usr/local/spark/people.txt").map(lambda line, line.split(',')).map(lambda attributes: Row(attributes[0], attributes[1]))

peopleDF = spark.createDataFrame(rowRDD, schema)

df.show(n=20), df.head(n=1), df.take(), df.limit()

show, head, take都是action, limit是transformation

take会将得到的数据返回driver端,

df.printScheme()

print out the schema in the tree format

df.select()

选择子列

df.select('*').collect()
df.select('name', 'age').collect()
df.select(df.name, (df.age+10).alias('age')).collect()
df = df.select(F.monotonically_increasing_id().alias('index'), "*") # add index column

df.col(“id”), df.apply(“id”)

只能获得一个column

df.withColumn()

used to adding a column or replacing the existing column that has the same name

df.withColumn('newColumn', df.oldColumn+2)

df.drop()

删除列,每次只能删除一列

df.filter()

df.filter(df.age>20).show()

df.groupBy(“age”).count().show()

df.agg()

df.agg() === df.groupBy.agg().collect()
df.agg({"age": "max", "salary": "avg"})

from pyspark.sql import functions as F
df.agg(F.min(df.age)).collect()

# select
df.agg(expr('count(distinct DIGITAL_CUSTOMER_ID)').alias('n_item')).collect() 
df.select(expr('count(distinct DIGITAL_CUSTOMER_ID)').alias('n_item')).collect() 

df.sort(df.age.desc(), df.name.asc()).show()

df.orderBy(df.age.desc) === df.orderBy(- df.age)

load & write

peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
 
peopleDF.select("name", "age").write.format("csv").save("file:///usr/local/spark/mycode/newpeople.csv")

pyspark dataframe: 一些常用的行为

dataframe column to list

df.select('column').distinct().rdd.flatMap(lambda x: x).collect()

substitude to get_dummies in pandas

offers = email_df.select("OFFER_NAME").distinct().rdd.flatMap(lambda x: x).collect()
offers_expr = [F.when(F.col("OFFER_NAME") == ty, 1).otherwise(0).alias("OFFER_NAME_" + ty) for ty in offers]
email_df.select("*", *offers_expr).show()

count_values

df.groupby('col').count().sort(F.desc('count')).show(50, truncate=False)