Posted on Mon 21 March 2016 under Databases

A Billion Taxi Rides on Amazon EMR running Spark

This blog post will cover how I took a billion+ records containing six years of taxi ride metadata in New York City and analysed them using Spark SQL on Amazon EMR. I stored the data on S3 instead of HDFS so that I could launch EMR clusters only when I need them while only paying a few dollars a month to permanently store the data on S3.

The links in this blog post pointing to the AWS console use the eu-west-1 region. This may or may not be the most suitable location for you. You can switch the region by clicking the drop down that reads "Ireland" in the top right of the console.

AWS CLI Up and Running

All the following commands were run on a fresh install of Ubuntu 14.04.3.

To start, I'll install the AWS CLI tool and a few dependencies it needs to run.

$ sudo apt update
$ sudo apt install \
    python-pip \

EMR's CLI endpoint gets a fair amount of maintenance so ensure you're running at least version 1.10.14 of the AWS CLI tool.

$ virtualenv amazon
$ source amazon/bin/activate
$ pip install --upgrade \

The following will capture AWS credentials used by the AWS CLI tool.


Launching the Spark Cluster on EMR

The following will launch a 5-node cluster, there will be one master node, 2 core nodes and a 2 task nodes. I'll only install Hive and Spark on this cluster.

If you haven't launched an EMR cluster before then I suggest doing so via the AWS Console. Make sure you turn on advanced options so you can pick spot instances instead of on-demand instances for your core and task nodes. When you use the EMR wizard it'll create the roles and security groups needed for your cluster nodes to communicate with one another.

Before you run the command below generate a key pair called "emr". Download and store the emr.pem file in the ~/.ssh/ folder on your system. This file is the SSH private key that lets you access the Hadoop cluster nodes via SSH.

Below you will need to change a few items before executing the create-cluster command:

  • If the key name you generated was called anything other than emr then change the KeyName attribute.
  • The InstanceProfile is set to the name the wizard automatically generated. If you haven't used the wizard before then make sure this value matches your role.
  • The --region and AvailabilityZone parameters are set to eu-west-1a, change this if this region isn't the most appropriate for you. This region should match the region where you've stored the denormalised CSV data on S3.
  • The log-uri parameter's bucket name needs to be changed to another bucket name that hasn't already been taken.
  • The BidPrice amounts are in US Dollars and were appropriate at the time I was bidding for them. Please see the current spot prices for m3.xlarge instances in your region and adjust these accordingly. There is a pricing history button on the EC2 Spot Requests page.
  • You might want to consider adding more task nodes as these are the only nodes doing any serious work when you're queries are executing via Spark SQL.
  • EMR release 4.3.0 is being used. Amazon released 4.4.0 on March 14th and I haven't yet had a chance to try it out. Consider using the latest EMR release and keep an eye on Jeff Barr's EMR blog posts where he announces new releases.
$ aws emr create-cluster \
    --applications \
      Name=Hive \
      Name=Spark \
    --ec2-attributes '{
        "KeyName": "emr",
        "InstanceProfile": "EMR_EC2_DefaultRole",
        "AvailabilityZone": "eu-west-1a",
        "EmrManagedSlaveSecurityGroup": "sg-89cd3eff",
        "EmrManagedMasterSecurityGroup": "sg-d4cc3fa2"
    }' \
    --release-label emr-4.3.0 \
    --service-role EMR_DefaultRole \
    --log-uri 's3n://aws-logs-591231097547-eu-west-1/elasticmapreduce/' \
    --name 'A Billion Taxi Trips' \
    --instance-groups '[{
        "InstanceCount": 2,
        "BidPrice": "0.048",
        "InstanceGroupType": "CORE",
        "InstanceType": "m3.xlarge",
        "Name": "Core instance group - 2"
    }, {
        "InstanceCount": 2,
        "BidPrice": "0.048",
        "InstanceGroupType": "TASK",
        "InstanceType": "m3.xlarge",
        "Name": "Task instance group - 3"
    }, {
        "InstanceCount": 1,
        "InstanceGroupType": "MASTER",
        "InstanceType": "m3.xlarge",
        "Name": "Master instance group - 1"
    }]' \
    --region eu-west-1

Once that has executed make sure you add access for your IP address on port 22 to the ElasticMapReduce-master security group so that you can SSH into the master node.

The cluster can take 20 - 30 minutes to finish provisioning and bootstrapping all of the nodes so keep an eye on its status in the EMR console.

Creating Tables in the Hive Metastore

To access your master node change the EC2 hostname below to the hostname of your master node.

$ ssh -i ~/.ssh/emr.pem \
    [email protected]

The first command to run will fix the permissions on the hive logs folder.

$ sudo su -c 'mkdir -p /var/log/hive/user/hadoop &&
              touch /var/log/hive/user/hadoop/hive.log &&
              chown -R hadoop /var/log/hive/user/hadoop'

In my Billion Taxi Rides in Redshift blog post I took the raw taxi trip metadata and created denormalised CSV files that were gzip compressed.

In my Billion Taxi Rides on Amazon EMR running Presto blog post I turned that CSV data into ORC files and stored them on S3. The following shows that data as it currently lives on S3.

$ aws s3 ls <s3_bucket>/orc/
2016-03-14 13:54:41  398631347 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000000
2016-03-14 13:54:40  393489828 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000001
2016-03-14 13:54:40  339863956 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000002
2016-03-14 14:00:26  266048453 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000189
2016-03-14 14:00:23  265076102 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000190
2016-03-14 14:00:12  115110402 1dcb5447-87d5-4418-b7fb-455e4e39e5f6-000191

