This article provides a concise overview of Autoloader in Databricks, which employs the concept of Structured Streaming. Let’s delve into an overview of Structured Streaming.
Overview of Structured Streaming
Apache Spark Structured Streaming is a processing engine that operates in near-real-time, ensuring end-to-end fault tolerance and exactly-once processing guarantees. It allows you to express computations on streaming data just like you would for batch computations on static data.
It enables the querying of an endless data source while automatically detecting and processing the results seamlessly into a designated destination.
The incoming datastream is treated as individual rows in a table, and this table, symbolizing an infinite data source, is referred to as an unbounded table.
What is an Autoloader? How it Works?
Auto Loader adeptly and incrementally processes newly arriving data files in cloud storage with efficiency. It seamlessly loads data files from various cloud platforms such as AWS S3 (s3://), Azure Data Lake Storage Gen2 (ADLS Gen2, abfss://), Google Cloud Storage (GCS, gs://), Azure Blob Storage (wasbs://), ADLS Gen1 (adl://), and Databricks File System (DBFS, dbfs:/).
Auto Loader accommodates a diverse range of file formats, including JSON, CSV, XML, PARQUET, AVRO, ORC, TEXT, and BINARYFILE.
Auto Loader introduces a Structured Streaming source named cloudFiles. When provided with an input directory path on cloud file storage, the cloudFiles source autonomously handles incoming files, processing them as they arrive.
Fetching data into a Spark DataFrame
df =
(spark.readStream # Establishing it as a streaming dataframe
.format(“cloudFiles”) # Specifying the use of AutoLoader for efficient file handling
.option(“cloudFiles.format”, “json”) # Expected data file format as JSON for AutoLoader
.schema(custom_schema) # Defining a tailored schema for the actual data file
.load(f”{data_location}/directory”) # Providing the directory path for the actual data file
)
How does Autoloader monitor data ingestion progress?
As the system identifies files, their metadata gets stored in a robust key-value store (RocksDB) located in the checkpoint destination of your Auto Loader pipeline.
In the event of failures, Auto Loader seamlessly picks up from its last checkpoint, utilizing the stored information.
(
spark.readStream
.format(“cloudFiles”)
.option(“cloudFiles.format”, “json”)
.schema(schema).load(“path”)
.writeStream.option(“checkpointLocation”, checkpoint_location)
.trigger(once=True)
.table(table_name)
)
Autoloader Trigger Modes
1. Specified Fixed Interval – Operates on data in microbatches at intervals specified by the user.
Method calling syntax – trigger(processingTime=”5” min)
2. SingleBatch – Processes the entire available dataset in a single batch operation.
Method calling syntax – trigger(once =True)
3. MicroBatches– Processes all available data in microbatches and then stops.
Method calling syntax – trigger(availableNow =True)
Auto Loader File Detection Modes
1. Directory Listing Mode – Auto Loader operates in Directory Listing mode as its default behavior. In this mode, the system identifies new files by scanning the input directory. Directory Listing mode enables the swift initiation of Auto Loader streams without the need for extensive permission configurations, except for access to your cloud storage data.
2. File Notification Mode – In this mode, Auto Loader takes charge by establishing both a notification service and a queue service. These services are designed to subscribe to file events originating from the input directory. Leveraging file notifications allows Auto Loader to effortlessly scale, capable of ingesting millions of files per hour. When juxtaposed with directory listing mode, file notification mode emerges as a more performant and scalable solution, particularly suitable for extensive input directories or handling a substantial volume of files. However, it’s essential to note that this mode necessitates additional cloud resources.
COPY INTO vs Autoloader
The COPY INTO SQL command in Databricks empowers users to seamlessly transfer data from a specified file location to a Delta table. Notably, this operation is designed to be both re-triable and idempotent. In simple terms, if there are files in the source location that have already been loaded, the command gracefully skips them.
Summary
Auto Loader, a powerful feature, facilitates incremental data ingestion, minimizing redundant processing. With its capability to seamlessly process billions of files, it ensures efficient scalability. Moreover, Auto Loader guarantees fault tolerance, enhancing the reliability of data ingestion processes in Databricks.
References
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html