https://s3.console.aws.amazon.com/s3/buckets/crawler-public-ap-northeast-1/flight/
s3://crawler-public-ap-northeast-1/flight/2016/csv/
Steps:
git clone [email protected]:komushi/aws-glue-study.git
Login to AWS management console then:
- Service -> S3 -> Create Bucket
- Create a folder named as 'cfn':
<your_workshop_bucket>/
+-- cfn
- Upload "iam.yaml" / "sg.yaml" / "vpc.yaml" files under "cfn" folder under repository cloned in 1-0
- Copy full path of each files, saving for later use
- Open "master.yaml" under "cfn" with proper editor
- Replace URL links ended with "iam.yaml" / "sg.yaml" / "vpc.yaml" with their couterparts copied above and save
- Service -> CloudFormation -> Create new task
- Select template -> Upload template to Amazon S3
- Browse and select "master.yaml", which is just edited and saved.
- Stack name: aws-glue-study
- Input proper UserName and UserPassword (Blank value will cause creation failure!!)
- Check on: I acknowledge that AWS CloudFormation might create IAM resources with custom names.
- Wait until status changed to "CREATE_COMPLETE"
You will see stacks like:
aws-glue-study-SecurityGroupStack-XXXXXXXXXXXXX [NESTED]
aws-glue-study-IAMStack-MBEUULSULYPQ [NESTED]
aws-glue-study-VPCStack-GR86W4ABT8H1 [NESTED]
aws-glue-study
- Service -> AWS Glue -> Crawlers -> Add Crawler
- Crawler name: crawl-public-flight-2016-csv
- Data store: s3://crawler-public-ap-northeast-1/flight/2016/csv/
- Choose an existing IAM role: AWSGlueServiceRole created by CloudFormation:
aws-glue-study-IAMStack-XXXXXX-AWSGlueServiceRole-XXXXXXXXXXXX
- Frequency: Run on demand
- Output Database: flight_data
- Run script and wait until finish
- Service -> Athena -> Execute query
select count(1) from csv
Under bucket created in 1-1, create two more folders as below
<your_workshop_bucket>/
+-- cfn
+-- scripts # new
+-- data # new
- Service -> AWS Glue -> Jobs -> Add job
- Name: job_import_csv
- IAM Role: AWSGlueServiceRole created by CloudFormation:
aws-glue-study-IAMStack-XXXXXX-AWSGlueServiceRole-XXXXXXXXXXXX
- Script: Use the proposed script by AWS Glue:
◉ A proposed script generated by AWS Glue
◉ Python
- Script file name: job_import_csv
- S3 path where the script is stored
s3://<your_workshop_bucket>/scripts # Created at 3-0
- Data source: csv - create at Step 2-1
- Data target: Create a gzip-compressed CSV table in your own S3 table
◉ Create tables in your data target
Data store: Amazon S3
Format: CSV
Compression type: gzip
Target path: s3://<your_workshop_bucket>/data/ # Created at 3-0
- Save job and edit scripts
- Run job
- Crawlers -> Add crawler
- Crawler name: crawl-private-flight-2016-csv
- Data store: choose the folder specified at 3-1 as data target
Data store: S3
Crawl data in
◉ Specified path in my account
Include path: s3://<your_workshop_bucket>/data/ # data generated at 3-1
- Choose an existing IAM role: AWSGlueServiceRole created by CloudFormation
aws-glue-study-IAMStack-XXXXXX-AWSGlueServiceRole-XXXXXXXXXXXX
- Frequency: Run on demand
- Output Database: flight_data <- Create the database if not created yet
- Prefix: private_
- Finish and run it now, wait until finish
- Service -> Athena -> Query editor
select count(1) from private_data
- Service -> S3 -> <your_workshop_bucket> -> Create folder -> flight_parquet
<your_workshop_bucket>/
+-- cfn
+-- scripts
+-- data
+-- flight_parquet # new
4-1. Run ETL Job to convert flight_data.flight_csv into private parquet data on S3 - by glue crawler GUI, only data for January
- Service -> AWS Glue -> ETL -> Jobs -> Add job
- Name: conver-to-parquet
- IAM Role: AWSGlueServiceRole created by CloudFormation:
aws-glue-study-IAMStack-XXXXXX-AWSGlueServiceRole-XXXXXXXXXXXX
- Script: Use the proposed script by AWS Glue:
This job runs
◉ A proposed script generated by AWS Glue
◉ Python
- Script file name: conver-to-parquet
- Store script in the "scripts" folder that is created at 3-0:
S3 path where the script is stored:
s3://<your_workshop_bucket>/scripts/
- Data source: private_data:
◉ private_data
- Data target: parquet in S3:
Data store: Amazon S3
Format: Parquet
Target path: s3://<your_workshop_bucket>/flight_parquet/
- Save job and run it until finish
- Crawlers -> Add crawler
- Crawler name: crawl-private-parquet
- Data store: S3
- Crawl data generated at 4-1:
Crawl data in:
◉ Specified path in my account
Include path: s3://<your_workshop_bucket>/flight_parquet/
- IAM Role: AWSGlueServiceRole created by CloudFormation:
aws-glue-study-IAMStack-XXXXXX-AWSGlueServiceRole-XXXXXXXXXXXX
- Frequency: Run on demand
- Database: flight_data
- Finish and run it now, wait until finish
- Service -> Athena -> Execute query
select count(1) from flight_parquet
- Service -> AWS Glue -> Dev endpoints -> Add endpoint
Development endpoint name: type as you want
IAM Role: chooose existing Glue IAM role which was created by Cfn
Networking: choose "Choose a VPC, subnet, and security groups". Then specify appropriate subnet/security group accordingly
Public key contents: choose "Create SSH Key". Then download private-key file (.pem) to your local PC
- Press Finish
- Dev endpoints -> Action -> Create notebook server
- Input notebook parameters as below:
CloudFormation stack name: aws-glue-[your-name-and-date]
IAM Role: choose the one which name includes "AWSGlueNotebookRule"
EC2 key pair: use any of existing or just create a new one
SSH private key: press Upload button to choose the private-key.pem
Notebook username: admin
Notebook password: type as you want
- Leave all other parameters as default and press Finish
- Service -> CloudFormation -> Stack
- Check if the Zeppelin notebook server is being created, and wait until finished (it may takes a few minutes)
- Service -> AWS Glue -> Dev endpoints -> myDevEndpoint
- Check if your new instance is now listed under Notebook servers pane
- Click Notebook Server URL to connect to Zeppelin notebook server via Web browser
User: admin
Password: password which you used
- Wait until notebook EC2 instance is ready (Can be checked under EC2 instances)
- Open the link at the bottom of "myDevEndpoint" detail page

- "Your connection is not secure" may be displayed with Firefox browser
Advanced -> Add Exception... -> Confirm Secure Exception
- Login (Top right corner of page) -> input username / password set in #5-2 -> Logged in -> Create new note

- Execute code below
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import count, when, expr, col, sum, isnull
from pyspark.sql.functions import countDistinct
from awsglue.dynamicframe import DynamicFrame
glueContext = GlueContext(spark.sparkContext)
job = Job(glueContext)
job.init("delay_analytics_job")
## Read from data source created in #3-2
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "flight_data", table_name = "private_data", transformation_ctx = "datasource0")
datasource0.printSchema()
## For large data sets, try to cache the data will accelerate later execution.
df.cache()
## Convert to standard Spark DataFrame to do trasformation to be continued
df = datasource0.toDF()
## Over view of flight dates and destinations
df.select(countDistinct("fl_date"), countDistinct("dest")).show()
- In order to analyse the arrived delay statics in all days of airports, let us try to aggregate the all the airport on each day, by counting the aireplanes arrived ahead of time, ontime or delayed
## Count on condition
cnt_cond = lambda cond: sum(when(cond, 1).otherwise(0))
## Aggregate flight delay statics
stat_df = df.groupBy("fl_date", "dest").agg(count("fl_date").alias("num_arrive"), cnt_cond(col("arr_delay") < 0).alias("ahead_of_time"), (cnt_cond(col("arr_delay") < 0)*100/count("fl_date")).alias("ahead_of_time_percent"), cnt_cond(col("arr_delay") == 0).alias("ontime"), (cnt_cond(col("arr_delay") == 0)*100/count("fl_date")).alias("ontime_percent"), cnt_cond(col("arr_delay") > 0).alias("delayed"), (cnt_cond(col("arr_delay") > 0)*100/count("fl_date")).alias("delayed_percent"), cnt_cond(isnull(col("arr_delay"))).alias("no_value"), (cnt_cond(isnull(col("arr_delay")))*100/count("fl_date")).alias("no_value_percent")).orderBy('fl_date', ascending=True)
## To see all airports on one day, a little large set should be exported accorting to #6-1
stat_df.show(400)
- Service -> S3 -> "fight2016csv" -> Add folder -> "results"
- Convert DataFrame back to DynamicFrame for exporting transformed data to data sink, such as S3 bucket.
- Try to change the format to the one you want to export
stat_dyf = DynamicFrame.fromDF(stat_df, glueContext, "stat_dyf")
## Case 1: Write to CSV
## datasink1 = glueContext.write_dynamic_frame.from_options(frame = stat_dyf, connection_type = "s3", connection_options = {"path": "s3://fight2016csv/results/", "compression": "gzip"}, format = "csv", transformation_ctx = "datasink1")
## Case 2: Write to parquet
#datasink2 = glueContext.write_dynamic_frame.from_options(frame = stat_dyf, connection_type = "s3", connection_options = {"path": "s3://fight2016csv/results/"}, format = "parquet", transformation_ctx = "datasink2")
## Case 3: Write to orc
datasink3 = glueContext.write_dynamic_frame.from_options(frame = stat_dyf, connection_type = "s3", connection_options = {"path": "s3://fight2016csv/results/"}, format = "orc", transformation_ctx = "datasink3")
job.commit()
- Go to S3 to check under results folder, files should be here.
- The transformed data can be loaded to kinds of data consumers like QuickSight, Athena, etc.
- Service -> AWS Glue -> Jobs -> Add job
- Name: job_delay_stat_etl
- IAM Role: AWSGlueServiceRole created by CloudFormation:
aws-glue-study-IAMStack-XXXXXX-AWSGlueServiceRole-XXXXXXXXXXXX
- Script: Use the proposed script by AWS Glue:
◉ A proposed script generated by AWS Glue
◉ Python
- Script file name: job_delay_stat_etl
- S3 path where the script is stored
s3://<your_workshop_bucket>/scripts/ # Created at 3-0
- Data source: private_data
- Data target: Create a gzip-compressed CSV table in your own S3 table
◉ Create tables in your data target
Data store: Amazon S3
Format: Avro
Target path: s3://<your_workshop_bucket>/results/ # Created at 6-3
- Map the source columns to target columns > Delete all the maps except "fl_date"
- Save job and edit scripts
...
from pyspark.sql.functions import count, when, expr, col, sum, isnull
from pyspark.sql.functions import countDistinct
from awsglue.dynamicframe import DynamicFrame
...
## Convert to standard Spark DataFrame to do trasformation to be continued
df = datasource0.toDF()
## For large data sets, try to cache the data will accelerate later execution.
df.cache()
## Count on condition
cnt_cond = lambda cond: sum(when(cond, 1).otherwise(0))
## Aggregate flight delay statics
stat_df = df.groupBy("fl_date", "dest").agg(count("fl_date").alias("num_arrive"), cnt_cond(col("arr_delay") < 0).alias("ahead_of_time"), (cnt_cond(col("arr_delay") < 0)*100/count("fl_date")).alias("ahead_of_time_percent"), cnt_cond(col("arr_delay") == 0).alias("ontime"), (cnt_cond(col("arr_delay") == 0)*100/count("fl_date")).alias("ontime_percent"), cnt_cond(col("arr_delay") > 0).alias("delayed"), (cnt_cond(col("arr_delay") > 0)*100/count("fl_date")).alias("delayed_percent"), cnt_cond(isnull(col("arr_delay"))).alias("no_value"), (cnt_cond(isnull(col("arr_delay")))*100/count("fl_date")).alias("no_value_percent")).orderBy('fl_date', ascending=True)
stat_dyf = DynamicFrame.fromDF(stat_df, glueContext, "stat_dyf")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = stat_dyf, connection_type = "s3", connection_options = {"path": "s3://fight2016csv/results/"}, format = "avro", transformation_ctx = "datasink4")
job.commit()
- Save and Run job





