top of page
Search
  • Writer's pictureTim Burns

Integrating Snowflake with Glue

Snowflake is an excellent modern database that couples the power of traditional SQL with modern data lake architecture. AWS Glue is a native ETL environment built into the AWS serverless ecosystem. Together they make a powerful combination for building a modern data lake.


In this article, I will address the fundamentals of creating a job in Glue to automatically load a Snowflake table using the powerful COPY command.


Integrate your Snowflake Credentials into Secrets Manager

AWS provides a utility called Secrets Manager to store passwords and it has a number of features, including automated password rotation that make it very attractive for secure storage.


The best way to utilize Secrets Manager is to store your Snowflake credentials using Multifactor Authentication.


Open up Secrets Manager and add the credentials of your Snowflake user. Fill in all the required fields in the manager and store the secret.

Fill in the Snowflake Connection Information


Record the secrets ID and add it to your AWS::IAM::Role specification in a Cloud Formation file. Use environment variables to store the secret name. Don't worry, we will use AWS roles to restrict secret access to the Glue job explicitly.


- PolicyName: "AllowSecretsManager"
  PolicyDocument:
    Version: "2012-10-17"
    Statement:
      - Effect: "Allow"
        Action: [
            "secretsmanager:GetSecretValue",
            "secretsmanager:DescribeSecret"
        ]
        Resource: [
            !Sub "arn:aws:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretName}*"

The policy will allow your Glue Job to connect to Snowflake to perform operations.


Snowflake Requirements

Create a Stage

The COPY requires STAGE connected to the S3 bucket in AWS. Follow the Snowflake method to create a stage. The best practice is to create a storage integration to your S3 bucket and then create multiple stages on the storage integration for application or customer. You can use SNOWSQL with variable substitution to automate.

!set variable_substitution=true;
create or replace stage &{database}.stage.OLYMPICS
storage_integration = &{storage_integration}
url = '&{S3DataHome}/stage/olympics/';


Create a Table in Snowflake to Contain the Data

We will create a table to contain the public data set containing athlete results for 120 years of Olympics events. The table below has the columns of the data set as well as standard columns that every data warehouse table should contain (highlighted in bold).


CREATE OR REPLACE TABLE STAGE.OLYMPICS_ATHELETE_EVENT
(
    FILE_LOAD_ID     INTEGER identity (1,1),
    FILE_NAME         VARCHAR,
    FILE_ROW_NUMBER         INTEGER,
    ID             VARCHAR(100) NOT NULL,
    NAME           VARCHAR(100) NOT NULL,
    SEX            VARCHAR(100) NOT NULL,
    AGE            VARCHAR(100) NOT NULL,
    HEIGHT         VARCHAR(100) NOT NULL,
    WEIGHT         VARCHAR(100) NOT NULL,
    TEAM           VARCHAR(100) NOT NULL,
    NOC            VARCHAR(100) NOT NULL,
    GAMES          VARCHAR(100) NOT NULL,
    YEAR           VARCHAR(100) NOT NULL,
    SEASON         VARCHAR(100) NOT NULL,
    CITY           VARCHAR(100) NOT NULL,
    SPORT          VARCHAR(100) NOT NULL,
    EVENT          VARCHAR(100) NOT NULL,
    MEDAL          VARCHAR(100) NOT NULL,
    DW_CREATE_DATE TIMESTAMPTZ           DEFAULT CURRENT_TIMESTAMP(),
    DW_CREATE_USER VARCHAR      NOT NULL DEFAULT CURRENT_USER(),
    DW_UPDATE_DATE TIMESTAMPTZ           DEFAULT CURRENT_TIMESTAMP(),
    DW_UPDATE_USER VARCHAR      NOT NULL DEFAULT CURRENT_USER()
);

Create a COPY Statement to Load Data from S3

The COPY statement in Snowflake is a powerful method to import and export data from the data warehouse. It will load any new data files and will ignore files previously loaded. Utilizing COPY is a simple and effective means of managing your warehouse.

copy into STAGE.OLYMPICS_ATHELETE_EVENT (FILE_NAME,
                                         FILE_ROW_NUMBER,
                                         ID,
                                         NAME,
                                         SEX,
                                         AGE,
                                         HEIGHT,
                                         WEIGHT,
                                         TEAM,
                                         NOC,
                                         GAMES,
                                         YEAR,
                                         SEASON,
                                         CITY,
                                         SPORT,
                                         EVENT,
                                         MEDAL)
    from (
        select METADATA$FILENAME        file_name,
               METADATA$FILE_ROW_NUMBER row_number,
               t.$1,
               t.$2,
               t.$3,
               t.$4,
               t.$5,
               t.$6,
               t.$7,
               t.$8,
               t.$9,
               t.$10,
               t.$11,
               t.$12,
               t.$13,
               t.$14,
               t.$15
        from @stage.OLYMPICS t
    )
    pattern = '.*athlete_events.csv.gz'
    on_error = CONTINUE
    force = false
    file_format = (field_optionally_enclosed_by = '"'
        type = 'csv'
        compression = GZIP
        field_delimiter = ','
        skip_header = 1);

AWS Glue Job Requirements

The Cloud Formation Script

The foundation of AWS is Cloud Formation. A basic Cloud Formation file contains a parameters section, a role section, and a component section. The component section above contains the Secrets Manager snippet earlier in the Article.

AWSTemplateFormatVersion: "2010-09-09"
Description: >
  This Template Configures a Job to Load Event Data into a Snowflake Table using Glue

Parameters:
  JobName:
    Type: String
    Description: "The Glue Job name used for the Stack and tags in the Snowflake query"

  JobSql:
    Type: String
    Description: "A SQL COPY function to load the data into Snowflake."

  S3DataHome:
    Type: String
    MinLength: "1"
    Description: "The S3 Bucket Containing the Data Lake Data"

  SecretName:
    Type: String
    Description: "The secret containing the Snowflake login information"

Resources:
  SnowflakeGlueJobRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - glue.amazonaws.com
            Action:
              - sts:AssumeRole
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - "s3:GetObject"
                  - "s3:PutObject"
                  - "s3:ListBucket"
                  - "s3:DeleteObject"
                Resource:
                  - !Sub "arn:aws:s3:::${S3DataHome}"
                  - !Sub "arn:aws:s3:::${S3DataHome}/*"

        - PolicyName: "AllowSecretsManager"
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action: [
                    "secretsmanager:GetSecretValue",
                    "secretsmanager:DescribeSecret"
                ]
                Resource: [
                    !Sub "arn:aws:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretName}*"
                ]

      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
      Path: "/"

  LoadOlympicData:
    Type: AWS::Glue::Job
    Properties:
      Command:
        Name: pythonshell
        PythonVersion: 3
        ScriptLocation: !Sub "s3://${S3DataHome}/src/etl-scripts/copy_to_snowflake.py"
      GlueVersion: 1.0
      DefaultArguments:
        "--job-bookmark-option": "job-bookmark-enable"
        "--job-language": "python"
        "--extra-py-files": !Sub "s3://${S3DataHome}/lib/snowflake_connector_python-2.4.2-cp37-cp37m-manylinux2014_x86_64.whl"
        "--RegionName": !Sub "${AWS::Region}"
        "--SecretName": !Ref SecretName
        "--JobName": !Ref JobName
        "--JobBucket": !Ref S3DataHome
        "--JobSql": !Ref JobSql
      ExecutionProperty:
        MaxConcurrentRuns: 2
      MaxRetries: 0
      Name: snowflake-load-olympic-data
      Role: !Ref SnowflakeGlueJobRole

Create the Snowflake Python Wheel in Docker

Take careful note of the line with the Snowflake connector library. Since Snowflake is not native to AWS, you will need to provide a Wheel file with the compiled binaries of the Snowflake Python library. Use docker or an EC2 instance with the AWS AMI to create the wheel file.


Here is the docker file.

python3.7 -m venv wheel-env
source wheel-env/bin/activate
pip install --upgrade pip
cat "snowflake-connector-python" > requirements.txt
for f in $(cat ../requirements.txt); do pip wheel $f -w ../wheelhouse; done
cd wheelhouse/
INDEXFILE="<html><head><title>Links</title></head><body><h1>Links</h1>"
for f in *.whl; do INDEXFILE+="<a href='$f'>$f</a><br>"; done
INDEXFILE+="</body></html>"
echo "$INDEXFILE" > index.html
cd ..
deactivate
rm -rf cache wheel-env
aws s3 sync wheelhouse s3://${S3DataHome}/lib/

Create the Python Script

The Glue job takes a Python script.

ScriptLocation: !Sub "s3://${S3DataHome}/src/etl-scripts/copy_to_snowflake.py"

The script is fairly generic and takes a SQL file to execute so it can be reused as well.


import sys

import boto3
import json

from botocore.exceptions import ClientError
from awsglue.utils import getResolvedOptions

import snowflake.connector


def get_secret_json(session, secret_name, region_name):

    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    secret = None
    get_secret_value_response = None

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
            raise e
    else:
        if 'SecretString' in get_secret_value_response:
            secret = get_secret_value_response['SecretString']

    return json.loads(secret)


def connect(user, password, account, database, warehouse, session_parameters=None):
    """
    Connect to Snowflake
    """
    return snowflake.connector.connect(
        user=user,
        password=password,
        account=account,
        database=database,
        warehouse=warehouse,
        session_parameters=session_parameters
    )


def read_sql(session, bucket, filename):
    s3_client = session.client("s3")
    s3_object = s3_client.get_object(Bucket=bucket, Key=filename)
    return s3_object['Body'].read().decode("utf-8")


def main():
    # Create a Secrets Manager client
    session = boto3.session.Session()

    args = getResolvedOptions(sys.argv, ['JobName', 'RegionName', 'SecretName', 'JobBucket', 'JobSql'])

    json_secret = get_secret_json(session, args["SecretName"], args["RegionName"])

    sql = read_sql(session=session, bucket=args["JobBucket"], filename=args["JobSql"])

    with connect(
            user=json_secret['USERNAME'],
            password=json_secret['PASSWORD'],
            account=json_secret['ACCOUNT'],
            database=json_secret['DB'],
            warehouse=json_secret['WAREHOUSE'],
            session_parameters={
                'QUERY_TAG': args["JobName"]
            }) as con:
        cs = con.cursor()

        result_cs = cs.execute(sql)

        result_row = result_cs.fetchone()

        print(result_row)


if __name__ == "__main__":
    main()


Create the Glue Job with the Cloud Formation Script

Here is the Makefile form for executing the Cloud Formation script to create the Glue Job.

package-job:
   aws cloudformation package \
      --template-file resources/glue/snowflake-glue-load-history.yaml \
             --s3-bucket ${S3_DEPLOYMENT_BUCKET} \
       --output-template-file build/snowflake-glue-load-history.yaml

deploy-job: package-job
   aws cloudformation deploy \
      --template-file build/snowflake-glue-load-history.yaml \
      --parameter-overrides S3DataHome=${S3_DATA_BUCKET} \
         SecretName="${SECRET_NAME}" JobName=${LOAD_OLYMPICS_DATA_JOB} \
         JobSql=${SQL_COPY_OLYMPIC_DATA} \
       --stack-name ${LOAD_OLYMPICS_DATA_JOB} \
       --capabilities CAPABILITY_IAM

Running the Code

Execute the Glue Job (or Schedule it)

You can run and execute the Glue Job in the AWS console.


The Glue process costs $0.44 per DPU-Hour, billed per second, with a 1-minute minimum, so it is going to be very competitive price-wise with even free ETL tools running on an EC2 instance.


Checking the Results

Now the fun part - running the query to check your data.


select TEAM,
       sum(case when MEDAL='Gold' then 1 else 0 end) GOLD_MEDALS,
sum(case when MEDAL='Silver' then 1 else 0 end) SILVER_MEDALS,
sum(case when MEDAL='Bronze' then 1 else 0 end) BRONZE_MEDALS
from stage.OLYMPICS_ATHELETE_EVENT
where medal in ('Gold', 'Silver', 'Bronze')
 and sport = 'Fencing'
    group by team
order by sum(case when MEDAL='Gold' then 1 else 0 end) desc,
         sum(case when MEDAL='Silver' then 1 else 0 end) desc,
         sum(case when MEDAL='Bronze' then 1 else 0 end) desc;

So who is the best in Fencing? Italy, of course.





30 views0 comments

Recent Posts

See All

Commenti


bottom of page