Skip to main content

How to define an UDFs in Spark with scala

Introduction

There are a few ways of defining UDFs in spark (scala), but this is the one I found most useful, as it allows you to separate the logic of the UDF from the actual spark wrapper, so you can easily test the logic of the function without having to invoke spark.

Definition

First, we need to define the function we want to use as an UDF. For this example, we will create a very simple function that gets a string and returns the same string but in upper case.

// Converts a string to Upper Case
def toUpperCase(text: String): String = {
text.toUpperCase
}

Then, we just need to wrap the function in the UDF that spark gives us:

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf

// Wrap previous function with UDF
def to_uppercase_udf: UserDefinedFunction = udf((s1: String) => toUpperCase(s1))

// Register UDF to be used in Spark SQL
spark.sqlContext.udf.register("to_uppercase_udf", to_uppercase_udf)

Usage

Once it has been defined, you can use it like this:

val data = Seq(("test 1"), ("test 2")).toDF("C1")

// With the Dataframe API
data.withColumn("C2", to_uppercase_udf(col("C1"))).show()

// With the SQL API
data.selectExpr("C1", "to_uppercase_udf(C1) as C2").show()

Testing

One of the benefits of splitting the function definition of the UDF wrapper is that we can write unit test to check that the function works fine without having to deal with spark:

@Test
def toUpperCase_test(): Unit = {
val inputText = "hello"
val expectedOutput = "HELLO"

val output = toUpperCase(inputText)

output should be (expectedOutput)
}