Azure Blob Storage file transfer using Mage Pro’s dynamic blocks

First published on October 31, 2024

 

9 minute read

Cole Freeman

TLDR

Companies are always looking for better ways to manage and process their data. In this blog post, we’ll explore a proof of concept developed for a leading beauty brand, demonstrating how Mage Pro can turn complicated data tasks into smooth operations. Whether you’re a data engineer, analyst, or simply interested in data management, this article will offer valuable insights into building effective data pipelines using Mage Pro, Azure Blob Storage, and Snowflake.

Table of contents

  1. Introduction

  2. Understanding the challenge

  3. Solution overview

  4. Step-by-step implementation

  5. Loading data from Azure Blob Storage

  6. Splitting data into chunks

  7. Transforming data in chunks

  8. Combining chunks while maintaining order

  9. Exporting data to Snowflake

  10. Conclusion

Introduction

Recently, Mage was asked to help develop a proof of concept for a global leader in the beauty product industry. This company processes a substantial amount of data daily, and maintaining their data is crucial to maintaining their competitive edge. Their challenge was to extract a static file saved in blob storage and move it to a data warehouse. Data from the file needed to be processed in chunks, and the data needed to be loaded into the warehouse in the exact order it was extracted. This use case is one of the most common use cases in data engineering.

Mage Pro offers a suite of tools that simplify data ingestion, transformation, and export processes. One of those tools used in this process are dynamic blocks. By leveraging features like dynamic blocks in Mage Pro, organizations can build scalable and maintainable data pipelines without dealing with underlying infrastructure issues present in Mage’s open source project.

Understanding the challenge

The global leader in beauty products needed to verify Mage Pro by verifying the data pipeline tool could handle the following:

  1. Ingest Data

    : Retrieve files from Azure Blob Storage.

  2. Process in Chunks

    : Split large files into manageable chunks for efficient processing.

  3. Maintain Order

    : Ensure that the original order of data is preserved throughout processing.

  4. Export Data

    : Send the processed data to a generic exporter, such as Snowflake, maintaining data integrity and order.

This POC aimed to demonstrate Mage Pro’s capability to handle these requirements.

Solution overview

The solution involved building a data pipeline with the following steps:

  1. Load Data

    : Fetch the file from Azure Blob Storage.

  2. Split Data

    : Divide 20,000 rows into 10 chunks of 2,000 rows each using dynamic blocks.

  3. Transform Data

    : Apply necessary transformations to each chunk.

  4. Export Data

    : Transfer data to Snowflake using Mage Pro’s exporter block in it’s original order.

What are dynamic blocks?

Dynamic blocks in Mage are a special type of block that can create multiple downstream blocks at runtime. This feature allows for incredible flexibility in pipeline design, enabling data engineers to create workflows that adapt to the data they’re processing. The power of dynamic blocks lies in their ability to generate a variable number of blocks based on the output of an upstream block. This means your pipeline can scale and adjust itself depending on the data it receives, without requiring manual intervention or redesign. Dynamic blocks run in parallel, reducing the processing time and improving the efficiency of your data pipelines.

How dynamic blocks work

Let’s break down the mechanics of dynamic blocks:

  • Output Structure

    : A dynamic block must return a list of two lists of dictionaries. The first list contains the data that will be passed to downstream blocks, while the second list contains metadata for each dynamically created block.

  • Downstream Block Creation

    : The number of downstream blocks created is equal to the number of items in the output data multiplied by the number of direct downstream blocks.

  • Data Flow

    : Each dynamically created block receives a portion of the data from the dynamic block, allowing for parallel processing of different data subsets.

  • Metadata

    : The metadata provided by the dynamic block is used to uniquely identify each dynamically created block, ensuring proper data routing and execution.

Step-by-step implementation

Let’s explore each step in more detail:

1. Loading data from Azure Blob Storage

The first step involved fetching data from Azure Blob Storage. Mage Pro connects to an plethora of different data sources through its

data_loader

decorator. Many data loader block templates are available in Mage Pro that make extracting data from different sources more efficient.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

from mage_ai.data_preparation.shared.secrets import get_secret_value
import pandas as pd
from azure.storage.blob import BlobServiceClient
from io import StringIO

@data_loader
def load_data(*args, **kwargs) -> pd.DataFrame:
    """
    Loads data from Azure Blob Storage and returns it as a pandas DataFrame.

    Returns:
        pd.DataFrame: Data loaded from the CSV file in Azure Blob Storage.
    """
    try:
        # Step 1: Connect to Azure Blob Storage
        connection_string = get_secret_value('azure_connection_string')  # Replace with your secret retrieval method
        blob_service_client = BlobServiceClient.from_connection_string(connection_string)

        # Step 2: Access the container and list blobs
        container_name = 'sftpcontainer'  # Replace with your container name
        container_client = blob_service_client.get_container_client(container_name)

        prefix = 'medical'  # Replace with your blob prefix if necessary
        blobs_list = container_client.list_blobs(name_starts_with=prefix)
        blobs = [blob.name for blob in blobs_list]

        if not blobs:
            return pd.DataFrame()  # Return empty DataFrame if no blobs are found

        # Step 3: Select the latest blob
        latest_blob_name = sorted(blobs)[-1]

        # Step 4: Download and read the CSV data
        blob_client = container_client.get_blob_client(latest_blob_name)
        download_stream = blob_client.download_blob()
        blob_data = download_stream.readall().decode('utf-8')

        # Read CSV data into DataFrame
        df = pd.read_csv(StringIO(blob_data))

        return df

    except Exception as e:
        return pd.DataFrame()  # Return empty DataFrame in case of error

Explanation of code:

  • Connection

    : The script connects to Azure Blob Storage using a secure connection string saved in and retrieved from the Mage Pro secrets manager.

  • Blob Selection

    : It accesses a specific container and lists blobs with a given prefix, selecting the latest one based on sorting.

  • Data Loading

    : The selected CSV file is downloaded and read into a pandas DataFrame for further processing.

2. Splitting data into chunks

Processing large datasets can be resource-intensive. To optimize performance, the data is split into smaller chunks using dynamic blocks. Here, 20,000 rows are divided into 10 chunks of 2,000 rows each. To mark the block dynamic click the three dots located in the top right of the pipeline block and click “Set block as dynamic parent.” Setting the block as dynamic will create 10 runs of the data based on a chunk size of 2000. The blocks will run in parallel with each other.

Explanation of code:

  • Chunking Logic

    : The function checks if the DataFrame is empty. If not, it calculates the total number of rows and splits the DataFrame into chunks of 2,000 rows each using pandas’

    iloc

    indexing.

  • Return Structure

    : The function returns a list of DataFrames, each representing a chunk of the original data.

3. Transforming data dynamically

Each data chunk undergoes transformation to enhance or modify its structure. In this POC, a new column

risk_score_2

is added based on the existing

cardiac_risk_score

. With the previous block being checked as dynamic, this block run will be a dynamic child block of the previous. All subsequent blocks will be dynamic child blocks of the original child block until the dynamic block is reduced.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test

from typing import List
import pandas as pd

@transformer
def transform_data(chunk: pd.DataFrame, *args, **kwargs) -> List[pd.DataFrame]:
    """
    Transforms each chunk by adding 'risk_score_2' based on 'cardiac_risk_score'.
    """
    chunk = chunk.copy()
    # Ensure 'cardiac_risk_score' is treated as a string to handle possible non-numeric values
    chunk['cardiac_risk_score'] = chunk['cardiac_risk_score'].astype(str)

    # Apply transformation logic
    def assign_risk_score(row):
        cardiac_risk_score = row['cardiac_risk_score']
        patient_id = row.get('patient_id')

        if cardiac_risk_score:
            try:
                score = float(cardiac_risk_score)
                if score < 5:
                    risk_category = 'High risk'
                elif 5 <= score <= 7:
                    risk_category = 'Medium risk'
                elif score > 7:
                    risk_category = 'Low risk'
                else:
                    risk_category = 'Unknown'

                return risk_category

            except ValueError:
                return 'Invalid score'
        else:
            return 'No score'

    chunk['risk_score_2'] = chunk.apply(assign_risk_score, axis=1)
    return chunk

Explanation of code:

  • Data Copy

    : To prevent altering the original DataFrame, a copy is made.

  • Data Type Handling

    : The

    cardiac_risk_score

    column is converted to a string to handle any non-numeric values gracefully.

  • Transformation Logic

    : A nested function

    assign_risk_score

    categorizes the risk based on the

    cardiac_risk_score

    . It handles various scenarios, including invalid or missing scores.

  • Applying Transformation

    : The transformation is applied to each row using pandas’

    apply

    method, and the transformed chunks are collected into a new list.

4. Exporting data to Snowflake

The final step involves exporting the processed data to Snowflake, a cloud-based data warehousing solution. The data is exported in 10 chunks to the data warehouse, maintaining the original order of the extracted file.

Explanation of code:

  • Configuration

    : The exporter reads configuration settings from an

    io_config.yaml

    file, ensuring secure and flexible connection parameters.

  • Export Logic

    : The

    Snowflake

    loader handles the data export, specifying the target table, database, and schema. The

    if_exists='replace'

    parameter ensures that the table is updated with the new data, replacing any existing entries.

  • Scalability

    : This setup allows for easy scalability and integration with other data warehouses or destinations by modifying the exporter configuration.

Conclusion

This proof of concept for a leading beauty company shows how Mage Pro simplifies the management of complex data pipelines. Thanks to its easy-to-use features like dynamic blocks the company can process large amounts of data efficiently and with minimal hassle.

If you’re aiming to build a similar pipeline or improve your current data processes, Mage Pro provides the tools and flexibility to make it happen. Take advantage of Mage Pro to streamline your data operations and keep things running smoothly.