Hadoop

How to create an Apache Beam data pipeline and deploy it using Cloud Dataflow in Java

Cloud Dataflow is a fully managed google service for executing data processing pipelines using Apache Beam. What do you mean by fully managed? Cloud dataflow like BigQuery dynamically provisions the optimal quantity and type of resource(i.e CPU or memory instances) based on volume and specific resource requirements for your job. Cloud dataflow is a server-less and auto-scaling service.

Dataflow and Spark

Google Cloud Dataflow is closely analogous to Apache Spark in terms of API and engine. Both are also directed acyclic graph-based (DAG) data processing engines. However, there are aspects of Dataflow that aren’t directly comparable to Spark. Where Spark is strictly an API and engine with the supporting technologies, Google Cloud Dataflow is all that plus Google’s underlying infrastructure and operational support. More comparable to Google Cloud Dataflow is the managed Spark service available as part of the Databricks platform.

Setting up the development environment:-

Developing Apache Beam code in Java is done within IDE like Eclipse installed on your laptop. You can download the sample code from my git repository. Don’t forget to modify the path for maven plugin. I am using windows environment therefore my POM snippet looks like below:-

	<plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.6.1</version>
        <configuration>
          <fork>true</fork>
          <executable>C:\Java\jdk1.8.0_191\bin\javac.exe</executable>
        </configuration>
      </plugin>

This blog is exclusively not on Beam API and function, therefore, leaving the function thought process for another blog. We will discuss here how to create a program locally and execute the job in the dataflow.

Once you are done with download my git repository, import it in Eclipse using File → Import → Maven → Existing Maven Projects. Browse to the src folder, and then click Finish.

Running a Java program from Eclipse show error like error below:-

Failed to detect whether we are running on Google Compute Engine. java.net.SocketException: Network is unreachable: connect

Resolution:- The application required to obtains credentials from the tool.

gcloud auth application-default login

Below is our first CreateTrainingDataset.Java program:-

public class CreateTrainingDataset {
  private static final Logger LOG = LoggerFactory.getLogger(CreateTrainingDataset.class);

  public static void main(String[] args) {
    Pipeline p = Pipeline.create(
        PipelineOptionsFactory.fromArgs(args).withValidation().create());

    p.apply(Create.of("Hello", "World"))
    .apply(MapElements.via(new SimpleFunction<String, String>() {
      @Override
      public String apply(String input) {
        return input.toUpperCase();
      }
    }))
    .apply(ParDo.of(new DoFn<String, Void>() {
      @ProcessElement
      public void processElement(ProcessContext c)  {
        LOG.info(c.element());
      }
    }));
    p.run();
  }
}

Above CreateTrainingDataset.java is a simple Hello World program, to execute it locally just run as Java Application from eclipse. Now to submit JON on cloud run below command:-

mvn compile exec:java -Dexec.mainClass=CreateTrainingDataset -Dexec.args="--project=********** --stagingLocation=gs://*********/staging/ --runner=DataflowRunner"

Please don’t forget to modify the project name the same as your project name.

In the above command, the Staging in command is the area is where all the jar and related classes to be uploaded for processing.

Below is the DAG generated for my Hello World Program. To view it go to Dataflow API in google console and click on respective Job name…

This code changes the input to be the events array and filter those lines in parallel using ParDo (“Parallel Do”). Functionally, it simulates the Unix tool grep, reading some input and sending it to its output only the lines that match a specific pattern. Here is the CreateTrainingDataset1.Java program to do this job.

public class CreateTrainingDataset1 {
  private static final Logger LOG = LoggerFactory.getLogger(CreateTrainingDataset1.class);

