Skip to main content

Why You Should Start Writing Spark Custom Native Functions

· 8 min read

One of the first things that people try when they need to do something that doesn’t come out of the box in Spark is to write a UDF, a User Defined Function, that allows them to achieve that functionality they’re looking for, but is that the best way of doing it? What are the performance implications of writing a UDF?

This post will be looking at implementing a function that returns a UUID with two different approaches by using a UDF and writing Custom Spark-Native code, and comparing their performance.

Let’s get started!

This post will follow the next structure.

  • Introduction
  • A quick preface to UUIDs
  • UUID implementation with UDFs
  • UUID implementation with Catalyst Expressions
  • Performance Comparison
  • Conclusion
  • Resources

Note 1: This will only work using scala.

Note 2: The UUID() function is available in Spark since version 3.0.0, but implementing it is still a useful exercise due to its simplicity.

You can find the code: https://github.com/imdany/spark_catalyst_udf


Introduction

If you have done some work with Spark, you know that there are cases where Spark itself doesn’t provide the functionality you require, so you need to expand it.

Usually, you can do this by writing a UDF that does the job. But did you know that there is another alternative? They are called Catalyst Expressions, and I have to say that they are not straightforward to write, but (spoiler alert) they could bring your application to another level of performance.

So let’s get down to business!

A quick introduction to UUIDs

UUID stands for Universally Unique Identifier, and it’s a pretty common way of generating a unique string that can identify a piece of data. There are multitudes of different implementations of this mechanism, but the most regularly used is the UUID Version-4, and the IDs look like this:

c896f39a-6001–4e62–9296-a323bee9b047

The critical point of this version is that the bits that comprise the identifier are generated randomly and with no inherent logic. Because of this, there is no way to identify information about the source by only looking at the UUID.

Another important aspect when generating IDs is called “collisions”, which means having a duplicated string during the generation. Quoting from Wikipedia:

“The number of random version-4 UUIDs which need to be generated to have a 50% probability of at least one collision is 2.71 quintillion… This number is equivalent to generating 1 billion UUIDs per second for about 85 years. A file containing this many UUIDs, at 16 bytes per UUID, would be about 45 exabytes. exabytes

So we can say that it’s implausible that we face this issue.

Java/Scala already has this functionality implemented in the class java.util.UUID. This class provides a method called randomUUID() (you can take a look at the source code here, http://hg.openjdk.java.net/jdk8/…/UUID.java#l141) that generates the UUID for us, but… how do we access that functionality from Spark?

Let’s examine how to implement this UUID generator in Spark using both a UDF and a Catalyst expression approach.

UUID implementation with UDFs

Implementing a UUID generator using UDFs is straightforward, and it’s a piece of code that I have probably seen in all the projects that I have worked on.

With some variations or different syntax, one way that we can write this function is:

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf

def uuid: UserDefinedFunction = udf(() => java.util.UUID.randomUUID().toString)

spark.sqlContext.udf.register("uuid", uuid)

This way, we can use the UUID function either with SQL expressions and with the Dataframe API.

The main point here is that we’re using an existing Java function but not in Spark. To use that from Spark, we can wrap that code around the UDF method Spark provides and then register that UDF if we want to use it in the SQL API.

Easy, right? And you can use this approach for many, many different things. It’s easy to do, and it works, but there are other ways of extending the spark functionality.

UUID implementation with Catalyst Expressions

Writing a catalyst expression can be more complex than a UDF, but as you’ll see in the next section, there are some performance advantages of doing it. Let’s start with the basics, how we write them:

To write a custom function in Spark, we need at least two files: the first one will implement the functionality by extending the Catalyst functionality. The second one will make that function available.

Catalyst Expression

The file that contains the implementation of the code needs to be created in a particular package, that is:

org.apache.spark.sql.catalyst.expressions

Therefore, we need to create the file for our function in that folder in our Spark project.

Location of the file

And here is when things can get quite elaborated. To make Spark use our functions, we need to extend the available interface. There are many different interfaces that we could implement, and finding the right one can be complex as the documentation around this is not abundant. To simplify things, let’s analyze what we want to achieve — A function that has no input parameters and returns a string.

I found that I need to implement a “Leaf Expression”, so my class will start like this:

case class Uuid() extends LeafExpression with CodegenFallback {    
override def nullable: Boolean = ???
override def eval(input: InternalRow): Any = ???
override def dataType: DataType = ???
}

So let’s fill that definition, starting with the easy methods:

For dataType, as we want to return a string:

override def dataType: DataType = StringType

And for the nullable, as we don’t want to return nulls from our function:

override def nullable: Boolean = false

The last bit, “eval”, is the actual evaluation of the function that will generate the UUIDs.

override def eval(input: InternalRow): Any = UTF8String.fromString(java.util.UUID.randomUUID().toString)

That’s it! The only unusual thing that you may notice is the UTF8String.fromString(). If you try to run the code without that, you’ll see:

java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String The reason for calling that method is because Spark uses it to convert an “External String” into a “Spark String” https://github.com/apache/spark/…/UTF8String.java#L49

The final code looks like this:

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String

case class Uuid() extends LeafExpression with CodegenFallback {
override def nullable: Boolean = false
override def dataType: DataType = StringType
override def eval(input: InternalRow): Any = UTF8String.fromString(java.util.UUID.randomUUID().toString)
}

Easy right? Well, in this case, yes, this was an easy implementation, but it’s not typically as easy as this.

Function wrapper

Now that we have the catalyst expression written, we need to make it available to the Dataframe API. To do that, we need to create a file. The location of this file is not as important as the previous one, but to get things sorted, I usually put it in:

org.apache.spark.sql

Location of the Wrapper file

And this time, I have called it CustomFunctions, and it needs to define the following content:

object CustomFunctions { 
private def withExpr(expr: Expression): Column = Column(expr)
def UUID_CUSTOM(): Column = withExpr {
Uuid()
}
}

With this code, we’re making available the function Uuid through the object CustomFunctions.

Usage

The final question is, how do we use this function? The answer is quite easy!

We need to import it as we’ll do with any other function:

import org.apache.spark.sql.CustomFunctions.UUID_CUSTOM

and use it in our dataframe:

.withColumn("uuid", UUID_CUSTOM())

Performance Comparison

The question that you may be asking yourself is, is all of these really worth it? Well, let’s take a look at the numbers.

I have run the same code using the UDF and the catalyst expression on a dataframe with different sizes, and the results are pretty interesting.

// UDF version
val data = spark.range(nRows).toDF("ID").withColumn("uuid_udf", expr("uuid_udf()"))
data.write.format("parquet").mode("overwrite").save(s"/tmp/test/UDF/${runID}")

--------

// Catalyst Version
val data = spark.range(nRows).toDF("ID").withColumn("uuid", UUID_CUSTOM())
data.write.format("parquet").mode("overwrite").save(s"/tmp/test/catalyst/${runID}")

I have run each function with 4 different numbers of rows and 100 times each combination, and then I got the average of those times, and the result is this:

Performance comparison (lower is better)

For small dataframes, the difference is not very noticeable…. but when you increase the size of the dataframe, you can start seeing how the Catalyst expression perform much better than the UDF.

Conclusion

So should I stop using UDFs and start writing Catalyst expressions?

I cannot answer that for you as it will depend on many different aspects, like time, resources available or knowledge.

But what is clear from the results of the tests is that if you need a high performant application or reduce the execution time of your job, you should consider taking a look at how to write these kinds of catalyst expressions.

Resources

I haven’t been able to find much documentation regarding Catalyst Expressions and their implementation. So, if you want to deep dive into this, I recommend you to take a look at the spark source code and look for existing functions that achieve a similar thing to what you’re trying to do and implement yours based on that:

Examples:

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala

Expressions:

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala


Thanks for reading!