Spark SQL doesn't interact with ORC files as fast as it can interact with Parquet files so below I'll create tables representing the data in both ORC and Parquet format and then copy the data from the ORC table into the Parquet table.

The tables are created with the EXTERNAL attribute which means they don't expect to take full control of the files they represent. If the trips_orc table was declared without the EXTERNAL attribute it would wipe out any existing data in the s3://<s3_bucket>/orc/ folder.

But with the EXTERNAL attribute the trips_orc table will discover the existing files in the s3://<s3_bucket>/orc/ folder and the trips_parquet table will start to store data in the s3://<s3_bucket>/parquet/ folder even though I haven't created that folder beforehand.

$ hive
    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
  LOCATION 's3://<s3_bucket>/orc/';

    trip_id                 INT,
    vendor_id               STRING,
    pickup_datetime         TIMESTAMP,
    dropoff_datetime        TIMESTAMP,
    store_and_fwd_flag      STRING,
    rate_code_id            SMALLINT,
    pickup_longitude        DOUBLE,
    pickup_latitude         DOUBLE,
    dropoff_longitude       DOUBLE,
    dropoff_latitude        DOUBLE,
    passenger_count         SMALLINT,
    trip_distance           DOUBLE,
    fare_amount             DOUBLE,
    extra                   DOUBLE,
    mta_tax                 DOUBLE,
    tip_amount              DOUBLE,
    tolls_amount            DOUBLE,
    ehail_fee               DOUBLE,
    improvement_surcharge   DOUBLE,
    total_amount            DOUBLE,
    payment_type            STRING,
    trip_type               SMALLINT,
    pickup                  STRING,
    dropoff                 STRING,

    cab_type                STRING,

    precipitation           SMALLINT,
    snow_depth              SMALLINT,
    snowfall                SMALLINT,
    max_temperature         SMALLINT,
    min_temperature         SMALLINT,
    average_wind_speed      SMALLINT,

    pickup_nyct2010_gid     SMALLINT,
    pickup_ctlabel          STRING,
    pickup_borocode         SMALLINT,
    pickup_boroname         STRING,
    pickup_ct2010           STRING,
    pickup_boroct2010       STRING,
    pickup_cdeligibil       STRING,
    pickup_ntacode          STRING,
    pickup_ntaname          STRING,
    pickup_puma             STRING,

    dropoff_nyct2010_gid    SMALLINT,
    dropoff_ctlabel         STRING,
    dropoff_borocode        SMALLINT,
    dropoff_boroname        STRING,
    dropoff_ct2010          STRING,
    dropoff_boroct2010      STRING,
    dropoff_cdeligibil      STRING,
    dropoff_ntacode         STRING,
    dropoff_ntaname         STRING,
    dropoff_puma            STRING
) STORED AS parquet
  LOCATION 's3://<s3_bucket>/parquet/';

1.1 Billion Trips in Parquet Format

The following will copy all the data in the trips_orc table into the trips_parquet. This process should take around 2 hours but could be quicker if you add more task nodes to your cluster.

$ screen
$ echo "INSERT INTO TABLE trips_parquet
        SELECT * FROM trips_orc;" | hive

Benchmarking Queries in Spark SQL

Make sure Spark is using at least 8 GB of memory when you launch it. The fourth query executed in the benchmark will run out of heap space with the amount of memory specified in the stock EMR 4.3.0 configuration settings.

$ export SPARK_MEM

The following will launch the Spark SQL CLI interface.

$ spark-sql

I was surprised how quickly Spark SQL could query the data. Unlike Presto, which could use both core and task nodes to complete queries, Spark only used the task nodes to do all the heavy lifting.

Though Spark can do analytics its main strengths seem to be in running machine learning code, graph analysis and other recursive and memory-hungry workloads. So it might not be fair comparing results of the Presto benchmarks I did in the Presto on EMR blog post. Nonetheless it's interesting to see such differences in performance.

The following completed in 4 minutes and 24 seconds (3.5x slower than Presto).

SELECT cab_type,
FROM trips_parquet
GROUP BY cab_type;

The following completed in 5 minutes and 13 seconds (5x slower than Presto).

SELECT passenger_count,
FROM trips_parquet
GROUP BY passenger_count;

The following completed in 10 minutes and 20 seconds (7.5x slower than Presto).

SELECT passenger_count,
FROM trips_parquet
GROUP BY passenger_count,

The following completed in 16 minutes and 1 second (13x slower than Presto).

SELECT passenger_count,
       year(pickup_datetime) trip_year,
       count(*) trips
FROM trips_parquet
GROUP BY passenger_count,
ORDER BY trip_year,
         trips desc;

Spark's ORC performance leaves a lot to be desired. I ran the first two benchmark queries on the trips_orc table and got back results that took 7 - 8x longer to return then their Parquet counterparts. $0.485 / hour for a Spark cluster might not be a lot of money to pay but paying $0.485 for a query that takes the better part of an hour to complete feels like a lot. I can see how ORC is Presto's favourite file format and how Parquet is Spark's favourite.

The following completed in 33 minutes and 50 seconds (27x slower than Presto).

SELECT cab_type,
FROM trips_orc
GROUP BY cab_type;

The following completed in 44 minutes and 29 seconds (40x slower than Presto).

SELECT passenger_count,
FROM trips_orc
GROUP BY passenger_count;

I'm interested in learning of any optimisations that can bring Spark's analytical performance in line with Presto's. Please drop me a line if you've got any configuration settings to share.

Thank you for taking the time to read this post. I offer both consulting and hands-on development services to clients in North America and Europe. If you'd like to discuss how my offerings can help your business please contact me via LinkedIn.

