Writing Spark dataframes to file with a size limit
Problem
You’re working with Spark and you need to write a dataframe to files with a maximum size.
Solution
The main idea is that you will need to write twice, the first one to get the total size of the computation, and the second write with the correct partitioning so you get the desired output.
Code example
First we write the dataframe to a staging location:
// df defined previously with out computation
val outputPath = "/tmp/output/" // Final path
val stgPath = "/tmp/staging/" // Staging path
val outputSizeLimitMB = 256 // We want files of max 256MB
df.write.format("json").save(stgPath)
Now you need to get the size of the files in the folder. You may need to change this depending on the file system that you're using, some examples could be:
- DBFS (Databricks):
val dirSizeInBytes = dbutils.fs.ls(stgPath).map(x => x.size).sum
- HDFS:
val fileSystem = FileSystem.get(sc.hadoopConfiguration)
val dirSizeInBytes = fileSystem.getContentSummary(stgPath).getLength
- Locally (Who knows!)
val dirSizeInBytes = FileUtils.sizeOfDirectory(new File(stgPath))
- Cloud Storage
For getting the size of a folder in a S3 bucket or a Azure Storage Account, you have a few options.
- If you’re working in databricks, just use the DBFS option listed before.
- Use the official APIs to get the size of the folders. All these cloud providers have a SDK that allows you to interact with the Storage and retrieve the details.
In this example, the output location looks like this:
With the size of the staging files, you can now calculate the number of partitions that you need. For that you can do something like this:
val numFiles = math.ceil(dirSizeInBytes.toFloat / (outputSizeLimitMB*1024*1024).toFloat).toInt
- Using math.ceil to get to round up the value, as the number of pertitions needs to be an int.
- The 1024*104 is the conversion from Bytes to MegaBytes, as the size of the folder is given in Bytes.
And finally use the numFiles
to repartition the Dataframe so each partition will end up having the desired size. Notice that we’re reading the data from the stgPath so you don’t have to reprocess the data again!
val finalDF = spark.read.format("json").load(stgPath)
finalDF.coalesce(numFiles).write.format("json").save(outputPath)
And that’s it, you’ll have with the output data in chunks of smaller size than the specified amount. Check the output:
And don’t forget to remove the staging data!