NashTech Insights

Build a lakehouse solution with AWS Glue

Vinh Phan Thanh
Vinh Phan Thanh
Table of Contents

Key Takeaways
The data lakehouse plays a pivotal role in today’s data landscape by addressing the challenges of handling big data, enabling real-time analytics, fostering data exploration, supporting advanced analytics and machine learning, ensuring data governance, promoting agility, and offering cost efficiency. It empowers organizations to harness the full potential of their data and gain valuable insights for improved decision-making and business success. In this post, I will present how to build this Lakehouse solution using AWS Glue Service that you can use to accelerate building a Lakehouse architecture on AWS.

I. Data Lakehouse and its benefits

A data lakehouse is an abstract concept that combines the capabilities of a data lake and a data warehouse, providing a unified and flexible platform for storing, managing, and analyzing vast amounts of data. It represents a modern data architecture approach that aims to address the limitations and challenges of traditional data warehousing and data lake solutions

At its core, a data lakehouse serves as a centralized repository for storing structured, semi-structured, and unstructured data in its raw form. This allows organizations to capture and retain large volumes of diverse data types, including real-time streams, log files, social media data, and more.

It also incorporates features typically associated with data warehouses, such as ACID (Atomicity, Consistency, Isolation, Durability) transactions and support for SQL queries. This enables users to perform complex analytics, reporting, and ad-hoc querying on the data lake house, leveraging familiar SQL-based tools and interfaces

By combining the strengths of data lakes and data warehouses, a data lakehouse offers several advantages such as:

  • Unified Data Storage
  • Flexibility and Agility
  • Scalability
  • Cost Efficiency
  • Enhanced Data Governance
  • Advanced Analytics and Insights

II. Build Lakehouse solution using AWS Glue

1. Delta Lake Architecture

Delta Lake is an open-source storage layer that enables building a data lakehouse on top of existing storage systems over cloud objects with additional features like ACID properties, schema enforcement, and time travel features enabled. The stored data file includes three layers:

  • Bronze tables: This table contains the raw data ingested from multiple sources like the Internet of Things (IoT) systems, CRM, RDBMS, and JSON files
  • Silver tables: This layer contains a more refined view of our data after undergoing transformation and feature engineering processes.
  • Gold tables: This final layer is often made available for end users in BI reporting and analysis or use in machine learning processes

The three layers are referred to as a multi-hop architecture below:

image from databricks website

To simplify the process in this post, I assume that we have extracted some raw CSV files (for instance, category.csv) from data sources and put in Bronze layer (raw data formats could be csv, xml, json, parquet, log files, IoT….) and then using AWS Glue to transform and load data into dim_category folder also a table in Gold layer like the architecture below:

architechte

Note: Depend on data complexity in real world, we could proceed one more step to refine data (Silver layer) before loading into Gold layer as you see in the diagram above.

2. Preparation

Step 1: On S3, create a nashtechbucket bucket and folder structure as below:

structure folder

Step 2: Copy Category.csv file to Bronze folder

category file

Step 3: Download Delta.IO library (delta-core_2.12-1.0.1.jar) from link and put into jar folder

Note: Depend on Glue version, we MUST choose a Delta.IO library respectively. In my case, I am using Glue 3.0 so delta-core_2.12-1.0.1.jar is the right one. For more detail, go to the this link

3. Build Glue Job

Step 1: In aws console, search Glue service -> select ETL jobs

Step 2: Select Spark script editor -> Click “Create” button

Step 3: Create script as below

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from delta import *
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,TimestampType
import uuid
import logging
from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.functions import sha2, concat_ws
from pyspark.sql.functions import lit
from dateutil.parser import parse

#Create table in lake house(S3)
def CreateTable(spark, p_path, p_dbname, p_tablename):
    db_path = p_path + "/" + p_dbname + "/"
    spark.sql("CREATE DATABASE IF NOT EXISTS {0} LOCATION '{1}'".format(p_dbname,db_path))
    
    table_path = db_path + p_tablename + "/"  
    spark.sql("CREATE OR REPLACE TABLE {0}.{1} \
            ( \
                cat_key BIGINT,  \
                catid INT, \
                catgroup STRING, \
                catname STRING, \
                catdesc STRING, \
                update_date TIMESTAMP, \
                run_date STRING, \
                batch_id INT, \
                hash STRING \
            ) \
            USING delta \
            LOCATION '{2}'".format(p_dbname, p_tablename,table_path))
     
    spark.sql("""INSERT INTO {0}.{1} VALUES(-1,-1,'unknown','unknown','unknown','1970-10-01 00:00:00','1970-01-01', 0,'NULL')""".format(p_dbname, p_tablename))
    
    deltaTable = DeltaTable.forPath(spark, table_path)
    deltaTable.generate("symlink_format_manifest")
    return True
    
#Using Merge command to update & insert data
def UpSertTable(spark, df_Source, p_path, p_dbname, p_tablename):
    df_Source.createOrReplaceTempView("stg_category")
    spark.sql("""
        
            MERGE INTO {0}.{1} AS trg
            USING
            (
                SELECT row_number() OVER (PARTITION BY CASE WHEN dm.catid IS NULL THEN 1 ELSE 0 END ORDER BY stg.catid) as row_num,
                       (SELECT CASE WHEN MAX(cat_key)<0 THEN 0 
                                    ELSE MAX(cat_key) END FROM {0}.{1}) AS maxid,
                       stg.catid,
                       stg.catgroup,
                       stg.catname,
                       stg.catdesc,
                       stg.run_date,
                       stg.batch_id,
                       stg.hash_value
                FROM stg_category stg
                LEFT JOIN {0}.{1} dm ON stg.catid = dm.catid
            ) AS scr ON scr.catid = trg.catid --and scr.hash_value <> trg.hash
            WHEN MATCHED THEN
                    UPDATE SET
                                catid = scr.catid,
                                catgroup = scr.catgroup,
                                catname = scr.catname,
                                catdesc = scr.catdesc,
                                run_date = scr.run_date,
                                batch_id = scr.batch_id,
                                hash = scr.hash_value,
                                update_date = current_date()
            WHEN NOT MATCHED THEN 
                    INSERT(cat_key, catid,catgroup,catname,catdesc,update_date,run_date,batch_id,hash)
                    VALUES(scr.row_num + scr.maxid, scr.catid,scr.catgroup,scr.catname,scr.catdesc,current_date(),scr.run_date,scr.batch_id,scr.hash_value)  
    """.format(p_dbname, p_tablename))
    
    table_path = p_path + "/" + p_dbname + "/" + p_tablename + "/" 
    deltaTable = DeltaTable.forPath(spark, table_path)
    deltaTable.generate("symlink_format_manifest")
    return True

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
#spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

#Create spark instance
spark = SparkSession \
    .builder \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

#Declare variable here, you can pass them as parameters
var_path ='s3a://nashtechbucket/lake/gold'
var_target_database = 'data_warehouse'
var_target_table ='dim_category'
var_run_date = datetime.now().strftime("%Y-%m-%d")
var_batch_id = 1

try:
    table_path = var_path +"/" + var_target_database + "/" + var_target_table + "/"
    if DeltaTable.isDeltaTable(spark,table_path)==False:
        CreateTable(spark, var_path, var_target_database, var_target_table)
    
    ###Read data source    
    dfBronzeCategory = glueContext.create_dynamic_frame.from_options(
    format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
    connection_type="s3",
    format="csv",
    connection_options={"paths": ["s3://nashtechbucket/lake/bronze/"], "recurse": True},
    transformation_ctx="S3bucket_node1",
    )
    src_count = dfBronzeCategory.count()
   
    #Convet dynamic dataframe to dataframe
    dfBronzeCategory  = dfBronzeCategory.toDF()  
   
    #Do some transformations
    df_Category_NotNull = dfBronzeCategory.filter("catid is not NULL")
    #Create hash column
    df_Category_NotNull = df_Category_NotNull.withColumn("hash_value", sha2(concat_ws("||", *df_Category_NotNull.columns[1:]), 256))
    
    #Remove duplicated data
    df_Category_NotNull.createOrReplaceTempView("stg_category")
    sqlCategory = spark.sql("SELECT *, ROW_NUMBER() OVER (PARTITION BY hash_value ORDER BY catid DESC) as row_num FROM stg_category")
    new_sqlCategory = sqlCategory.filter("row_num == 1")
    
     #Add more 2 columns batch id and run_date
    new_sqlCategory = new_sqlCategory.withColumn('batch_id', lit(var_batch_id))
    df_Source = new_sqlCategory.withColumn('run_date', lit(var_run_date))
    
    trg_count = df_Source.count()
    UpSertTable(spark, df_Source, var_path, var_target_database, var_target_table)
    process_status = "SUCCEEDED"
    end_run = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
