Hadoop

Python: Stream the ingest of data into the database in real-time using dataflow.

In my previous articles, we solve real-time data ingestion problems using various tools like Apache Kafka, Storm, Flink and Spark. I have shown you in details that how to create such pipelines for real-time processing. In this blog, we will try to simulate a similar problem using Apache Beam and Dataflow using Python. Let’s say we have sample data below and FL_DATE and DEP_TIME columns represent the local dates without timezone. You can find dataset and python code in my github repository too. This is the flight dataset and two countries having different timezones and in sample data the time zones offset is not present in this dataset. As timezone depends upon airport location so we will put timezone offset in our dataset to Coordinated Universal Time(UTC).

Therefore let us first start with the transformation to our sample dataset and convert all time fields to UTC. Additionally, we add three fields for the destination airport: the latitude, longitude, and time zone offset.

FL_DATE,UNIQUE_CARRIER,AIRLINE_ID,CARRIER,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,DISTANCE
2015-01-01,AA,19805,AA,1,12478,1247802,31703,JFK,12892,1289203,32575,LAX,0900,0855,-5.00,17.00,0912,1230,7.00,1230,1237,7.00,0.00,,0.00,2475.00
2015-01-01,AA,19805,AA,2,12892,1289203,32575,LAX,12478,1247802,31703,JFK,0900,0856,-4.00,16.00,0912,1643,8.00,1735,1651,-44.00,0.00,,0.00,2475.00
2015-01-01,AA,19805,AA,3,12478,1247802,31703,JFK,12892,1289203,32575,LAX,1230,1226,-4.00,19.00,1245,1543,5.00,1550,1548,-2.00,0.00,,0.00,2475.00
2015-01-01,AA,19805,AA,4,12892,1289203,32575,LAX,12478,1247802,31703,JFK,1220,1214,-6.00,23.00,1237,2021,12.00,2050,2033,-17.00,0.00,,0.00,2475.00
2015-01-01,AA,19805,AA,5,11298,1129803,30194,DFW,12173,1217302,32134,HNL,1305,1754,289.00,21.00,1815,2234,6.00,1740,2240,300.00,0.00,,0.00,3784.00
2015-01-01,AA,19805,AA,6,13830,1383002,33830,OGG,11298,1129803,30194,DFW,1805,,,,,,,0510,,,1.00,A,0.00,3711.00

Let’s say we have another master dataset that contains the information about the airport information. We first extract the Longitude and Latitude information. To use this we run DirectPipelineRunner using code below. The map takes each line passes it to CSVReader and second maps the required fields in our dataset.

import apache_beam as beam
import csv

if __name__ == '__main__':
   with beam.Pipeline('DirectRunner') as pipeline:

      airports = (pipeline
         | beam.io.ReadFromText('airports.csv.gz')
         | beam.Map(lambda line: next(csv.reader([line])))
         | beam.Map(lambda fields: (fields[0], (fields[21], fields[26])))
      )

      airports | beam.Map(lambda (airport, data): '{},{}'.format(airport, ','.join(data)) )| beam.io.textio.WriteToText('extracted_airports')

      pipeline.run()

Note: If you have not installed Apache Beam uses the below command to install.

sudo pip install apache-beam

This program simply runs on Google dataflow service and launch data pipeline on multiple workers in the cloud. We have an output with name “extracted_airports-00000-of-00001” in the same folder and result in something looks like below:-

1000101,58.10944444,-152.90666667
1000301,65.54805556,-161.07166667

Now another transformation is to convert time into UTC, for that first we take the original dataset to Cloud Storage.

gsutil cp 201501_part.csv gs://myproject/mydata_part.csv

Let us first install timezone finder python library using pip command:-

sudo pip install timezonefinder

Now run below query to add timezone to our dataset, this is same as what we have done in the previous query except adding timezone columns to output:-

mport apache_beam as beam
import csv

def addtimezone(lat, lon):try:
      import timezonefinder
      tf = timezonefinder.TimezoneFinder()
      tz = tf.timezone_at(lng=float(lon), lat=float(lat))
      if tz is None:
         tz = 'UTC'return (lat, lon, tz)
   except ValueError:
      return (lat, lon, 'TIMEZONE') # header

if __name__ == '__main__':
   with beam.Pipeline('DirectRunner') as pipeline:

      airports = (pipeline
         | beam.io.ReadFromText('airports.csv.gz')
         | beam.Map(lambda line: next(csv.reader([line])))
         | beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26])))
      )

      airports | beam.Map(lambda (airport, data): '{},{}'.format(airport, ','.join(data)) )| beam.io.textio.WriteToText('airports_with_tz')

The output is like below:-

AIRPORT_SEQ_ID,LATITUDE,LONGITUDE,TIMEZONE
1000101,58.10944444,-152.90666667,America/Anchorage
1000301,65.54805556,-161.07166667,America/Anchorage
1000401,68.08333333,-163.16666667,America/Nome
1000501,67.57000000,-148.18388889,America/Anchorage
1000601,57.74527778,-152.88277778,America/Anchorage
1000701,55.55472222,-133.10166667,America/Sitka

