Introduction
Apache Spark is a powerful tool for processing data, but I've found it's easy to get stuck when learning the technology. If a problem confounds me, then I lack an understanding of the fundamentals behind the problem.
In the words of Sir Isaac Newton:
A Vulgar Mechanick can practice what he has been taught or seen done, but if he is in an error he knows not how to find it out and correct it, and if you put him out of his road he is at a stand. Whereas he that is able to reason nimbly and judiciously about figure, force, and motion, is never at rest till he gets over every rub. -- Sir Isaac Newton, 1694
The language is archaic, but Vulgar Mechanick is often our lot when working with the powerful analytical tools of the giants of our industry. When I attempted to create a Machine Learning Transformation with AWS Glue, I found myself as a Vulgar Mechanick, unable to find out the errors and correct them, mostly because I didn't understand Spark.
This article uses DBLP Benchmark Data Set as example data and PySpark to create data structures for a SQL engine.
I'm choosing PySpark as my platform because Python is standard for most professional teams, and I choose AWS because I know it well. However, I am keen to pay attention to the details of creating from scratch rather than by rote to communicate true understanding.
If you get stuck in this article, then go back to basics. You will need to know Python, and you will need to have a local installation of Spark on your PyCharm IDE. You will also need to know about file formats. Here is a good article: Big Data File Formats Explained. I will be going through step-by-step the process of running Spark to perform a machine learning transformation.
PySpark Transformations by Example
Transform the Deck of Cards Data into Parquet
A simple basic solution is the foundation of solid understanding. Transforming a row-oriented format like CSV into a column-oriented format for SQL querying is a common Spark transformation. A deck of cards is a simple data structure that we can load and then query.
Make sure you have a Spark installation. You can follow the excellent blog post: How to use PySpark in PyCharm IDE.
The following PySpark code transforms the data into a data frame, and we store that data frame as Parquet. After the code runs, it will create a folder containing a column-oriented representation of the CSV format ready for analysis.
import shutil
from pyspark import SparkContext
from pyspark.sql import SQLContext
if __name__ == "__main__":
sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)
output_folder = '../../data/out/deckofcards.parquet'
try:
shutil.rmtree(output_folder)
except OSError as e:
print("Warning: %s : %s" % (output_folder, e.strerror))
df = sqlContext.read.csv("../../data/cards/deckofcards.txt",
inferSchema=True,
header=True,
sep="|")
df.write.parquet(output_folder)
Transform a Realistic CSV File into Parquet
The ACM data for author publications represents a typical CSV file encountered in Data Science. It will use comma-separated values and double quotes to separate fields. The following code will transform it into Parquet.
import shutil
from pyspark import SparkContext
from pyspark.sql import SQLContext
if __name__ == "__main__":
sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)
output_folder = '../../../data/out/dblp_acm_records.parquet'
try:
shutil.rmtree(output_folder)
except OSError as e:
print("Warning: %s : %s" % (output_folder, e.strerror))
df = sqlContext.read.csv("../../../data/scholar/dblp_acm_records.csv",
inferSchema=True,
header=True,
quote="\"",
escape="\"")
df.write.parquet(output_folder)
Using AWS Athena
AWS Athena allows for SQL queries against the data. Synchronize the folders created with the *.parquest suffix to S3.
aws s3 sync out/ s3://${S3_DATA_BUCKET}/stage/dblp-scholar/parquet/ --include="*.parquet"
The Athena SQL points to the data in S3.
-- Athena SQL for creating a Table from a Parquet Folder
create external table if not exists dblp_acm_record (
id string,title string,authors string,venue string,year int,source string
)
stored as parquet
location 's3://${S3_DATA_BUCKET}/stage/dblp-scholar/parquet/dblp_acm_records.parquet/';
Querying the data by year yields the structure.
SELECT year, count(*)
FROM "scholar-data"."dblp_acm_record"
GROUP BY year
ORDER BY year;
Understanding Spark through a Name-Matching Algorithm
Spark has "dataframes," but it isn't a fancy version of Pandas. It is for distributed processing of large data sets. I am abusing its power by using it to solve a toy problem, but I'm doing it with the intent of understanding it so that I am never at a stand.
Spark will partition the data sets I create and run them in parallel.
Say we have a merged list of names, and some of those names may have alternate spellings or typographic errors. A common way to clean such a list is by phonetic data matching using the Metaphone package.
The process is as follows:
Load the CSV data into a Spark Dataframe
Create a column in the dataset containing the double Metaphone
Join the table to itself with an inner join
Create unique ids for each row
Pick the best-match record for each name
Loading the data into a Spark data frame initiates the process.
sc = SparkContext(appName="TestMetaphone")
sql_context = SQLContext(sc)
df = sql_context.read.csv(f'{output_folder}/person.csv',
inferSchema=True,
header=True,
sep=",")
Create the column with the Metaphone library.
udf_df = fun.udf(personator.double_metaphone_sep, StringType())
df_metaphone = df.withColumn("metaphone", udf_df("first_name", "last_name"))
The double Metaphone column contains a phonetic representation of the name.
Donald Knuth --> TNLTKN0_TNLTKNT
Isaac Newton --> ASKNTN_
The join will match names that sound the same as potential duplicates. The ID function will set up for further refinement on the best-match stage.
df_matches = df_metaphone2\
.join(df_metaphone, "metaphone", "inner")\
.withColumn("id", fun.monotonically_increasing_id())
Comments