...

From Configuration to Orchestration: Building an ETL Workflow with AWS Is No Longer a Struggle


to lead the cloud industry with a whopping 32% share due to its early market entry, robust technology and comprehensive service offerings. However, many users find AWS challenging to navigate, and this discontentment lead more companies and organisations to prefer its competitors Microsoft Azure and Google Cloud Platform.

Despite its steeper learning curve and less intuitive interface, AWS remains the top cloud service due to its reliability, hybrid cloud and maximum service options. More importantly, the selection of proper strategies can significantly reduce configuration complexity, streamline workflows, and boost performance.

In this article, I’ll introduce an efficient way to set up a complete ETL pipeline with orchestration on AWS, based on my own experience. It will also give you a refreshed view on the production of data with AWS or make you feel less struggling when conducting configuration if this is your first time to use AWS for certain tasks.

Strategy for Designing an Efficient Data Pipeline

AWS has the most comprehensive ecosystem with its vast services. To build a production-ready data warehouse on AWS at least requires the following services:

  • IAM – Although this service isn’t included into any part of the workflow, it’s the foundation for accessing all other services.
  • AWS S3 – Data Lake storage
  • AWS Glue – ETL processing
  • Amazon Redshift – Data Warehouse
  • CloudWatch – Monitoring and logging

You also need access to Airflow if you have to schedule more complex dependencies and conduct advanced retries in terms of error handling although Redshift can handle some basic cron jobs.

To make your work easier, I highly recommend to install an IDE (Visual Studio Code or PyCharm and of course you can choose your own favourite IDE). An IDE dramatically improves your efficiency for complex python code, local testing/debugging, version control integration and team collaboration. And in the next session, I’ll provide step by step configurations.

Initial Setup

Here are the steps of initial configurations:

  • Launch a virtual environment in your IDE
  • Install dependencies – basically, we need to install the libraries that will be used later on.
pip install apache-airflow==2.7.0 boto3 pandas pyspark sqlalchemy
  • Install AWS CLI – this step allows you to write scripts to automate various AWS operations and makes the management of AWS resources more efficiently.
  • AWS Configuration – make sure to enter these IAM user credentials when prompted:
    • AWS Access Key ID: From your IAM user.
    • AWS Secret Access Key: From your IAM user.
    • Default region: us-east-1 (or your preferred region)
    • Default output format: json.
  • Integrate Airflow – here are the steps:
    • Initialize Airflow
    • Create DAG files in Airflow
    • Run the web server at http://localhost:8080 (login:admin/admin)
    • Open another terminal tab and start the scheduler
export AIRFLOW_HOME=$(pwd)/airflow
airflow db init
airflow users create \
  --username admin \
  --password admin \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email [email protected]
#Initialize Airflow
airflow webserver --port 8080 ##run the webserver
airflow scheduler #start the scheduler

Development Workflow: COVID-19 Data Case Study

I’m using JHU’s public COVID-19 dataset (CC BY 4.0 licensed) for demonstration purpose. You can refer to data here,

The chart below shows the workflow from data ingestion to data loading to Redshift tables in the development environment.

Development workflow created by author

Data Ingestion

In the first step of data ingestion to AWS S3, I processed data by melting them to long format and converting the date format. I saved the data in the parquet format to improve the storage efficiency, enhance query performance and reduce storage costs. The code for this step is as below:

import pandas as pd
from datetime import datetime
import os
import boto3
import sys

def process_covid_data():
    try:
        # Load raw data
        url = "https://github.com/CSSEGISandData/COVID-19/raw/master/archived_data/archived_time_series/time_series_19-covid-Confirmed_archived_0325.csv"
        df = pd.read_csv(url)
        
        # --- Data Processing ---
        # 1. Melt to long format
        df = df.melt(
            id_vars=['Province/State', 'Country/Region', 'Lat', 'Long'], 
            var_name='date_str',
            value_name='confirmed_cases'
        )
        
        # 2. Convert dates (JHU format: MM/DD/YY)
        df['date'] = pd.to_datetime(
            df['date_str'], 
            format='%m/%d/%y',
            errors='coerce'
        ).dropna()
        
        # 3. Save as partitioned Parquet
        output_dir = "covid_processed"
        df.to_parquet(
            output_dir,
            engine='pyarrow',
            compression='snappy',
            partition_cols=['date']
        )
        
        # 4. Upload to S3
        s3 = boto3.client('s3')
        total_files = 0
        
        for root, _, files in os.walk(output_dir):
            for file in files:
                local_path = os.path.join(root, file)
                s3_path = os.path.join(
                    'raw/covid/',
                    os.path.relpath(local_path, output_dir)
                )
                s3.upload_file(
                    Filename=local_path,
                    Bucket='my-dev-bucket',
                    Key=s3_path
                )
            total_files += len(files)
        
        print(f"Successfully processed and uploaded {total_files} Parquet files")
        print(f"Data covers from {df['date'].min()} to {df['date'].max()}")
        return True

    except Exception as e:
        print(f"Error: {str(e)}", file=sys.stderr)
        return False

if __name__ == "__main__":
    process_covid_data()

After running the python code, you should be able to see the parquet files in the S3 buckets, under the folder of ‘raw/covid/’.

Screenshot by author

ETL Pipeline Development

AWS Glue is mainly used for ETL Pipeline Development. Although it can also be used for data ingestion even if the data hasn’t loaded to S3, its strength lies in processing data once it’s in S3 for data warehousing purposes. Here’s PySpark scripts for data transform:

# transform_covid.py
from awsglue.context import GlueContext
from pyspark.sql.functions import *

glueContext = GlueContext(SparkContext.getOrCreate())
df = glueContext.create_dynamic_frame.from_options(
    "s3",
    {"paths": ["s3://my-dev-bucket/raw/covid/"]},
    format="parquet"
).toDF()

# Add transformations here
df_transformed = df.withColumn("load_date", current_date())

# Write to processed zone
df_transformed.write.parquet(
    "s3://my-dev-bucket/processed/covid/",
    mode="overwrite"
)
Screenshot by author

The next step is to load data to Redshift. In Redshift Console, click on “Query Editor Q2” on the left side and you can edit your SQL code and finish the Redshift COPY.

# Create a table covid_data in dev schema
CREATE TABLE dev.covid_data (
    "Province/State" VARCHAR(100),  
    "Country/Region" VARCHAR(100),
    "Lat" FLOAT8,
    "Long" FLOAT8,
    date_str VARCHAR(100),
    confirmed_cases FLOAT8  
)
DISTKEY("Country/Region")   
SORTKEY(date_str);
# COPY data to redshift
COPY dev.covid_data (
    "Province/State",
    "Country/Region",
    "Lat",
    "Long",
    date_str,
    confirmed_cases
)
FROM 's3://my-dev-bucket/processed/covid/'
IAM_ROLE 'arn:aws:iam::your-account-id:role/RedshiftLoadRole'
REGION 'your-region'
FORMAT PARQUET;

Then you’ll see the data successfully uploaded to the data warehouse.

Screenshot by author

Pipeline Automation

The easiest way to automate your data pipeline is to schedule jobs under Redshift query editor v2 by creating a Stored Procedure (I have a more detailed introduction about SQL Stored Procedure, you can refer to this article).

CREATE OR REPLACE PROCEDURE dev.run_covid_etl()
AS $$
BEGIN
  TRUNCATE TABLE dev.covid_data;
  COPY dev.covid_data 
  FROM 's3://simba-dev-bucket/raw/covid'
  IAM_ROLE 'arn:aws:iam::your-account-id:role/RedshiftLoadRole'
  REGION 'your-region'
  FORMAT PARQUET;
END;
$$ LANGUAGE plpgsql;
Screenshot by author

Alternatively, you can run Airflow for scheduled jobs.

from datetime import datetime
from airflow import DAG
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 2
}

with DAG(
    'redshift_etl_dev',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
) as dag:

    run_etl = RedshiftSQLOperator(
        task_id='run_covid_etl',
        redshift_conn_id='redshift_dev',
        sql='CALL dev.run_covid_etl()',
    )

Production Workflow

Airflow DAG is powerful to orchestrates your entire ETL pipeline if there are many dependencies and it’s also a good practice in production environment.

After developing and testing your ETL pipeline, you can automate your tasks in production environment using Airflow.

Production workflow created by author

Here are the check list of key preparation steps to help the successful deployment in Airflow:

  • Create S3 bucket my-prod-bucket 
  • Create Glue job prod_covid_transformation in AWS Console
  • Create Redshift Stored Procedure prod.load_covid_data()
  • Configure Airflow
  • Configure SMTP for emails in airflow.cfg

Then the deployment of the data pipeline in Airflow is:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from airflow.operators.email import EmailOperator

# 1. DAG CONFIGURATION
default_args = {
    'owner': 'data_team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2023, 1, 1)
}

# 2. DATA INGESTION FUNCTION
def load_covid_data():
    import pandas as pd
    import boto3
    
    url = "https://github.com/CSSEGISandData/COVID-19/raw/master/archived_data/archived_time_series/time_series_19-covid-Confirmed_archived_0325.csv"
    df = pd.read_csv(url)

    df = df.melt(
        id_vars=['Province/State', 'Country/Region', 'Lat', 'Long'], 
        var_name='date_str',
        value_name='confirmed_cases'
    )
    df['date'] = pd.to_datetime(df['date_str'], format='%m/%d/%y')
    
    df.to_parquet(
        's3://my-prod-bucket/raw/covid/',
        engine='pyarrow',
        partition_cols=['date']
    )

# 3. DAG DEFINITION
with DAG(
    'covid_etl',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
) as dag:

    # Task 1: Ingest Data
    ingest = PythonOperator(
        task_id='ingest_data',
        python_callable=load_covid_data
    )

    # Task 2: Transform with Glue
    transform = GlueJobOperator(
        task_id='transform_data',
        job_name='prod_covid_transformation',
        script_args={
            '--input_path': 's3://my-prod-bucket/raw/covid/',
            '--output_path': 's3://my-prod-bucket/processed/covid/'
        }
    )

    # Task 3: Load to Redshift
    load = RedshiftSQLOperator(
        task_id='load_data',
        sql="CALL prod.load_covid_data()"
    )

    # Task 4: Notifications
    notify = EmailOperator(
        task_id='send_email',
        to='you-email-address',
        subject='ETL Status: {{ ds }}',
        html_content='ETL job completed: View Logs'
    )

My Final Thoughts

Although some users, especially those who are new to the cloud and seeking simple solutions tend to be daunted by AWS’s high barrier to entry and be overwhelmed by the massive choices of services, it’s worth the time and efforts and here are the reasons:

  • The process of configuration, and the designing, building and testing of the data pipelines gives you the deep understanding of a typical data engineering workflow. The skills will benefit you even if you produce your projects with other cloud services, such as Azure, GCP and Alibaba Cloud.
  • The mature ecosystem that AWS has and a vast array of services that it offers enable users to customise their data architecture strategies and enjoy more flexibility and scalability in their projects.

Thank you for reading! Hope this article helpful to build your cloud-base data pipeline!

Source link

#Configuration #Orchestration #Building #ETL #Workflow #AWS #Longer #Struggle