Hadoop

Sample Dataflow Pipeline featuring Cloud Pub/Sub, Dataflow, and BigQuery…

Streaming data in Google Cloud Platform is typically published to Cloud Pub/Sub, a serverless real-time messaging service. Cloud Pub/Sub provides reliable delivery and can scale to more than a million messages per second. It stores copies of messages in multiple zones to provide “at least once” guaranteed delivery to subscribers, and there can be many simultaneous subscribers.

The simulation code that we are writing here is only for quick experimentation with streaming data. Hence, I will not take the extra effort needed to make it fault-tolerant. If we had to do so, we could make the simulation fault-tolerant by starting from a BigQuery query that is bounded in terms of a time range with the start of that time range automatically inferred from the last-notified record in Cloud Pub/Sub. Because Cloud Pub/Sub subscriptions are not retroactive, we need to maintain a subscriber (perhaps on App Engine) that simply returns the last-notified record whenever asked. For now, therefore, let’s note that the simulation code as written will not automatically restart, and even if manually restarted, it will not resume where it last left off.

Overview

This article contains a sample data pipeline featuring Google Cloud’s Pub/Sub, Dataflow, and BigQuery products. The solution will simulate calculating a windowed average of data received through Pub/Sub and processed with Dataflow. The result will be stored in a BigQuery table.

Setup Project

Create a new GCP project and open the Google Cloud Shell. You can download source code from my github repository.

Use below command on cloud shell to enable the Pub/Sub API

gcloud services enable pubsub.googleapis.com

Use below command on cloud shell to enable the Dataflow API

gcloud services enable dataflow.googleapis.com

Setup Pub/Sub

Open the Google Cloud Shell and create a new Pub/Sub topic.

export PUBSUB_TOPIC=<pubsub-topic>
gcloud pubsub topics create $PUBSUB_TOPIC

Now create Create a new Pub/Sub subscription.

export PUBSUB_SUBSCRIPTION=<pubsub-subscription>
gcloud pubsub subscriptions create --topic $PUBSUB_TOPIC $PUBSUB_SUBSCRIPTION

Setup BigQuery

Open the Google Cloud Shell and create a new dataset using below command:-

export BIGQUERY_DATASET=data
bq mk $BIGQUERY_DATASET

Setup Dataflow

Open the Google Cloud Shell and Create a new Google Cloud Storage bucket

gsutil mb gs://<bucket_name> 

Change directories to the folder containing the Constants.java file

cd ~/sample-dataflow-aggregations/dataflow-pipeline/src/main/java/com/mukesh/pubsub2bq/shared

Update Constants.java file using your favorite editor. Update the following:

	public static final String GCS_BUCKET_NAME="<bucket-name>";
	public static final String PROJECT_ID="<project-id>";
	public static final String PUBSUB_SUBSCRIPTION="<pubsub subscription>";

A quick look at our main program:-

package com.mukesh.pubsubbq;

import com.mukesh.pubsubbq.shared.Constants;

import com.mukesh.pubsubbq.transforms.*;

import java.text.SimpleDateFormat;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.transforms.*;
import org.joda.time.Duration;

public class SamplePipeline {
  // data format for job name
  public static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
  
  public static void main(String[] args) {
	//define pipeline options
	DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
		options.setAppName(Constants.APP_NAME);
		options.setStagingLocation(Constants.GCS_URL_BASE + Constants.GCS_BUCKET_NAME + "/"+Constants.GCS_STAGING_LOCATION);
		options.setTempLocation(Constants.GCS_URL_BASE + Constants.GCS_BUCKET_NAME + "/"+Constants.GCS_TEMP_LOCATION);
		options.setRunner(DataflowRunner.class);
		options.setStreaming(true);
		options.setProject(Constants.PROJECT_ID);
		options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
		options.setMaxNumWorkers(Constants.MAX_NUM_WORKER);
		options.setJobName(Constants.DF_BASE_JOB_NAME+dateFormat.format(new java.util.Date()));
	
	//create Pipeline with options
    Pipeline p = Pipeline.create(options);
    
    //read messages from Pub/Sub using a window
    PCollection<String> pubSubMessages=p.apply("Read messages from PubSub",PubsubIO.readStrings().fromSubscription("projects/" + Constants.PROJECT_ID + "/subscriptions/" + Constants.PUBSUB_SUBSCRIPTION).withTimestampAttribute(Msg.TIME));
    
    // Process aggregations
    pubSubMessages
    	.apply("Apply window",Window.<String>into(SlidingWindows.of(Duration.standardSeconds(Constants.DURATION)).every(Duration.standardSeconds(Constants.EVERY))).withTimestampCombiner(TimestampCombiner.EARLIEST))
    	.apply("Map keys to values", ParDo.of(new Msg2KV()))
    	.apply("Group by Keys", GroupByKey.<String, Double>create())
    	.apply("Calculate Averages", ParDo.of(new Average()))
		.apply("Convert Map to Table Rows", ParDo.of(new KV2Row()))
		.apply("Write aggregations into BigQuery",
    		BigQueryIO.writeTableRows()
    			.to(Constants.PROJECT_ID + ":" + Constants.DATASET + "." + KV2Row.TABLE_NAME)
    			.withSchema(KV2Row.getSchema())
    			.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
    			.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
			);
    // Insert raw data
    pubSubMessages
    	.apply("Raw Data to Table Row", ParDo.of(new Msg()))
    	.apply("Write raw data into BigQuery",
    		BigQueryIO.writeTableRows()
    			.to(Constants.PROJECT_ID + ":" + Constants.DATASET + "." + Msg.TABLE_NAME)
    			.withSchema(Msg.getSchema())
    			.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
    			.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
			);	
	p.run();
  }
}

Change directories to dataflow-pipeline folder and run below command to deploy dataflow on the cloud:

cd ~/sample-dataflow-aggregations/dataflow-pipeline
mvn compile exec:java -Dexec.mainClass=com.mukesh.pubsubbq.SamplePipeline -Dexec.args="--runner=DataflowRunner"

Dataflow view

Below show the dataflow that DAG:

Run the Pub/Sub Simulator

Open the Google Cloud Shell, Change directories to the Pub/Sub Simulator and run below command:-

cd ~/sample-dataflow-aggregations/pubsub-simulator/
python sentme.py

BigQuery aggregated tables populated for stream messages:-

Raw data in table:-

Next, we’ll go into more detail into the code used above.

Happy Machine Learning!

Leave a Reply

Your email address will not be published. Required fields are marked *