Hadoop

Sample Java Program on Google Cloud Pub/Sub

Overview

This article contains a sample java program on Google Cloud’s Pub/Sub to publish messages from google store. The solution is simple to set up the environment, create a topic, subscribe to that topic and read those messages using a java program.

Prerequisite

  1. Create a new GCP project
  2. Enable the Pub/Sub API
  3. Setting environment variables
  4. Java1.8
  5. Java SDK eclipse.

Setup Pub/Sub

  1. Create topics with Cloud Pub / Sub. : Open the google cloud shell and create new pub/sub topic using below command:-
export PUBSUB_TOPIC=mynewtopic
gcloud pubsub topics create $PUBSUB_TOPIC

Create a new Pub/Sub subscription: Open the google cloud shell and create new pub/sub subscription using below command:-

export PUBSUB_SUBSCRIPTION=mynewsub
gcloud pubsub subscriptions create --topic $PUBSUB_TOPIC $PUBSUB_SUBSCRIPTION

 Transfer file to Google Store to publish to topic:-

gsutil cp output-00000-of-00003  gs://mybucketname/output-00000-of-00003

Clone my github repository and open in your eclipse and modify Constants.Java as per your environment:-

package com.mukesh.apps;

public class Constants {
	// UPDATE
	public static final String GCS_BUCKET_NAME="mybucket";
	public static final String PROJECT_ID="myproject";
	public static final String PUBSUB_SUBSCRIPTION="mukesh_sub";
  
  	// DON'T UPDATE
	public static final String APP_NAME="My Simple program for google messaging";
	public static final String DF_BASE_JOB_NAME="ps-df-bq";
	public static final String GCS_URL_BASE="gs://";
	public static final String GCS_TEMP_LOCATION="dataflow/temp";
	public static final String GCS_STAGING_LOCATION="dataflow/staging";
	

	public static final int MAX_NUM_WORKER=10;
	
	// size of window
	public static final int DURATION = 300;
	// frequency of window
	public static final int EVERY = 60;
	
}

Run below java program to sent messages to pub/sub from google store:-

package com.mukesh.apps;

import java.io.BufferedReader;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class GCSReadAndPublish {
	  
	  private static Storage storage = StorageOptions.getDefaultInstance().getService();
	
	public static void main(String [] args) {
		//[project-name] [bucket-name] [file-name] [topic]
		if(args.length < 4) {
			throw new IllegalArgumentException("Ensure the parameters are provided in the format [project-name] [bucket-name] [file-name] [topic]");
		} else {
			try {
			String project= args[0];
			String bucketName= args[1];
			String fileName = args[2];
			String topicName = args[3];
			Publisher publisher = PublishUtil.createPublisher(PublishUtil.createTopic(project, topicName));
			BufferedReader reader = new BufferedReader(Channels.newReader(storage.reader(bucketName, fileName), "UTF-8"));
			List<ApiFuture<String>> messageIdFutures = new ArrayList<>();
			String read=null;
			while ((read =reader.readLine()) != null) {
				TimeUnit.MILLISECONDS.sleep(500);
				System.out.println("read::" + read);
				PublishUtil.publishMessage(publisher, read);
			}
			
			} catch(Exception e) {
				System.err.println("Error encountered:" + e.getMessage());
				e.printStackTrace(System.err);
			}
		}
	}
}

To read messages in your eclipse console from pub/sub using below java program:-

package com.mukesh.apps;


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;

/**
 * A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub pull subscription and
 * asynchronously pull messages from it.
 */
public class CreateSubscriptionAndConsumeMessages {

  public static void main(String... args) throws Exception {
    ProjectTopicName topic = ProjectTopicName.of("myproject", "mynewtopic");
    ProjectSubscriptionName subscription =
        ProjectSubscriptionName.of("myproject", "mynewsub");

   // try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
   //   subscriptionAdminClient.createSubscription(
   //       subscription, topic, PushConfig.getDefaultInstance(), 0);
   // }

    MessageReceiver receiver =
        new MessageReceiver() {
          @Override
          public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            System.out.println("Received message: " + message.getData().toStringUtf8());
            consumer.ack();
          }
        };
    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscription, receiver).build();
      subscriber.addListener(
          new Subscriber.Listener() {
            @Override
            public void failed(Subscriber.State from, Throwable failure) {
              // Handle failure. This is called when the Subscriber encountered a fatal error and is
              // shutting down.
              System.err.println(failure);
            }
          },
          MoreExecutors.directExecutor());
      subscriber.startAsync().awaitRunning();

      Thread.sleep(60000);
    } finally {
      if (subscriber != null) {
        subscriber.stopAsync().awaitTerminated();
      }
    }
  }
}

In the above program, we will pull messages for one minute (60,000ms) then stop. In a real application, this sleep-then-stop is not necessary. Simply call stopAsync().awaitTerminated() when the server is shutting down, etc.

Happy Machine Learning!

Leave a Reply

Your email address will not be published. Required fields are marked *