Pub/Sub Lite: Qwik Start


Complementing Pub/Sub, Pub/Sub Lite is a zonal service for messaging systems with predictable traffic patterns.

If you publish 1 MiB-1 GiB of messages per second, Pub/Sub Lite is a low cost option for high-volume event ingestion.

Publishers send messages to Lite topics and subscribers receive messages from Lite subscriptions.

Lite topics and Lite subscriptions are zonal resources that must be in the same Cloud project and zone.


Create Lite topics and Lite subscriptions using the Cloud Console.

Send and receive messages using the Pub/Sub Lite client library for Python.



student_04_ae4d70a09902@cloudshell:~ (qwiklabs-gcp-01-b8c06f76b1c9)$  pip3 install --upgrade google-cloud-pubsublite

Collecting google-cloud-pubsublite
  Downloading google_cloud_pubsublite-1.8.3-py2.py3-none-any.whl (288 kB)
     |████████████████████████████████| 288 kB 9.2 MB/s 
Requirement already satisfied: google-api-core[grpc]!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,<3.0.0dev,>=1.33.2 in /usr/local/lib/python3.9/dist-packages (from google-cloud-pubsublite) (2.11.1)
Requirement already satisfied: grpcio<2.0.0dev



























student_04_ae4d70a09902@cloudshell:~ (qwiklabs-gcp-01-b8c06f76b1c9)$ vi send_messages.py



from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)
# TODO(developer):
project_number = 62201625285
cloud_region = "us-east4"
zone_id = "b"
topic_id = "my-lite-topic"
num_messages = 100
location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)
# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    data = "Hello world!"
    api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
    # result() blocks. To resolve API futures asynchronously, use add_done_callback().
    message_id = api_future.result()
    publish_metadata = MessageMetadata.decode(message_id)
    print(
        f"Published a message to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}."
    )
student_04_ae4d70a09902@cloudshell:~ (qwiklabs-gcp-01-b8c06f76b1c9)$ 

student_04_ae4d70a09902@cloudshell:~ (qwiklabs-gcp-01-b8c06f76b1c9)$ vi receive_messages.py


from concurrent.futures._base import TimeoutError
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    SubscriptionPath,
)
# TODO(developer):
project_number = 62201625285 
cloud_region = "us-east4"
zone_id = "b"
subscription_id = "my-lite-subscription"
timeout = 90
location = CloudZone(CloudRegion(cloud_region), zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)
# Configure when to pause the message stream for more incoming messages based on the
# maximum size or number of messages that a single-partition subscriber has received,
# whichever condition is met first.
per_partition_flow_control_settings = FlowControlSettings(
    # 1,000 outstanding messages. Must be >0.
    messages_outstanding=1000,
    # 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
    bytes_outstanding=10 * 1024 * 1024,
)
def callback(message):
    message_data = message.data.decode("utf-8")
    print(f"Received {message_data} of ordering key {message.ordering_key}.")
    message.ack()
# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:
    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )
    print(f"Listening for messages on {str(subscription_path)}...")
    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

student_04_ae4d70a09902@cloudshell:~ (qwiklabs-gcp-01-b8c06f76b1c9)$ python3 send_messages.py

Published a message to partition 0 and offset 0.



student_04_ae4d70a09902@cloudshell:~ (qwiklabs-gcp-01-b8c06f76b1c9)$ python3 receive_messages.py

Listening for messages on projects/62201625285/locations/us-east4-b/subscriptions/my-lite-subscription...
Received Hello world! of ordering key .

student_04_ae4d70a09902@cloudshell:~ (qwiklabs-gcp-01-b8c06f76b1c9)$

No comments:

Post a Comment

AppEngine - Python

tudent_04_347b5286260a@cloudshell:~/python-docs-samples/appengine/standard_python3/hello_world (qwiklabs-gcp-00-88834e0beca1)$ sudo apt upda...