XML Doc and Blob Field


  1. Query XML docs in HDFS - each XML doc contain one or more records.
  2. Write XML doc into a blob field of a RDBMS table
  3. Query XML stored in a blob field in a RDBMS table

Here I am using spark 2.1.1

Query XML data in HDFS

Upload the attached xml file in HDFS /user/cloudera/docs directory

Start spark shell with xml package support

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.11:0.4.1

In spark write the following code to retrieve nested data from xml

val docs = spark
.option("rowTag", "FIToFICstmrCdtTrf") //rowTag is a record identifier in the xml doc
.select("CdtTrfTxInf.PmtId.InstrId", "CdtTrfTxInf.PmtId.EndToEndId", "CdtTrfTxInf.PmtId.TxId") 

Select fields from nested XML structure using . operator


- if you have a record per file, put the root tag of the xml as a rowTag

- if you have multiple records in a single xml file, put the record element tag as rowTag

One additional note, each file contains a full record, total number of partitions will be large. You may wish to coalesce the number of partitions to a smaller number.

|InstrId                        |EndToEndId                         |TxId |

Write XML doc into a blob field of a RDBMS table

Suppose you stored individual transaction record as separate files in /user/cloudera/docs directory. You want to load each xml into a blob field of a RDBMS. In fact you can use any database including nosql databases for the same purpose.

Login to mysql and create a table with blob field under a database, tnx.

$ mysql -uroot -p
<enter password cloudera when prompted>
mysql> create database tnx;
mysql> use tnx;
mysql> create table docs(name varchar(255) primary key, body blob);

From spark load individual file as one record of RDD and

val dbOptions = Map(
    "url" ->  "jdbc:mysql://localhost:3306/tnx",
    "driver" -> "com.mysql.jdbc.Driver",
    "dbtable" -> "docs",
    "user" -> "root",
    "password" -> "cloudera"
.wholeTextFiles("/user/cloudera/docs") //Load each file as record. Name of file becomes a key and content is the value in the form of String 
.toDF("name", "body") // Convert the tuple RDD into data frame so that we can apply udf
.select(col("name"), encode(col("body"), "UTF-8").alias("body")) // Encode the string value into binary using UTF8 encoding
.format("jdbc") // JDBC connector to connect to RDBMS data source
.mode("append") //overwrite will truncate and load
.options(dbOptions) // DB options

Query XML stored in Blob field of a RDBMS table

Load the docs table from tnx db.

val docs = spark

View schema of docs

|-- name: string (nullable = true)
|-- body: binary (nullable = true)

Define a case class to hold the parsed fields from XML document.

case class CDT(

Define a UDF to extract fields from the XML content

import org.apache.spark.sql.functions.udf
import scala.xml.XML
def toCDT = udf{(body:String) => 
    val row = XML.loadString(body)
    ((row \\ "InstrId").text, (row \\ "EndToEndId").text, (row \\ "TxId").text)

Now you can use the toCDT UDF to parse xml doc

.select(col("name"), decode(col("body"), "UTF-8").alias("body")) // Decoding the UTF encoded binary field type into String 
.select(col("name"), toCDT(col("body")).alias("CDT"))


|name |CDT |