Spark Tutorial
Spark Tutorial
1 Spark计算原理
RDD(Resilient Distributed Dataset)弹性分布式数据,其中Resilent使用replicate(冗余备份)。Spark使用sparkcontext.textfile加载本地文件或内存中的数据变成RDD,通过transformation操作定义新的RDD(懒计算,这里只是定义),最终使用action执行计算并存储到外部节点。
常见的Transformation包括map, filter, groupby, join等,输入RDD,返回RDD。
常见的Action包括count, collect, reduce, fold, aggregate等,输入RDD,以python(或scala等)对象的形式返回一个值或结果。
过程构造DAG计算依赖图,提交给DAGScheduler,DAGScheduler会将任务划分为多个stage,每个stage都是一个任务集。前部分的stages做Map, 后部分的stages做Reduce,中间使用shuffle连接。shuffle将Map的结果通过序列化、反序列化(CPU将对象和二进制之间的转换)、跨节点IO和磁盘IO等操作传递给Reduce。DAGScheduler计算得到各stages之间的依赖关系,并将任务集提交给TaskScheduler,TaskScheduler将认为分发给excutor运行,执行结果先反馈给TaskScheduler,然后反馈给DAGScheduler
DataFrame 大部分情况下, RDD还是用来对数据集进行基本的转换, 它的数据是非结构化的(以parallelize存储list集合为例,每个item以java对象的形式存储,不同的item在集群中分布式存储), DataFrame则更适合处理结构化的(以Row对象的形式在集群中分布式存储,并且Spark知道row对象的列名、列数据类型等),关系型数据(table in relational database)。DataFrame支持map, filter, aggregate, average, sum, SQL直接操作。Datafram可以通过调用.rdd直接转换为RDD数据。
2.1 Spark RDD API
Transformation
:
rdd.cache()
对RDD进行持久化,把它保存到内存中。当数据被反复访问时,进行持久化是十分必要的。 cache并不是action, 执行它的时候没有触发task。
rdd.map()
对RDD中每个元素执行一个指定的函数,生成一个新的RDD,任何原RDD中的元素在新的RDD中都有一个元素与之对应。
x = sc.parallelize([1,2,3])
y = x.map(lambda x: (x, x*100, x**2))
print(y.collect()) # ((1, 100, 1), (2, 200, 4), (3, 300, 9))
rdd.flatMap()
与map类似,只是将结果转为一维。
x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x, x*100, x**2))
print(y.collect()) # (1, 100, 1, 2, 200, 4, 3, 300, 9))
rdd.glom()
flattens elements on the same partition
x = sc.parallelize([1,2,3])
x.glom().collect() # [[1], [2, 3]]
rdd.mapPartitions()
map将指定函数应用于输入的RDD的每个元素,mapPartition则将输入函数应用于每个分区。
x = sc.parallelize([1,2,3])
def f(iterator):
yield sum(iterator)
y = x.mapPartitions(f)
x.glom().collect() # [[1], [2, 3]]
y.glom().collect() #[[1], [5]]
rdd.mapPartitionsWithIndex()
在mapPartitions的基础上加上partition index
x = sc.parallelize([1,2,3], 2)
def f(partitionIndex, iterator):
yield (partitionIndex, sum(iterator))
y = x.mapPartitions(f)
x.glom().collect() # [[1], [2, 3]]
y.glom().collect() #[[(0, 1)], [(1, 5)]]
rdd.getNumPartitions()
返回RDD的partition数目
x = sc.parallelize([1,2,3], 2)
print(x.getNumPartitions()) # 2
rdd.filter()
返回符合过滤条件的元素
x = sc.parallelize([1,2,3,4], 2)
y = x.filter(lambda x: x%2==1)
print(y.collect()) # [1,3]
rdd.distince()
去除重复元素
rdd.sample()
做抽样,第一个参数withReplacement=True/False表示是否有放回抽样, 第二个参数fraction表示比例,第三个参数设置seed
x = sc.parallelize(range(7))
ylist = [x.sample(False, 0.5) for i in range(5)]
for item in ylist():
print(item.collect())
# [0, 1, 2, 4, 6]
# [0, 1, 3, 6]
# [1, 5]
# [0, 1, 4, 5, 6]
# [1, 2, 3]
rdd.takeSample()
rdd.sample()的一种替代,第二参数为num表示抽样的数量。直接返回collection,而非rdd,因此不用再调用collect()
rdd.union()
合并两个rdd
x = sc.parallelize(range(4))
y = sc.parallelize(range(4, 7))
z = x.union(y)
print(z.collect()) # 1, 2, 3, 4, 5, 6,
rdd.intersection()
与union使用方法按类似,求两个rdd的共同元素
rdd.cartesian()
返回笛卡尔乘积 [1,2] -> [(1, 1), (1, 2), (2, 1), (2, 2)]
rdd.sorBy()
按照规则对元素排序
x = sc.parallelize(['Cat','Apple','Bat'])
y = x.sortBy(lambda x:x[0]) # sort by first character
print(y.collect()) # ['Apple', 'Bat', 'Cat']
rdd.sortByKey()
x = sc.parallelize([('B',1),('A',2),('C',3)])
y = x.sortByKey()
print(x.collect()) # [('B', 1), ('A', 2), ('C', 3)]
print(y.collect()) # [('A', 2), ('B', 1), ('C', 3)]
rdd.groupBy()
按照条件把元素归到各group
x = sc.parallelize(range(6))
rst = x.groupBy(lambda x:x%3).collect()
for k, v in rst:
print(k, sorted(v))
# (0, [0, 3])
# (2, [2, 5])
# (1, [1, 4])
rdd.pipe()
将元素传递给外部进程
sc.parallelize([1, 2, 3]).pipe('cat').collect()
# [u'1', u'2', u'3']
rdd.foreach()
Applies a funtion to all elements of this rdd, similar to map
rdd = sc.parallelize([1,2,3,4])
for f(item):
with open(filepath, 'a+') as wf:
wf.write(item)
y = x.foreach(f)
print(y) # foreach returns None
rdd.foreachPartition()
map-> mapPartitions 与 foreach -> foreachPartition的关系
Actions
:
reduce
将前两个元素传入函数,产生新的值,逐个将新的值和下个元素传入到函数中。
x = sc.parallelize([1,2,3])
y = x.reduce(lambda x, accumulated: x + accumulated)
print(y)
rdd.fold()
将同一个partition下的元素做聚合,然后再聚合各个partition
x = sc.parallelize([1,2,3])
neutral_zero_value = 0
y = x.fold(neutral_zero_value, lambda x, summ: x+summ)
print(y) # 6
rdd.aggregate()
reduce 和 fold都要求中间返回值与rdd中元素的数据类型保持一致,而aggregate则没有这个限制。
seqOp和comOp可以理解为先进行在partition内做使用seqOp做reduce操作,后使用comOp在partitions之间做操作。
# compute summation and production
x = sc.parallelize([2,3,4])
neutral_zero_value = (0, 1)
seqOp = lambda aggregated, elem: (aggregated[0]+elem, aggregated[1]*elem)
comOp = lambda aggregated, elem: (aggregated[0]+elem[0], aggregated[1]*elem[1])
rst = x.aggregate(neutral_zero_value, seqOp, comOp) #(9, 24)
rdd.top()
以list形式返回最大的k各元素
rdd.takeOrdered(nums=k)
返回最小的k个元素
rdd.take(num=k)
返回数据的前k个元素
rdd.first
返回第一个元素
rdd.histogram(buckets=k)
>>> x = sc.parallelize([1,3,1,2,3])
>>> y = x.histogram(buckets = 2)
>>> print(y)
([1, 2, 3], [2, 3]) # [1 2), [2, 3] 以2作为x axis分界值,count分别为2,3
other transformation
s:
rdd.collectAsMap()
输入rdd,返回dict
rdd.keys()
输入rdd, 返回keys组成的list
rdd.values()
同上,返回values
rdd.reduceByKey()
针对kv结构的数据,将相同的key的values放到一起,执行一个reduce操作
x = sc.parallelize([('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)])
y = x.reduceByKey(lambda agg, x: agg+x)
rdd.countByKey()
使用方法同上
join
inner join
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2), ("a", 3)])
>>> x.join(y).collect()
[('a', (1, 2)), ('a', (1, 3))]
combineByKey()
>>> x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
>>> createCombiner = (lambda el: [(el,el**2)])
>>> mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) # append to aggregated
>>> mergeComb = (lambda agg1,agg2: agg1 + agg2 ) # append agg1 with agg2
>>> y = x.combineByKey(createCombiner,mergeVal,mergeComb)
>>> y.collect()
[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]
aggregateByKey()
>>> x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
>>> createCombiner = (lambda el: [(el,el**2)])
>>> mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) # append to aggregated
>>> mergeComb = (lambda agg1,agg2: agg1 + agg2 ) # append agg1 with agg2
>>> y = x.combineByKey(createCombiner,mergeVal,mergeComb)
>>> y.collect()
[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]
foldByKey()
合并同一key下的values
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rdd.foldBykey(0, add).collect()
# [("a":2), ("b":1)]
mapValues()
对kv数据,对value进行map操作,不修改key
>>> x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])
>>> y = x.mapValues(lambda x: [i**2 for i in x])
>>> y.collect()
[('A', [1, 4, 9]), ('B', [16, 25])]
flatMapValues()
>>> x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])
>>> y = x.flatMapValues(lambda x: [i**2 for i in x])
>>> y.collect()
[('A', 1), ('A', 4), ('A', 9), ('B', 16), ('B', 25)]
groupWith()
>>> x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))])
>>> y = sc.parallelize([('B',(7,7)),('A',6),('D',(5,5))])
>>> z = sc.parallelize([('D',9),('B',(8,8))])
>>> a = x.groupWith(y,z)
>>> for k, v in list(a.collect()):
... print(k, [list(i) for i in v])
...
('B', [[(3, 3)], [(7, 7)], [(8, 8)]])
('D', [[], [(5, 5)], [9]])
('A', [[2, (1, 1)], [6], []])
('C', [[4], [], []])
keyBy()
>>> x = sc.parallelize([1,2,3])
>>> y = x.keyBy(lambda x: x**2)
>>> print(y.collect())
[(1, 1), (4, 2), (9, 3)]
zip()
x = sc.parallelize(['B','A','A'])
y = x.map(lambda x: ord(x))
z = x.zip(y)
>>> print(x.collect())
['B', 'A', 'A']
>>> print(y.collect())
[66, 65, 65]
>>> print(z.collect())
[('B', 66), ('A', 65), ('A', 65)]
write: saveAsTextFile
peopleDF.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")
2.2 Spark DataFrame API
将RDD转为Dataframe:1. 使用toDF()
这种方法不需要手动指定schema
def f(x):
data = dict()
data['name'] = x[0]
data['age'] = x[1]
return data
peopleDF = sc.textFile("file:///usr/local/spark/people.txt").map(lambda line: line.split(',')).map(lambda x: Row(**f(x))).toDF()
peopleDF = peopleDF.createOrReplaceTempView("people") # Creates or replaces a local temporary view (session based view) with this DataFrame.
将RDD转为Dataframe:2. 使用spark.createDataFrame(rowRDD, schema)
将RDD转化为Row(attributes[0], attributes[1])
,即一系列row对象,最后在指定schema
from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
fields = list(map(lambda filedName: StructField(fildName, StringType(), nullable=True), ['name', 'age']))
schema = StructType(fields)
rowRDD = sc.textFile("file:///usr/local/spark/people.txt").map(lambda line, line.split(',')).map(lambda attributes: Row(attributes[0], attributes[1]))
peopleDF = spark.createDataFrame(rowRDD, schema)
df.show(n=20), df.head(n=1), df.take(), df.limit()
show, head, take都是action, limit是transformation
take会将得到的数据返回driver端,
df.printScheme()
print out the schema in the tree format
df.select()
选择子列
df.select('*').collect()
df.select('name', 'age').collect()
df.select(df.name, (df.age+10).alias('age')).collect()
df = df.select(F.monotonically_increasing_id().alias('index'), "*") # add index column
df.col(“id”), df.apply(“id”)
只能获得一个column
df.withColumn()
used to adding a column or replacing the existing column that has the same name
df.withColumn('newColumn', df.oldColumn+2)
df.drop()
删除列,每次只能删除一列
df.filter()
df.filter(df.age>20).show()
df.groupBy(“age”).count().show()
df.agg()
df.agg() === df.groupBy.agg().collect()
df.agg({"age": "max", "salary": "avg"})
from pyspark.sql import functions as F
df.agg(F.min(df.age)).collect()
# select
df.agg(expr('count(distinct DIGITAL_CUSTOMER_ID)').alias('n_item')).collect()
df.select(expr('count(distinct DIGITAL_CUSTOMER_ID)').alias('n_item')).collect()
df.sort(df.age.desc(), df.name.asc()).show()
df.orderBy(df.age.desc) === df.orderBy(- df.age)
load & write
peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("csv").save("file:///usr/local/spark/mycode/newpeople.csv")
pyspark dataframe: 一些常用的行为
dataframe column to list
df.select('column').distinct().rdd.flatMap(lambda x: x).collect()
substitude to get_dummies in pandas
offers = email_df.select("OFFER_NAME").distinct().rdd.flatMap(lambda x: x).collect()
offers_expr = [F.when(F.col("OFFER_NAME") == ty, 1).otherwise(0).alias("OFFER_NAME_" + ty) for ty in offers]
email_df.select("*", *offers_expr).show()
count_values
df.groupby('col').count().sort(F.desc('count')).show(50, truncate=False)