Skip to content

building data pipelines to collect air quality information and maybe other cool stuff

Notifications You must be signed in to change notification settings

Light2Dark/quality-of-life

Repository files navigation

Malaysia Weather Data Pipeline

Weather data is collected from weather stations throughout Malaysia. Government or third party providers collect this data. Unfortunately, these datasets are not publicly accessible and are error-prone. This project aims to create an automated end-to-end data pipeline and a dashboard on Malaysian weather.

Questions that can be answered:

  • How has our environment changed over the past 10 years?
  • Which are the best places to live in terms of weather?
  • Are there any patterns in air quality, temperature and humidity according to the time of day?

Features

  • Dashboard: Looker Studio Report
  • Weather data from 1996-Present*
  • Fresh daily data, publicly available. Refer to analyzing the data
  • Daily workflow that can be observed through Prefect Cloud. GitHub Actions, Prefect & dbt sends an email if the workflow has failed at any stage.

Architecture

image

  • Orchestration: GitHub Actions, Prefect Cloud, dbt Cloud
  • Infrastructure: GitHub Actions, GCP, Terraform
  • Data Extraction: Python
  • Data Transformation & Load: dbt, Pandas
  • Data Lake & Warehouse: Google Cloud Storage, BigQuery
  • Data Visualization: Google Looker Studio

Data Transformation:

Both pandas and dbt is used to clean, transform and model the data

Schema

image

Clustering: TODO recommended here.

Partitioning: It may be more efficient to partition by states / city. However BigQuery does not allow partitions by String fields. Some workarounds exist that could help as suggested in Medium Article by Guillaume. More consideration is needed for now.

Normalization: Some tables are denormalized to speed up queries for the dashboard. Eg, the city field is duplicated many times. Data integrity remains intact however as we use dbt to form the tables and ensure the downstream tables always follow the upstream tables.

Data Sources

The Weather data is proprietary and unfortunately this code is not reproducible without the API key. Credits to Weather Underground for the data. Contact me for more details regarding this.

The air quality data is extracted from the government website APIMS Table.

Historical air quality data: Big thanks to YoungShung's API Project for this data. You can donwload the data from that repo and give him a star too!

Dashboard

Access the dashboard here: Looker Studio Report

Analyzing the data

You can obtain the datasets in 2 ways.

  1. Use BigQuery/SQL/Kaggle to get the dataset
  2. Request the csv files from me at ([email protected]). The files are too large and there are various tables so we can have a discussion on what sort of data you'd like. I'd love to help you out!

This project uses BigQuery as a Data Warehouse, so you can use SQL to query data. All the tables in the prod dataset is public. You can star the dataset by doing the following:

Star the dataset quality-of-life-364309

Example SQL Statements

  SELECT *
  FROM `quality-of-life-364309.prod.full_weather_places`
  LIMIT 1000

# Other available datasets
`quality-of-life-364309.prod.air_quality`
`quality-of-life-364309.prod.uv`
`quality-of-life-364309.prod.weather`
`quality-of-life-364309.prod.personal_weather`

You can play around with BigQuery SQL using Kaggle. A sample notebook: Shahmir's AQ Kaggle Notebook

Additional Features

Tests: Some transformation is done in Python and dbt. Several tests are done after running to ensure the data processed is as intended.

GitHub Actions: Before merging into main, a CI/CD pipeline checks to see if the unittests work.

Adding new weather/aq stations

Edit the following files:

  • dbt/seeds/state_locations.csv
  • dbt/seeds/city_places.csv
  • dbt/seeds/city_states.csv Make sure the the identifying_location is unique, you can refer to the pipelines/etl/utils/util_weather.py for geocoding new locations.

Installation

Python 3 is required for this project. Additionally, the entire project runs daily on the Cloud. Thus, the following accounts are needed:

  1. Setup your environment (RealPython's tutorial)
  git clone <url>
  cd <project-name>

  python -m venv venv     # create a virtual environment
  source venv/bin/activate    # activate the virtual environment

  pip install -r requirements.txt   # installing dependencies
  1. Download the service_account_json_file from GCP and store the json file in the root of this project. Follow service_account_file_download
  2. Fill in the .env.example file and rename it to .env. Do not remove the # symbols!
## Prefect Config
PREFECT_API_ACCOUNT_ID=<PREFECT_API_ACCOUNT_ID>
PREFECT_API_WORKSPACE_ID=<PREFECT_API_WORKSPACE_ID>
PREFECT_API_KEY=<PREFECT_API_KEY>
#
## Prefect Blocks
#
## GCP Config
PROJECT_ID=<PROJECT_ID>
REGION=<REGION>
GCP_CREDENTIALS_FILEPATH=<GCP_SERVICE_ACCOUNT_FILENAME>
#
## Weather API
WEATHER_API=<WEATHER_API_KEY>
#
## GitHub Email Config (Work in Progress)
SMTP=<smtp+starttls://user:password@server:port>
# GitHub Blocks (Optional)
GITHUB_REPO=<GITHUB_REPO>
GITHUB_BRANCH=<GITHUB_BRANCH>
GITHUB_BLOCK=<GITHUB_BLOCK>
#
  • Refer to the profile section of Prefect for the API Key and Account ID.
  • Refer to the GCP Console for the project id. You can choose a region for the storage of buckets and dataset ([Regions and Zones in GCP][https://cloud.google.com/compute/docs/regions-zones])
  • For the weather API key, contact me if you require one.

Next, fill in the terraform.tfvars.example file in the infra folder and rename it to terraform.tfvars.

google_credentials_file = "../GCP_SERVICE_ACCOUNT_FILENAME"
project_id = "PROJECT_ID"
  1. Setup the infrastructure In your terminal, from the root folder of this project, run the following command
# This will create the GCP resources (buckets + bigquery dataset), create the prefect connection and blocks.
bash setup_infra.sh
  1. You are ready to run the main elt pipeline. Run the following command to extract air quality, weather and personal weather stations data from 2020-01-01 to 2020-01-02 using 1 process only. This will load the data into the dev dataset in BigQuery.
python main.py \
  --testing \
  --air_quality \
  --weather \
  --personal_weather \
  --start_date=20241001 \
  --end_date=20241002 \
  --time=0000 \
  --parallel=1
  1. Setup dbt. Firstly, modify the dbt/profile_template.yml file with your own project details. Change the dataset to prod if your data is there.
fixed:
  dataset: dev
  job_execution_timeout_seconds: 300
  job_retries: 1
  keyfile: <PATH_TO_GCP_CREDENTIALS_JSON_FILE>
  location: <REGION>
  method: service-account
  priority: interactive
  project: <PROJECT_ID>
  type: bigquery
prompts:
  user:
    type: string
    hint: [email protected]
  threads:
    hint: "number of threads"
    type: int
    default: 4
  1. Run dbt.
cd dbt
dbt init    # answer the prompts
dbt deps
dbt seed
dbt run
dbt test    # run tests against data models

Add addresses

workbook.ipynb has some scripts to automate new locations identification

Contributing

Contributions are always welcome!

Improvements (To-Do):

  • It might be good to partition and cluster based on certain attributes to provide long term scalability
  • Check for more outliers in the data and report them
  • The repo alive checker is not working, causing missed pipelines :')

Credits

Thank you to everyone who made the Data Engineering Zoomcamp.

About

building data pipelines to collect air quality information and maybe other cool stuff

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published