Skip to content

iMadeThem/aws-glue-study

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

15 Commits
 
 
 
 
 
 
 
 

Repository files navigation

https://s3.console.aws.amazon.com/s3/buckets/crawler-public-ap-northeast-1/flight/

s3://crawler-public-ap-northeast-1/flight/2016/csv/

Steps:

1. Prepare environment

1-0. Fetch source code

git clone [email protected]:komushi/aws-glue-study.git

1-1. Create S3 bucket and upload CloudFormation configuration files

Login to AWS management console then:

  • Service -> S3 -> Create Bucket
  • Create a folder named as 'cfn':
<your_workshop_bucket>/
  +-- cfn
  • Upload "iam.yaml" / "sg.yaml" / "vpc.yaml" files under "cfn" folder under repository cloned in 1-0
  • Copy full path of each files, saving for later use

1-2. Edit master.yaml file

  • Open "master.yaml" under "cfn" with proper editor
  • Replace URL links ended with "iam.yaml" / "sg.yaml" / "vpc.yaml" with their couterparts copied above and save

1-3. Use CloudFormation to prepare environment

  • Service -> CloudFormation -> Create new task
  • Select template -> Upload template to Amazon S3
  • Browse and select "master.yaml", which is just edited and saved.
  • Stack name: aws-glue-study
  • Input proper UserName and UserPassword (Blank value will cause creation failure!!)
  • Check on: I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  • Wait until status changed to "CREATE_COMPLETE"

You will see stacks like:

aws-glue-study-SecurityGroupStack-XXXXXXXXXXXXX [NESTED]
aws-glue-study-IAMStack-MBEUULSULYPQ [NESTED]
aws-glue-study-VPCStack-GR86W4ABT8H1 [NESTED]
aws-glue-study

2. Play with the original data

2-1. Crawl the original data by creating a table - flight_data.csv - by the AWS Glue Crawlers GUI

  • Service -> AWS Glue -> Crawlers -> Add Crawler
  • Crawler name: crawl-public-flight-2016-csv
  • Data store: s3://crawler-public-ap-northeast-1/flight/2016/csv/
  • Choose an existing IAM role: AWSGlueServiceRole created by CloudFormation:
aws-glue-study-IAMStack-XXXXXX-AWSGlueServiceRole-XXXXXXXXXXXX
  • Frequency: Run on demand
  • Output Database: flight_data
  • Run script and wait until finish

2-2. Query the data at Athena

  • Service -> Athena -> Execute query
select count(1) from csv

3. Import into private s3 bucket as csv

3-0. Prepare Data sink by creating folders

Under bucket created in 1-1, create two more folders as below

<your_workshop_bucket>/
  +-- cfn
  +-- scripts    # new
  +-- data       # new

3-1. Run ETL Job into private csv data on S3 by the AWS Glue Jobs GUI

  • Service -> AWS Glue -> Jobs -> Add job
  • Name: job_import_csv
  • IAM Role: AWSGlueServiceRole created by CloudFormation:
aws-glue-study-IAMStack-XXXXXX-AWSGlueServiceRole-XXXXXXXXXXXX
  • Script: Use the proposed script by AWS Glue:
  ◉ A proposed script generated by AWS Glue
  ◉ Python
  • Script file name: job_import_csv
  • S3 path where the script is stored
  s3://<your_workshop_bucket>/scripts               # Created at 3-0
  • Data source: csv - create at Step 2-1
  • Data target: Create a gzip-compressed CSV table in your own S3 table
 ◉ Create tables in your data target
    Data store: Amazon S3
    Format: CSV
    Compression type: gzip
    Target path: s3://<your_workshop_bucket>/data/   # Created at 3-0
  • Save job and edit scripts
  • Run job

3-2. Crawl to create a table - flight_data.private_csv

  • Crawlers -> Add crawler
  • Crawler name: crawl-private-flight-2016-csv
  • Data store: choose the folder specified at 3-1 as data target
Data store: S3
Crawl data in
  ◉ Specified path in my account
  Include path: s3://<your_workshop_bucket>/data/    # data generated at 3-1
  • Choose an existing IAM role: AWSGlueServiceRole created by CloudFormation
aws-glue-study-IAMStack-XXXXXX-AWSGlueServiceRole-XXXXXXXXXXXX
  • Frequency: Run on demand
  • Output Database: flight_data <- Create the database if not created yet
  • Prefix: private_
  • Finish and run it now, wait until finish

3-3. Query the data

  • Service -> Athena -> Query editor
select count(1) from private_data

4. ETL to Parquet

4-0. Prepare parquet folders

  • Service -> S3 -> <your_workshop_bucket> -> Create folder -> flight_parquet
<your_workshop_bucket>/
  +-- cfn
  +-- scripts
  +-- data
  +-- flight_parquet    # new

4-1. Run ETL Job to convert flight_data.flight_csv into private parquet data on S3 - by glue crawler GUI, only data for January

  • Service -> AWS Glue -> ETL -> Jobs -> Add job
  • Name: conver-to-parquet
  • IAM Role: AWSGlueServiceRole created by CloudFormation:
aws-glue-study-IAMStack-XXXXXX-AWSGlueServiceRole-XXXXXXXXXXXX
  • Script: Use the proposed script by AWS Glue:
This job runs
  ◉ A proposed script generated by AWS Glue
  ◉ Python
  • Script file name: conver-to-parquet
  • Store script in the "scripts" folder that is created at 3-0:
S3 path where the script is stored:
s3://<your_workshop_bucket>/scripts/
  • Data source: private_data:
 ◉ private_data
  • Data target: parquet in S3:
Data store: Amazon S3
Format: Parquet
Target path: s3://<your_workshop_bucket>/flight_parquet/
  • Save job and run it until finish

4-2. Crawl to create a table - flight_data.flight_parquet

  • Crawlers -> Add crawler
  • Crawler name: crawl-private-parquet
  • Data store: S3
  • Crawl data generated at 4-1:
Crawl data in:
◉ Specified path in my account
  Include path: s3://<your_workshop_bucket>/flight_parquet/
  • IAM Role: AWSGlueServiceRole created by CloudFormation:
aws-glue-study-IAMStack-XXXXXX-AWSGlueServiceRole-XXXXXXXXXXXX
  • Frequency: Run on demand
  • Database: flight_data
  • Finish and run it now, wait until finish

4-3. Query the data

  • Service -> Athena -> Execute query
select count(1) from flight_parquet

5. Setup managed Zeppelin notebook envrironment

5-0. Create Development Eendpoint

  • Service -> AWS Glue -> Dev endpoints -> Add endpoint
    Development endpoint name: type as you want
    IAM Role: chooose existing Glue IAM role which was created by Cfn
    Networking: choose "Choose a VPC, subnet, and security groups". Then specify appropriate subnet/security group accordingly
    Public key contents: choose "Create SSH Key". Then download private-key file (.pem) to your local PC
  • Press Finish

5-1. Create Zeppelin notebook server

  • Dev endpoints -> Action -> Create notebook server
  • Input notebook parameters as below:
  CloudFormation stack name: aws-glue-[your-name-and-date]
  IAM Role: choose the one which name includes "AWSGlueNotebookRule"
  EC2 key pair: use any of existing or just create a new one
  SSH private key: press Upload button to choose the private-key.pem
  Notebook username: admin
  Notebook password: type as you want
  • Leave all other parameters as default and press Finish
  • Service -> CloudFormation -> Stack
  • Check if the Zeppelin notebook server is being created, and wait until finished (it may takes a few minutes)

5-2. Connect to the Zeppelin

  • Service -> AWS Glue -> Dev endpoints -> myDevEndpoint
  • Check if your new instance is now listed under Notebook servers pane
  • Click Notebook Server URL to connect to Zeppelin notebook server via Web browser
  User: admin
  Password: password which you used

6. Develop custom ETL script using Zeppelin notebook

6-0. Login to Zeppelin notebook

  • Wait until notebook EC2 instance is ready (Can be checked under EC2 instances)
  • Open the link at the bottom of "myDevEndpoint" detail page Open endpoint in new tab
  • "Your connection is not secure" may be displayed with Firefox browser
Advanced -> Add Exception... -> Confirm Secure Exception

Add security exception

  • Login (Top right corner of page) -> input username / password set in #5-2 -> Logged in -> Create new note Login to notebook Login Choose create new note Create new note

6-1. Extract data and quick explore

  • Execute code below
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import count, when, expr, col, sum, isnull
from pyspark.sql.functions import countDistinct
from awsglue.dynamicframe import DynamicFrame

glueContext = GlueContext(spark.sparkContext)

job = Job(glueContext)
job.init("delay_analytics_job")

## Read from data source created in #3-2
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "flight_data", table_name = "private_data", transformation_ctx = "datasource0")
datasource0.printSchema()

## For large data sets, try to cache the data will accelerate later execution.
df.cache()

## Convert to standard Spark DataFrame to do trasformation to be continued
df = datasource0.toDF()

## Over view of flight dates and destinations
df.select(countDistinct("fl_date"), countDistinct("dest")).show()

Input and run code Check result

6-2. Transform to get insight of delay statics

  • In order to analyse the arrived delay statics in all days of airports, let us try to aggregate the all the airport on each day, by counting the aireplanes arrived ahead of time, ontime or delayed
## Count on condition
cnt_cond = lambda cond: sum(when(cond, 1).otherwise(0))

