Photo by Author with the Never Summer Wilderness in the Background
Introduction
I enjoy integrating my love of music into my professional development, so when I get the chance, I listen to KEXP and work on a Snowflake Data Warehouse I have that tracks radio shows and playlists.
In this article, I tackle streams - a hot topic in my day job - and I dig into the SQL nitty-gritty of creating and using streams.
You might have seen the New Music Through Data Analytics post if you've read my blog before. The data here is from that project; you can set it up by following that article.
About Streams
The art of capturing and processing only changed data is a core feature of efficient data pipelines. Snowflake streams show changes on a table and allow pipelines to process data efficiently.
See Introduction to Streams from the Snowflake documentation.
See also a Change Data Capture using Snowflake Streams from ThinkETL.
In this article, I listen to KEXP and trace stream data collected in Snowflake from the radio broadcast on this quiet Saturday morning with the amazing KEXP DJ: Greta Rose.
Creating Streams on Existing Tables
Streams on Tables
Snowflake recommends having a unique stream for every consumer, so I create a table stream on my KEXP Radio Show import table.
create or replace stream STAGE.STREAM_IMPORT_KEXP_SHOW_CCD
on table STAGE.IMPORT_KEXP_SHOW;
create or replace stream STAGE.STREAM_IMPORT_KEXP_PLAYLIST_CDC
on table STAGE.IMPORT_KEXP_PLAYLIST;
The stream will start tracking changes on the target table. A consistent naming strategy is a crucial component of a data warehouse. Here I named in STREAM_<table_name>_<consumer>. And the consumer is the "Change Data Capture" or CDC component. Snowflake recommends having a separate stream for each consumer because Snowflake resets the stream with every consumption. It will look much like a table but will not be consistent according to naive table expectations.
Streams on Views
Snowflake also allows streams on views, with several exclusions.
You can't track a view based on a "select distinct" query.
You can't track a view with any aggregation or grouping logic.
All underlying tables must be simple tables
No materialized views are allowed.
In my day job, I use streams on views extensively because I want to translate tables to views in the stream so that I can stream multiple dimensional sources together into a single managed CDC table, but that is a topic for another article.
Consuming a Table Stream
Repeatable Read Isolation
A simple select statement will not consume the stream because of repeatable read isolation. Repeatable reads ensure that we do not lose data until we finish the stream rows within the transaction. Repeatable read isolation is different than read-committed, which is the standard read mode of committed data on tables.
Once the stream is consumed into the table, it will not return the consumed rows. It works ideally to capture only the changed data on a table and transform that data into your analytical data warehouse.
Putting it all Together - The Data Pipeline
The Data Pipeline is the core architectural component of any data product. I present a short data pipeline based on a Makefile with specific simple steps so the reader can integrate these ideas into their existing data applications.
Extracting Data with Python API
A Python API calls the endpoint and synchronizes the data from the KEXP Web API to the S3 Bucket.
sync-kexp-api:
python python/sync_kexp_s3.py
Running the Snowflake Pipeline in SnowSQL
SnowSQL is a powerful command line interface for executing commands in Snowflake. It pairs well with Makefiles, and here, I use SnowSQL to run the pipeline to copy the data from S3 to Snowflake and then process the data into the dimensional model using the defined streams.
pipeline: sync-kexp-api
snowsql -f snowflake/sql/copy_stage_import_show.sql
snowsql -f snowflake/sql/copy_stage_import_playlist.sql
snowsql -f snowflake/sql/merge_stream_dim_kexp_show.sql
snowsql -f snowflake/sql/merge_stream_dim_kexp_playlist.sql
Importing the shows will copy any shows new since the last sync into a stage table.
{
"airdate_after_date": "2022-09-25T09:36:10-0700",
"airdate_before_date": "2022-09-25T10:13:59-0700",
"run_datetime_key": "20220925101359",
"run_date_key": "20220925",
"playlist_key": "stage/kexp/playlists/20220925101359/playlist20220925101359.json",
"shows_key": "stage/kexp/shows/20220925101359/show20220925101359.json",
"number_songs": 15
}
Querying the stream tells me what shows were loaded.
select PROGRAM_NAME
, PROGRAM_TAGS
, HOST_NAMES
, TAGLINE
, START_TIME
from STAGE.STREAM_IMPORT_KEXP_SHOW_CDC
+-------------------+-------------------+--------------------------+
|PROGRAM_NAME |PROGRAM_TAGS |TAGLINE |
+-------------------+-------------------+--------------------------+
|Preachin' the Blues|Blues,Country,Roots|Seattle's #1 Hangover Show|
+-------------------+-------------------+--------------------------+
And also what songs were loaded:
select ARTIST, SONG, ALBUM, metadata$ACTION, metadata$ISUPDATE
from STAGE.STREAM_IMPORT_KEXP_PLAYLIST_CDC;
+--------------------------------------------------+-------------------------+-------------------------------------+---------------+-----------------+
|ARTIST |SONG |ALBUM |METADATA$ACTION|METADATA$ISUPDATE|
+--------------------------------------------------+-------------------------+-------------------------------------+---------------+-----------------+
|NULL |NULL |NULL |INSERT |false |
|Betty Everett |Tell Me Darling |Ike Turner Sessions |INSERT |false |
|Lula Reed |Every Second |The Soulful Side Of Lula Reed |INSERT |false |
|B.B. King |Be Careful With A Fool |RPM & Kent Vaults |INSERT |false |
|B.B. King |Country Girl |Lucille |INSERT |false |
|Jerry McCain |Steady |The Ace Story Volume 3 |INSERT |false |
|NULL |NULL |NULL |INSERT |false |
|Jody Williams |I Feel So All Alone |Chicago: The Blues Yesterday Volume 9|INSERT |false |
|Albert King |Blues At Sunrise |Deep Feeling |INSERT |false |
|Albert King |Little Boy Blue |Chicago Guitar Killers |INSERT |false |
|NULL |NULL |NULL |INSERT |false |
|Howlin' Wolf |Change My Way |The Wolf Is At Your Door |INSERT |false |
|Mabel Franklin |Unhappy Woman |Ivory single |INSERT |false |
|Magic Sam |Baby, You Torture My Soul|The Late Great Magic Sam |INSERT |false |
|Little Esther & The Robins w Johnny Otis Orchestra|Double Crossing Blues |Midnight At The Barrelhouse |INSERT |false |
+--------------------------------------------------+-------------------------+-------------------------------------+---------------+-----------------+
With the merge, all this new data appends to the data warehouse, and I don't need to worry about all the historical data.
The Powerful and Flexible Merge Statement
The merge statement acts as the glue in change data capture using streams. It consumes the stream and transfers the data from the stage import to the dimensional tables in the data warehouse.
merge into WAREHOUSE.DIM_KEXP_SHOW dim
using STAGE.STREAM_IMPORT_KEXP_SHOW_CDC str on dim.SHOW_ID = str.SHOW_ID
when matched and str.metadata$action = 'DELETE' AND metadata$isupdate = 'FALSE'
then delete
when matched and str.metadata$action = 'INSERT' AND metadata$isupdate = 'TRUE'
then UPDATE SET
dim.LOAD_ID = str.LOAD_ID
, dim.SHOW_ID = str.SHOW_ID
, dim.PROGRAM_ID = str.PROGRAM_ID
, dim.PROGRAM_NAME = str.PROGRAM_NAME
, dim.PROGRAM_TAGS = str.PROGRAM_TAGS
, dim.HOST_NAMES = str.HOST_NAMES
, dim.TAGLINE = str.TAGLINE
, dim.START_TIME = str.START_TIME
, DW_UPDATE_DATE = current_timestamp
, DW_UPDATE_USER = current_user
when not matched and metadata$action = 'INSERT' and metadata$isupdate = 'FALSE'
then INSERT (LOAD_ID, SHOW_ID, PROGRAM_ID, PROGRAM_NAME, PROGRAM_TAGS, HOST_NAMES, TAGLINE, START_TIME)
values (str.LOAD_ID, str.SHOW_ID, str.PROGRAM_ID, str.PROGRAM_NAME, str.PROGRAM_TAGS, str.HOST_NAMES,
str.TAGLINE, str.START_TIME);
Check out the Code
All the code here is published and open source on my Github account.
Conclusion
Streams help hide complexity in maintaining position on tables; in a simple case, it is easy to see how they work.
I see streams as particularly valuable in "Real World" examples where we need to sync multiple streams into a single consumer table. The streams will ensure we are not doing more work than we need by querying data we've already processed. Additionally, they will simplify our merge SQL so that we can focus on building business logic and not debugging SQL.
Comments