except Exception as e:
    error_msg = getattr(e, 'message', repr(e))
    process_status = "FAILED"
    end_run = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
    sys.exit(0)
job.commit()

Step 4: Select “Job details” tab, set configuration as below:

conf1
conf2
conf3
  • Note: Job parameters:
    • Key: — conf
    • Value: spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore –conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension –conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

Step 5: Save job

4. Run Glue Job

Step 1: To run job, select job and then click “Run” button. If running successfully, you can see the screenshot below:

result1

Step 2: Go to S3 to check data

result2

As you can see here, data is already loaded in to dim_category folder (also a table in lake house). In this folder, along with data files (parquet files), it also has a _delta_log/ folder which keeps the last checkpoint when we insert, update or delete data.

5. Query table in lakehouse

We can use another services such as Athena or Redshift Spectrum to query table in lakehouse. Here, I use Athena service to demonstrate how to play on it

In AWS console, select Athena and create the table as below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
CREATE EXTERNAL TABLE `dim_category`(
  `cat_key` bigint, 
  `catid` int, 
  `catgroup` string, 
  `catname` string, 
  `catdesc` string, 
  `update_date` timestamp, 
  `run_date` string, 
  `batch_id` int, 
  `hash` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION  's3://nashtechbucket/lake/gold/data_warehouse/dim_category/_symlink_format_manifest'
TBLPROPERTIES (
  'CrawlerSchemaDeserializerVersion'='1.0', 
  'CrawlerSchemaSerializerVersion'='1.0', 
  'classification'='parquet')

If run successfully, you can see the screenshot as below:

athena1

Query created table by using T-SQL

athena2

Continue modifying category.csv file in S3 by adding one more record and modify old data to new one and then rerun the Glue job.

athena3

Although just a few records in data source were updated but old data in table is still not duplicated (quite different from data lake mechanism). In addition, new records were updated and inserted. We gain this benefit because _delta_log/ folder keeps the last checkpoint. This is quite similar to what the relational database engines work.

III. Final thoughts

In this post, I illustrate how to build an ETL job that ingests data from data sources to data lakehouse using AWS Glue Service. There are still some rooms for improvement such as passing parameters, writing log, count number of records from sources and destination. But overall, it provided a general picture how to build a job from scratch. By taking advantage the lake house solution properly, not only does it help organizations to save significant cost but still adapt to their needs such as decision-making, advanced analytics, machine learning and AI.

NashTech have recently built modern data solutions using different tech stacks like AWS Redshift, Azure Synapse, and Databricks. If you are interested in this topic and would like to have a deep dive session on how to apply these solutions to accelerator your data journey, please access to Nashtech Accelerator (nashtechglobal.com) for more information and don’t hesitate to book a demo to discuss with our Data experts.

Vinh Phan Thanh

Vinh Phan Thanh

Vinh Phan is a Data Architect based in Vietnam, specializing in Performance tuning, Data Migration, Data warehousing, BI solutions, and Advanced analytics utilizing AI/ML. During his free time, he dedicates himself to researching emerging technologies, enjoying quality time with his family, reading books, and exploring new travel destinations.

Leave a Comment

Your email address will not be published. Required fields are marked *

Suggested Article

%d bloggers like this: