Streaming Data Processing: Publish Streaming Data into PubSub

 Perform the following tasks:

  • Create a Pub/Sub topic and subscription
  • Simulate your traffic sensor data into Pub/Sub


Pub/Sub is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications. Use Pub/Sub to publish and subscribe to data from multiple sources, then use Dataflow to understand your data, all in real time.

Simulate your traffic sensor data into a Pub/Sub topic for later to be processed by Dataflow pipeline before finally ending up in a BigQuery table for further analysis.

Note: At the time of this writing, streaming pipelines are not available in the DataFlow Python SDK. So the streaming labs are written in Java.




Verify initialization is complete



student-01-ebabe6658191@training-vm:~$ ls /training
bq_magic.sh  project_env.sh  sensor_magic.sh

student-01-ebabe6658191@training-vm:~$ cat /training/bq_magic.sh 
#! /bin/bash

bq mk --dataset $DEVSHELL_PROJECT_ID:demos

bq load --skip_leading_rows=1 --source_format=CSV demos.average_speeds gs://cloud-training/gcpdei/results-20180608-102960.csv timestamp:TIMESTAMP,latitude:FLOAT,longitude:FLOAT,highway:STRING,direction:STRING,lane:INTEGER,speed:FLOAT,sensorId:STRING
bq load --skip_leading_rows=1 --source_format=CSV demos.current_conditions gs://cloud-training/gcpdei/results-20180608-102960.csv timestamp:TIMESTAMP,latitude:FLOAT,longitude:FLOAT,highway:STRING,direction:STRING,lane:INTEGER,speed:FLOAT,sensorId:STRING


student-01-ebabe6658191@training-vm:~$ cat /training/sensor_magic.sh 
#! /bin/bash

# User tasks:
#  1. copy repo to ~/training-data-analyst
#  2. create $DEVSHELL_PROJECT_ID
#
# Install PIP
# sudo apt-get install -y python-pip
# Use PIP to install pubsub API
# sudo pip install -U google-cloud-pubsub
# Download the data file
gsutil cp gs://cloud-training-demos/sandiego/sensor_obs2008.csv.gz ~/training-data-analyst/courses/streaming/publish/
# cd to directory
cd ~/training-data-analyst/courses/streaming/publish/
# Run sensor simulator
python3 ./send_sensor_data.py --speedFactor=60 --project $DEVSHELL_PROJECT_ID



student-01-ebabe6658191@training-vm:~$ cat /training/project_env.sh 
#! /bin/bash

# Create the DEVSHELL_PROJECT_ID on a VM
curl "http://metadata.google.internal/computeMetadata/v1/project/project-id" -H "Metadata-Flavor: Google" > Project_ID
awk '{print "export DEVSHELL_PROJECT_ID=" $0, "\n" "export BUCKET=" $0, "\n" "export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/jre" }' Project_ID > env.txt
source env.txt
echo $DEVSHELL_PROJECT_ID

Download the code repository



student-01-ebabe6658191@training-vm:~$ git clone https://github.com/GoogleCloudPlatform/training-data-analyst
Cloning into 'training-data-analyst'...
remote: Enumerating objects: 68642, done.
remote: Counting objects: 100% (156/156), done.
remote: Compressing objects: 100% (134/134), done.
remote: Total 68642 (delta 99), reused 22 (delta 20), pack-reused 68486 (from 3)
Receiving objects: 100% (68642/68642), 707.16 MiB | 25.70 MiB/s, done.
Resolving deltas: 100% (44101/44101), done.
Checking out files: 100% (12919/12919), done.

student-01-ebabe6658191@training-vm:~$ export DEVSHELL_PROJECT_ID=$(gcloud config get-value project)


student-01-ebabe6658191@training-vm:~$ echo $DEVSHELL_PROJECT_ID
qwiklabs-gcp-01-57cb1bca2434



Task 2. Create Pub/Sub topic and subscription



student-01-ebabe6658191@training-vm:~$ cd ~/training-data-analyst/courses/streaming/publish




Verify that the Pub/Sub service is accessible and working using the gcloud command.

Create your topic and publish a simple message:


student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ gcloud pubsub topics create sandiego
Created topic [projects/qwiklabs-gcp-01-57cb1bca2434/topics/sandiego].


student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ gcloud pubsub topics publish sandiego --message "hello"
messageIds:
- '18371544379309904'



Create Subscription:


student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ gcloud pubsub subscriptions create --topic sandiego mySub1

Created subscription [projects/qwiklabs-gcp-01-57cb1bca2434/subscriptions/mySub1].

student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ gcloud pubsub subscriptions pull --auto-ack mySub1
Listed 0 items.

Do you see any result? If not, why?


Try to publish another message and then pull it using the subscription:

student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ gcloud pubsub topics publish sandiego --message "hello again"
messageIds:
- '18374154964739822'


student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ gcloud pubsub subscriptions pull --auto-ack mySub1
┌─────────────┬───────────────────┬────────────┐
│     DATA    │     MESSAGE_ID    │ ATTRIBUTES │
├─────────────┼───────────────────┼────────────┤
│ hello again │ 18374154964739822 │            │
└─────────────┴───────────────────┴────────────┘

student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ gcloud pubsub subscriptions delete mySub1
Deleted subscription [projects/qwiklabs-gcp-01-57cb1bca2434/subscriptions/mySub1].










Task 3. Simulate traffic sensor data into Pub/Sub

Explore the python script to simulate San Diego traffic sensor data. Do not make any changes to the code.
cd ~/training-data-analyst/courses/streaming/publish
nano send_sensor_data.py

Look at the simulate function. This one lets the script behave as if traffic sensors were sending in data in real time to Pub/Sub. The speedFactor parameter determines how fast the simulation will go. Exit the file by pressing Ctrl+X.


Download the traffic simulation dataset:

./download_data.sh



student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ ./download_data.sh 
Copying gs://cloud-training-demos/sandiego/sensor_obs2008.csv.gz...
\ [1 files][ 34.6 MiB/ 34.6 MiB]                                                
Operation completed over 1 objects/34.6 MiB.                                     

Simulate streaming sensor data

Run the send_sensor_data.py:


This command simulates sensor data by sending recorded sensor data via Pub/Sub messages. The script extracts the original time of the sensor data and pauses between sending each message to simulate realistic timing of the sensor data. The value speedFactor changes the time between messages proportionally. So a speedFactor of 60 means "60 times faster" than the recorded timing. It will send about an hour of data every 60 seconds.
Leave this terminal open and the simulator running.



 

student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ echo $DEVSHELL_PROJECT_ID

qwiklabs-gcp-01-57cb1bca2434

student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ ./send_sensor_data.py --speedFactor=60 --project $DEVSHELL_PROJECT_ID


INFO: Reusing pub/sub topic sandiego
INFO: Sending sensor data from 2008-11-01 00:00:00
INFO: Publishing 477 events from 2008-11-01 00:00:00
INFO: Sleeping 5.0 seconds
INFO: Publishing 477 events from 2008-11-01 00:05:00
INFO: Sleeping 5.0 seconds
INFO: Publishing 477 events from 2008-11-01 00:10:00
INFO: Sleeping 5.0 seconds
INFO: Publishing 477 events from 2008-11-01 00:15:00
INFO: Sleeping 5.0 seconds
INFO: Publishing 477 events from 2008-11-01 00:20:00
INFO: Sleeping 5.0 seconds




OPEN ANOTHER SSH

student-01-ebabe6658191@training-vm:~$ cd ~/training-data-analyst/courses/streaming/publish

student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ gcloud pubsub subscriptions create --topic sandiego mySub2

Created subscription [projects/qwiklabs-gcp-01-57cb1bca2434/subscriptions/mySub2].

student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ gcloud pubsub subscriptions pull --auto-ack mySub2

┌────────────────────────────────────────────────────────┬───────────────────┬────────────┐
│                          DATA                          │     MESSAGE_ID    │ ATTRIBUTES │
├────────────────────────────────────────────────────────┼───────────────────┼────────────┤
│ 2008-11-01 03:55:00,32.749679,-117.155519,163,S,1,71.8 │ 18369681673453243 │            │
└────────────────────────────────────────────────────────┴───────────────────┴────────────┘
student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ 





















student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ cd ~/training-data-analyst/courses/streaming/publish

student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ ls -l
total 16
-rwxr-xr-x 1 student-01-ebabe6658191 google-sudoers   69 Apr 23 17:59 download_data.sh
-rw-r--r-- 1 student-01-ebabe6658191 google-sudoers 1614 Apr 23 17:59 pubsub_pull.py
-rw-r--r-- 1 student-01-ebabe6658191 google-sudoers  933 Apr 23 17:59 README.txt
-rwxr-xr-x 1 student-01-ebabe6658191 google-sudoers 3889 Apr 23 17:59 send_sensor_data.py


student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ cat README.txt 

(1) First of all, simply try to run:
        python send_sensor_data.py --speedFactor=30

    If this fails, look at your error.  Is it because a module could not be found
    or is it because the pubsub module has no attribute named 'Client'?

(2) If this fails because google.cloud.pubsub can not be found, then do:
        sudo pip install google-cloud-pubsub
    Then, try again

(3) If you get a failure that the module pubsub has no attribute called Client
    then you are either:
    - running into path problems because an older version of pub/sub is installed on your machine
    - trying to use a newer version of pub/sub

    The solution is to use virtualenv:

    (a) virtualenv cpb104
    (b) source cpb104/bin/activate
    (c) pip install google-cloud-pubsub==0.27.0
    (d) gcloud auth application-default login

    Then, try the send_sensor_data.py again

    To exit the virtualenv environment, type 'deactivate'



student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ cat pubsub_pull.py 

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# demo of message pull with pubsub
# make sure you have pubsub installed. On cloud shell you can just run
# sudo pip install --upgrade google-cloud-pubsub
#
# you will also need to create a topic and subscription. From cloud shell
# gcloud pubsub topics create cp300
# gcloud pubsub subscriptions create cpsubs --topic=cp300

from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()

def callback(message):
  print(('Received message: {}'.format(message)))
  message.ack()

# make sure you replace "javier" with your project name 
subscription_path = 'projects/javier/subscriptions/cpsubs'
subscriber.subscribe(subscription_path, callback=callback)

# just go to https://console.cloud.google.com/cloudpubsub/subscriptions/cpsubs
# and publish some messages. You will see the payload inmediately on cloudshell
#
# note even if we are pulling behind the scenes, the client libraries are designed so from the developer's point of view 
# it works like a push. You just register a callback and forget. No need to keep looping and pulling and sleeping



student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ cat download_data.sh 

gsutil cp gs://cloud-training-demos/sandiego/sensor_obs2008.csv.gz .








student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ cat send_sensor_data.py 

#!/usr/bin/env python3

# Copyright 2018 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import gzip
import logging
import argparse
import datetime
from google.cloud import pubsub

TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
TOPIC = 'sandiego'
INPUT = 'sensor_obs2008.csv.gz'

def publish(publisher, topic, events):
   numobs = len(events)
   if numobs > 0:
       logging.info('Publishing {0} events from {1}'.format(numobs, get_timestamp(events[0])))
       for event_data in events:
         publisher.publish(topic,event_data)

def get_timestamp(line):
   ## convert from bytes to str
   line = line.decode('utf-8')

   # look at first field of row
   timestamp = line.split(',')[0]
   return datetime.datetime.strptime(timestamp, TIME_FORMAT)

def simulate(topic, ifp, firstObsTime, programStart, speedFactor):
   # sleep computation
   def compute_sleep_secs(obs_time):
        time_elapsed = (datetime.datetime.utcnow() - programStart).seconds
        sim_time_elapsed = ((obs_time - firstObsTime).days * 86400.0 + (obs_time - firstObsTime).seconds) / speedFactor
        to_sleep_secs = sim_time_elapsed - time_elapsed
        return to_sleep_secs

   topublish = list() 

   for line in ifp:
       event_data = line   # entire line of input CSV is the message
       obs_time = get_timestamp(line) # from first column

       # how much time should we sleep?
       if compute_sleep_secs(obs_time) > 1:
          # notify the accumulated topublish
          publish(publisher, topic, topublish) # notify accumulated messages
          topublish = list() # empty out list

          # recompute sleep, since notification takes a while
          to_sleep_secs = compute_sleep_secs(obs_time)
          if to_sleep_secs > 0:
             logging.info('Sleeping {} seconds'.format(to_sleep_secs))
             time.sleep(to_sleep_secs)
       topublish.append(event_data)

   # left-over records; notify again
   publish(publisher, topic, topublish)

def peek_timestamp(ifp):
   # peek ahead to next line, get timestamp and go back
   pos = ifp.tell()
   line = ifp.readline()
   ifp.seek(pos)
   return get_timestamp(line)


if __name__ == '__main__':
   parser = argparse.ArgumentParser(description='Send sensor data to Cloud Pub/Sub in small groups, simulating real-time behavior')
   parser.add_argument('--speedFactor', help='Example: 60 implies 1 hour of data sent to Cloud Pub/Sub in 1 minute', required=True, type=float)
   parser.add_argument('--project', help='Example: --project $DEVSHELL_PROJECT_ID', required=True)
   args = parser.parse_args()

   # create Pub/Sub notification topic
   logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)
   publisher = pubsub.PublisherClient()
   event_type = publisher.topic_path(args.project,TOPIC)
   try:
      publisher.get_topic(event_type)
      logging.info('Reusing pub/sub topic {}'.format(TOPIC))
   except:
      publisher.create_topic(event_type)
      logging.info('Creating pub/sub topic {}'.format(TOPIC))

   # notify about each line in the input file
   programStartTime = datetime.datetime.utcnow() 
   with gzip.open(INPUT, 'rb') as ifp:
      header = ifp.readline()  # skip header
      firstObsTime = peek_timestamp(ifp)
      logging.info('Sending sensor data from {}'.format(firstObsTime))
      simulate(event_type, ifp, firstObsTime, programStartTime, args.speedFactor)
student-01-ebabe6658191@training-vm:~/training-data-analyst/courses/streaming/publish$ 


No comments:

Post a Comment

Streaming Data Processing: Publish Streaming Data into PubSub

 Perform the following tasks: Create a Pub/Sub topic and subscription Simulate your traffic sensor data into Pub/Sub Pub/Sub is a fully-mana...