Create a streaming pipeline using a Dataflow template

This quickstart shows you how to create a streaming pipeline using a Google-provided Dataflow template. Specifically, this quickstart uses the Pub/Sub to BigQuery template as an example.

The Pub/Sub to BigQuery template is a streaming pipeline that can read JSON-formatted messages from a Pub/Sub topic and write them to a BigQuery table.


To follow step-by-step guidance for this task directly in the Google Cloud console, click Guide me:

Guide me


Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Enable the APIs

  8. Create a Cloud Storage bucket:
    1. In the Google Cloud console, go to the Cloud Storage Buckets page.

      Go to Buckets page

    2. Click Create bucket.
    3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
      • For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
      • For Choose where to store your data, do the following:
        • Select a Location type option.
        • Select a Location option.
      • For Choose a default storage class for your data, select the following: Standard.
      • For Choose how to control access to objects, select an Access control option.
      • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
    4. Click Create.
  9. Copy the following, as you need them in a later section:
    • Your Cloud Storage bucket name.
    • Your Google Cloud project ID.

      To find this ID, see Identifying projects.
  10. To complete the steps in this quickstart, your user account must have the Dataflow Admin role and the Service Account User role. The Compute Engine default service account must have the Dataflow Worker role, the Storage Object Admin role, the Pub/Sub Editor role, the BigQuery Data Editor role, and the Viewer role. To add the required roles in the Google Cloud console:

    1. Go to the IAM page and select your project.
      Go to IAM
    2. In the row containing your user account, click Edit principal. Click Add another role, and add the following roles: Dataflow Admin and Service Account User.
    3. Click Save.
    4. In the row containing the Compute Engine default service account (PROJECT_NUMBER[email protected]), click Edit principal.
    5. Click Add another role, and add the following roles: Dataflow Worker, Storage Object Admin, Pub/Sub Editor, BigQuery Data Editor, Viewer.
    6. Click Save.

      For more information about granting roles, see Grant an IAM role by using the console.

  11. By default, each new project starts with a default network. If the default network for your project is disabled or was deleted, you need to have a network in your project for which your user account has the Compute Network User role (roles/compute.networkUser).

Create a BigQuery dataset and table

Create a BigQuery dataset and table with the appropriate schema for your Pub/Sub topic using the Google Cloud console.

In this example, the name of the dataset is taxirides and the name of the table is realtime. To create this dataset and table, follow these steps:

  1. Go to the BigQuery page.
    Go to BigQuery
  2. In the Explorer panel, next to the project where you want to create the dataset, click View actions, and then click Create dataset.
  3. On the Create dataset panel, follow these steps:
    1. For Dataset ID, enter taxirides. Dataset IDs are unique for each Google Cloud project.
    2. For Location type, choose Multi-region, and then select US (multiple regions in United States). Public datasets are stored in the US multi-region location. For simplicity, place your dataset in the same location.
    3. Leave the other default settings, and then click Create dataset
  4. In the Explorer panel, expand your project.
  5. Next to your taxirides dataset, click View actions, and then click Create table.
  6. On the Create table panel, follow these steps:
    1. In the Source section, for Create table from, select Empty table.
    2. In the Destination section, for Table, enter realtime.
    3. In the Schema section, click the Edit as text toggle and paste the following schema definition into the box:
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. In the Partition and cluster settings section, for Partitioning, select the timestamp field.
  7. Leave the other default settings in place and click Create table.

Run the pipeline

Run a streaming pipeline using the Google-provided Pub/Sub to BigQuery template. The pipeline gets incoming data from the input topic.

  1. Go to the Dataflow Jobs page.
    Go to Jobs
  2. Click Create job from template.
  3. Enter taxi-data as the Job name for your Dataflow job.
  4. For Dataflow template, select the Pub/Sub to BigQuery template.
  5. For BigQuery output table, enter the following:
    PROJECT_ID:taxirides.realtime

    Replace PROJECT_ID with the project ID of the project where you created your BigQuery dataset.

  6. In the section Optional source parameters, for Input Pub/Sub topic, click Enter topic manually.
  7. In the dialog, for Topic name enter the following, and then click Save:
    projects/pubsub-public-data/topics/taxirides-realtime

    This publicly available Pub/Sub topic is based on the NYC Taxi & Limousine Commission's open dataset. The following is a sample message from this topic, in the JSON format:

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  8. For Temp location, enter the following:
    gs://BUCKET_NAME/temp/

    Replace BUCKET_NAME with the name of your Cloud Storage bucket. The temp folder stores temporary files, like the staged pipeline job.

  9. If your project does not have a default network, enter a Network and a Subnetwork. For more information, see Specify a network and subnetwork.
  10. Click Run job.

View your results

To view the data written to your realtime table, follow these steps:
  1. Go to the BigQuery page.

    Go to BigQuery

  2. Click Compose a new query. A new Editor tab opens.

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    Replace PROJECT_ID with the project ID of the project where you created your BigQuery dataset. It can take up to five minutes for data to start appearing in your table.

  3. Click Run.

    The query returns rows that have been added to your table in the past 24 hours. You can also run queries using standard SQL.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used on this page, follow these steps.

Delete the project

The easiest way to eliminate billing is to delete the Google Cloud project that you created for the quickstart.
  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete the individual resources

If you want to keep the Google Cloud project that you used in this quickstart, then delete the individual resources:

  1. Go to the Dataflow Jobs page.
    Go to Jobs
  2. Select your streaming job from the job list.
  3. In the navigation, click Stop.
  4. In the Stop job dialog, either cancel or drain your pipeline, and then click Stop job.
  5. Go to the BigQuery page.
    Go to BigQuery
  6. In the Explorer panel, expand your project.
  7. Next to the dataset you want to delete, click View actions, and then click Open.
  8. In the details panel, click Delete dataset, and then follow the instructions.
  9. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  10. Click the checkbox for the bucket that you want to delete.
  11. To delete the bucket, click Delete, and then follow the instructions.

What's next