## Aggregate flight delay statics
stat_df = df.groupBy("fl_date", "dest").agg(count("fl_date").alias("num_arrive"), cnt_cond(col("arr_delay") < 0).alias("ahead_of_time"), (cnt_cond(col("arr_delay") < 0)*100/count("fl_date")).alias("ahead_of_time_percent"), cnt_cond(col("arr_delay") == 0).alias("ontime"), (cnt_cond(col("arr_delay") == 0)*100/count("fl_date")).alias("ontime_percent"), cnt_cond(col("arr_delay") > 0).alias("delayed"), (cnt_cond(col("arr_delay") > 0)*100/count("fl_date")).alias("delayed_percent"), cnt_cond(isnull(col("arr_delay"))).alias("no_value"), (cnt_cond(isnull(col("arr_delay")))*100/count("fl_date")).alias("no_value_percent")).orderBy('fl_date', ascending=True)

## To see all airports on one day, a little large set should be exported accorting to #6-1
stat_df.show(400)

Check result

6-3. Export transformed data to data sinks in kinds of formats for Loading to data consumers.

  • Service -> S3 -> "fight2016csv" -> Add folder -> "results"
  • Convert DataFrame back to DynamicFrame for exporting transformed data to data sink, such as S3 bucket.
  • Try to change the format to the one you want to export
stat_dyf = DynamicFrame.fromDF(stat_df, glueContext, "stat_dyf")
## Case 1: Write to CSV
## datasink1 = glueContext.write_dynamic_frame.from_options(frame = stat_dyf, connection_type = "s3", connection_options = {"path": "s3://fight2016csv/results/", "compression": "gzip"}, format = "csv", transformation_ctx = "datasink1")
## Case 2: Write to parquet
#datasink2 = glueContext.write_dynamic_frame.from_options(frame = stat_dyf, connection_type = "s3", connection_options = {"path": "s3://fight2016csv/results/"}, format = "parquet", transformation_ctx = "datasink2")
## Case 3: Write to orc
datasink3 = glueContext.write_dynamic_frame.from_options(frame = stat_dyf, connection_type = "s3", connection_options = {"path": "s3://fight2016csv/results/"}, format = "orc", transformation_ctx = "datasink3")
job.commit()
  • Go to S3 to check under results folder, files should be here.
  • The transformed data can be loaded to kinds of data consumers like QuickSight, Athena, etc.

7. Create a Glue job to do the ETL job automatically

  • Service -> AWS Glue -> Jobs -> Add job
  • Name: job_delay_stat_etl
  • IAM Role: AWSGlueServiceRole created by CloudFormation:
aws-glue-study-IAMStack-XXXXXX-AWSGlueServiceRole-XXXXXXXXXXXX
  • Script: Use the proposed script by AWS Glue:
  ◉ A proposed script generated by AWS Glue
  ◉ Python
  • Script file name: job_delay_stat_etl
  • S3 path where the script is stored
  s3://<your_workshop_bucket>/scripts/               # Created at 3-0
  • Data source: private_data
  • Data target: Create a gzip-compressed CSV table in your own S3 table
 ◉ Create tables in your data target
    Data store: Amazon S3
    Format: Avro
    Target path: s3://<your_workshop_bucket>/results/   # Created at 6-3
  • Map the source columns to target columns > Delete all the maps except "fl_date"
  • Save job and edit scripts
  1. Delete auto generated code in red rectangle Delete code
  2. Add necessary code developed in 6. as follows
...

from pyspark.sql.functions import count, when, expr, col, sum, isnull
from pyspark.sql.functions import countDistinct
from awsglue.dynamicframe import DynamicFrame

...

## Convert to standard Spark DataFrame to do trasformation to be continued
df = datasource0.toDF()

## For large data sets, try to cache the data will accelerate later execution.
df.cache()

## Count on condition
cnt_cond = lambda cond: sum(when(cond, 1).otherwise(0))

## Aggregate flight delay statics
stat_df = df.groupBy("fl_date", "dest").agg(count("fl_date").alias("num_arrive"), cnt_cond(col("arr_delay") < 0).alias("ahead_of_time"), (cnt_cond(col("arr_delay") < 0)*100/count("fl_date")).alias("ahead_of_time_percent"), cnt_cond(col("arr_delay") == 0).alias("ontime"), (cnt_cond(col("arr_delay") == 0)*100/count("fl_date")).alias("ontime_percent"), cnt_cond(col("arr_delay") > 0).alias("delayed"), (cnt_cond(col("arr_delay") > 0)*100/count("fl_date")).alias("delayed_percent"), cnt_cond(isnull(col("arr_delay"))).alias("no_value"), (cnt_cond(isnull(col("arr_delay")))*100/count("fl_date")).alias("no_value_percent")).orderBy('fl_date', ascending=True)

stat_dyf = DynamicFrame.fromDF(stat_df, glueContext, "stat_dyf")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = stat_dyf, connection_type = "s3", connection_options = {"path": "s3://fight2016csv/results/"}, format = "avro", transformation_ctx = "datasink4")
job.commit()

Add code

  • Save and Run job

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors