XML Doc and Blob Field

Goals

  1. Query XML docs in HDFS - each XML doc contain one or more records.

  2. Write XML doc into a blob field of a RDBMS table

  3. Query XML stored in a blob field in a RDBMS table

Here I am using spark 2.1.1

Query XML data in HDFS

Upload the attached xml file in HDFS /user/cloudera/docs directory

Start spark shell with xml package support

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.11:0.4.1

In spark write the following code to retrieve nested data from xml

val docs = spark

.read

.format("xml")

.option("rowTag", "FIToFICstmrCdtTrf") //rowTag is a record identifier in the xml doc

.load("/user/cloudera/docs")

.select("CdtTrfTxInf.PmtId.InstrId", "CdtTrfTxInf.PmtId.EndToEndId", "CdtTrfTxInf.PmtId.TxId")

Select fields from nested XML structure using . operator

Note:

- if you have a record per file, put the root tag of the xml as a rowTag

- if you have multiple records in a single xml file, put the record element tag as rowTag

One additional note, each file contains a full record, total number of partitions will be large. You may wish to coalesce the number of partitions to a smaller number.

docs.show(false)

+-------------------------------+-----------------------------------+-----------------------------------+

|InstrId |EndToEndId |TxId |

+-------------------------------+-----------------------------------+-----------------------------------+

|20160516021200201BFFF0000000001|E2E-TxPrefix20160513223041391000000|TxPrefix....20160513223041391000000|

+-------------------------------+-----------------------------------+-----------------------------------+

Write XML doc into a blob field of a RDBMS table

Suppose you stored individual transaction record as separate files in /user/cloudera/docs directory. You want to load each xml into a blob field of a RDBMS. In fact you can use any database including nosql databases for the same purpose.

Login to mysql and create a table with blob field under a database, tnx.

$ mysql -uroot -p

<enter password cloudera when prompted>

mysql> create database tnx;

mysql> use tnx;

mysql> create table docs(name varchar(255) primary key, body blob);

From spark load individual file as one record of RDD and

val dbOptions = Map(

"url" -> "jdbc:mysql://localhost:3306/tnx",

"driver" -> "com.mysql.jdbc.Driver",

"dbtable" -> "docs",

"user" -> "root",

"password" -> "cloudera"

)

sc

.wholeTextFiles("/user/cloudera/docs") //Load each file as record. Name of file becomes a key and content is the value in the form of String

.toDF("name", "body") // Convert the tuple RDD into data frame so that we can apply udf

.select(col("name"), encode(col("body"), "UTF-8").alias("body")) // Encode the string value into binary using UTF8 encoding

.write

.format("jdbc") // JDBC connector to connect to RDBMS data source

.mode("append") //overwrite will truncate and load

.options(dbOptions) // DB options

.save()

Query XML stored in Blob field of a RDBMS table

Load the docs table from tnx db.

val docs = spark

.read

.format("jdbc")

.options(dbOptions)

.load()

View schema of docs

docs.printSchema

root

|-- name: string (nullable = true)

|-- body: binary (nullable = true)

Define a case class to hold the parsed fields from XML document.

case class CDT(

InstrId:String,

EndToEndId:String,

TxId:String)

Define a UDF to extract fields from the XML content

import org.apache.spark.sql.functions.udf

import scala.xml.XML

def toCDT = udf{(body:String) =>

val row = XML.loadString(body)

((row \\ "InstrId").text, (row \\ "EndToEndId").text, (row \\ "TxId").text)

}

Now you can use the toCDT UDF to parse xml doc

docs

.select(col("name"), decode(col("body"), "UTF-8").alias("body")) // Decoding the UTF encoded binary field type into String

.select(col("name"), toCDT(col("body")).alias("CDT"))

.show(false)

+---------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+

|name |CDT |

+---------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+

|hdfs://quickstart.cloudera:8020/user/cloudera/xml/pacs.008.001.06.xml|[20160516021200201BFFF0000000001,E2E-TxPrefix20160513223041391000000,TxPrefix....20160513223041391000000]|

+---------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+