Hadoop

Demo Delta Lake on big data workloads…

First what’s the difference between a Delta Lake and Change Data Capture?

CDC is just the log of changes on a relational table. Delta Lake is to provide more native administrative capabilities to a data lake implementation (schemas, transactions, cataloging). Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark™ and big data workloads. Delta Lake speed up incremental data loads into Delta Lake using File Metadata.

For more information on Delta Lake you can visit here. Now lets start with Demo assuming you have gained some knowledge about Data Lake and Spark Delta Lake.

The Delta Lake package is available as with the –packages option. Run below command on your Spark node to install required packages:-

pyspark --packages io.delta:delta-core_2.11:0.4.0 --conf "spark.databricks.delta.retentionDurationCheck.enabled=false" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
[root@namenode bin]# pyspark --packages io.delta:delta-core_2.11:0.4.0 --conf "spark.databricks.delta.retentionDurationCheck.enabled=false" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
Python 3.5.1 (default, May 29 2019, 14:49:13)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-23)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/spark-2.4.0-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
io.delta#delta-core_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-43e14ff6-417d-4234-a961-9ca705e1f577;1.0
        confs: [default]
        found io.delta#delta-core_2.11;0.4.0 in central
        found org.antlr#antlr4;4.7 in central
        found org.antlr#antlr4-runtime;4.7 in central
        found org.antlr#antlr-runtime;3.5.2 in central
        found org.antlr#ST4;4.0.8 in central
        found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
        found org.glassfish#javax.json;1.0.4 in central
        found com.ibm.icu#icu4j;58.2 in central
downloading https://repo1.maven.org/maven2/io/delta/delta-core_2.11/0.4.0/delta-core_2.11-0.4.0.jar ...
        [SUCCESSFUL ] io.delta#delta-core_2.11;0.4.0!delta-core_2.11.jar (9446ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4/4.7/antlr4-4.7.jar ...
        [SUCCESSFUL ] org.antlr#antlr4;4.7!antlr4.jar (5242ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr4-runtime/4.7/antlr4-runtime-4.7.jar ...
        [SUCCESSFUL ] org.antlr#antlr4-runtime;4.7!antlr4-runtime.jar (2768ms)
downloading https://repo1.maven.org/maven2/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar ...
        [SUCCESSFUL ] org.antlr#antlr-runtime;3.5.2!antlr-runtime.jar (1198ms)
downloading https://repo1.maven.org/maven2/org/antlr/ST4/4.0.8/ST4-4.0.8.jar ...
        [SUCCESSFUL ] org.antlr#ST4;4.0.8!ST4.jar (1008ms)
downloading https://repo1.maven.org/maven2/org/abego/treelayout/org.abego.treelayout.core/1.0.3/org.abego.treelayout.core-1.0.3.jar ...
        [SUCCESSFUL ] org.abego.treelayout#org.abego.treelayout.core;1.0.3!org.abego.treelayout.core.jar(bundle) (477ms)
downloading https://repo1.maven.org/maven2/org/glassfish/javax.json/1.0.4/javax.json-1.0.4.jar ...
        [SUCCESSFUL ] org.glassfish#javax.json;1.0.4!javax.json.jar(bundle) (504ms)
downloading https://repo1.maven.org/maven2/com/ibm/icu/icu4j/58.2/icu4j-58.2.jar ...
        [SUCCESSFUL ] com.ibm.icu#icu4j;58.2!icu4j.jar (42963ms)
:: resolution report :: resolve 26749ms :: artifacts dl 63614ms
        :: modules in use:
        com.ibm.icu#icu4j;58.2 from central in [default]
        io.delta#delta-core_2.11;0.4.0 from central in [default]
        org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
        org.antlr#ST4;4.0.8 from central in [default]
        org.antlr#antlr-runtime;3.5.2 from central in [default]
        org.antlr#antlr4;4.7 from central in [default]
        org.antlr#antlr4-runtime;4.7 from central in [default]
        org.glassfish#javax.json;1.0.4 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   8   |   8   |   8   |   0   ||   8   |   8   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-43e14ff6-417d-4234-a961-9ca705e1f577
        confs: [default]
        8 artifacts copied, 0 already retrieved (15322kB/47ms)
2020-06-05 12:51:03 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/
Using Python version 3.5.1 (default, May 29 2019 14:49:13)
SparkSession available as 'spark'.
>>>

Sample dataset used in this demo can be downloaded from GitHub here.

I am using the containerzed HDP cluster that I have created in my previous blog. In case you dont have cluster running you can download from DockerHub or my git repo

Lets assume you have cluster up and running, now we move above sample dataset file to any of the node of the cluster where the spark client or spark master application.

docker cp departuredelays.csv.gz namenode:/mukesh/

Now we unzip this sample data file and place it to hdfs location.

[root@namenode mukesh]# gzip -d departuredelays.csv.gz
[root@namenode mukesh]# ls -ltr
total 32616
-rwxr-xr-x 1 root root 33396236 Jun  5 12:52 departuredelays.csv
[root@namenode mukesh]# hadoop fs -copyFromLocal departuredelays.csv /apps/landingzone/
[root@namenode mukesh]# hadoop fs -ls /apps/landingzone/
Found 1 items
-rw-r--r--   1 root supergroup   33396236 2020-06-05 13:06 /apps/landingzone/departuredelays.csv

Define path variables on pySpark shell

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/
Using Python version 3.5.1 (default, May 29 2019 14:49:13)
SparkSession available as 'spark'.
>>>
>>>tripdelaysFilePath = "/apps/landingzone/departuredelays.csv"
>>>pathToEventsTable = "/apps/pathtoeventtable/departuredelays.delta"
>>>file_type = "csv"

Using Spark dataframe read the dataset.

>>> df = spark.read.format(file_type) \
... .option("header", "true") \
... .option("inferSchema", "true") \
... .csv(tripdelaysFilePath)

By saving this table to Delta Lake storage(Notice .format(“delta”) in below command), we will be able to take advantage of its features including ACID transactions, unified batch and streaming, and time travel.

>>> df \
.write \
.format("delta") \
.mode("overwrite") \
.save("/apps/pathtoeventtable/departuredelays.delta")

If you were to take a look at the underlying file system, you will notice four files created for the departureDelays Delta Lake table in parquet format which is format for Delta lake works.

[root@namenode /]# hadoop fs -ls /apps/pathtoeventtable/departuredelays.delta/
Found 3 items
drwxr-xr-x   - root supergroup          0 2020-06-06 08:42 /apps/pathtoeventtable/departuredelays.delta/_delta_log
-rw-r--r--   1 root supergroup    3953962 2020-06-06 08:42 /apps/pathtoeventtable/departuredelays.delta/part-00000-6a28aa9e-4e1d-4fcf-b73c-e3c08a331688-c000.snappy.parquet
-rw-r--r--   1 root supergroup    3087901 2020-06-06 08:42 /apps/pathtoeventtable/departuredelays.delta/part-00001-9bebb846-1cd1-4def-acb5-8c356581d939-c000.snappy.parquet

Now, let’s reload the data but this time our DataFrame will be backed by Delta Lake.

>>> delays_delta = spark \
.read \
.format("delta") \
.load("/apps/pathtoeventtable/departuredelays.delta")

Check the count

>>> delays_delta.count()
1391578

From the DataFrame, create a temporary view. When you don’t want to register a table, you can use a temporary view to work with, but it is accessible only from the notebook where it was created. If you don’t need it, you can skip this step.

>>> delays_delta.createOrReplaceTempView("vw_delays_delta")

Lets query the data using above view

>>> spark.sql("select count(1) from vw_delays_delta where origin = 'SEA' and destination = 'SFO'").show()
+--------+
|count(1)|
+--------+
|    1698|
+--------+

Save DataFrame as a Delta Table. This data can be accessed from all the notebooks in your workspace.

>>> permanent_table_name = "tbl_delays_delta"
>>> delays_delta.write.format("delta").saveAsTable(permanent_table_name)

Data Deletion:-

To delete data from your traditional Data Lake table, you will need to:

1. Select all of the data from your table not including the rows you want to delete

2. Create a new table based on the previous query

3. Delete the original table

4. Rename the new table to the original table name for downstream dependencies.

Instead of performing all of these steps, with Delta Lake, we can simplify this process by running a DELETE statement. To show this, let’s delete all of the flights that had arrived early or on-time (i.e. delay < 0).

>>> pathToEventsTable = "/apps/pathtoeventtable/departuredelays.delta"
>>> from delta.tables import *
>>> from pyspark.sql.functions import *
>>> deltaTable = DeltaTable.forPath(spark, pathToEventsTable)
>>> deltaTable.delete("delay < 0")
>>> delays_delta.count()
722849
>>> spark.sql("select count(1) from vw_delays_delta where origin = 'SEA' and destination = 'SFO'").show()
+--------+
|count(1)|
+--------+
|     837|
+--------+

Lets query the underlying filesystem

[root@namenode /]# hadoop fs -ls /apps/pathtoeventtable/departuredelays.delta/
Found 5 items
drwxr-xr-x   - root supergroup          0 2020-06-06 08:49 /apps/pathtoeventtable/departuredelays.delta/_delta_log
-rw-r--r--   1 root supergroup    2603126 2020-06-06 08:49 /apps/pathtoeventtable/departuredelays.delta/part-00000-27c3618c-0aa5-4287-8161-c5c0d187529f-c000.snappy.parquet
-rw-r--r--   1 root supergroup    3953962 2020-06-06 08:42 /apps/pathtoeventtable/departuredelays.delta/part-00000-6a28aa9e-4e1d-4fcf-b73c-e3c08a331688-c000.snappy.parquet
-rw-r--r--   1 root supergroup    3087901 2020-06-06 08:42 /apps/pathtoeventtable/departuredelays.delta/part-00001-9bebb846-1cd1-4def-acb5-8c356581d939-c000.snappy.parquet
-rw-r--r--   1 root supergroup    1811471 2020-06-06 08:49 /apps/pathtoeventtable/departuredelays.delta/part-00001-eb940dee-9e70-4bad-ad70-339d9c27dd67-c000.snappy.parquet

In traditional data lakes, deletes are performed by re-writing the entire table excluding the values to be deleted. With Delta Lake, deletes instead are performed by selectively writing new versions of the files containing the data be deleted and only marks the previous files as deleted. This is because Delta Lake uses multiversion concurrency control to do atomic operations on the table: for example, while one user is deleting data, another user may be querying the previous version of the table. This multi-version model also enables us to travel back in time (i.e. time travel) and query previous versions as we will see later.

Data Update

To update data from your traditional Data Lake table, you will need to:

1. Select all of the data from your table not including the rows you want to modify

2. Modify the rows that need to be updated/changed

3. Merge these two tables to create a new table

4. Delete the original table

5. Rename the new table to the original table name for downstream dependencies.

Instead of performing all of these steps, with Delta Lake, we can simplify this process by running an UPDATE statement. To show this, let’s update all of the flights originating from Detroit to Seattle.

>>> deltaTable.update("origin = 'DTW'", { "origin": "'SEA'" } ) 
>>> spark.sql("select count(1) from vw_delays_delta where origin = 'SEA' and destination = 'SFO'").show()
+--------+
|count(1)|
+--------+
|     986|
+--------+

Data Merge

