Sample Java Program on Google Cloud Pub/Sub


This article contains a sample java program on Google Cloud’s Pub/Sub to publish messages from google store. The solution is simple to set up the environment, create a topic, subscribe to that topic and read those messages using a java program.


  1. Create a new GCP project
  2. Enable the Pub/Sub API
  3. Setting environment variables
  4. Java1.8
  5. Java SDK eclipse.

Setup Pub/Sub

  1. Create topics with Cloud Pub / Sub. : Open the google cloud shell and create new pub/sub topic using below command:-
export PUBSUB_TOPIC=mynewtopic
gcloud pubsub topics create $PUBSUB_TOPIC

Create a new Pub/Sub subscription: Open the google cloud shell and create new pub/sub subscription using below command:-

gcloud pubsub subscriptions create --topic $PUBSUB_TOPIC $PUBSUB_SUBSCRIPTION

 Transfer file to Google Store to publish to topic:-

gsutil cp output-00000-of-00003  gs://mybucketname/output-00000-of-00003

Clone my github repository and open in your eclipse and modify Constants.Java as per your environment:-

package com.mukesh.apps;

public class Constants {
	public static final String GCS_BUCKET_NAME="mybucket";
	public static final String PROJECT_ID="myproject";
	public static final String PUBSUB_SUBSCRIPTION="mukesh_sub";
	public static final String APP_NAME="My Simple program for google messaging";
	public static final String DF_BASE_JOB_NAME="ps-df-bq";
	public static final String GCS_URL_BASE="gs://";
	public static final String GCS_TEMP_LOCATION="dataflow/temp";
	public static final String GCS_STAGING_LOCATION="dataflow/staging";

	public static final int MAX_NUM_WORKER=10;
	// size of window
	public static final int DURATION = 300;
	// frequency of window
	public static final int EVERY = 60;

Run below java program to sent messages to pub/sub from google store:-

package com.mukesh.apps;

import java.io.BufferedReader;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class GCSReadAndPublish {
	  private static Storage storage = StorageOptions.getDefaultInstance().getService();
	public static void main(String [] args) {
		//[project-name] [bucket-name] [file-name] [topic]
		if(args.length < 4) {
			throw new IllegalArgumentException("Ensure the parameters are provided in the format [project-name] [bucket-name] [file-name] [topic]");
		} else {
			try {
			String project= args[0];
			String bucketName= args[1];
			String fileName = args[2];
			String topicName = args[3];
			Publisher publisher = PublishUtil.createPublisher(PublishUtil.createTopic(project, topicName));
			BufferedReader reader = new BufferedReader(Channels.newReader(storage.reader(bucketName, fileName), "UTF-8"));
			List<ApiFuture<String>> messageIdFutures = new ArrayList<>();
			String read=null;
			while ((read =reader.readLine()) != null) {
				System.out.println("read::" + read);
				PublishUtil.publishMessage(publisher, read);
			} catch(Exception e) {
				System.err.println("Error encountered:" + e.getMessage());

To read messages in your eclipse console from pub/sub using below java program:-

package com.mukesh.apps;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;

 * A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub pull subscription and
 * asynchronously pull messages from it.
public class CreateSubscriptionAndConsumeMessages {

  public static void main(String... args) throws Exception {
    ProjectTopicName topic = ProjectTopicName.of("myproject", "mynewtopic");
    ProjectSubscriptionName subscription =
        ProjectSubscriptionName.of("myproject", "mynewsub");

   // try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
   //   subscriptionAdminClient.createSubscription(
   //       subscription, topic, PushConfig.getDefaultInstance(), 0);
   // }

    MessageReceiver receiver =
        new MessageReceiver() {
          public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            System.out.println("Received message: " + message.getData().toStringUtf8());
    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscription, receiver).build();
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              // Handle failure. This is called when the Subscriber encountered a fatal error and is
              // shutting down.

    } finally {
      if (subscriber != null) {

In the above program, we will pull messages for one minute (60,000ms) then stop. In a real application, this sleep-then-stop is not necessary. Simply call stopAsync().awaitTerminated() when the server is shutting down, etc.

Happy Machine Learning!

Leave a Reply

Your email address will not be published. Required fields are marked *