I am planning to share my knowledge on Apache Spark RDD, Dataframes API and some tips and tricks. If I combine everything into one then it would be a very lengthy article. Therefore I am dividing the long article into three separate articles and this article is the first series in that continuation.

  1. Spark RDD API
  2. Dataframe API
  3. Tips and tricks on RDD API and Dataframe API.

Let us start with basics of RDD API. Resilient Distributed Dataset(RDD) is, essentially, the Spark representation of a set of data, spread across multiple machines, with APIs to let you act on it. RDD could come from any datasource, e.g. text files, JSON, csv file, a database via JDBC etc.

Here in demo I am using Scala prompt spark-shell to show usage of API like below:-

[root@victoria bin]# /usr/hdp/current/spark-client/bin/spark-shell
17/12/06 01:48:39 INFO SecurityManager: Changing view acls to: root
17/12/06 01:48:39 INFO SecurityManager: Changing modify acls to: root
17/12/06 01:48:39 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/12/06 01:48:41 INFO HttpServer: Starting HTTP Server
17/12/06 01:48:41 INFO Server: jetty-8.y.z-SNAPSHOT
17/12/06 01:48:41 INFO AbstractConnector: Started SocketConnector@https://www.linkedin.com/redir/invalid-link-page?url=0%2e0%2e0%2e0:46329
17/12/06 01:48:41 INFO Utils: Successfully started service 'HTTP class server' on port 46329.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.3
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_77)
Type in expressions to have them evaluated.
..
..
scala>

MAP:

Map performs a conversion operation on the dataset, each element of the RDD executes the function to generate new data.

map(f: T=>U) RDD [T] => RDD [U] One to one conversion

scala> val arr = Array((1,"a"),(2,"b"),(3,"c"),(4,"d"),(5,"e"),(6,"f"),(7,"g"))
arr: Array[(Int, String)] = Array((1,a), (2,b), (3,c), (4,d), (5,e), (6,f), (7,g))

scala> val rdd = sc.parallelize(arr).map(f=>("A"+f._1*10,f._2+"#"))
rdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[1] at map at <console>:23

scala> println(rdd.collect().mkString(","))
17/12/06 02:08:01 INFO SparkContext: Starting job: collect at <console>:26
17/12/06 02:08:01 INFO DAGScheduler: Got job 0 (collect at <console>:26) with 2 output partitions
17/12/06 02:08:04 INFO DAGScheduler: ResultStage 0 (collect at <console>:26) finished in 1.687 s
17/12/06 02:08:04 INFO DAGScheduler: Job 0 finished: collect at <console>:26, took 3.053065 s

