Create UDF

There are 2 types of UDF in Spark SQL

  1. For dataframes that can be used in language integrated queries such as select, agg etc.
  2. For Spark SQL

Create UDF for Dataframe function

scala> import java.util.Calendar
scala> import org.apache.spark.sql.functions.udf
scala> def age = udf((year:Int) => Calendar.getInstance().get(Calendar.YEAR) - year)
scala> val employees = sqlContext.read.format("json").load("/Volumes/SONY/Data/people.json")
scala> employees.printSchema
root
 |-- dob: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- ssn: string (nullable = true)

employees.show(5)

scala> employees.show(5)
+----------+---------+------+---------+----------+------+-----------+
|       dob|firstname|gender| lastname|middlename|salary|        ssn|
+----------+---------+------+---------+----------+------+-----------+
|1940-08-06|    Dacia|     F|Samborski|   Rosella|274357|932-39-7400|
|1964-01-31|    Loria|     F|  Cassino|     Suzie|166618|940-40-2137|
|1936-06-02|Lashaunda|     F| Rockhill|   Markita|185766|923-83-5563|
|1971-09-25|  Candace|     F|    Goike|     Marcy| 92497|935-40-2967|
|1926-06-29|   Marhta|     F|    Bonin|  Filomena| 40013|968-22-1158|
+----------+---------+------+---------+----------+------+-----------+
scala> employees.withColumn("age", age(year(to_date($"dob")))).show(5)
+----------+---------+------+---------+----------+------+-----------+---+
|       dob|firstname|gender| lastname|middlename|salary|        ssn|age|
+----------+---------+------+---------+----------+------+-----------+---+
|1940-08-06|    Dacia|     F|Samborski|   Rosella|274357|932-39-7400| 76|
|1964-01-31|    Loria|     F|  Cassino|     Suzie|166618|940-40-2137| 52|
|1936-06-02|Lashaunda|     F| Rockhill|   Markita|185766|923-83-5563| 80|
|1971-09-25|  Candace|     F|    Goike|     Marcy| 92497|935-40-2967| 45|
|1926-06-29|   Marhta|     F|    Bonin|  Filomena| 40013|968-22-1158| 90|
+----------+---------+------+---------+----------+------+-----------+---+

Here is one more example:

scala> def ageGroup = udf((age:Int) => age / 10)
scala> employees.withColumn("age", age(year(to_date($"dob")))).withColumn("ageGroup",ageGroup($"age")).show(5)
+----------+---------+------+---------+----------+------+-----------+---+--------+
|       dob|firstname|gender| lastname|middlename|salary|        ssn|age|ageGroup|
+----------+---------+------+---------+----------+------+-----------+---+--------+
|1940-08-06|    Dacia|     F|Samborski|   Rosella|274357|932-39-7400| 76|       7|
|1964-01-31|    Loria|     F|  Cassino|     Suzie|166618|940-40-2137| 52|       5|
|1936-06-02|Lashaunda|     F| Rockhill|   Markita|185766|923-83-5563| 80|       8|
|1971-09-25|  Candace|     F|    Goike|     Marcy| 92497|935-40-2967| 45|       4|
|1926-06-29|   Marhta|     F|    Bonin|  Filomena| 40013|968-22-1158| 90|       9|
+----------+---------+------+---------+----------+------+-----------+---+--------+

UDF for Spark SQL

def age_sql = {(year:Int) => Calendar.getInstance().get(Calendar.YEAR) - year}
sqlContext.udf.register("age", age_sql(_:Int))
scala> sql("select dob, age(year(to_date(dob))) from employees").show(5) 
+----------+---+
|       dob|_c1|
+----------+---+
|1940-08-06| 76|
|1964-01-31| 52|
|1936-06-02| 80|
|1971-09-25| 45|
|1926-06-29| 90|
+----------+---+

Note:

  • You can define the UDF functions to take multiple columns
  • You can also write UADF (User Defined Aggregate Function). Follow this link for an example https://databricks.com/blog/2015/06/02/statistical-and-mathematical-functions-with-dataframes-in-spark.html