TLDR
Companies are always looking for better ways to manage and process their data. In this blog post, we’ll explore a proof of concept developed for a leading beauty brand, demonstrating how Mage Pro can turn complicated data tasks into smooth operations. Whether you’re a data engineer, analyst, or simply interested in data management, this article will offer valuable insights into building effective data pipelines using Mage Pro, Azure Blob Storage, and Snowflake.
Table of contents
Introduction
Understanding the challenge
Solution overview
Step-by-step implementation
Loading data from Azure Blob Storage
Splitting data into chunks
Transforming data in chunks
Combining chunks while maintaining order
Exporting data to Snowflake
Conclusion
Introduction
Recently, Mage was asked to help develop a proof of concept for a global leader in the beauty product industry. This company processes a substantial amount of data daily, and maintaining their data is crucial to maintaining their competitive edge. Their challenge was to extract a static file saved in blob storage and move it to a data warehouse. Data from the file needed to be processed in chunks, and the data needed to be loaded into the warehouse in the exact order it was extracted. This use case is one of the most common use cases in data engineering.
Mage Pro offers a suite of tools that simplify data ingestion, transformation, and export processes. One of those tools used in this process are dynamic blocks. By leveraging features like dynamic blocks in Mage Pro, organizations can build scalable and maintainable data pipelines without dealing with underlying infrastructure issues present in Mage’s open source project.
Understanding the challenge
The global leader in beauty products needed to verify Mage Pro by verifying the data pipeline tool could handle the following:
Ingest Data
: Retrieve files from Azure Blob Storage.
Process in Chunks
: Split large files into manageable chunks for efficient processing.
Maintain Order
: Ensure that the original order of data is preserved throughout processing.
Export Data
: Send the processed data to a generic exporter, such as Snowflake, maintaining data integrity and order.
This POC aimed to demonstrate Mage Pro’s capability to handle these requirements.
Solution overview
The solution involved building a data pipeline with the following steps:
Load Data
: Fetch the file from Azure Blob Storage.
Split Data
: Divide 20,000 rows into 10 chunks of 2,000 rows each using dynamic blocks.
Transform Data
: Apply necessary transformations to each chunk.
Export Data
: Transfer data to Snowflake using Mage Pro’s exporter block in it’s original order.
What are dynamic blocks?
Dynamic blocks in Mage are a special type of block that can create multiple downstream blocks at runtime. This feature allows for incredible flexibility in pipeline design, enabling data engineers to create workflows that adapt to the data they’re processing. The power of dynamic blocks lies in their ability to generate a variable number of blocks based on the output of an upstream block. This means your pipeline can scale and adjust itself depending on the data it receives, without requiring manual intervention or redesign. Dynamic blocks run in parallel, reducing the processing time and improving the efficiency of your data pipelines.
How dynamic blocks work
Let’s break down the mechanics of dynamic blocks:
Output Structure
: A dynamic block must return a list of two lists of dictionaries. The first list contains the data that will be passed to downstream blocks, while the second list contains metadata for each dynamically created block.
Downstream Block Creation
: The number of downstream blocks created is equal to the number of items in the output data multiplied by the number of direct downstream blocks.
Data Flow
: Each dynamically created block receives a portion of the data from the dynamic block, allowing for parallel processing of different data subsets.
Metadata
: The metadata provided by the dynamic block is used to uniquely identify each dynamically created block, ensuring proper data routing and execution.
Step-by-step implementation
Let’s explore each step in more detail:
1. Loading data from Azure Blob Storage
The first step involved fetching data from Azure Blob Storage. Mage Pro connects to an plethora of different data sources through its
data_loader
decorator. Many data loader block templates are available in Mage Pro that make extracting data from different sources more efficient.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
from mage_ai.data_preparation.shared.secrets import get_secret_value
import pandas as pd
from azure.storage.blob import BlobServiceClient
from io import StringIO
@data_loader
def load_data(*args, **kwargs) -> pd.DataFrame:
"""
Loads data from Azure Blob Storage and returns it as a pandas DataFrame.
Returns:
pd.DataFrame: Data loaded from the CSV file in Azure Blob Storage.
"""
try:
# Step 1: Connect to Azure Blob Storage
connection_string = get_secret_value('azure_connection_string') # Replace with your secret retrieval method
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
# Step 2: Access the container and list blobs
container_name = 'sftpcontainer' # Replace with your container name
container_client = blob_service_client.get_container_client(container_name)
prefix = 'medical' # Replace with your blob prefix if necessary
blobs_list = container_client.list_blobs(name_starts_with=prefix)
blobs = [blob.name for blob in blobs_list]
if not blobs:
return pd.DataFrame() # Return empty DataFrame if no blobs are found
# Step 3: Select the latest blob
latest_blob_name = sorted(blobs)[-1]
# Step 4: Download and read the CSV data
blob_client = container_client.get_blob_client(latest_blob_name)
download_stream = blob_client.download_blob()
blob_data = download_stream.readall().decode('utf-8')
# Read CSV data into DataFrame
df = pd.read_csv(StringIO(blob_data))
return df
except Exception as e:
return pd.DataFrame() # Return empty DataFrame in case of error
Explanation of code:
Connection
: The script connects to Azure Blob Storage using a secure connection string saved in and retrieved from the Mage Pro secrets manager.
Blob Selection
: It accesses a specific container and lists blobs with a given prefix, selecting the latest one based on sorting.
Data Loading
: The selected CSV file is downloaded and read into a pandas DataFrame for further processing.
2. Splitting data into chunks
Processing large datasets can be resource-intensive. To optimize performance, the data is split into smaller chunks using dynamic blocks. Here, 20,000 rows are divided into 10 chunks of 2,000 rows each. To mark the block dynamic click the three dots located in the top right of the pipeline block and click “Set block as dynamic parent.” Setting the block as dynamic will create 10 runs of the data based on a chunk size of 2000. The blocks will run in parallel with each other.
Explanation of code:
Chunking Logic
: The function checks if the DataFrame is empty. If not, it calculates the total number of rows and splits the DataFrame into chunks of 2,000 rows each using pandas’
iloc
indexing.
Return Structure
: The function returns a list of DataFrames, each representing a chunk of the original data.
3. Transforming data dynamically
Each data chunk undergoes transformation to enhance or modify its structure. In this POC, a new column
risk_score_2
is added based on the existing
cardiac_risk_score
. With the previous block being checked as dynamic, this block run will be a dynamic child block of the previous. All subsequent blocks will be dynamic child blocks of the original child block until the dynamic block is reduced.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
from typing import List
import pandas as pd
@transformer
def transform_data(chunk: pd.DataFrame, *args, **kwargs) -> List[pd.DataFrame]:
"""
Transforms each chunk by adding 'risk_score_2' based on 'cardiac_risk_score'.
"""
chunk = chunk.copy()
# Ensure 'cardiac_risk_score' is treated as a string to handle possible non-numeric values
chunk['cardiac_risk_score'] = chunk['cardiac_risk_score'].astype(str)
# Apply transformation logic
def assign_risk_score(row):
cardiac_risk_score = row['cardiac_risk_score']
patient_id = row.get('patient_id')
if cardiac_risk_score:
try:
score = float(cardiac_risk_score)
if score < 5:
risk_category = 'High risk'
elif 5 <= score <= 7:
risk_category = 'Medium risk'
elif score > 7:
risk_category = 'Low risk'
else:
risk_category = 'Unknown'
return risk_category
except ValueError:
return 'Invalid score'
else:
return 'No score'
chunk['risk_score_2'] = chunk.apply(assign_risk_score, axis=1)
return chunk
Explanation of code:
Data Copy
: To prevent altering the original DataFrame, a copy is made.
Data Type Handling
: The
cardiac_risk_score
column is converted to a string to handle any non-numeric values gracefully.
Transformation Logic
: A nested function
assign_risk_score
categorizes the risk based on the
cardiac_risk_score
. It handles various scenarios, including invalid or missing scores.
Applying Transformation
: The transformation is applied to each row using pandas’
apply
method, and the transformed chunks are collected into a new list.
4. Exporting data to Snowflake
The final step involves exporting the processed data to Snowflake, a cloud-based data warehousing solution. The data is exported in 10 chunks to the data warehouse, maintaining the original order of the extracted file.
Explanation of code:
Configuration
: The exporter reads configuration settings from an
io_config.yaml
file, ensuring secure and flexible connection parameters.
Export Logic
: The
Snowflake
loader handles the data export, specifying the target table, database, and schema. The
if_exists='replace'
parameter ensures that the table is updated with the new data, replacing any existing entries.
Scalability
: This setup allows for easy scalability and integration with other data warehouses or destinations by modifying the exporter configuration.
Conclusion
This proof of concept for a leading beauty company shows how Mage Pro simplifies the management of complex data pipelines. Thanks to its easy-to-use features like dynamic blocks the company can process large amounts of data efficiently and with minimal hassle.
If you’re aiming to build a similar pipeline or improve your current data processes, Mage Pro provides the tools and flexibility to make it happen. Take advantage of Mage Pro to streamline your data operations and keep things running smoothly.