Google Professional Data Engineer – Pub/Sub for Streaming Part 3
August 1, 2023

7. Lab: Reading Streaming Data From PubSub And Writing To BigQuery

At the end of this lecture, you should know when you would choose to use the Dataflow Pipeline Options interface to read in configuration parameters for our beam pipeline. This demo will pull together a bunch of things that we’ve learned over the course of this class. We’ll process streaming data from Pub sub. We’ll store it in BigQuery and we’ll transform it using data in this lab. We’ll run the same code that we saw in the last demo to publish streaming data to Pub sub. We want to publish data to the San Diego topic. Make sure the topic is created in your Pub sub. The code for this lab is present in the training data analyst. Repo CD into the training data analyst courses. Streaming publish folder.

This is where the Python code send sensor data pile lives, which will read from the Gzip file that you can see right here on screen. The Python code will read this Gzip file and publish to the San Diego topic in Pub sub. Run this Python code with a speed factor of 60, which means 60 minutes of sensor data will be sent to Pub sub. Every minute I get an error here. I had to reconnect to my cloud shell since my connection had expired, which is why I get User Not Authorized. We should be prose at this. Now. We’ve seen this error before. Run g cloud application defaults login. Enter the verification code that you get from this URL and you’ll be reauthorized. I’ll run this Python code once again. So now I have streaming data being published to the San Diego topic on Pub sub.

We are going to run a data flow job which reads from Pub sub and writes into BigQuery. Open up your BigQuery web console. We are going to create a brand new data set called Demos. Our data flow code will be creating a table within this data set. Let’s now take a look at the data flow code. Move into the training. Data Analyst courses. Streaming process directory. Within that, there should be a San Diego directory CD into that. And if you run an LS command there, you’ll notice a run on cloud shrine. This is the script that we’ll use to run our data flow pipeline on the cloud. It is run on cloud. Specify the project name, bucket name, and the name of the class which contains the Java code that you want to execute. We’ll use Maven compile to compile this Java code.

And here are all the arguments that you specify to configure your beam pipeline. You want the staging location, the project ID, and you want to specify the runner as Data Flow Runner so that it executes on Google cloud platform. And here is the dataflow pipeline code that we’re going to execute in average speeds Java. This source code is located in the Training Data Analyst repository. Here is the full path. The current directory that you’re working in is the courses streaming Process San Diego folder. From here, you need to find the Java source folder which is in this path that you see on the second line on screen. This data flow code performs transformations which finds the average speed per se over a certain time interval.

Because this is streaming data, we need to specify a sliding window over which we want to perform our averaging. Here are all the libraries that we import. We read streaming data from Pub. Sub, which is why we need Pub sub IO and we’ll write into BigQuery after processing the data and finding averages, that’s why we need BigQuery I O. We’ll perform our averages over a sliding window. And finally, when we write out to BigQuery, we’ll use these files the table field schema, table row, and the Table schema. For this dataflow pipeline, we have some custom configuration options, which is why our My Options interface extends from Data Flow Pipeline Options.

Data flow pipeline options can be used to configure the data flow runner. And this is the interface that you’ll extend from if you want to access cloud specific parameters in your code such as the Project ID that you’re currently working in, the staging location in your GCs, and so on. There are two command line parameters that we can configure in our pipeline. The first is the averaging interval. This is the length of the window over which we calculate averages. And the second is the simulation speed up factor. This speed up factor should be the same factor that you specified when you ran the Python code that publishes to Pub. Sub. Since we use the speed up factor of 60, we can leave this parameter at its default value of 60.

Now, for some boilerplate code to set up our data flow, we parse the options from the command line and instantiate a pipeline. Because we know that we are working with streaming data, we’ll set streaming to True within options. The topic variable contains the topic that we read from in Pub. Sub. This contains the full paths to the topic. This is where we need to access the Project ID from our Options interface. The project ID is available in the data flow pipeline options interface. The average speed table contains the full path to our BigQuery table, which will have the result of these transformations. The average speeds on a per sensor basis. The averaging interval is the duration that holds the length of our sliding window.

This is the time period of the sliding window. Over how much time do we want our averages? If the averages are over the last hour, our sliding window duration should be 60 minutes. We need some special calculation here because this has to take into account the speed up factor of the sensor data. The data is not being sent to Pub. Sub in real time. There is a speed up factor involved, which means 1 hour of data will be sent within 1 minute. This is with a speed up factor of 60, and that has to be taken into account. When we use sliding windows over streaming data, we have to consider two lengths of time the length of the sliding window and the sliding interval. The sliding interval here is half the window interval.

The sliding interval determines by how much the window slides over for each averaging operation. Here is the code to set up the schema for our BigQuery table. Notice the columns that the table has timestamp latitude, longitude, highway direction, lean speed and sensor ID. Here is where the pipeline processing begins. We first read data from Pub sub. This reads from the topic that we’ve specified san Diego. One message from Pub sub represents a single line from the CSV file. We process each line and instantiate a lane info object using the fields in that line. We’ve now got a p collection of lane info objects, each representing the speed of a single car in that San Diego highway. We now want to apply a time window to the stream in order to perform some calculations.

This is a sliding window of length averaging interval and of sliding interval averaging frequency. The remaining transformations in the pipeline happened within the sliding window. We want to split the cars that were identified on the highway on a per sensor basis. So we get each sensor and split the car based on the sensors. The output of this transformation is a key value pair, where the key is the sensor ID and the value is the speed of the car as picked up by that sensor. We’ve now got car speeds on a per sensor basis and we apply an averaging operation. We find the mean per key. At this point, we have the average on a per se basis over a sliding window.

We now write this out to a BigQuery row, fill in the values for every column in a row, and then write it out to a BigQuery table. In our demos data set, the only bit of code that we need to look at now is the lean info class, which passes one line of sensor data and creates an object which has getters and setters for easy access to the data about one card detected by a sensor. The lane info is just an array of fields. This is the enum which represents each field. This is all the information that we get from our sensors. The new lane info method which instantiates a new lane info object. This is the one line passed in from the CSV data. It splits it up into fields and stores these field values as an array of strings.

There’s a generic get which you can use by specifying the enum for each field. Or you can use the getters and setters which simply access individual fields within lane info. The get sensor key creates a unique identifier for each sensor. It combines a number of fields from the data that is seen on a single line to get a unique Identifier for a sensor. The fields that represent a unique sensor are latitude, longitude, freeway ID, freeway direction and the lane. So when would you use the data flow pipeline options for configuring your pipeline? When you want to access specific parameters from your data flow runner, such as the project ID, the Google cloud, staging location and so on.

8. Lab: Executing A Pipeline To Read Streaming Data And Write To BigQuery

When you stop a dataflow job, you have two options canceling it or draining it. This lecture should tell you what the difference is between these two. In this lecture, we’ll continue with the previous demo. We’ll read in streaming data from Pub sub, perform some transformations on it, and store it in BigQuery. We’ll see how we can run this job on the cloud. To execute this pipeline on the Cloud, we’ll use the run on cloud script that we saw earlier. Specify the project ID as the first command line parameter, the bucket name as the second, and the last parameter should be the name of the Java class that you want to execute. In our case, it is Average speeds.

Notice that I have another Cloud shell tab open. That is the tab where my Send Sensor Data Pi program is running. Sensor data is being pushed to Pub sub as this data flow pipeline builds and then executes. Let’s go to the web console, to the data flow page and view the status of this job. And there is our job at the very top. Click on our job and you’ll see the execution graph. The job seems to be progressing along just fine processing, streaming data and writing to BigQuery. The logs look fine as well. No errors or warnings. The transformation where we calculate the average by sensor, performs multiple transforms. Within it, you can see the group by key and then the combined operation of grouped values. Let’s turn our attention to get messages.

The first transform, which reads from Pub sub. You’ll notice on the right that about 13,000 elements have been added and a total of 905 data have been processed. There are some other interesting parameters here. Let’s look at those. The system lag is 5 seconds. That means this is the maximum amount of time that a particular element has been waiting for processing in this stage of the pipeline. Below that, we have the data. Watermark, which is a timestamp watermarks, are special markers in streaming data to help measure progress and completeness of our streams. This time stamp value of the watermark indicates that all windows ending before or at this time stamp are closed.

Processing step will no longer accept any streaming entities with a timestamp that is before this watermark. The Bisensor transformation is where the individual car speeds, as recorded by different sensors, are split by sensor. You can see that it has processed 31,000 elements roughly, and the output is also roughly 31,000 elements. More elements are being processed as time goes on. The average by Sensor has seen about 33,000 elements, almost 34,000 elements at its input. Its output collection, though, is just 2385 elements. The aggregation reduces the number of elements produced at the output. This calculates the average speed per sensor over the sliding window interval within our execution graph.

