Skip to main content

How to archive processed data files in Spark Structured Streaming

Introduction

Only for Spark 3.2.0 +

One of the most common use cases when using Spark Structured Streaming is to listen to a directory for new files to arrive, so the stream will only read the new files and used them in the following transformations.

However, as the list of files in the directory starts to grow, you may face performance issues when starting the streams, as the process needs to detect which files have been processed (using the checkpoint). This could be especially problematic in cloud storage, like ADLS Gen 2 or S3, as listing the files in a container is an expensive operation (from both, a time and money perspective).

The most common solution to this problem is to archive files (to a lower-cost storage solution) as you have processed them, however, this can be tricky to achieve in a streaming process… as you may not know if a file has already been processed.

Lucky us, Spark introduced a solution to this problem in its version 3.2, the “cleanSource” option that you can add to the readStream method. Let’s take a look:

Example

Having a normal streaming query like this, that reads files from a folder, adds a timestamp, and writes the output to a new folder:

val mainPath = "/tmp/spark/hot/"
val outputPath = "/tmp/spark/output/"
val checkpointPath = "/tmp/spark/checkpoint/"

// Read files that are added to the folder
val data = spark.readStream
.format("csv")
.load(mainPath)
.withColumn("loadTime", current_timestamp())

// Write files to another directory
val write = query.writeStream
.format("csv")
.option("checkpointLocation", checkpointPath)
.option("path", outputPath)
.start()

write.awaitTermination()

You can easily archive the files that have already been read from the “mainPath” by simply adding these options:

val data = spark.readStream
.format("csv")
.option("cleanSource", "archive")
.option("sourceArchiveDir", archivePath)
.load(mainPath)

With this option, the files from the “mainPath” will be moved to the “archivePath” during the process.

There are other available options:

  • delete: it will remove the source files.
  • off: it’s the default value. It won’t do anything to the files.

Gif with example

Here you can see a quick gif showing the actual process: files are being generated in to the “hot” folder, the stream will load them and write the output into the “output” folder, and then spark will move the files to the “archive” folder.

Gif of streaming process archiving data

Notes:

  • The archive may not happen instantaneously, and it may take some time. Normally (from what I have seen) the files are moved after new files appear in the source folder.
  • This may slow down the process a little bit, but it may be a better option than listing the files in the folder.

Source:

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources