Skip to content

Basic Datawarehouse ETL Pipeline

This blueprint illustrates how to create a data curation pipeline for a Data Lakehouse - to curate raw data into dimensions and facts in the datalake and load them into Redshift DataWarehouse Tables.

This blueprint may be suitable when: files are being regularly uploaded to the data lake, and need to be periodically (or nightly) curated and loaded into DataWarehouse tables.

While the blueprint doesn't immediately handle partitioning, or additional transformation, the Glue ETL Job can be easily extended to provide these capabilities.

Glue CSV Parqet Transformer


Usage Instructions

The following instructions assume you have already deployed your Data Lake (possibly using MDAA). If already using MDAA, you can merge these sample blueprint configs into your existing mdaa.yaml.

  1. Deploy sample configurations into the specified directory structure (or obtain from the MDAA repo under sample_blueprints/basic_dwh_etl_pipeline).

  2. Edit the mdaa.yaml to specify an organization name to replace <unique-org-name>. This must be a globally unique name, as it is used in the naming of all deployed resources, some of which are globally named (such as S3 buckets).

  3. Review and Edit the *.yaml files to specify values for atttributes marked in angle brackets <> e.g. <your-glue-catalog-database-name-for-raw-datasets>.

  4. Edit basic_dwh_etl_pipeline/basic_etl_pipeline/src/glue/curate-dataset-*.py to add your processing logic.

  5. Ensure you are authenticated to your target AWS account.

  6. Optionally, run <path_to_mdaa_repo>/bin/mdaa -l ls from the directory containing mdaa.yaml to understand what stacks will be deployed.

  7. Optionally, run <path_to_mdaa_repo>/bin/mdaa -l synth from the directory containing mdaa.yaml and review the produced templates.

  8. Run <path_to_mdaa_repo>/bin/mdaa -l deploy from the directory containing mdaa.yaml to deploy all modules.

  9. Before loading csv files, you will need to provide the generated glue-etl role with access to your datalake bucket(s).

Additional MDAA deployment commands/procedures can be reviewed in DEPLOYMENT.


Configurations

The sample configurations for this blueprint are provided below. They are also available under sample_blueprints/basic_dwh_etl_pipeline whithin the MDAA repo.

Config Directory Structure

basic_dwh_etl_pipeline
│   mdaa.yaml
│   tags.yaml
│
└───gbasic_etl_pipeline
    └───roles.yaml
    └───project.yaml
    └───crawlers.yaml
    └───jobs.yaml
    └───workflow.yaml    

mdaa.yaml

This configuration specifies the global, domain, env, and module configurations required to configure and deploy this sample architecture.

Note - Before deployment, populate the mdaa.yaml with appropriate organization and context values for your environment

# Contents available in mdaa.yaml
# All resources will be deployed to the default region specified in the environment or AWS configurations.
# Can optional specify a specific AWS Region Name.
region: default

# One or more tag files containing tags which will be applied to all deployed resources
tag_configs:
  - ./tags.yaml

## Pre-Deployment Instructions

# TODO: Set an appropriate, unique organization name, likely matching the org name used in other MDAA configs.
# Failure to do so may resulting in global naming conflicts.
organization: <unique-org-name>

# One or more domains may be specified. Domain name will be incorporated by default naming implementation
# to prefix all resource names.
domains:  
  # Where resources may be shared across multiple domains, and domain name of 'shared' may be appropriate.
  # The domain name can be referenced within MDAA CDK App configs via the inline {{domain}} syntax.
  <your-domain-name>:
    # One or more environments may be specified, typically along the lines of 'dev', 'test', and/or 'prod'
    environments:
      # The environment name will be incorporated into resource name by the default naming implementation.
      dev:
        # The target deployment account can be specified per environment.
        # If 'default' or not specified, the account configured in the environment will be assumed.
        account: default
        #TODO: Set context values appropriate to your env
        context:
          # The arn of a role which will be provided admin privileges to dataops resources
          data_admin_role_arn : <your-data-admin-role-arn>
          # The arn of a role which will be provided read-write privileges to dataops resources
          data_engineer_role_arn: <your-data-admin-role-arn>
          # The name of the datalake S3 bucket where the csv files will be uploaded
          datalake_src_bucket_name: <your-src-datalake-bucket-name>
          # The prefix on the datalake S3 bucket where the csv files will be uploaded
          datalake_src_prefix: <your/path/to/csv>
          # The name of the datalake S3 bucket where the parquet files will be written
          datalake_dest_bucket_name: <your-dest-datalake-bucket-name>
          # The prefix on the datalake S3 bucket where the parquet files will be written
          datalake_dest_prefix: <your/path/to/parquet>
          # The arn of the KMS key used to encrypt the datalake bucket
          datalake_kms_arn: <your-datalake-kms-key-arn>
          # The arn of the KMS key used to encrypt the Glue Catalog
          glue_catalog_kms_arn: <your-datalake-kms-key-arn>
          # ID of VPC in which data ops resources will be configured to run
          vpc_id: <your-vpc-id>
          # ID of security group in which DWH database is deployed. Egress rules to this SG will be defined on dataops security group
          database_security_group: <your-dwh-security-group-id>
          # Subnet ID in which glue job connection will be setup
          subnet_1_id: <your-subnet-id>
          # Name of Datalake Bucket that contains raw and curated datasets
          datalake_bucket_name: <your-datalake-bucket-name>
        # The list of modules which will be deployed. A module points to a specific MDAA CDK App, and
        # specifies a deployment configuration file if required.
        modules:
          # This module will create all of the roles required for the GLUE ETL Job
          roles:
            module_path: "@aws-mdaa/roles"
            module_configs:
              - ./basic_etl_pipeline/roles.yaml
          # This module will create DataOps Project resources which can be shared
          # across multiple DataOps modules
          dataops-project:
            module_path: "@aws-mdaa/dataops-project"
            module_configs:
              - ./basic_etl_pipeline/project.yaml
          crawler:
            module_path: "@aws-mdaa/dataops-crawler"
            module_configs:
              - ./basic_etl_pipeline/crawler.yaml
          # This module will create the csv to parquet GLUE ETL Job
          jobs:
            module_path: "@aws-mdaa/dataops-job"
            module_configs:
              - ./basic_etl_pipeline/jobs.yaml
          # This module will create an AWS Glue Workflow which will schedule the csv to parquet GLUE ETL Job
          workflow:
             module_path: "@aws-mdaa/dataops-workflow"
             tag_configs:
               - ./tags.yaml
             module_configs:
               - ./basic_etl_pipeline/workflow.yaml

tags.yaml

This configuration specifies the tags to be applied to all deployed resources.

# Contents available in tags.yaml
tags:
  costcentre: '123456'
  project: data-ecosystem

basic_etl_pipeline/roles.yaml

This configuration will be used by the MDAA Roles module to deploy IAM roles and Managed Policies required for this sample architecture.

# Contents available in roles.yaml
# The list of roles which will be generated
generatePolicies:
  GlueJobPolicy:
    policyDocument:
      Statement:
        - SID: GlueCloudwatch
          Effect: Allow
          Resource:
            - "arn:{{partition}}:logs:{{region}}:{{account}}:log-group:/aws-glue/*"
          Action:
            - logs:CreateLogStream
            - logs:AssociateKmsKey
            - logs:CreateLogGroup
            - logs:PutLogEvents
    suppressions:
      - id: "AwsSolutions-IAM5"
        reason: "Glue log group name not known at deployment time."

generateRoles:
  glue-etl:
    trustedPrincipal: service:glue.amazonaws.com
    # A list of AWS managed policies which will be added to the role
    awsManagedPolicies:
      - service-role/AWSGlueServiceRole
    generatedPolicies:
      - GlueJobPolicy
    suppressions:
      - id: "AwsSolutions-IAM4"
        reason: "AWSGlueServiceRole approved for usage"

basic_etl_pipeline/project.yaml

This configuration will create a DataOps Project which can be used to support a wide variety of data ops activities. Specifically, this configuration will create a number of Glue Catalog databases and apply fine-grained access control to these using basic.

# Contents available in dataops/project.yaml
# Arns for IAM role which will be authoring code within the project
dataEngineerRoles:
  - arn: "{{context:data_engineer_role_arn}}"

# Arns for IAM roles which will be provided to the projects's resources (IE bucket)
dataAdminRoles:
  - arn: "{{context:data_admin_role_arn}}"

projectExecutionRoles:
  - id: generated-role-id:glue-etl

# failure notifications.
# For glue jobs, this includes state changes of "FAILED", "TIMEOUT", and "STOPPED".
# For crawlers, this includes state changes of "Failed".
failureNotifications:
  email:
    - user1@example.com
    - distribution-list@example.com

# A list of security groups which will be created for
# use by various project resources (such as Lambda functions, Glue jobs, etc)
securityGroupConfigs:
  dataops-project-security-group:
    # The id of the VPC on which the SG will be used
    vpcId: "{{context:vpc_id}}"
    # Optional - The list of custom egress rules which will be added to the SG.
    # If not specified, the SG will allow all egress traffic by default.
    securityGroupEgressRules:
      sg:
        - sgId: "{{context:database_security_group}}"
          protocol: TCP
          port: 443

# The ID of the KMS key which will encrypt all S3 outputs of Jobs run under this project
s3OutputKmsKeyArn: "{{context:datalake_kms_arn}}"

# The Arn of the KMS key used to encrypt the Glue Catalog. Specific access to this key
# will be granted to Glue executor roles for the purpose of decrypting
# Glue connections.
glueCatalogKmsKeyArn: "{{context:glue_catalog_kms_arn}}"

# (optional)  Definitions for crawler connections. Referred to by name in the crawler configuration files.
connections:
  # (optional)  Example of a Network Connection which uses the SG produced in the project config
  connectionVpcWithProjectSG:
    connectionType: NETWORK
    description: VPC Connection using DataOps Project Security Group
    physicalConnectionRequirements:
      availabilityZone: "{{region}}a"
      subnetId: {{context:subnet_1_id}}
      projectSecurityGroupNames:
        - dataops-project-security-group


# (Optional) List of Databases to create. Referred to by name in the crawler configuration files.
databases:
  <your-glue-catalog-database-name-for-raw-datasets>:
    description: Database for raw datasets landed from upstream systems
    locationBucketName: {{context:datalake_bucket_name}}
    locationPrefix: <your-s3-directory-prefix-containing-raw-datasets>

  # Condensed DB config
  <your-glue-catalog-database-name-for-curated-datasets>:
    description: Database for curated datasets produced by ETL process
    locationBucketName: {{context:datalake_bucket_name}}
    locationPrefix: <your-s3-directory-prefix-containing-curated-datasets>

basic_etl_pipeline/crawler.yaml

This configuration will create the Glue crawler using the DataOps Glue Crawler module.

# Contents available in dataops/crawler.yaml
# The name of the dataops project this crawler will be created within.
# The dataops project name is the MDAA module name for the project.
projectName: dataops-project
crawlers:
  crawler-a:
    executionRoleArn: generated-role-arn:glue-etl
    # (required) Reference back to the database name in the 'databases:' section of the crawler.yaml
    databaseName: project:databaseName/<your-glue-catalog-database-name-for-raw-datasets>
    # (required) Description of the crawler
    description: Example for a Crawler
    # (required) At least one target definition.  See: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-glue-crawler-targets.html
    targets:
      # (at least one).  S3 Target.  See: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-glue-crawler-s3target.html
      s3Targets:
        - path: s3://{{context:datalake_bucket_name}}/<your-s3-directory-prefix-containing-raw-datasets-to-crawl>
    recrawlBehavior: CRAWL_EVERYTHING
    # Extra crawler configuration options
    extraConfiguration:
      Version: 1
      CrawlerOutput:
        Partitions:
          AddOrUpdateBehavior: InheritFromTable

basic_etl_pipeline/jobs.yaml

This configuration will create the transformation Glue ETL Jobs using the DataOps Glue Job module.

# Contents available in dataops/jobs.yaml
# (required) Name of the Data Ops Project this Job will run within. 
# Resources provided by the Project, such as security configuration, encryption keys, 
# temporary S3 buckets, and execution roles will automatically be wired into the Job config.
# Other resources provided by the project can be optionally referenced 
# by the Job config using a "project:" prefix on the config value.
projectName: dataops-project
templates:
  # An example job template. Can be referenced from other jobs. Will not itself be deployed.
  GlueJobTemplate:
    # (required) the role used to execute the job
    executionRoleArn: generated-role-arn:glue-etl
    # (required) Command definition for the glue job
    command:
      # (required) Either of "glueetl" | "pythonshell"
      name: "glueetl"
      # (optional) Python version.  Either "2" or "3"
      pythonVersion: "3"
    # (required) Description of the Glue Job
    description: Template of a Glue ETL Transforamtion Job
    # (optional) List of connections for the glue job to use.  Reference back to the connection name in the 'connections:' section of the project.yaml
    connections:
      - project:connections/connectionVpcWithProjectSG
    # (optional) key: value pairs for the glue job to use.  see: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
    defaultArguments:
      --job-bookmark-option: job-bookmark-enable
    executionProperty:
      maxConcurrentRuns: 1
    # (optional) Glue version to use as a string.  See: https://docs.aws.amazon.com/glue/latest/dg/release-notes.html
    glueVersion: "4.0"
    # (optional) Maximum retries.  see: MaxRetries section:
    maxRetries: 1
    # (optional) Number of minutes to wait before considering the job timed out
    timeout: 60
    # (optional) Worker type to use.  Any of: "Standard" | "G.1X" | "G.2X"
    # Use maxCapacity or WorkerType.  Not both.
    workerType: "G.2X"
    # Viewing real-time logs provides you with a better perspective on the running job.
    # https://docs.aws.amazon.com/glue/latest/dg/monitor-continuous-logging.html
    continuousLogging:
      # For allowed values, refer https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_logs.RetentionDays.html
      # Possible values are: 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 3653, and 0.
      logGroupRetentionDays: 14

  RedshiftLoadJobTemplate:
    # (required) the role used to execute the job
    executionRoleArn: generated-role-arn:glue-etl
    # (required) Command definition for the glue job
    command:
      # (required) Either of "glueetl" | "pythonshell"
      name: "glueetl"
      # (optional) Python version.  Either "2" or "3"
      pythonVersion: "3"
    # (required) Description of the Glue Job
    description: Template of Redshift load job.
    # (optional) List of connections for the glue job to use.  Reference back to the connection name in the 'connections:' section of the project.yaml
    connections:
      - project:connections/connectionVpcWithProjectSG
    # (optional) key: value pairs for the glue job to use.  see: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
    defaultArguments:
      --job-bookmark-option: job-bookmark-enable
    # (optional) maximum concurrent runs.  See: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-jobs-job.html#aws-glue-api-jobs-job-ExecutionProperty
    executionProperty:
      maxConcurrentRuns: 1
    # (optional) Glue version to use as a string.  See: https://docs.aws.amazon.com/glue/latest/dg/release-notes.html
    glueVersion: "3.0"
    # (optional) Maximum capacity.  See: MaxCapcity Section: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-jobs-job.html
    # Use maxCapacity or WorkerType.  Not both.
    #maxCapacity: 1
    # (optional) Maximum retries.  see: MaxRetries section:
    maxRetries: 0
    # (optional) Number of minutes to wait before sending a job run delay notification.
    notificationProperty:
      notifyDelayAfter: 1
    # (optional) Number of workers to provision
    #numberOfWorkers: 1
    # (optional) Number of minutes to wait before considering the job timed out
    timeout: 60
    # (optional) Worker type to use.  Any of: "Standard" | "G.1X" | "G.2X"
    # Use maxCapacity or WorkerType.  Not both.
    workerType: Standard

jobs:
  # Job definitions below
  curate-dataset-a: # Job Name
    template: "GlueJobTemplate" # Reference a job template.
    defaultArguments:
      --raw_database: project:databaseName/<your-glue-catalog-database-name-for-raw-datasets>
      --raw_table: <raw-table-a>
      --curated_s3_bucket: "{{resolve:ssm:/{{org}}/{{domain}}/ssm_path/to/s3/bucket/name}}"
      --curated_folder: <your-s3-directory-prefix-containing-curated-datasets>
      --curated_database: project:databaseName/<your-glue-catalog-database-name-for-curated-datasets>
    command:
      scriptLocation: ./src/glue-jobs/curate-dataset-a.py
    allocatedCapacity: 3
    description: Glue job to read raw source file and create transformed and curated datasets.

  curate-dataset-b: # Job Name
    template: "GlueJobTemplate" # Reference a job template.
    defaultArguments:
      --raw_database: project:databaseName/<your-glue-catalog-database-name-for-raw-datasets>
      --raw_table: <raw-table-b>
      --curated_s3_bucket: "{{resolve:ssm:/{{org}}/{{domain}}/ssm_path/to/s3/bucket/name}}"
      --curated_folder: <your-s3-directory-prefix-containing-curated-datasets>
      --curated_database: project:databaseName/<your-glue-catalog-database-name-for-curated-datasets>
    command:
      scriptLocation: ./src/glue-jobs/curate-dataset-b.py
    allocatedCapacity: 3
    description: Glue job to read raw source file and create transformed and curated datasets.

  fact-table-1-load:
    template: "RedshiftLoadJobTemplate"
    defaultArguments:
      --redshift_secret: "<your-redshift-secret-name?"
      --redshift_database: <your-redhisft-dwh-database-name>
      --redshift_schema: <your-redshift-dwh-schema-name>
      --redshift_table: fact_table_1
      --catalog_database: project:databaseName/<your-glue-catalog-database-name-for-curated-datasets>
      --catalog_table: fact_table_1
      --role_iam: "{{resolve:ssm:/{{org}}/{{domain}}/<ssm_path/to/generated-role/redshift-execution-role/arn>}}"
      --redshift_tmp_dir: "s3://{{resolve:ssm:/{{org}}/{{domain}}/dataops-project/bucket/name}}/temp/"
      --truncate_before_load: "false"
    command:
      scriptLocation: ./src/glue-jobs/generic-redshift-load.py
    allocatedCapacity: 2
    description: Glue job to read curated file and load into Redshift table.

  dim-table-1-load:
    template: "RedshiftLoadJobTemplate"
    defaultArguments:
      --redshift_secret: "<your-redshift-secret-name>"
      --redshift_database: <your-redhisft-dwh-database-name>
      --redshift_schema: <your-redshift-dwh-schema-name>
      --redshift_table: dim_table_1
      --catalog_database: project:databaseName/<your-glue-catalog-database-name-for-curated-datasets>
      --catalog_table: dim_table_1
      --role_iam: "{{resolve:ssm:/{{org}}/{{domain}}/<ssm_path/to/generated-role/redshift-execution-role/arn>}}"
      --redshift_tmp_dir: "s3://{{resolve:ssm:/{{org}}/{{domain}}/dataops-project/bucket/name}}/temp/"
      --truncate_before_load: "true"
    command:
      scriptLocation: ./src/glue-jobs/generic-redshift-load.py
    allocatedCapacity: 2
    description: Glue job to read curated file and load into Redshift table.

  dim-table-2-load:
    template: "RedshiftLoadJobTemplate"
    defaultArguments:
      --redshift_secret: "<your-redshift-secret-name>"
      --redshift_database: <your-redhisft-dwh-database-name>
      --redshift_schema: <your-redshift-dwh-schema-name>
      --redshift_table: dim_table_2
      --catalog_database: project:databaseName/<your-glue-catalog-database-name-for-curated-datasets>
      --catalog_table: dim_table_2
      --role_iam: "{{resolve:ssm:/{{org}}/{{domain}}/generated-role/redshift-execution-role/arn}}"
      --redshift_tmp_dir: "s3://{{resolve:ssm:/{{org}}/{{domain}}/dataops-project/bucket/name}}/temp/"
      --truncate_before_load: "true"
    command:
      scriptLocation: ./src/glue-jobs/generic-redshift-load.py
    allocatedCapacity: 2
    description: Glue job to read curated file and load into Redshift table.

basic_etl_pipeline/workflow.yaml

This configuration will create the Glue Workflow, to orchestrate the crawler and jobs, using the DataOps Glue Workflow module.

# Contents available in dataops/workflow.yaml

# (Required) Name of the Data Ops Project
# Name the the project the resources of which will be used by this workflow.
# Other resources within the project can be referenced in the workflow config using
# the "project:" prefix on the config value.
projectName: dataops-project-name
# List of workflow definitions as produced by 'aws glue get-workflow --name <name> --include-graph'
workflowDefinitions:
    # The rawWorkflowDef can be specified directly, or can be Json/Yaml representation of the output of the
    # 'aws glue get-workflow --name <name> --include-graph' command. This allows workflows to be created in the Glue
    # interface, exported, and pasted directly into this config. The parts of the command output which are not required
    # will be ignored.
  - rawWorkflowDef:
      Workflow:
        Name: curate-my-application-data-wf
        DefaultRunProperties: {}
        Graph:
          Nodes:
          - Type: TRIGGER
            Name: EveryNight2AMTrigger
            TriggerDetails:
              Trigger:
                Name: EveryNight2AMTrigger
                WorkflowName: curate-my-application-data-wf
                Type: SCHEDULED
                State: ACTIVATED
                Schedule: cron(0 7 * * ? *)
                Actions:
                  - CrawlerName: "{{resolve:ssm:/{{org}}/{{domain}}/dataops-project-name/crawler/name/crawler-a}}"
          - Type: TRIGGER
            Name: RawCrawlerSuccess
            TriggerDetails:
              Trigger:
                Name: RawCrawlerSuccess
                WorkflowName: curate-my-application-data-wf
                Type: CONDITIONAL
                State: ACTIVATED
                Actions:
                  - JobName: "{{resolve:ssm:/{{org}}/{{domain}}/dataops-project-name/job/name/curate-dataset-a}}"
                  - JobName: "{{resolve:ssm:/{{org}}/{{domain}}/dataops-project-name/job/name/curate-dataset-b}}"
                Predicate:
                  Logical: ANY
                  Conditions:
                  - LogicalOperator: EQUALS
                    CrawlerName: "{{resolve:ssm:/{{org}}/{{domain}}/dataops-project-name/crawler/name/crawler-a}}"
                    CrawlState: SUCCEEDED
          - Type: TRIGGER
            Name: CurateConnectCTRSuccess
            TriggerDetails:
              Trigger:
                Name: CurateConnectCTRSuccess
                WorkflowName: curate-my-application-data-wf
                Type: CONDITIONAL
                State: ACTIVATED
                Actions:
                  - JobName: "{{resolve:ssm:/{{org}}/{{domain}}/dataops-project-name/job/name/fact-table-1-load}}"
                  - JobName: "{{resolve:ssm:/{{org}}/{{domain}}/dataops-project-name/job/name/dim-table-1-load}}"
                  - JobName: "{{resolve:ssm:/{{org}}/{{domain}}/dataops-project-name/job/name/dim-table-2-load}}"
                Predicate:
                  Logical: AND
                  Conditions:
                  - LogicalOperator: EQUALS
                    JobName: "{{resolve:ssm:/{{org}}/{{domain}}/dataops-project-name/job/name/curate-dataset-a}}"
                    State: SUCCEEDED
                  - LogicalOperator: EQUALS
                    JobName: "{{resolve:ssm:/{{org}}/{{domain}}/dataops-project-name/job/name/curate-dataset-b}}"
                    State: SUCCEEDED  

  - rawWorkflowDef:
      Workflow:
        Name: crawl-my-application-data-raw-wf
        DefaultRunProperties: {}
        Graph:
          Nodes:
            - Type: TRIGGER
              Name: EveryDay930AMTrigger
              TriggerDetails:
                Trigger:
                  Name: EveryDay930AMTrigger
                  WorkflowName: crawl-my-application-data-raw-wf
                  Type: SCHEDULED
                  Schedule: "cron(30 14 * * ? *)"
                  State: ACTIVATED
                  Actions:
                    - CrawlerName: "{{resolve:ssm:/{{org}}/{{domain}}/dataops-project-name/crawler/name/crawler-a}}"