A common scenario when working with a data lake is to continuously append data to your table. This often results in duplicate data (rows you do not want inserted into your table again), new rows that need to be inserted, and some rows that need to be updated. With Delta Lake, all of this can be achieved by using the merge operation (similar to the SQL MERGE statement).

Let’s start with a sample dataset that you will want to be updated, inserted, or deduplicated with the following query.

>>> spark.sql("select * from vw_delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()
+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1010710|   31|     590|   SEA|        SFO|
|1010521|    0|     590|   SEA|        SFO|
|1010955|  104|     590|   SEA|        SFO|
|1010730|    5|     590|   SEA|        SFO|
+-------+-----+--------+------+-----------+

Next, let’s generate our own merge_table that contains data we will insert, update or de-duplicate with the following code snippet.

>>> items = [(1010710, 31, 590, 'SEA', 'SFO'), (1010521, 10, 590, 'SEA', 'SFO'), (1010822, 31, 590, 'SEA', 'SFO')]
>>> cols = ['date', 'delay', 'distance', 'origin', 'destination']
>>> merge_table = spark.createDataFrame(items, cols)
>>> merge_table.toPandas()
[Stage 85:>                                                         (0 + 2) / 2]Current mem limits: -1 of max -1
Current mem limits: -1 of max -1

Setting mem limits to 536870912 of max 536870912
Setting mem limits to 536870912 of max 536870912
      date  delay  distance origin destination
0  1010710     31       590    SEA         SFO
1  1010521     10       590    SEA         SFO
2  1010822     31       590    SEA         SFO

Lets merge dataframes

>>> deltaTable.alias("flights") \
    .merge(merge_table.alias("updates"),"flights.date = updates.date") \
    .whenMatchedUpdate(set = { "delay" : "updates.delay" } ) \
    .whenNotMatchedInsertAll() \
>>> spark.sql("select * from vw_delays_delta where origin = 'SEA' and destination = 'SFO' and date like '1010%' limit 10").show()
+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1010955|  104|     590|   SEA|        SFO|
|1010710|   31|     590|   SEA|        SFO|
|1010730|    5|     590|   SEA|        SFO|
|1010521|   10|     590|   SEA|        SFO|
+-------+-----+--------+------+-----------+
    .execute()

View Table History

This can be seen by using the DeltaTable.history() method as noted below.

>>> deltaTable.history().show()
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version|           timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|      3|2020-06-06 09:30:...|  null|    null|    MERGE|[predicate -> (CA...|null|    null|     null|          2|          null|        false|
|      2|2020-06-06 08:59:...|  null|    null|   UPDATE|[predicate -> (or...|null|    null|     null|          1|          null|        false|
|      1|2020-06-06 08:49:...|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          0|          null|        false|
|      0|2020-06-06 08:42:...|  null|    null|    WRITE|[mode -> Overwrit...|null|    null|     null|       null|          null|        false|
+-------+--------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+

Alternate way to view history:-

>>> dfv0 = spark.read.format("delta").option("versionAsOf", 0).load("departuredelays.delta")
>>> dfv1 = spark.read.format("delta").option("versionAsOf", 1).load("departuredelays.delta")
# Calculate the SEA to SFO flight counts for each version of history
>>> cnt0 = dfv0.where("origin = 'SEA'").where("destination = 'SFO'").count()
>>> cnt1 = dfv1.where("origin = 'SEA'").where("destination = 'SFO'").count()
# Print out the value
>>> print("SEA -> SFO Counts: Create Table: %s, Delete: %s" % (cnt0, cnt1))
## Output
SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986

External table is more suitable for immutable data or data that doesn’t change frequently, since we can modify it only by recreating and overwriting it. It also doesn’t offer any version control. On the contrary, a Delta table can easily be modified through inserts, deletes, and merges. In addition, all these modifications can be rolled back to obtain an older version of the Delta Table. That way Delta Lake offers us flexible storage and helps us to keep control over the changes in the data.

Leave a Reply

Your email address will not be published. Required fields are marked *