We need to join the flight’s data with the airport’s data to find the time zone corresponding to each flight. To do that, we make the airports PCollection a “side input.” Side inputs in Beam are like views into the original PCollection and are either lists or dicts. In this case, we will create a dict that maps airport ID to information about the airports:

import apache_beam as beam
import csv

def addtimezone(lat, lon):
   try:
      import timezonefinder
      tf = timezonefinder.TimezoneFinder()
      return (lat, lon, tf.timezone_at(lng=float(lon), lat=float(lat)))
      #return (lat, lon, 'America/Los_Angeles') # FIXME
   except ValueError:
      return (lat, lon, 'TIMEZONE') # header

def as_utc(date, hhmm, tzone):
   try:
      if len(hhmm) > 0 and tzone is not None:
         import datetime, pytz
         loc_tz = pytz.timezone(tzone)
         loc_dt = loc_tz.localize(datetime.datetime.strptime(date,'%Y-%m-%d'), is_dst=False)
         # can't just parse hhmm because the data contains 2400 and the like ...
         loc_dt += datetime.timedelta(hours=int(hhmm[:2]), minutes=int(hhmm[2:]))
         utc_dt = loc_dt.astimezone(pytz.utc)
         return utc_dt.strftime('%Y-%m-%d %H:%M:%S')
      else:
         return '' # empty string corresponds to canceled flights
   except ValueError as e:
      print '{} {} {}'.format(date, hhmm, tzone)
      raise e

def tz_correct(line, airport_timezones):
   fields = line.split(',')
   if fields[0] != 'FL_DATE' and len(fields) == 27:
      # convert all times to UTC
      dep_airport_id = fields[6]
      arr_airport_id = fields[10]
      dep_timezone = airport_timezones[dep_airport_id][2]
      arr_timezone = airport_timezones[arr_airport_id][2]

      for f in [13, 14, 17]: #crsdeptime, deptime, wheelsoff
         fields[f] = as_utc(fields[0], fields[f], dep_timezone)
      for f in [18, 20, 21]: #wheelson, crsarrtime, arrtime
         fields[f] = as_utc(fields[0], fields[f], arr_timezone)

      yield ','.join(fields)

if __name__ == '__main__':
   with beam.Pipeline('DirectRunner') as pipeline:

      airports = (pipeline
         | 'airports:read' >> beam.io.ReadFromText('airports.csv.gz')
         | 'airports:fields' >> beam.Map(lambda line: next(csv.reader([line])))
         | 'airports:tz' >> beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26])))
      )

      flights = (pipeline
         | 'flights:read' >> beam.io.ReadFromText('201501_part.csv')
         | 'flights:tzcorr' >> beam.FlatMap(tz_correct, beam.pvalue.AsDict(airports))
      )

      flights | beam.io.textio.WriteToText('all_flights')

      pipeline.run()

We need to join the flight’s data with the airport’s data to find the time zone corresponding to each flight. To do that, we make the airports PCollection a “side input.” Side inputs in Beam are like views into the original PCollection and are either lists or dicts. In this case, we will create a dict that maps airport ID to information about the airports. The FlatMap() method calls out to a method tz_correct(), which takes a line from 201501_part.csv (containing a single flight’s information) and a Python dictionary (containing all the airports’ time zone information). Why FlatMap() instead of Map to call tz_correct()? A Map is a 1-to-1 relation between input and output, whereas a FlatMap() can return 0–N outputs per input. 

A line that originally (in the raw data) looked like:-

2015-01-01,AA,19805,AA,8,12173,1217302,32134,HNL,11298,1129803,30194,DFW,1745,
1933,108.00,15.00,1948,0648,11.00,0510,0659,109.00,0.00,,0.00,3784.00

now becomes:-

2015-01-01,AA,19805,AA,8,12173,1217302,32134,HNL,11298,1129803,30194,DFW,2015-01- 02 03:45:00,2015-01-02 05:33:00,108.00,15.00,2015-01-02 05:48:00, 2015-01-01 12:48:00,11.00,2015-01-01 11:10:00,2015-01-01 

Now let’s create the events, the first event would be a departed event and is to be published at the departure time, while the second would be an arrived event and is to be published at the arrival time. The departed event has a number of empty fields corresponding to data that is not known at that time.

import apache_beam as beam
import csv

DATETIME_FORMAT='%Y-%m-%dT%H:%M:%S'

def addtimezone(lat, lon):
   try:
      import timezonefinder
      tf = timezonefinder.TimezoneFinder()
      return (lat, lon, tf.timezone_at(lng=float(lon), lat=float(lat)))
      #return (lat, lon, 'America/Los_Angeles') # FIXME
   except ValueError:
      return (lat, lon, 'TIMEZONE') # header

