In my previous article, I am using scala to show usability of Spark RDD API. Many of us utilizing PySpark to work with RDD and Lambda functions. Though the function names and output is same what we have in Scala, syntax in Pyspark is different on RDD operations. I’ll explain here Pyspark RDD using a different approach and with a different perspective to solve the problem.

Let us consider we are streaming data using Spark and we have created RDD using this streaming application want to perform RDD operations on this stream of data in particular time interval. Here I am using PyCharm to run Pyspark code and code snippet is below:-

countByKey

import os
import findspark
import sys
findspark.init('/usr/hdp/2.5.6.0-40/spark')
try:
        from pyspark.streaming import StreamingContext
        from pyspark import SparkContext
        from pyspark import SparkConf
        from pyspark.sql import SQLContext
        from pyspark import RDD
        print ("Successfully imported Spark Modules")
except ImportError as e:
        print ("Can not import Spark Modules", e)
        sys.exit(1)
print ("Successfully imported Spark os.env")
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0 pyspark-shell'
sc = SparkContext(appName="PythonSparkStreaming")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc,10)
print('sc =================== {} {}');
sqlContext = SQLContext(sc)
print('RDD API...');
x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
y = x.countByKey()
print(x.collect())
print(y)

To give you an idea of RDD operations, I am not using ssc.start() and output is like below:-

/usr/bin/python2.7 /root/PycharmProjects/untitled/Deleteme.py
Successfully imported Spark Modules
Successfully imported Spark os.env
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/hdp/2.5.6.0-40/spark/lib/spark-assembly-1.6.3.2.5.6.0-40-hadoop2.7.3.2.5.6.0-40.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-streaming-kafka_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
	confs: [default]
..
	found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 2173ms :: artifacts dl 126ms
	:: modules in use:
	org.spark-project.spark#unused;1.0.0 from central in [default]
	org.xerial.snappy#snappy-java;1.1.2 from central in [default]
--------------------------------------------------------
|            modules            ||   artifacts   |
conf       | number| search|dwnlded|evicted|| number|dwnlded|
-------------------------------------------------------
default     |   10  |   0   |   0   |   0   ||   10  |   0   |
-------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
	confs: [default]
	0 artifacts copied, 10 already retrieved (0kB/112ms)
...
17/12/07 02:22:39 INFO EventLoggingListener: Logging events to hdfs:///spark-history/local-1512631357278
sc =================== {} {}
RDD API...
[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
defaultdict(<type 'int'>, {'A': 3, 'B': 2})
Process finished with exit code 0

join

x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])
y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)])
z = x.join(y)
print(x.collect())
print(y.collect())
print(z.collect())


OUTPUT>>
[('C', 4), ('B', 3), ('A', 2), ('A', 1)]
[('A', 8), ('B', 7), ('A', 6), ('D', 5)]
[('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('B', (3, 7))]

leftOuterJoin

x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])
y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)])
z = x.leftOuterJoin(y)
print(x.collect())
print(y.collect())
print(z.collect())
OUTPUT>>
[('C', 4), ('B', 3), ('A', 2), ('A', 1)]
[('A', 8), ('B', 7), ('A', 6), ('D', 5)]
[('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('C', (4, None)), ('B', (3, 7))]
Process finished with exit code 0

rightOuterJoin

x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])
y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)])
z = x.rightOuterJoin(y)
print(x.collect())
print(y.collect())
print(z.collect())
OUTPUT>>
[('C', 4), ('B', 3), ('A', 2), ('A', 1)]
[('A', 8), ('B', 7), ('A', 6), ('D', 5)]
[('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('D', (None, 5)), ('B', (3, 7))]

partitionBy

x = sc.parallelize([(0,1),(1,2),(2,3)],2)
y = x.partitionBy(numPartitions = 3, partitionFunc = lambda x: x)  # only key is passed to paritionFunc
print(x.glom().collect())
print(y.glom().collect())
OUTPUT>>
[[(0, 1)], [(1, 2), (2, 3)]]
[[(0, 1)], [(1, 2)], [(2, 3)]]
Process finished with exit code 0

combineByKey

x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
createCombiner = (lambda el: [(el,el**2)])
mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) # append to aggregated
mergeComb = (lambda agg1,agg2: agg1 + agg2 )  # append agg1 with agg2
y = x.combineByKey(createCombiner,mergeVal,mergeComb)
print(x.collect())
print(y.collect())
[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]

aggregateByKey

x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
zeroValue = [] # empty list is 'zero value' for append operation
mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)])
mergeComb = (lambda agg1,agg2: agg1 + agg2 )
y = x.aggregateByKey(zeroValue,mergeVal,mergeComb)
print(x.collect())
print(y.collect())
OUTPUT>>
[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]

foldByKey

x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
zeroValue = 1 # one is 'zero value' for multiplication
y = x.foldByKey(zeroValue,lambda agg,x: agg*x )  # computes cumulative product within each key
print(x.collect())
print(y.collect())
OUTPUT>>
[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
[('A', 60), ('B', 2)]

groupByKey

x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])
y = x.groupByKey()
print(x.collect())
print([(j[0],[i for i in j[1]]) for j in y.collect()])
OUTPUT>>
[('B', 5), ('B', 4), ('A', 3), ('A', 2), ('A', 1)]
[('A', [3, 2, 1]), ('B', [5, 4])]

