Export data to Pub/Sub (reverse ETL)

Exporting data to Pub/Sub requires using BigQuery continuous queries. To enroll in the continuous queries preview, fill out the request form. To give feedback or request support for this feature, contact [email protected].

This document describes how you can set up reverse extract-transform-load (RETL) from BigQuery to Pub/Sub. You can do this by using the EXPORT DATA statement in a continuous query to export data from BigQuery to a Pub/Sub topic.

You can use a RETL workflow to Pub/Sub to combine BigQuery's analytics capabilities with Pub/Sub's asynchronous and scalable global messaging service. This workflow lets you serve data to downstream applications and services in an event-driven manner.

Prerequisites

You must create a service account. A service account is required to run a continuous query that exports results to a Pub/Sub topic.

You must create a Pub/Sub topic to receive the continuous query results as messages, and a Pub/Sub subscription that the target application can use to receive those messages.

Required roles

This section provides information about the roles and permissions required by the user account that creates the continuous query, and the service account that runs the continuous query.

User account permissions

To create a job in BigQuery, the user account must have the bigquery.jobs.create IAM permission. Each of the following IAM roles grants the bigquery.jobs.create permission:

To submit a job that runs using a service account, the user account must have the Service Account User (roles/iam.serviceAccountUser) role. If you are using the same user account to create the service account, then the user account must have the Service Account Admin (roles/iam.serviceAccountAdmin) role. For information on how to limit a user's access to single service account, rather than to all service accounts within a project, see Grant a single role.

If the user account must enable the APIs required for your continuous query use case, the user account must have the Service Usage Admin (roles/serviceusage.serviceUsageAdmin) role.

Service account permissions

To export data from a BigQuery table, the service account must have the bigquery.tables.export IAM permission. Each of the following IAM roles grants the bigquery.tables.export permission:

For the service account to access Pub/Sub, you must grant the service account both of the following IAM roles:

You might also be able to get the required permissions through custom roles.

Before you begin

Enable the BigQuery and Pub/Sub APIs.

Enable the APIs

Export to Pub/Sub

Use the EXPORT DATA statement to export data to a Pub/Sub topic:

Console

  1. In the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  2. In the query editor, click More > Query settings.

  3. In the Continuous Query section, select the Use continuous query mode checkbox.

  4. In the Service account box, select the service account that you created.

  5. Click Save.

  6. In the query editor, enter the following statement:

    EXPORT DATA
    OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID'
    ) AS
    (
    QUERY
    );

    Replace the following:

    • PROJECT_ID: your project ID.
    • TOPIC_ID: the Pub/Sub topic ID. You can get the topic ID from the Topics page of the Google Cloud console.
    • QUERY: the SQL statement to select the data to export. The SQL statement must only contain supported operations.
  7. Click Run.

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. On the command line, run the continuous query by using the bq query command with the following flags:

    • Set the --continuous flag to true to make the query continuous.
    • Use the --connection_property flag to specify a service account to use.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'EXPORT DATA OPTIONS (format = "CLOUD_PUBSUB", uri = "https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID") AS (QUERY);'

    Replace the following:

    • PROJECT_ID: your project ID.
    • SERVICE_ACCOUNT_EMAIL: the service account email. You can get the service account email on the Service accounts page of the Google Cloud console.
    • QUERY: the SQL statement to select the data to export. The SQL statement must only contain supported operations.

API

  1. Run the continuous query by calling the jobs.insert method. Set the following fields in the JobConfigurationQuery resource of the Job resource that you pass in:

    • Set the continuous field to true to make the query continuous.
    • Use the connection_property field to specify a service account to use.
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs'
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID') AS (QUERY);","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    Replace the following:

    • PROJECT_ID: your project ID.
    • QUERY: the SQL statement to select the data to export. The SQL statement must only contain supported operations.
    • SERVICE_ACCOUNT_EMAIL: the service account email. You can get the service account email on the Service accounts page of the Google Cloud console.

Export multiple columns to Pub/Sub

If you want to include multiple columns in your output, you can create a struct column to contain the column values, and then convert the struct value to a JSON string by using the TO_JSON_STRING function. The following example exports data from four columns, formatted as a JSON string:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM `myproject.real_time_taxi_streaming.taxi_rides`
  WHERE ride_status = 'enroute'
);

Export optimization

If your continuous query job performance appears to be limited by available compute resources, try increasing the size of your BigQuery CONTINUOUS slot reservation assignment.

Limitations

  • The exported data must consist of a single STRING or BYTES column. The column name can be whatever you choose.
  • You must use a continuous query to export to Pub/Sub.
  • You can't pass a schema to a Pub/Sub topic in the continuous query.
  • You can't export data to a Pub/Sub topic that uses a schema.
  • You can't export data that contains NULL values. You can exclude NULL values from the query results by including a WHERE message IS NOT NULL filter in the continuous query.
  • Exported data must not exceed Pub/Sub quotas.

Pricing

When you export data in a continuous query, you are billed using BigQuery capacity compute pricing. To run continuous queries, you must have a reservation that uses the Enterprise or Enterprise Plus edition, and a reservation assignment that uses the CONTINUOUS job type.

After the data is exported, you're charged for using Pub/Sub. For more information, see Pub/Sub pricing.