The transformation which converts the data to a BigQuery row is one which takes a long time to process. The wall time is four minutes and 54 seconds. You can see this within that node in the execution graph as well as in the monitoring on the side. The wall time is a measure of how long the processing steps within that node take. Let’s switch over to BigQuery and see whether our table writes have been successful. Load your web console and you’ll see within the demos data set we have the average speeds table. The schema of this table matches our Schema specification within our data flow program. Let’s sample the data in this table by running a quick query.

We select Star from the average speeds table, and yes, there are entries that have been written out. Notice the average speed on a per sensor basis over the last hour of the sliding window. Now, this data flow pipeline can run for a potentially long time since it reads in streaming data. We might want to stop the job if we’ve understood everything and are ready to move on. This you can do by clicking on Stop Job on the left. This will throw up a dialogue that gives you two options. You can either cancel the job or drain it. If you cancel the job, data flow will immediately stop processing, and all data that you’ve ingested will just be aborted. Any buffered data may be lost, but if you choose to drain your job instead, this is a more graceful stopping of your job.

Data flow will seize all data injection, but whatever is in the pipeline, it will try and complete processing that data. We’ve already made my choice to drain this job, which is why this option is highlighted. You can switch over to BigQuery now and check the last timestamp that was written out. This shouldn’t change once the job has been trained. Let’s get back to this question. What does it mean to cancel a job versus draining it? When you cancel a job, the data flow pipeline stops immediately and does not process even that data that has been ingested into the pipeline. It’s an abrupt stop of operations. If you drain a job, it will stop data flow operations. But if any data has been ingested into the pipeline, it will complete processing that data before it stops completely. It’s a more useful way to stop your pipeline.

9. Lab: Pubsub Source BigQuery Sink

At the end of this lecture, you should be able to clearly explain what the sliding window size and the sliding window period or sliding window interval represents in this lab. Once again, we’ll read streaming data from Pub sub and write out our results to BigQuery. You’ll find that the actual transformations in the dataflow pipeline are a little simpler than in the lab that we did earlier. The results will be written to a BigQuery data set called Demos. If it doesn’t already exist, go ahead to the BigQuery Web console and create it. This time around, we won’t run a program that streams data into Pub sub. We’ll manually publish messages to the Pub sub topic. This will allow us to more clearly identify what’s going on when the execution graph for the dataflow pipeline executes.

Create a stream demo topic. This is the topic to which we’ll publish our messages. Once the topic has been created, switch over to Cloud Shell and we’ll take a look at the data flow pipeline code that we are going to execute. The source code for this demo is present in the training data analyst repository under courses data Analysis Lab to Java help. In this directory you’ll find a script called Run on Cloud Four Sh. This is what will execute our dataflow pipeline on the cloud. This is what has the command line arguments that will pass to our dataflow pipeline. The arguments for the project staging location, temp location, and the runner which is Dataflow runner all should look familiar to you.

The output parameter indicates where the results are going to be stored once the pipeline has processed them. This will be in the Demos dataset stream demo table. And the input to this program comes from a Pub subtropic called Stream demo within our project. We’ll now take a look at the Java code that executes this pipeline. This is written in Stream demo consumer Java and it is in the source directory where the JavaScript files live. The complete path to that is shown on screen in front of you. Your current directory in Cloud shell should be Lab Two. Java. Help. Within that CD into the directory that you see on the second line on screen.

The code in this file simply processes the messages that it sees from Pub sub and spits out the number of words that it has encountered within a certain window interval. So the main transformation is simply summing up the number of words within that window. Here are the libraries that we import to read from the data source and write to a data sync. Pub sub IO allows us to read from the stream demo topic on Pub sub that is our data source and BigQuery IO allows us to write out the results to a BigQuery table that is our data sync. And here are libraries which help us perform our transforms. The do function which processes one element from the pilot. The pair do which allows you to parallelize these processing tasks.

The sum is an aggregation that we’ll lose. We’ll calculate the total number of words generated by our messages within a window of time. This will be a sliding window and will have a corresponding window size and a sliding interval. And we also require these libraries which work on BigQuery. This allow you to set up the table schema, represent a table row and right out to the table. The My Options interface, which derives from the Data Flow pipeline options interface, specifies the configuration parameters for our pipeline. The output that we finally write to is a BigQuery table called Stream Demo within the Demos data set. And the input that we read in is a Pub sub topic.

The values that we pass in for these command line parameters reference our own project, instantiate the pipeline and set the streaming option to True. We are going to be reading from Pub sub after all. It’s streaming data. Get the topic from which we’re going to read messages and set up the schema for the BigQuery table, which we’ll write into at the end. The first step in this pipeline is to read from the topic on Pub sub. That’s a simple Pub sub IO read operation. Once we have the stream of messages in the form of a peak collection, we apply a window operation on them. We apply the window operation so that the remaining transformations in this pipeline apply to a particular window.

This window is a sliding window of size two minutes. That means the size of the window interval is two minutes. This window interval basically means that we’ll count the number of words that were received in the form of Pub sub messages within a two minute interval. That is what window size represents. This window of size two minutes moves every 30 seconds. This is the sliding period or the sliding interval. The sliding interval determines by how much a window moves over. Where will the position of the new window be? In this particular example, if the first window was from zero to two minutes, the second window will be from 30 seconds to two minutes, 30 seconds. That is the sliding interval.

The next transform processes the lines that it has received from Pub sub messages and splits them into words. The output for every string that it encounters in the input P collection is an integer representing the number of words in that string within each window interval. We want to perform a sum operation, which we can do using the aggregation sum we’ll call integers globally without defaults, sum all the integers within this window and then write out the total number of words that we’ve encountered within a window interval to BigQuery. This operation can be performed in parallel, which is why we specify the pair do instantiate a new BigQuery table row.

For every result, set the timestamp and the number of words and pass it on to the output. And finally, write each of these table rows out to BigQuery. Switch over to your cloud shell window and let’s execute this dataflow program. Make sure that you are in the lab to Java Help directory run the script run on cloud four sh. Specify your current project ID and the name of your bucket. Submit this job and let’s examine the dataflow dashboard and see how the execution of this pipeline does. And there is the execution graph for our dataflow pipeline. Everything looks nice and green. All the transforms are running. If you look at the monitoring console on the right for the Get messages step, you’ll find that the output collections is zero.

This is because we haven’t fed any messages, not published any messages to the stream demo topic which this dataflow pipeline reads from. Let’s switch over to the Pub Sub web console and publish some messages to the stream demo topic. It doesn’t matter what the message is. I’m simply going to say how are you doing? This is my very first message. And then switch over to your dataflow pipeline and click on Get Messages. Notice that in the output collection one element has been added. Click on the window operation to see what that looks like. You notice something that is interesting and bears mentioning. The input to the window has been one element to the collection and the output is four elements.

What does this mean? If you remember the data flow pipeline code, the sliding window size was two minutes and the sliding interval was 30 seconds. This means as the window slides over streaming entities, each entity will be included in four different windows. The entity will be output. For each of these four windows, one element becomes four. The words per line has received those messages within the different sliding window intervals and output the corresponding number of words. Four elements were received. Four elements were output. These elements are then passed on to the next step words in time window. Four elements have been received there. These elements are then converted to BigQuery table rows.

This is a slightly slower operation and written out to BigQuery because you control the input to the streaming application. It’s receiving the messages that you are explicitly typing out. You can type out a bunch of other messages and see how the pipeline processes them. How many elements does it receive at each step? How many does it output? This is a good way to debug your pipeline. I just published two more messages to Pub sub, so with the first one that I added, it brings a total to three. That means three messages have been received by my first stage and so on. I’ll leave it to you to explore the graph. It’s a lot of fun. The one last thing that’s left to do for us is to check out the stream demo table to see if things have been written out successfully.

We’ll run a simple select star query here with a limit. And there you see it. The final results the number of words in every window. This lecture should have made the concept of sliding window much clearer. The sliding window size is determined by the time interval over which it processes the streaming entities. A window size of two minutes processes all the entities which come in in that two minute period. The sliding interval, or the sliding window period, is the frequency at which the window window slides over. Let’s say this were set to 30 seconds. Every 30 seconds, the two minute window would slide over. This sliding interval also determines how many times each entity in a stream will be included in a window calculation.

Leave a Reply

How It Works

img
Step 1. Choose Exam
on ExamLabs
Download IT Exams Questions & Answers
img
Step 2. Open Exam with
Avanset Exam Simulator
Press here to download VCE Exam Simulator that simulates real exam environment
img
Step 3. Study
& Pass
IT Exams Anywhere, Anytime!