flatMapValues

x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])
y = x.flatMapValues(lambda x: [i**2 for i in x]) # function is applied to entire value, then result is flattened
print(x.collect())
print(y.collect())
OUTPUT>>
[('A', (1, 2, 3)), ('B', (4, 5))]
[('A', 1), ('A', 4), ('A', 9), ('B', 16), ('B', 25)]
Process finished with exit code 0

mapValues

x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])
y = x.mapValues(lambda x: [i**2 for i in x]) # function is applied to entire value
print(x.collect())
print(y.collect())
OUTPUT>>
[('A', (1, 2, 3)), ('B', (4, 5))]
[('A', [1, 4, 9]), ('B', [16, 25])]

groupWith

x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))])
y = sc.parallelize([('B',(7,7)),('A',6),('D',(5,5))])
z = sc.parallelize([('D',9),('B',(8,8))])
a = x.groupWith(y,z)
print(x.collect())
print(y.collect())
print(z.collect())
print("Result:")
for key,val in list(a.collect()):
    print(key, [list(i) for i in val])
OUTPUT>>
[('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))]
[('B', (7, 7)), ('A', 6), ('D', (5, 5))]
[('D', 9), ('B', (8, 8))]
Result:
('B', [[(3, 3)], [(7, 7)], [(8, 8)]])
('D', [[], [(5, 5)], [9]])
('A', [[2, (1, 1)], [6], []])
('C', [[4], [], []])

cogroup

x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))])
y = sc.parallelize([('A',8),('B',7),('A',6),('D',(5,5))])
z = x.cogroup(y)
print(x.collect())
print(y.collect())
for key,val in list(z.collect()):
    print(key, [list(i) for i in val])
OUTPUT>>
[('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))]
[('A', 8), ('B', 7), ('A', 6), ('D', (5, 5))]
('A', [[2, (1, 1)], [8, 6]])
('D', [[], [(5, 5)]])
('C', [[4], []])
('B', [[(3, 3)], [7]])

sampleByKey

x = sc.parallelize([('A',1),('B',2),('C',3),('B',4),('A',5)])
y = x.sampleByKey(withReplacement=False, fractions={'A':0.5, 'B':1, 'C':0.2})
print(x.collect())
print(y.collect())
OUTPUT>>
[('A', 1), ('B', 2), ('C', 3), ('B', 4), ('A', 5)]
[('A', 1), ('B', 2), ('B', 4)]

subtractByKey

x = sc.parallelize([('C',1),('B',2),('A',3),('A',4)])
y = sc.parallelize([('A',5),('D',6),('A',7),('D',8)])
z = x.subtractByKey(y)
print(x.collect())
print(y.collect())
print(z.collect())
OUTPUT>>
[('C', 1), ('B', 2), ('A', 3), ('A', 4)]
[('A', 5), ('D', 6), ('A', 7), ('D', 8)]
[('C', 1), ('B', 2)]

subtract

x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])
y = sc.parallelize([('C',8),('A',2),('D',1)])
z = x.subtract(y)
print(x.collect())
print(y.collect())
print(z.collect())
OUTPUT>>
[('C', 4), ('B', 3), ('A', 2), ('A', 1)]
[('C', 8), ('A', 2), ('D', 1)]
[('C', 4), ('B', 3), ('A', 1)]

keyBy

x = sc.parallelize([1,2,3])
y = x.keyBy(lambda x: x**2)
print(x.collect())
print(y.collect())
OUTPUT>>
[1, 2, 3]
[(1, 1), (4, 2), (9, 3)]

repartition

x = sc.parallelize([1,2,3,4,5],2)
y = x.repartition(numPartitions=3)
print(x.glom().collect())
print(y.glom().collect())
OUTPUT>>
[[1, 2], [3, 4, 5]]
[[], [1, 2, 3, 4], [5]]

coalesce

x = sc.parallelize([1,2,3,4,5],2)
y = x.coalesce(numPartitions=1)
print(x.glom().collect())
print(y.glom().collect())
OUTPUT>>
[[1, 2], [3, 4, 5]]
[[1, 2, 3, 4, 5]]

zip

x = sc.parallelize(['B','A','A'])
# zip expects x and y to have same #partitions and #elements/partition
y = x.map(lambda x: ord(x))
z = x.zip(y)
print(x.collect())
print(y.collect())
print(z.collect())
OUTPUT>>
['B', 'A', 'A']
[66, 65, 65]
[('B', 66), ('A', 65), ('A', 65)]

zipWithIndex

x = sc.parallelize(['B','A','A'],2)
y = x.zipWithIndex()
print(x.glom().collect())
print(y.collect())
[['B'], ['A', 'A']]
[('B', 0), ('A', 1), ('A', 2)]

zipWithUniqueId

x = sc.parallelize(['B','A','A'],2)
y = x.zipWithUniqueId()
print(x.glom().collect())
print(y.collect())
OUTPUT>>
[['B'], ['A', 'A']]
[('B', 0), ('A', 1), ('A', 3)]
Process finished with exit code 0

Leave a Reply

Your email address will not be published. Required fields are marked *