Create UDF
There are 2 types of UDF in Spark SQL
For dataframes that can be used in language integrated queries such as select, agg etc.
For Spark SQL
Create UDF for Dataframe function
scala> import java.util.Calendar
scala> import org.apache.spark.sql.functions.udf
scala> def age = udf((year:Int) => Calendar.getInstance().get(Calendar.YEAR) - year)
scala> val employees = sqlContext.read.format("json").load("/Volumes/SONY/Data/people.json")
scala> employees.printSchema
root
|-- dob: string (nullable = true)
|-- firstname: string (nullable = true)
|-- gender: string (nullable = true)
|-- lastname: string (nullable = true)
|-- middlename: string (nullable = true)
|-- salary: long (nullable = true)
|-- ssn: string (nullable = true)
employees.show(5)
scala> employees.show(5)
+----------+---------+------+---------+----------+------+-----------+
| dob|firstname|gender| lastname|middlename|salary| ssn|
+----------+---------+------+---------+----------+------+-----------+
|1940-08-06| Dacia| F|Samborski| Rosella|274357|932-39-7400|
|1964-01-31| Loria| F| Cassino| Suzie|166618|940-40-2137|
|1936-06-02|Lashaunda| F| Rockhill| Markita|185766|923-83-5563|
|1971-09-25| Candace| F| Goike| Marcy| 92497|935-40-2967|
|1926-06-29| Marhta| F| Bonin| Filomena| 40013|968-22-1158|
+----------+---------+------+---------+----------+------+-----------+
scala> employees.withColumn("age", age(year(to_date($"dob")))).show(5)
+----------+---------+------+---------+----------+------+-----------+---+
| dob|firstname|gender| lastname|middlename|salary| ssn|age|
+----------+---------+------+---------+----------+------+-----------+---+
|1940-08-06| Dacia| F|Samborski| Rosella|274357|932-39-7400| 76|
|1964-01-31| Loria| F| Cassino| Suzie|166618|940-40-2137| 52|
|1936-06-02|Lashaunda| F| Rockhill| Markita|185766|923-83-5563| 80|
|1971-09-25| Candace| F| Goike| Marcy| 92497|935-40-2967| 45|
|1926-06-29| Marhta| F| Bonin| Filomena| 40013|968-22-1158| 90|
+----------+---------+------+---------+----------+------+-----------+---+
Here is one more example:
scala> def ageGroup = udf((age:Int) => age / 10)
scala> employees.withColumn("age", age(year(to_date($"dob")))).withColumn("ageGroup",ageGroup($"age")).show(5)
+----------+---------+------+---------+----------+------+-----------+---+--------+
| dob|firstname|gender| lastname|middlename|salary| ssn|age|ageGroup|
+----------+---------+------+---------+----------+------+-----------+---+--------+
|1940-08-06| Dacia| F|Samborski| Rosella|274357|932-39-7400| 76| 7|
|1964-01-31| Loria| F| Cassino| Suzie|166618|940-40-2137| 52| 5|
|1936-06-02|Lashaunda| F| Rockhill| Markita|185766|923-83-5563| 80| 8|
|1971-09-25| Candace| F| Goike| Marcy| 92497|935-40-2967| 45| 4|
|1926-06-29| Marhta| F| Bonin| Filomena| 40013|968-22-1158| 90| 9|
+----------+---------+------+---------+----------+------+-----------+---+--------+
UDF for Spark SQL
def age_sql = {(year:Int) => Calendar.getInstance().get(Calendar.YEAR) - year}
sqlContext.udf.register("age", age_sql(_:Int))
scala> sql("select dob, age(year(to_date(dob))) from employees").show(5)
+----------+---+
| dob|_c1|
+----------+---+
|1940-08-06| 76|
|1964-01-31| 52|
|1936-06-02| 80|
|1971-09-25| 45|
|1926-06-29| 90|
+----------+---+
Note:
You can define the UDF functions to take multiple columns
You can also write UADF (User Defined Aggregate Function). Follow this link for an example https://databricks.com/blog/2015/06/02/statistical-and-mathematical-functions-with-dataframes-in-spark.html