(A10,a#),(A20,b#),(A30,c#),(A40,d#),(A50,e#),(A60,f#),(A70,g#)

flatMap

Same as map, but an element may generate multiple result data.

flatMap(f: T=>Seq[U]) RDD [T] => RDD [U] One to many conversion

flatMap(f: T=>Seq[U])	RDD [T] => RDD [U]	
One to many conversion

scala> val arr = Array("1#2#3","4#5","6")
17/12/06 02:10:59 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:44644 in memory (size: 1208.0 B, free: 511.1 MB)
17/12/06 02:10:59 INFO ContextCleaner: Cleaned accumulator 1
arr: Array[String] = Array(1#2#3, 4#5, 6)

scala> val rdd = sc.parallelize(arr).flatMap(f=>f.split("#"))
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at <console>:23

scala> println(rdd.collect().mkString(","))
1,2,3,4,5,6

Filter

Filtering operation returns data whose function f is true

filter(f: T=>Bool) RDD [T] => RDD [T] filter

scala> val arr = Array("1#2#3","4#5","6")
arr: Array[String] = Array(1#2#3, 4#5, 6)

scala> val rdd = sc.parallelize(arr).filter(f=>f.length>=3)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at filter at <console>:23

scala> println(rdd.collect().mkString(","))
1#2#3,4#5

Distinct

Go heavy operation.

scala> val arr = Array(1,2,3,2,3,4,5,4,6)
arr: Array[Int] = Array(1, 2, 3, 2, 3, 4, 5, 4, 6)

scala> val rdd = sc.parallelize(arr).distinct()
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at distinct at <console>:23

scala> println(rdd.collect().mkString(","))
4,6,2,1,3,5

Sample/Random sampling.

If the first parameter is true, there may be duplicate elements, if it is false, there will be no duplicate elements; the second parameter takes the value of [0,1], the last number of data is about equal to the second parameter Multiplied by the total number; the third parameter is a random factor.

scala> val arr = 1 to 20
arr: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

scala> val rdd = sc.parallelize(arr,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:23

scala> val a = rdd.sample(true,0.5,10)
a: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[11] at sample at <console>:25

scala> val b = rdd.sample(false,0.5,10)
b: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[12] at sample at <console>:25

scala> println("a:"+a.collect().mkString(","))
a:2,7,11,12,12,15,15,18

scala> println("b:"+b.collect().mkString(","))
b:7,8,11,13,15,16,17,18,19,20

groupByKey

The same key value together

scala> val arr = Array((1,"a"),(2,"b"),(2,"c"),(1,"b"),(3,"c"),(4,"d"),(2,"d"))
arr: Array[(Int, String)] = Array((1,a), (2,b), (2,c), (1,b), (3,c), (4,d), (2,d))

scala> val rdd = sc.parallelize(arr,3)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[13] at parallelize at <console>:23

scala> val a = rdd.groupByKey()
a: org.apache.spark.rdd.RDD[(Int, Iterable[String])] = ShuffledRDD[14] at groupByKey at <console>:25

scala> println(a.collect().mkString(","))
(3,CompactBuffer(c)),(4,CompactBuffer(d)),(1,CompactBuffer(a, b)),(2,CompactBuffer(b, c, d))

combineByKey

Perform processing and aggregation on elements of the same key value. CombineByKey three parameters createCombiner: V => C, the first element of the same key when entering mergeValue: (C, V) => C, the same element of the Nth N key (N ==2) When merged with the first function result merging function mergeCombiners: (C, C) => C, the same key different partition generated by the merger function of the second function result.

scala> val arr = Array((1,"a"),(2,"c"),(2,"b"),(1,"b"),(3,"c"),(4,"d"),(2,"d"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,b), (1,b), (3,c), (4,d), (2,d))

scala> val rdd = sc.parallelize(arr,3)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[15] at parallelize at <console>:23

scala> val a = rdd.combineByKey(f=>new StringBuffer(f),(a:StringBuffer,b:String)=>(a.append(b)),(c:StringBuffer,d:StringBuffer)=>(c.append(d)))
a: org.apache.spark.rdd.RDD[(Int, StringBuffer)] = ShuffledRDD[16] at combineByKey at <console>:39

scala>

scala> println(a.collect().mkString(","))
(3,c),(4,d),(1,ab),(2,cbd)

reduceByKey

Reduce the same key implementation

scala> val arr = Array((1,"a"),(2,"c"),(2,"b"),(1,"b"),(3,"c"),(4,"d"),(2,"d"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,b), (1,b), (3,c), (4,d), (2,d))

scala> val rdd = sc.parallelize(arr,3)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[17] at parallelize at <console>:23

scala> val a = rdd.reduceByKey((a,b)=>a+"|"+b)
a: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[18] at reduceByKey at <console>:39

scala> println(a.collect().mkString(","))
(3,c),(4,d),(1,a|b),(2,c|b|d)

union

Two RDD aggregates

scala> val arr = Array((1,"a"),(2,"c"),(2,"b"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,b))

scala> val arr2 = Array((1,"b"),(3,"c"),(4,"d"),(2,"d"))
arr2: Array[(Int, String)] = Array((1,b), (3,c), (4,d), (2,d))

scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[19] at parallelize at <console>:23

scala> val rdd2 = sc.parallelize(arr2)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[20] at parallelize at <console>:23

scala> val a = rdd1.union(rdd2)
a: org.apache.spark.rdd.RDD[(Int, String)] = UnionRDD[21] at union at <console>:29

scala> println(a.collect().mkString(","))
(1,a),(2,c),(2,b),(1,b),(3,c),(4,d),(2,d)

join

Two RDD key, join contains join, fullOuterJoin, leftOuterJoin, rightOuterJoin and other methods, the usage is similar to the SQL operation.

scala> val arr = Array((1,"a"),(2,"c"),(2,"b"),(5,"g"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,b), (5,g))

scala> val arr2 = Array((1,"B"),(3,"C"),(4,"D"),(2,"D"),(2,"E"))
arr2: Array[(Int, String)] = Array((1,B), (3,C), (4,D), (2,D), (2,E))

scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[22] at parallelize at <console>:23

scala> val rdd2 = sc.parallelize(arr2)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at parallelize at <console>:23

scala> val a = rdd1.join(rdd2)
a: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[26] at join at <console>:29

scala> val b = rdd1.fullOuterJoin(rdd2)
b: org.apache.spark.rdd.RDD[(Int, (Option[String], Option[String]))] = MapPartitionsRDD[29] at fullOuterJoin at <console>:29

scala> val c = rdd1.leftOuterJoin(rdd2)
c: org.apache.spark.rdd.RDD[(Int, (String, Option[String]))] = MapPartitionsRDD[32] at leftOuterJoin at <console>:29

scala> val d = rdd1.rightOuterJoin(rdd2)
d: org.apache.spark.rdd.RDD[(Int, (Option[String], String))] = MapPartitionsRDD[35] at rightOuterJoin at <console>:29

scala> println("join:"+a.collect().mkString(","))
join:(1,(a,B)),(2,(c,D)),(2,(c,E)),(2,(b,D)),(2,(b,E))

scala> println("fullOuterJoin:"+b.collect().mkString(","))
fullOuterJoin:(3,(None,Some(C))),(4,(None,Some(D))),(1,(Some(a),Some(B))),(5,(Some(g),None)),(2,(Some(c),Some(D))),(2,(Some(c),Some(E))),(2,(Some(b),Some(D))),(2,(Some(b),Some(E)))

scala> println("leftOuterJoin:"+c.collect().mkString(","))
leftOuterJoin:(1,(a,Some(B))),(5,(g,None)),(2,(c,Some(D))),(2,(c,Some(E))),(2,(b,Some(D))),(2,(b,Some(E)))

scala> println("rightOuterJoin:"+d.collect().mkString(","))
rightOuterJoin:(3,(None,C)),(4,(None,D)),(1,(Some(a),B)),(2,(Some(c),D)),(2,(Some(c),E)),(2,(Some(b),D)),(2,(Some(b),E))

cartesian

Cartesian product operation

scala> val arr = Array((1,"a"),(2,"c"))
arr: Array[(Int, String)] = Array((1,a), (2,c))

scala> val arr2 = Array((3,"C"),(4,"D"),(5,"E"))
arr2: Array[(Int, String)] = Array((3,C), (4,D), (5,E))

scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[50] at parallelize at <console>:23

scala> val rdd2 = sc.parallelize(arr2)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[51] at parallelize at <console>:23

scala> val a = rdd1.cartesian(rdd2)
a: org.apache.spark.rdd.RDD[((Int, String), (Int, String))] = CartesianRDD[52] at cartesian at <console>:29

scala> println(a.collect().mkString("\n"))
((1,a),(3,C))
((1,a),(4,D))
((1,a),(5,E))
((2,c),(3,C))
((2,c),(4,D))
((2,c),(5,E))

cogroup

Multiple RDD aggregations : Output show that if the key does not match will default to a null value

scala> val arr = Array((1,"a"),(2,"c"))
arr: Array[(Int, String)] = Array((1,a), (2,c))

scala> val arr2 = Array((1,"B"),(3,"C"))
arr2: Array[(Int, String)] = Array((1,B), (3,C))

scala> val arr3 = Array((1,"C"),(3,"D"))
arr3: Array[(Int, String)] = Array((1,C), (3,D))

scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[36] at parallelize at <console>:23

scala> val rdd2 = sc.parallelize(arr2)
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[37] at parallelize at <console>:23

scala> val rdd3 = sc.parallelize(arr3)
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[38] at parallelize at <console>:23

scala> val  a = rdd1.cogroup (rdd2, rdd3) // Support 1-3 RDD parameters
a: org.apache.spark.rdd.RDD[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = MapPartitionsRDD[40] at cogroup at <console>:33

scala> println(a.collect().mkString(","))
(3,(CompactBuffer(),CompactBuffer(C),CompactBuffer(D))),(1,(CompactBuffer(a),CompactBuffer(B),CompactBuffer(C))),(2,(CompactBuffer(c),CompactBuffer(),CompactBuffer()))

mapValue

The value of the key, value structure data (two tuples) implementation of the map

scala> val arr = Array((1,"a"),(2,"c"),(2,"b"),(5,"g"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,b), (5,g))

scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[41] at parallelize at <console>:23

scala> val a = rdd1.mapValues(f=>f.toUpperCase())
a: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[42] at mapValues at <console>:25

scala> println(a.collect().mkString(","))

(1,A),(2,C),(2,B),(5,G)

sort

RDD sort sortBy and sortByKey and other methods.

scala> val arr = Array((1,"a"),(2,"c"),(3,"b"),(5,"g"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (3,b), (5,g))

scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[43] at parallelize at <console>:23

scala> val a = rdd1.sortByKey(true)

scala> println(a.collect().mkString(","))
(1,a),(2,c),(3,b),(5,g)

scala> val b = rdd1.sortByKey(false)

scala> println(b.collect().mkString(","))
(5,g),(3,b),(2,c),(1,a)

count

count returns the length of the RDD

scala> val arr = Array((1,"a"),(2,"c"),(2,"c"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,c))

scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[53] at parallelize at <console>:23

scala> val a = rdd1.count()
a: Long = 3

scala> println(a)
3

collect

collect will trigger the execution of the data and generate the result set

reduce

scala> val arr = Array((1,"a"),(2,"c"),(2,"D"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,D))

scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[54] at parallelize at <console>:23

scala> val a = rdd1.reduce((a,b)=>(a._1+b._1,a._2+b._2))
a: (Int, String) = (5,aDc)

scala> println(a)
(5,aDc)

lookup

lookup query key for the participation of all data sets

scala> val arr = Array((1,"a"),(2,"c"),(2,"D"))
arr: Array[(Int, String)] = Array((1,a), (2,c), (2,D))

scala> val rdd1 = sc.parallelize(arr,3)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[55] at parallelize at <console>:23

scala> val a = rdd1.lookup(2)
a: Seq[String] = WrappedArray(c, D)

scala> println(a.mkString(","))
c,D

zip

The two RDD fight together. Note that zip must have the same number of partitions for both RDDs and the same length.

scala> val a = Array(1,2,3,4,5,6,7,8,9)
a: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> val b = Array("a","b","c","d","e","f","g","h","i")
b: Array[String] = Array(a, b, c, d, e, f, g, h, i)

scala> val m1 = sc.parallelize(a,3)
m1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[60] at parallelize at <console>:23

scala> val m2 = sc.parallelize(b,3)
m2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[61] at parallelize at <console>:23

scala> val m3 = m1.zip(m2)
m3: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[62] at zip at <console>:29

scala> println("m3:"+m3.collect().mkString(","))
m3:(1,a),(2,b),(3,c),(4,d),(5,e),(6,f),(7,g),(8,h),(9,i)

scala> val m4 = m1.zipWithIndex()
m4: org.apache.spark.rdd.RDD[(Int, Long)] = ZippedWithIndexRDD[63] at zipWithIndex at <console>:25

scala> println("m4:"+m4.collect().mkString(","))
m4:(1,0),(2,1),(3,2),(4,3),(5,4),(6,5),(7,6),(8,7),(9,8)

scala> val m5 = m1.zipWithUniqueId()
m5: org.apache.spark.rdd.RDD[(Int, Long)] = MapPartitionsRDD[64] at zipWithUniqueId at <console>:25

scala> println("m5:"+m5.collect().mkString(","))
m5:(1,0),(2,3),(3,6),(4,1),(5,4),(6,7),(7,2),(8,5),(9,8)

subtract

Calculate the remainder of two RDDs

scala> val a = Array(1,2,3,4,5,6,7,8,9)
a: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> val b = Array(3,4)
b: Array[Int] = Array(3, 4)

scala> val m1 = sc.parallelize(a,3)
m1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:23

scala> val m2 = sc.parallelize(b,2)
m2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[66] at parallelize at <console>:23

scala> val m3 = m1.subtract(m2)
m3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[70] at subtract at <console>:29

scala> println("m3:"+m3.collect().mkString(","))
m3:6,9,1,7,2,5,8

scala> val a = Array(1,2,3,4,5,6,7,8,9)
a: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> val b = Array(3,4)
b: Array[Int] = Array(3, 4)

scala> val m1 = sc.parallelize(a,3)
m1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:23

scala> val m2 = sc.parallelize(b,2)
m2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[66] at parallelize at <console>:23

scala> val m3 = m1.subtract(m2)
m3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[70] at subtract at <console>:29

scala> println("m3:"+m3.collect().mkString(","))
m3:6,9,1,7,2,5,8

coalesce/repartition

scala> val  rdd = sc.parallelize ( Array ( 1 , 2 , 3 , 4 , 5 , 6 ), 3 )
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at <console>:21

scala> val  rdd1 = rdd.coalesce ( 1 , false )
rdd1: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[72] at coalesce at <console>:23

scala> val  rdd2 = rdd.repartition ( 1 )
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[76] at repartition at <console>:23

scala> println("rdd:"+rdd.partitions.length)
rdd:3

scala> println("rdd1:"+rdd1.partitions.length)
rdd1:1

scala> println("rdd2:"+rdd2.partitions.length)
rdd2:1

continue…

Leave a Reply

Your email address will not be published. Required fields are marked *