def as_utc(date, hhmm, tzone):
   try:
      if len(hhmm) > 0 and tzone is not None:
         import datetime, pytz
         loc_tz = pytz.timezone(tzone)
         loc_dt = loc_tz.localize(datetime.datetime.strptime(date,'%Y-%m-%d'), is_dst=False)
         # can't just parse hhmm because the data contains 2400 and the like ...
         loc_dt += datetime.timedelta(hours=int(hhmm[:2]), minutes=int(hhmm[2:]))
         utc_dt = loc_dt.astimezone(pytz.utc)
         return utc_dt.strftime(DATETIME_FORMAT), loc_dt.utcoffset().total_seconds()
      else:
         return '',0 # empty string corresponds to canceled flights
   except ValueError as e:
      print '{} {} {}'.format(date, hhmm, tzone)
      raise e

def add_24h_if_before(arrtime, deptime):
   import datetime
   if len(arrtime) > 0 and len(deptime) > 0 and (arrtime < deptime):
      adt = datetime.datetime.strptime(arrtime, DATETIME_FORMAT)
      adt += datetime.timedelta(hours=24)
      return adt.strftime(DATETIME_FORMAT)
   else:
      return arrtime

def tz_correct(line, airport_timezones):
   fields = line.split(',')
   if fields[0] != 'FL_DATE' and len(fields) == 27:
      # convert all times to UTC
      dep_airport_id = fields[6]
      arr_airport_id = fields[10]
      dep_timezone = airport_timezones[dep_airport_id][2] 
      arr_timezone = airport_timezones[arr_airport_id][2]
      
      for f in [13, 14, 17]: #crsdeptime, deptime, wheelsoff
         fields[f], deptz = as_utc(fields[0], fields[f], dep_timezone)
      for f in [18, 20, 21]: #wheelson, crsarrtime, arrtime
         fields[f], arrtz = as_utc(fields[0], fields[f], arr_timezone)
      
      for f in [17, 18, 20, 21]:
         fields[f] = add_24h_if_before(fields[f], fields[14])

      fields.extend(airport_timezones[dep_airport_id])
      fields[-1] = str(deptz)
      fields.extend(airport_timezones[arr_airport_id])
      fields[-1] = str(arrtz)

      yield fields

def get_next_event(fields):
    if len(fields[14]) > 0:
       event = list(fields) # copy
       event.extend(['departed', fields[14]])
       for f in [16,17,18,19,21,22,25]:
          event[f] = ''  # not knowable at departure time
       yield event
    if len(fields[21]) > 0:
       event = list(fields)
       event.extend(['arrived', fields[21]])
       yield event

def run():
   with beam.Pipeline('DirectRunner') as pipeline:

      airports = (pipeline
         | 'airports:read' >> beam.io.ReadFromText('airports.csv.gz')
         | 'airports:fields' >> beam.Map(lambda line: next(csv.reader([line])))
         | 'airports:tz' >> beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26])))
      )

      flights = (pipeline
         | 'flights:read' >> beam.io.ReadFromText('201501_part.csv')
         | 'flights:tzcorr' >> beam.FlatMap(tz_correct, beam.pvalue.AsDict(airports))
      )

      (flights
         | 'flights:tostring' >> beam.Map(lambda fields: ','.join(fields))
         | 'flights:out' >> beam.io.textio.WriteToText('all_flights')
      )

      events = flights | beam.FlatMap(get_next_event)

      (events
         | 'events:tostring' >> beam.Map(lambda fields: ','.join(fields))
         | 'events:out' >> beam.io.textio.WriteToText('all_events')
      )

      pipeline.run()

if __name__ == '__main__':
   run()

Running the Python program with the preceding code submits the job to the cloud. Cloud Dataflow autoscales each step of the pipeline based on throughput, and streams the events data into BigQuery. You can monitor the running job on the Cloud Platform Console in the Cloud Dataflow section.

Note below are the parameters we have passed to run our python script in Cloud mode:

   argv = [
      '--project={0}'.format(project),
      '--job_name=ch04timecorr',
      '--save_main_session',
      '--staging_location=gs://{0}/airport/staging/'.format(bucket),
      '--temp_location=gs://{0}/airport/temp/'.format(bucket),
      '--setup_file=./setup.py',
      '--max_num_workers=8',
      '--autoscaling_algorithm=THROUGHPUT_BASED',
      '--runner=DataflowRunner'
   ]

Executing pipeline using below command:-

python df06.py -p myproject -b mybucket -d fdatasalts

Even as the events data is being written out, we can query it by browsing to the BigQuery Console and typing the following (change the project name as appropriate):

We can verify using below our pipeline and results:-

Next, we’ll explore the Apache Beam Library in more depth.

Credit: Original Code Snippet from Book: Data Science on GCP by Valliappa Lakshmanan

Happy Machine Learning!

Leave a Reply

Your email address will not be published. Required fields are marked *