XML Doc and Blob Field

Goals

  1. Query XML docs in HDFS - each XML doc contain one or more records.
  2. Write XML doc into a blob field of a RDBMS table
  3. Query XML stored in a blob field in a RDBMS table

Here I am using spark 2.1.1

Query XML data in HDFS

Upload the attached xml file in HDFS /user/cloudera/docs directory

Start spark shell with xml package support

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.11:0.4.1

In spark write the following code to retrieve nested data from xml

val docs = spark
.read
.format("xml")
.option("rowTag", "FIToFICstmrCdtTrf") //rowTag is a record identifier in the xml doc
.load("/user/cloudera/docs")
.select("CdtTrfTxInf.PmtId.InstrId", "CdtTrfTxInf.PmtId.EndToEndId", "CdtTrfTxInf.PmtId.TxId") 

Select fields from nested XML structure using . operator

Note:

- if you have a record per file, put the root tag of the xml as a rowTag

- if you have multiple records in a single xml file, put the record element tag as rowTag

One additional note, each file contains a full record, total number of partitions will be large. You may wish to coalesce the number of partitions to a smaller number.

docs.show(false)
+-------------------------------+-----------------------------------+-----------------------------------+
|InstrId                        |EndToEndId                         |TxId |
+-------------------------------+-----------------------------------+-----------------------------------+
|20160516021200201BFFF0000000001|E2E-TxPrefix20160513223041391000000|TxPrefix....20160513223041391000000|
+-------------------------------+-----------------------------------+-----------------------------------+

Write XML doc into a blob field of a RDBMS table

Suppose you stored individual transaction record as separate files in /user/cloudera/docs directory. You want to load each xml into a blob field of a RDBMS. In fact you can use any database including nosql databases for the same purpose.

Login to mysql and create a table with blob field under a database, tnx.

$ mysql -uroot -p
<enter password cloudera when prompted>
 
mysql> create database tnx;
mysql> use tnx;
mysql> create table docs(name varchar(255) primary key, body blob);

From spark load individual file as one record of RDD and

val dbOptions = Map(
    "url" ->  "jdbc:mysql://localhost:3306/tnx",
    "driver" -> "com.mysql.jdbc.Driver",
    "dbtable" -> "docs",
    "user" -> "root",
    "password" -> "cloudera"
)
sc
.wholeTextFiles("/user/cloudera/docs") //Load each file as record. Name of file becomes a key and content is the value in the form of String 
.toDF("name", "body") // Convert the tuple RDD into data frame so that we can apply udf
.select(col("name"), encode(col("body"), "UTF-8").alias("body")) // Encode the string value into binary using UTF8 encoding
.write
.format("jdbc") // JDBC connector to connect to RDBMS data source
.mode("append") //overwrite will truncate and load
.options(dbOptions) // DB options
.save() 

Query XML stored in Blob field of a RDBMS table

Load the docs table from tnx db.

val docs = spark
.read
.format("jdbc")
.options(dbOptions)
.load()

View schema of docs

docs.printSchema
root
|-- name: string (nullable = true)
|-- body: binary (nullable = true)

Define a case class to hold the parsed fields from XML document.

case class CDT(
    InstrId:String,
    EndToEndId:String,
    TxId:String)

Define a UDF to extract fields from the XML content

import org.apache.spark.sql.functions.udf
import scala.xml.XML
def toCDT = udf{(body:String) => 
    val row = XML.loadString(body)
    ((row \\ "InstrId").text, (row \\ "EndToEndId").text, (row \\ "TxId").text)
}

Now you can use the toCDT UDF to parse xml doc

docs
.select(col("name"), decode(col("body"), "UTF-8").alias("body")) // Decoding the UTF encoded binary field type into String 
.select(col("name"), toCDT(col("body")).alias("CDT"))
.show(false)  

+---------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+

|name |CDT |

+---------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+

|hdfs://quickstart.cloudera:8020/user/cloudera/xml/pacs.008.001.06.xml|[20160516021200201BFFF0000000001,E2E-TxPrefix20160513223041391000000,TxPrefix....20160513223041391000000]|

+---------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+