Google Professional Data Engineer – Pub/Sub for Streaming Part 2
July 31, 2023

4. Lab: Setting Up A Pubsub Publisher Using The Python Library

At the end of this lecture, you should know the answer to this question what two bits of data do you need to publish a message using the Python Library for Pub sub? In this lecture, you’ll implement a Pub sub publisher programmatically in Python using the Python Client library. That pub sub provides. All the code in this lab and a couple of labs that follow is available in the Python docs samples GitHub repository. Simply run githlone GitHub. com Google Cloud Platform python docs samples in order to get the source code onto your cloud shell VM instance, we’ll study the Python code for the publisher in Pub sub. Right now, the location of this file is in Python doc samples pub sub cloud Client let’s take a look at publisher PY we first import those libraries that we need.

We need ARPAS to be able to pass command line arguments, and from Google Cloud, we import the Pub sub library. We’ll first look at the implementations of the various commands that this publisher program responds to. First up, you should be able to list all topics that are available in Pub sub. This involves instantiating a Pub sub client class and then iterating through list underscore Topics. All topics present with the Pub sub. This program allows us to create new topics as well. Once again, the entry point is the Pub sub client instance, and within that we set up a topic with a particular name and called Create on the topic. This Create Topic method takes in one argument, that is, the topic name the Delete topic function deletes a previously created topic.

Once again, with the Pub sub client, we instantiate a topic with that name and then call delete on that topic. Once we have topics, we should be able to publish messages to them. The Publish Message method takes in two arguments the name of the topic we want to publish to and the data within the message instantiate a Pub sub client and access a topic within that client. Encode the data as a byte string that is required for Pub sub messages and finally, call the Publish method on this topic and pass in the data that we want to send. We receive a message ID and we can print the message ID out to screen. In the main, we’ll instantiate an argument parser, which allows us to parse the command line arguments that we specify to publisher pi.

The default description for any command that we send is the docs that is associated with that method. We need sub parsers because the publisher program has to handle a whole variety of commands such as list, create, delete, and so on. Based on what command we pass to publisher, the command might have additional arguments that we need to check for this. Subparser looks for the List command and the help documentation for this is the docs that’s associated with the method list underscore Topics. The Create parser looks for the create Argument and when you create a new topic, an additional argument that you need to check for is the topic name. The Delete parser looks for the Delete command, and it also requires the additional argument.

Topic Name we have to know what topic it is that we are deleting. The published Parser looks for two arguments the name of the topic you want to publish this message to and the actual data of the message. And here is the code which will parse the command line arguments and depending on what argument was passed into the command line, we’ll call List Topics create Topic, Delete Topic, or Publish Message. Switch over to Cloud Shell in order to run this program, if you just say Python Publisher Pi, there are too few arguments. The message also shows you what are valid arguments to pass into Publisher PY list. Create, delete and publish the Python argument. Parser outputs this information based on what commands you’ve registered with it.

Run Publisher Pylist and you’ll see there is exactly one topic in there. I’ve already precreated this topic called Messages, and that’s why you see it here on screen. You can delete this topic using the Delete command. The Delete command requires you to specify the topic name, though, so if you say Delete messages, that will run through just fine. Essentially, what you’ve done here is recreated the G Cloud Command Line Tool for Pub sub using a Python Program let’s use our Publisher PY Python program to create a topic named Greetings and publish a message to it. The message is Ahoy, we haven’t looked at the Python code for subscribers yet, so let’s use the G Cloud Command Line tool to create a subscriber. We’ve done this before.

This should be very familiar to you. If my sub requests for a message from the Greetings topic, there’ll be no messages because the subscription was created after the message was published, we’ll go ahead and publish a new message to our Greetings topic. The message is boo. And now, if you use the Gcloud Command Line tool to access this message, you’ll find that the message is received by our subscriber. The answer to this question that we asked at the beginning of this lecture is we require the name of the topic and the data content of the message before we can publish a message to a topic using the Python Library.

5. Lab: Setting Up A Pubsub Subscriber Using The Python Library

When you are done with this lecture, you should be able to articulate what additional command line parameter the list commands needs when you want to list the subscribers. The demo in this lecture is a continuation of the last lecture where we look at subscriber PY. We’ve looked at Python code for the publisher before. This is Python code to deal with subscriptions. In the same directory location where publisher Pi lived, you can find subscriber pi. This is in Python doc samples pub sub cloud Client The libraries we use here are the same ones that we saw earlier. Argues to pass command line arguments and the Pub sub library from Google cloud response to the List Command this program will list all subscriptions it has for a particular topic.

Remember that the list of subscriptions is always on a per topic basis. That’s why it takes in a topic name as an input argument, instantiate a Pub sub client and the topic object. And the topic object has a list underscore subscriptions method which will give you all the subscribers. The Create subscription will take in a topic name as well as the name of the subscription, instantiate a Pub sub client, a topic with that Pub sub client, and a subscription within that topic calling the Create method on this subscription creates a new subscriber. The delete subscription method takes in a topic name and a subscription name, instantiates the subscription within the topic and calls the delete method on it. Subscriptions can receive messages on particular topics.

The receive message method takes in a topic name and a subscription name, instantiate a Pub sub client, a topic, and a subscription within that topic with the name specified, and call the pull method on the subscription. The pull method takes in an argument which specifies whether that particular call should return immediately or not. If you say return immediately is equal to false, that call will block until messages are received. Once messages have been received, print them out to screen in a for loop. There might be more than one message received, and then finally acknowledge that you’ve received a message by sending back the acid. In the main, we parse the arguments that have been passed into the command line.

The list command for subscriptions requires that a topic be specified. Only then will it list the subscribers. The remaining commands Create, Delete, and Receive all require a topic name as well as a subscription name. Parse the command line arguments that the user has specified and call the right method based on what those arguments are. For example, if you have the list argument, you’ll call list subscriptions. Let’s say that we have one topic set up in our Pub sub. That is the greetings topic that we saw earlier. If you call subscriber pi, you need to specify a command one amongst list, create, Delete, or Receive. Each of these commands have additional arguments that need to be specified. The list command requires a topic name.

If you list the subscriptions for Greetings, the response should be nothing. There are no subscriptions yet. Let’s create a new subscription for the Greetings topic. Notice the command for creating a subscription. It’s very similar to the Gcloud command line tool that we used earlier. Publish a message using publisher pi. This is the hello message published to the Greetings topic. You can now use Subscriber Pi to receive this message. Play around with this Python program to get a feel for it. Once you’re done, you can go ahead and delete the subscription and perhaps the topic as well. Going back to the question that we asked earlier, if you want to list subscriptions, you need to specify the topic name. Subscriptions are on a per topic basis in Pub sub.

6. Lab: Publishing Streaming Data Into Pubsub

In this lecture, we’ll see some more Python client library code to work with Pub. Sub. At the end of this lecture, you should be able to answer this question how would you publish multiple events in a batch using Pub. Sub? Python Client API in this lecture, we’ll simulate an event stream that we publish to Pub. Sub and receive these streaming messages using the command line. In this demo, we’ll work with data that simulates traffic sensors in a San Diego highway. There are separate sensors for each lane, and there is sensor data from individual sensors which records the speeds of the cars which are driving along a particular lane. This sensor data is available in a Gzip file.

We’ll read it in and publish it to Pub sub. The code for this lab is available in the training. Data analyst GitHub Repo. Hopefully you’ve already cloned this repo onto your Cloud shell VM instance, and you don’t need to do that here. CV into the training data analyst courses. Streaming publish directory. Create a new pub sub topic called San Diego. This is the topic to which our Python code will publish messages. Run LSL in this directory and you’ll see two files that we use in this demo download underscore Data Sh, which is a script file, and Send Sensor Data Pi. That is a Python file which we’ll use to publish events. Let’s examine the code in. Send sensor data. Pi. It imports a whole bunch of libraries that it uses.

Rpars and Pub sub are familiar to you the others you’ll see how it’s being used when you look through the code. The sensor data is arranged in the form of events, one event per line, and every line has an associated timestamp. We format that timestamp. In this format we publish to the San Diego topic. This is stored in a variable named topic. The variable named input contains the CSV gzipped file, which contains the data that we are going to stream to Pub sub. The published method is what publishes events in batches. It takes in a topic instance and a list of events that need to be published to that topic. The Pub sub Python client allows you to publish to a topic in batches, so you’ll collect together a bunch of events and publish it in one go.

Instantiate a batch object using the topic batch method and publish to this batch. The Get Timestamp is a helper method which parses one line from a CSV file and extracts the timestamp and formats it in the format that we want it to be. The Simulate method forms the main logic of this program. This simulates lane sensors, sending traffic data in San Diego and publishing it to Pub sub. I’ll leave it to you to examine the precise details of the simulation code. Instead, I’ll opt to give you a broad understanding of what exactly is going on within this method. The exact nitty gritty of the simulation do not affect the integration with Pub sub. The important argument here is the Speed Factor.

The Speed Factor is something that you specify on the command line and it determines how fast the data is sent to Pub sub. If this speed factor were equal to one, it means 1 minute of data will be sent to Pub sub every minute. So this is real time data. If this speed factor was 60, it means 60 minutes or 1 hour of data will be sent to Pub sub every minute. Because this is a simulation and not real streaming data, you can use the Speed Up factor to push through a whole bunch of data quickly. This simulation will sleep for a bit after it publishes a batch of events. This compute sleep seconds, takes into account the Speed Factor to determine how much time has passed in the simulation. This simulation is not real time.

It can be sped up using the Speed Factor. The reason it’s not real time is because we are reading from a Gzip file. We can read as fast or as slowly as we want to. And this for loop down here is the code which parses the CSV file line by line and reads in the sensor events. These sensor events are then accumulated together in a list called To Publish and finally, at some point in time, they are published as a batch. This we do by calling the Publish method. Once all the messages have been published, there might be a few last messages that are left over call Publish once again outside the for loop peak timestamp is a helper method which looks ahead to the next line and returns a timestamp.

In the main, we first instantiate an argument parser which looks at the command line arguments. The main argument here is the Speed factor. Here is the help documentation which helps you understand this. A speed factor of one means 1 minute of data sent to Pub sub in 1 minute. A speed factor of 60 is a speed up 60 times, which means 1 hour of data will be sent to Pub sub every minute. A speed factor of 30 means 30 minutes of data will be sent to Pub sub every minute. This bit of code here creates a topic named San Diego, if one doesn’t already exist. And here we start reading the input file, the gzipped file and running the simulation.

The question to ask now would be where does this sensor data, this Gzip file that we are relying on, come from? This is where the other script comes in handy download Data Sh. It copies the sensor data Gzip file from Cloud training demos which is a publicly accessible bucket, to the current working directory in our Cloud Shell instance. Now it’s time to switch over to Cloud Shell and run the simulation. Run download Data Sh and download the Gzip file onto your VM instance. We are now ready to run send sensor data pi with a speed factor of 60 60 minutes, or 1 hour of data sent every 1 minute. Now, it’s possible that this Python program, when you try to run it, fails. There are a number of reasons why this could happen.

Look at what the error message is, and let’s see how we can fix it. If you get an error which says a module could not be found, or the Pub sub module does not have an attribute named client, in that case, you might need to run Pseudo pip. Install Google cloud pub sub. If you’ve been following this course linearly, it’s unlikely that you’ll run into this issue, because we ran G Cloud in it to get the latest versions of the Pub sub module onto our cloud shell. If you try again to run this Python program and it doesn’t run, you might need to set up a virtual environment to run Python. This you can do. Following these four steps that you see here on screen, if you look at your cloud shell, you’ll find that this program has been sending sensor data every minute or so.

These events are being published to the San Diego topic on Pub sub. Let’s set up a subscription. To receive these messages, create a subscription for the San Diego topic using the Gcloud command line tool, and then call Pull on this topic. Here is one message from the batch of data that has been published. What you see highlighted is the speed of the car that was recorded. So long as our Python code continues to publish events, we can pull information from this topic and receive these messages. You saw early on in this lecture when we examined the Python Code how you can publish events in a batch. You use topic batch to instantiate a batch object and call Publish on it. The batch instance takes in a list of event data to be published.

Leave a Reply

How It Works

img
Step 1. Choose Exam
on ExamLabs
Download IT Exams Questions & Answers
img
Step 2. Open Exam with
Avanset Exam Simulator
Press here to download VCE Exam Simulator that simulates real exam environment
img
Step 3. Study
& Pass
IT Exams Anywhere, Anytime!