This repository was archived by the owner on Mar 9, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 214
sample: add publish flow control sample and other nits #429
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,7 +55,7 @@ def create_topic(project_id, topic_id): | |
|
|
||
| topic = publisher.create_topic(request={"name": topic_path}) | ||
|
|
||
| print("Created topic: {}".format(topic.name)) | ||
| print(f"Created topic: {topic.name}") | ||
| # [END pubsub_quickstart_create_topic] | ||
| # [END pubsub_create_topic] | ||
|
|
||
|
|
@@ -74,7 +74,7 @@ def delete_topic(project_id, topic_id): | |
|
|
||
| publisher.delete_topic(request={"topic": topic_path}) | ||
|
|
||
| print("Topic deleted: {}".format(topic_path)) | ||
| print(f"Topic deleted: {topic_path}") | ||
| # [END pubsub_delete_topic] | ||
|
|
||
|
|
||
|
|
@@ -94,7 +94,7 @@ def publish_messages(project_id, topic_id): | |
| topic_path = publisher.topic_path(project_id, topic_id) | ||
|
|
||
| for n in range(1, 10): | ||
| data = "Message number {}".format(n) | ||
| data = f"Message number {n}" | ||
| # Data must be a bytestring | ||
| data = data.encode("utf-8") | ||
| # When you publish a message, the client returns a future. | ||
|
|
@@ -120,7 +120,7 @@ def publish_messages_with_custom_attributes(project_id, topic_id): | |
| topic_path = publisher.topic_path(project_id, topic_id) | ||
|
|
||
| for n in range(1, 10): | ||
| data = "Message number {}".format(n) | ||
| data = f"Message number {n}" | ||
| # Data must be a bytestring | ||
| data = data.encode("utf-8") | ||
| # Add two attributes, origin and username, to the message | ||
|
|
@@ -136,8 +136,7 @@ def publish_messages_with_custom_attributes(project_id, topic_id): | |
| def publish_messages_with_error_handler(project_id, topic_id): | ||
| # [START pubsub_publish_with_error_handler] | ||
| """Publishes multiple messages to a Pub/Sub topic with an error handler.""" | ||
| import time | ||
|
|
||
| from concurrent import futures | ||
| from google.cloud import pubsub_v1 | ||
|
|
||
| # TODO(developer) | ||
|
|
@@ -146,31 +145,28 @@ def publish_messages_with_error_handler(project_id, topic_id): | |
|
|
||
| publisher = pubsub_v1.PublisherClient() | ||
| topic_path = publisher.topic_path(project_id, topic_id) | ||
| publish_futures = [] | ||
|
|
||
| futures = dict() | ||
|
|
||
| def get_callback(f, data): | ||
| def callback(f): | ||
| def get_callback(publish_future, data): | ||
| def callback(publish_future): | ||
| try: | ||
| print(f.result()) | ||
| futures.pop(data) | ||
| except: # noqa | ||
| print("Please handle {} for {}.".format(f.exception(), data)) | ||
| # Wait 100 ms for the publish call to succeed. | ||
| print(publish_future.result(timeout=0.1)) | ||
| except futures.TimeoutError: | ||
| print(f"Publishing {data} timed out.") | ||
|
|
||
| return callback | ||
|
|
||
| for i in range(10): | ||
| data = str(i) | ||
| futures.update({data: None}) | ||
| # When you publish a message, the client returns a future. | ||
| future = publisher.publish(topic_path, data.encode("utf-8")) | ||
| futures[data] = future | ||
| # Publish failures shall be handled in the callback function. | ||
| future.add_done_callback(get_callback(future, data)) | ||
| publish_future = publisher.publish(topic_path, data.encode("utf-8")) | ||
| # Non-blocking. Publish failures are handled in the callback function. | ||
| publish_future.add_done_callback(get_callback(publish_future, data)) | ||
| publish_futures.append(publish_future) | ||
|
|
||
| # Wait for all the publish futures to resolve before exiting. | ||
| while futures: | ||
| time.sleep(5) | ||
| futures.wait(publish_futures, return_when=futures.ALL_COMPLETED) | ||
|
|
||
| print(f"Published messages with error handler to {topic_path}.") | ||
| # [END pubsub_publish_with_error_handler] | ||
|
|
@@ -179,39 +175,93 @@ def callback(f): | |
| def publish_messages_with_batch_settings(project_id, topic_id): | ||
| """Publishes multiple messages to a Pub/Sub topic with batch settings.""" | ||
| # [START pubsub_publisher_batch_settings] | ||
| from concurrent import futures | ||
| from google.cloud import pubsub_v1 | ||
|
|
||
| # TODO(developer) | ||
| # project_id = "your-project-id" | ||
| # topic_id = "your-topic-id" | ||
|
|
||
| # Configure the batch to publish as soon as there is ten messages, | ||
| # one kilobyte of data, or one second has passed. | ||
| # Configure the batch to publish as soon as there are 10 messages | ||
| # or 1 KiB of data, or 1 second has passed. | ||
| batch_settings = pubsub_v1.types.BatchSettings( | ||
| max_messages=10, # default 100 | ||
| max_bytes=1024, # default 1 MB | ||
| max_bytes=1024, # default 1 MiB | ||
| max_latency=1, # default 10 ms | ||
| ) | ||
| publisher = pubsub_v1.PublisherClient(batch_settings) | ||
| topic_path = publisher.topic_path(project_id, topic_id) | ||
| publish_futures = [] | ||
|
|
||
| # Resolve the publish future in a separate thread. | ||
| def callback(future): | ||
| message_id = future.result() | ||
| print(message_id) | ||
|
|
||
| for n in range(1, 10): | ||
| data = "Message number {}".format(n) | ||
| data = f"Message number {n}" | ||
| # Data must be a bytestring | ||
| data = data.encode("utf-8") | ||
| future = publisher.publish(topic_path, data) | ||
| publish_future = publisher.publish(topic_path, data) | ||
| # Non-blocking. Allow the publisher client to batch multiple messages. | ||
| future.add_done_callback(callback) | ||
| publish_future.add_done_callback(callback) | ||
| publish_futures.append(publish_future) | ||
|
|
||
| futures.wait(publish_futures, return_when=futures.ALL_COMPLETED) | ||
|
|
||
| print(f"Published messages with batch settings to {topic_path}.") | ||
| # [END pubsub_publisher_batch_settings] | ||
|
|
||
|
|
||
| def publish_messages_with_flow_control_settings(project_id, topic_id): | ||
| """Publishes messages to a Pub/Sub topic with flow control settings.""" | ||
| # [START pubsub_publisher_flow_control] | ||
| from concurrent import futures | ||
| from google.cloud import pubsub_v1 | ||
| from google.cloud.pubsub_v1.types import ( | ||
| LimitExceededBehavior, | ||
| PublisherOptions, | ||
| PublishFlowControl, | ||
| ) | ||
|
|
||
| # TODO(developer) | ||
| # project_id = "your-project-id" | ||
| # topic_id = "your-topic-id" | ||
|
|
||
| # Configure how many messages the publisher client can hold in memory | ||
| # and what to do when messages exceed the limit. | ||
| flow_control_settings = PublishFlowControl( | ||
| message_limit=100, # 100 messages | ||
| byte_limit=10 * 1024 * 1024, # 10 MiB | ||
| limit_exceeded_behavior=LimitExceededBehavior.BLOCK, | ||
| ) | ||
| publisher = pubsub_v1.PublisherClient( | ||
| publisher_options=PublisherOptions(flow_control=flow_control_settings) | ||
| ) | ||
| topic_path = publisher.topic_path(project_id, topic_id) | ||
| publish_futures = [] | ||
|
|
||
| # Resolve the publish future in a separate thread. | ||
| def callback(publish_future): | ||
| message_id = publish_future.result() | ||
| print(message_id) | ||
|
|
||
| # Publish 1000 messages in quick succession to trigger flow control. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn't say "to trigger flow control." It's possible that flow control will be triggered, but it's also entirely possible it won't be if messages get out quickly enough. Either way, the fact that flow control was triggered isn't something that will be easily visible to the user. Maybe say "Rapidly publishing 1000 messages in a loop may be constrained by flow control." |
||
| for n in range(1, 1000): | ||
| data = f"Message number {n}" | ||
| # Data must be a bytestring | ||
| data = data.encode("utf-8") | ||
| publish_future = publisher.publish(topic_path, data) | ||
| # Non-blocking. Allow the publisher client to batch messages. | ||
| publish_future.add_done_callback(callback) | ||
| publish_futures.append(publish_future) | ||
|
|
||
| futures.wait(publish_futures, return_when=futures.ALL_COMPLETED) | ||
|
|
||
| print(f"Published messages with flow control settings to {topic_path}.") | ||
| # [END pubsub_publisher_flow_control] | ||
|
|
||
|
|
||
| def publish_messages_with_retry_settings(project_id, topic_id): | ||
| """Publishes messages with custom retry settings.""" | ||
| # [START pubsub_publisher_retry_settings] | ||
|
|
@@ -244,7 +294,7 @@ def publish_messages_with_retry_settings(project_id, topic_id): | |
| topic_path = publisher.topic_path(project_id, topic_id) | ||
|
|
||
| for n in range(1, 10): | ||
| data = "Message number {}".format(n) | ||
| data = f"Message number {n}" | ||
| # Data must be a bytestring | ||
| data = data.encode("utf-8") | ||
| future = publisher.publish(topic=topic_path, data=data, retry=custom_retry) | ||
|
|
@@ -365,7 +415,8 @@ def detach_subscription(project_id, subscription_id): | |
|
|
||
| if __name__ == "__main__": | ||
| parser = argparse.ArgumentParser( | ||
| description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, | ||
| description=__doc__, | ||
| formatter_class=argparse.RawDescriptionHelpFormatter, | ||
| ) | ||
| parser.add_argument("project_id", help="Your Google Cloud project ID") | ||
|
|
||
|
|
@@ -388,7 +439,8 @@ def detach_subscription(project_id, subscription_id): | |
| publish_with_custom_attributes_parser.add_argument("topic_id") | ||
|
|
||
| publish_with_error_handler_parser = subparsers.add_parser( | ||
| "publish-with-error-handler", help=publish_messages_with_error_handler.__doc__, | ||
| "publish-with-error-handler", | ||
| help=publish_messages_with_error_handler.__doc__, | ||
| ) | ||
| publish_with_error_handler_parser.add_argument("topic_id") | ||
|
|
||
|
|
@@ -398,14 +450,21 @@ def detach_subscription(project_id, subscription_id): | |
| ) | ||
| publish_with_batch_settings_parser.add_argument("topic_id") | ||
|
|
||
| publish_with_flow_control_settings_parser = subparsers.add_parser( | ||
| "publish-with-flow-control", | ||
| help=publish_messages_with_flow_control_settings.__doc__, | ||
| ) | ||
| publish_with_flow_control_settings_parser.add_argument("topic_id") | ||
|
|
||
| publish_with_retry_settings_parser = subparsers.add_parser( | ||
| "publish-with-retry-settings", | ||
| help=publish_messages_with_retry_settings.__doc__, | ||
| ) | ||
| publish_with_retry_settings_parser.add_argument("topic_id") | ||
|
|
||
| publish_with_ordering_keys_parser = subparsers.add_parser( | ||
| "publish-with-ordering-keys", help=publish_with_ordering_keys.__doc__, | ||
| "publish-with-ordering-keys", | ||
| help=publish_with_ordering_keys.__doc__, | ||
| ) | ||
| publish_with_ordering_keys_parser.add_argument("topic_id") | ||
|
|
||
|
|
@@ -416,7 +475,8 @@ def detach_subscription(project_id, subscription_id): | |
| resume_publish_with_ordering_keys_parser.add_argument("topic_id") | ||
|
|
||
| detach_subscription_parser = subparsers.add_parser( | ||
| "detach-subscription", help=detach_subscription.__doc__, | ||
| "detach-subscription", | ||
| help=detach_subscription.__doc__, | ||
| ) | ||
| detach_subscription_parser.add_argument("subscription_id") | ||
|
|
||
|
|
@@ -436,6 +496,8 @@ def detach_subscription(project_id, subscription_id): | |
| publish_messages_with_error_handler(args.project_id, args.topic_id) | ||
| elif args.command == "publish-with-batch-settings": | ||
| publish_messages_with_batch_settings(args.project_id, args.topic_id) | ||
| elif args.command == "publish-with-flow-control": | ||
| publish_messages_with_flow_control_settings(args.project_id, args.topic_id) | ||
| elif args.command == "publish-with-retry-settings": | ||
| publish_messages_with_retry_settings(args.project_id, args.topic_id) | ||
| elif args.command == "publish-with-ordering-keys": | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Waiting only 100ms is likely to result in deadline exceeded failures. It's best not to set a deadline or if necessary, set a deadline closer to 60s.