  @SuppressWarnings("serial")
  public static void main(String[] args) {
    Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

    String[] events = {
        "2015-09-20,AA,19805,AA,1572,13303,1330303,32467,MIA,12889,1288903,32211,LAS,2015-09-20T22:50:00,2015-09-20T04:03:00,313.00,19.00,2015-09-20T04:22:00,,,2015-09-21T04:08:00,,,0.00,,,2174.00,25.79527778,-80.29000000,-14400.0,36.08000000,-115.15222222,-25200.0,wheelsoff,2015-09-20T04:22:00",
        "2015-09-20,AA,19805,AA,2495,11298,1129804,30194,DFW,12892,1289203,32575,LAX,2015-09-21T01:25:00,2015-09-20T06:04:00,279.00,15.00,2015-09-20T06:19:00,,,2015-09-21T04:55:00,,,0.00,,,1235.00,32.89722222,-97.03777778,-18000.0,33.94250000,-118.40805556,-25200.0,wheelsoff,2015-09-20T06:19:00",
        "2015-09-20,AA,19805,AA,2342,11292,1129202,30325,DEN,13303,1330303,32467,MIA,2015-09-21T05:59:00,2015-09-20T06:33:00,34.00,14.00,2015-09-20T06:47:00,,,2015-09-20T09:47:00,,,0.00,,,1709.00,39.86166667,-104.67305556,-21600.0,25.79527778,-80.29000000,-14400.0,wheelsoff,2015-09-20T06:47:00" };

    p //
        .apply(Create.of(Arrays.asList(events))) //
        .apply(ParDo.of(new DoFn<String, String>() {
          @ProcessElement
          public void processElement(ProcessContext c) throws Exception {
            String input = c.element();
            if (input.contains("MIA")) {
              c.output(input);
            }
          }
        })) //
        .apply(ParDo.of(new DoFn<String, Void>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            LOG.info(c.element());
          }
        }));

    p.run();
  }
}

You can run as Java Application from eclipse to execute using a local runner.

To run on Cloud use below command:-

mvn compile exec:java -Dexec.mainClass=CreateTrainingDataset1 -Dexec.args="--project=********** --stagingLocation=gs://*********/staging/ --runner=DataflowRunner --jobName=dataflow-filter-sample-runner"

Now instead of reading from hardcoded strings and logging the output, we want to read and write to files. Rather than hardcode the names of the inputs and outputs, it would be better to be able to specify the input file and the output directory. 

Create a file small.csv at the local system(say small.csv) and specify the output path(My location (say with name output in local drive).

public class CreateTrainingDataset2 {
  public static interface MyOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("small.csv")
    String getInput();

    void setInput(String s);

    @Description("Path of the output directory")
    @Default.String("output\\")
    String getOutput();

    void setOutput(String s);
  }

  @SuppressWarnings("serial")
  public static void main(String[] args) {
    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
    Pipeline p = Pipeline.create(options);

    p //
        .apply("ReadLines", TextIO.read().from(options.getInput())) //
        .apply("FilterMIA", ParDo.of(new DoFn<String, String>() {

          @ProcessElement
          public void processElement(ProcessContext c) {
            String input = c.element();
            if (input.contains("MIA")) {
              c.output(input);
            }
          }
        })) //
        .apply("WriteFlights", //
            TextIO.write().to(options.getOutput() + "flights2") //
                .withSuffix(".txt").withoutSharding());

    p.run();
  }
}

Now by running CreateTrainingDataset2.java from eclipse, this will generate a file in the output folder with desired filtered records.

Run on Cloud, let’s copy our input text file to our Cloud Storage bucket. Go to the local drive location using the command line on your environment and run below command.

gsutil cp small.csv gs://**********/chapt/small.csv

Run below command to submit a job on Google Dataflow:-

mvn compile exec:java -Dexec.mainClass=CreateTrainingDataset2 -Dexec.args="--project=********** --stagingLocation=gs://**********/staging/  --input=gs://***********/chapt/small.csv --output=gs://*************/chapt/output/ --runner=DataflowRunner --jobName=dataflow-filter-sample-runner-myfiles"

Below is the DAG generated for my program:-

We have now seen how to create an Apache Beam data pipeline and deploy it using Cloud Dataflow.

Unlike with Spark on Cloud Dataproc, we did not need to spin up a cluster to execute Beam on Cloud Dataflow. We just submitted the Apache Beam pipeline and Cloud Dataflow took care of all the execution details.

Next, we play with objects and aggregate them using the Apache Beam API. 

Happy Machine Learning!

Leave a Reply

Your email address will not be published. Required fields are marked *