Snowflake is an excellent modern database that couples the power of traditional SQL with modern data lake architecture. AWS Glue is a native ETL environment built into the AWS serverless ecosystem. Together they make a powerful combination for building a modern data lake.
In this article, I will address the fundamentals of creating a job in Glue to automatically load a Snowflake table using the powerful COPY command.
Integrate your Snowflake Credentials into Secrets Manager
AWS provides a utility called Secrets Manager to store passwords and it has a number of features, including automated password rotation that make it very attractive for secure storage.
The best way to utilize Secrets Manager is to store your Snowflake credentials using Multifactor Authentication.
Open up Secrets Manager and add the credentials of your Snowflake user. Fill in all the required fields in the manager and store the secret.
Fill in the Snowflake Connection Information
Record the secrets ID and add it to your AWS::IAM::Role specification in a Cloud Formation file. Use environment variables to store the secret name. Don't worry, we will use AWS roles to restrict secret access to the Glue job explicitly.
- PolicyName: "AllowSecretsManager"
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Action: [
"secretsmanager:GetSecretValue",
"secretsmanager:DescribeSecret"
]
Resource: [
!Sub "arn:aws:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretName}*"
The policy will allow your Glue Job to connect to Snowflake to perform operations.
Snowflake Requirements
Create a Stage
The COPY requires STAGE connected to the S3 bucket in AWS. Follow the Snowflake method to create a stage. The best practice is to create a storage integration to your S3 bucket and then create multiple stages on the storage integration for application or customer. You can use SNOWSQL with variable substitution to automate.
!set variable_substitution=true;
create or replace stage &{database}.stage.OLYMPICS
storage_integration = &{storage_integration}
url = '&{S3DataHome}/stage/olympics/';
Create a Table in Snowflake to Contain the Data
We will create a table to contain the public data set containing athlete results for 120 years of Olympics events. The table below has the columns of the data set as well as standard columns that every data warehouse table should contain (highlighted in bold).
CREATE OR REPLACE TABLE STAGE.OLYMPICS_ATHELETE_EVENT
(
FILE_LOAD_ID INTEGER identity (1,1),
FILE_NAME VARCHAR,
FILE_ROW_NUMBER INTEGER,
ID VARCHAR(100) NOT NULL,
NAME VARCHAR(100) NOT NULL,
SEX VARCHAR(100) NOT NULL,
AGE VARCHAR(100) NOT NULL,
HEIGHT VARCHAR(100) NOT NULL,
WEIGHT VARCHAR(100) NOT NULL,
TEAM VARCHAR(100) NOT NULL,
NOC VARCHAR(100) NOT NULL,
GAMES VARCHAR(100) NOT NULL,
YEAR VARCHAR(100) NOT NULL,
SEASON VARCHAR(100) NOT NULL,
CITY VARCHAR(100) NOT NULL,
SPORT VARCHAR(100) NOT NULL,
EVENT VARCHAR(100) NOT NULL,
MEDAL VARCHAR(100) NOT NULL,
DW_CREATE_DATE TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP(),
DW_CREATE_USER VARCHAR NOT NULL DEFAULT CURRENT_USER(),
DW_UPDATE_DATE TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP(),
DW_UPDATE_USER VARCHAR NOT NULL DEFAULT CURRENT_USER()
);
Create a COPY Statement to Load Data from S3
The COPY statement in Snowflake is a powerful method to import and export data from the data warehouse. It will load any new data files and will ignore files previously loaded. Utilizing COPY is a simple and effective means of managing your warehouse.
copy into STAGE.OLYMPICS_ATHELETE_EVENT (FILE_NAME,
FILE_ROW_NUMBER,
ID,
NAME,
SEX,
AGE,
HEIGHT,
WEIGHT,
TEAM,
NOC,
GAMES,
YEAR,
SEASON,
CITY,
SPORT,
EVENT,
MEDAL)
from (
select METADATA$FILENAME file_name,
METADATA$FILE_ROW_NUMBER row_number,
t.$1,
t.$2,
t.$3,
t.$4,
t.$5,
t.$6,
t.$7,
t.$8,
t.$9,
t.$10,
t.$11,
t.$12,
t.$13,
t.$14,
t.$15
from @stage.OLYMPICS t
)
pattern = '.*athlete_events.csv.gz'
on_error = CONTINUE
force = false
file_format = (field_optionally_enclosed_by = '"'
type = 'csv'
compression = GZIP
field_delimiter = ','
skip_header = 1);
AWS Glue Job Requirements
The Cloud Formation Script
The foundation of AWS is Cloud Formation. A basic Cloud Formation file contains a parameters section, a role section, and a component section. The component section above contains the Secrets Manager snippet earlier in the Article.
AWSTemplateFormatVersion: "2010-09-09"
Description: >
This Template Configures a Job to Load Event Data into a Snowflake Table using Glue
Parameters:
JobName:
Type: String
Description: "The Glue Job name used for the Stack and tags in the Snowflake query"
JobSql:
Type: String
Description: "A SQL COPY function to load the data into Snowflake."
S3DataHome:
Type: String
MinLength: "1"
Description: "The S3 Bucket Containing the Data Lake Data"
SecretName:
Type: String
Description: "The secret containing the Snowflake login information"
Resources:
SnowflakeGlueJobRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- glue.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: root
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- "s3:GetObject"
- "s3:PutObject"
- "s3:ListBucket"
- "s3:DeleteObject"
Resource:
- !Sub "arn:aws:s3:::${S3DataHome}"
- !Sub "arn:aws:s3:::${S3DataHome}/*"
- PolicyName: "AllowSecretsManager"
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Action: [
"secretsmanager:GetSecretValue",
"secretsmanager:DescribeSecret"
]
Resource: [
!Sub "arn:aws:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretName}*"
]
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
Path: "/"
LoadOlympicData:
Type: AWS::Glue::Job
Properties:
Command:
Name: pythonshell
PythonVersion: 3
ScriptLocation: !Sub "s3://${S3DataHome}/src/etl-scripts/copy_to_snowflake.py"
GlueVersion: 1.0
DefaultArguments:
"--job-bookmark-option": "job-bookmark-enable"
"--job-language": "python"
"--extra-py-files": !Sub "s3://${S3DataHome}/lib/snowflake_connector_python-2.4.2-cp37-cp37m-manylinux2014_x86_64.whl"
"--RegionName": !Sub "${AWS::Region}"
"--SecretName": !Ref SecretName
"--JobName": !Ref JobName
"--JobBucket": !Ref S3DataHome
"--JobSql": !Ref JobSql
ExecutionProperty:
MaxConcurrentRuns: 2
MaxRetries: 0
Name: snowflake-load-olympic-data
Role: !Ref SnowflakeGlueJobRole
Create the Snowflake Python Wheel in Docker
Take careful note of the line with the Snowflake connector library. Since Snowflake is not native to AWS, you will need to provide a Wheel file with the compiled binaries of the Snowflake Python library. Use docker or an EC2 instance with the AWS AMI to create the wheel file.
Here is the docker file.
python3.7 -m venv wheel-env
source wheel-env/bin/activate
pip install --upgrade pip
cat "snowflake-connector-python" > requirements.txt
for f in $(cat ../requirements.txt); do pip wheel $f -w ../wheelhouse; done
cd wheelhouse/
INDEXFILE="<html><head><title>Links</title></head><body><h1>Links</h1>"
for f in *.whl; do INDEXFILE+="<a href='$f'>$f</a><br>"; done
INDEXFILE+="</body></html>"
echo "$INDEXFILE" > index.html
cd ..
deactivate
rm -rf cache wheel-env
aws s3 sync wheelhouse s3://${S3DataHome}/lib/
Create the Python Script
The Glue job takes a Python script.
ScriptLocation: !Sub "s3://${S3DataHome}/src/etl-scripts/copy_to_snowflake.py"
The script is fairly generic and takes a SQL file to execute so it can be reused as well.
import sys
import boto3
import json
from botocore.exceptions import ClientError
from awsglue.utils import getResolvedOptions
import snowflake.connector
def get_secret_json(session, secret_name, region_name):
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
secret = None
get_secret_value_response = None
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
raise e
else:
if 'SecretString' in get_secret_value_response:
secret = get_secret_value_response['SecretString']
return json.loads(secret)
def connect(user, password, account, database, warehouse, session_parameters=None):
"""
Connect to Snowflake
"""
return snowflake.connector.connect(
user=user,
password=password,
account=account,
database=database,
warehouse=warehouse,
session_parameters=session_parameters
)
def read_sql(session, bucket, filename):
s3_client = session.client("s3")
s3_object = s3_client.get_object(Bucket=bucket, Key=filename)
return s3_object['Body'].read().decode("utf-8")
def main():
# Create a Secrets Manager client
session = boto3.session.Session()
args = getResolvedOptions(sys.argv, ['JobName', 'RegionName', 'SecretName', 'JobBucket', 'JobSql'])
json_secret = get_secret_json(session, args["SecretName"], args["RegionName"])
sql = read_sql(session=session, bucket=args["JobBucket"], filename=args["JobSql"])
with connect(
user=json_secret['USERNAME'],
password=json_secret['PASSWORD'],
account=json_secret['ACCOUNT'],
database=json_secret['DB'],
warehouse=json_secret['WAREHOUSE'],
session_parameters={
'QUERY_TAG': args["JobName"]
}) as con:
cs = con.cursor()
result_cs = cs.execute(sql)
result_row = result_cs.fetchone()
print(result_row)
if __name__ == "__main__":
main()
Create the Glue Job with the Cloud Formation Script
Here is the Makefile form for executing the Cloud Formation script to create the Glue Job.
package-job:
aws cloudformation package \
--template-file resources/glue/snowflake-glue-load-history.yaml \
--s3-bucket ${S3_DEPLOYMENT_BUCKET} \
--output-template-file build/snowflake-glue-load-history.yaml
deploy-job: package-job
aws cloudformation deploy \
--template-file build/snowflake-glue-load-history.yaml \
--parameter-overrides S3DataHome=${S3_DATA_BUCKET} \
SecretName="${SECRET_NAME}" JobName=${LOAD_OLYMPICS_DATA_JOB} \
JobSql=${SQL_COPY_OLYMPIC_DATA} \
--stack-name ${LOAD_OLYMPICS_DATA_JOB} \
--capabilities CAPABILITY_IAM
Running the Code
Execute the Glue Job (or Schedule it)
You can run and execute the Glue Job in the AWS console.
The Glue process costs $0.44 per DPU-Hour, billed per second, with a 1-minute minimum, so it is going to be very competitive price-wise with even free ETL tools running on an EC2 instance.
Checking the Results
Now the fun part - running the query to check your data.
select TEAM,
sum(case when MEDAL='Gold' then 1 else 0 end) GOLD_MEDALS,
sum(case when MEDAL='Silver' then 1 else 0 end) SILVER_MEDALS,
sum(case when MEDAL='Bronze' then 1 else 0 end) BRONZE_MEDALS
from stage.OLYMPICS_ATHELETE_EVENT
where medal in ('Gold', 'Silver', 'Bronze')
and sport = 'Fencing'
group by team
order by sum(case when MEDAL='Gold' then 1 else 0 end) desc,
sum(case when MEDAL='Silver' then 1 else 0 end) desc,
sum(case when MEDAL='Bronze' then 1 else 0 end) desc;
So who is the best in Fencing? Italy, of course.
Commentaires