Use the BigQuery connector with Spark

The spark-bigquery-connector is used with Apache Spark to read and write data from and to BigQuery. This tutorial provides example code that uses the spark-bigquery-connector within a Spark application. For instructions on creating a cluster, see the Dataproc Quickstarts.

Make the connector available to your application

You can make the spark-bigquery-connector available to your application in one of the following ways:

  1. Install the spark-bigquery-connector in the Spark jars directory of every node by using the Dataproc connectors initialization action when you create your cluster.

  2. Provide the connector URI when you submit your job:

    1. Google Cloud console: Use the Spark job Jars files item on the Dataproc Submit a job page.
    2. gcloud CLI: Use the gcloud dataproc jobs submit spark --jars flag.
    3. Dataproc API: Use the SparkJob.jarFileUris field.
  3. Include the jar in your Scala or Java Spark application as a dependency (see Compiling against the connector).

How to specify the connector jar URI

Spark-BigQuery connector versions are listed in the GitHub GoogleCloudDataproc/spark-bigquery-connector repository.

Specify the connector jar by substituting the Scala and connector version information in the following URI string:

gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
  • Use Scala 2.12 with Dataproc image versions 1.5+

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
    

    gcloud CLI example:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
        -- job-args
    
  • Use Scala 2.11 with Dataproc image versions 1.4 and earlier:

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
    

    gcloud CLI example:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
        -- job-args
    

Calculating costs

In this document, you use the following billable components of Google Cloud:

  • Dataproc
  • BigQuery
  • Cloud Storage

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

Reading and writing data from BigQuery

This example reads data from BigQuery into a Spark DataFrame to perform a word count using the standard data source API.

The connector writes the data to BigQuery by first buffering all the data into a Cloud Storage temporary table. Then it copies all data from into BigQuery in one operation. The connector attempts to delete the temporary files once the BigQuery load operation has succeeded and once again when the Spark application terminates. If the job fails, remove any remaining temporary Cloud Storage files. Typically, temporary BigQuery files are located in gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Configuring billing

By default, the project associated with the credentials or service account is billed for API usage. To bill a different project, set the following configuration: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

It can also be added to a read/write operation, as follows: .option("parentProject", "<BILLED-GCP-PROJECT>").

Running the code

Before running this example, create a dataset named "wordcount_dataset" or change the output dataset in the code to an existing BigQuery dataset in your Google Cloud project.

Use the bq command to create the wordcount_dataset:

bq mk wordcount_dataset

Use the Google Cloud CLI command to create a Cloud Storage bucket, which will be used to export to BigQuery:

gcloud storage buckets create gs://[bucket]

Scala

  1. Examine the code and replace the [bucket] placeholder with the Cloud Storage bucket you created earlier.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      (spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache())
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save())
    
    
  2. Run the code on your cluster
    1. Use SSH to connect to the Dataproc cluster master node
      1. Go to the Dataproc Clusters page in the Google Cloud console, then click the name of your cluster
        Dataproc clusters page in the Cloud console.
      2. On the >Cluster details page, select the VM Instances tab. Then, click SSH to the right of the name of the cluster master node
        Dataproc Cluster details page in the Cloud console.

        A browser window opens at your home directory on the master node
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Create wordcount.scala with the pre-installed vi, vim, or nano text editor, then paste in the Scala code from the Scala code listing
      nano wordcount.scala
        
    3. Launch the spark-shell REPL.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Run wordcount.scala with the :load wordcount.scala command to create the BigQuery wordcount_output table. The output listing displays 20 lines from the wordcount output.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      To preview the output table, open the BigQuery page, select the wordcount_output table, and then click Preview.
      Preview table in BigQuery Explorer page in Cloud console.

PySpark

  1. Examine the code and replace the [bucket] placeholder with the Cloud Storage bucket you created earlier.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
    
  2. Run the code on your cluster
    1. Use SSH to connect to the Dataproc cluster master node
      1. Go to the Dataproc Clusters page in the Google Cloud console, then click the name of your cluster
        Clusters page in the Cloud console.
      2. On the Cluster details page, select the VM Instances tab. Then, click SSH to the right of the name of the cluster master node
        Select SSH on cluster name row on Cluster details page in the Cloud console.

        A browser window opens at your home directory on the master node
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Create wordcount.py with the pre-installed vi, vim, or nano text editor, then paste in the PySpark code from the PySpark code listing
      nano wordcount.py
      
    3. Run wordcount with spark-submit to create the BigQuery wordcount_output table. The output listing displays 20 lines from the wordcount output.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      To preview the output table, open the BigQuery page, select the wordcount_output table, and then click Preview.
      Preview table in BigQuery Explorer page in Cloud console.

For more information