XML Doc and Blob Field
Goals
Query XML docs in HDFS - each XML doc contain one or more records.
Write XML doc into a blob field of a RDBMS table
Query XML stored in a blob field in a RDBMS table
Here I am using spark 2.1.1
Query XML data in HDFS
Upload the attached xml file in HDFS /user/cloudera/docs directory
Start spark shell with xml package support
$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.11:0.4.1
In spark write the following code to retrieve nested data from xml
val docs = spark
.read
.format("xml")
.option("rowTag", "FIToFICstmrCdtTrf") //rowTag is a record identifier in the xml doc
.load("/user/cloudera/docs")
.select("CdtTrfTxInf.PmtId.InstrId", "CdtTrfTxInf.PmtId.EndToEndId", "CdtTrfTxInf.PmtId.TxId")
Select fields from nested XML structure using . operator
Note:
- if you have a record per file, put the root tag of the xml as a rowTag
- if you have multiple records in a single xml file, put the record element tag as rowTag
One additional note, each file contains a full record, total number of partitions will be large. You may wish to coalesce the number of partitions to a smaller number.
docs.show(false)
+-------------------------------+-----------------------------------+-----------------------------------+
|InstrId |EndToEndId |TxId |
+-------------------------------+-----------------------------------+-----------------------------------+
|20160516021200201BFFF0000000001|E2E-TxPrefix20160513223041391000000|TxPrefix....20160513223041391000000|
+-------------------------------+-----------------------------------+-----------------------------------+
Write XML doc into a blob field of a RDBMS table
Suppose you stored individual transaction record as separate files in /user/cloudera/docs directory. You want to load each xml into a blob field of a RDBMS. In fact you can use any database including nosql databases for the same purpose.
Login to mysql and create a table with blob field under a database, tnx.
$ mysql -uroot -p
<enter password cloudera when prompted>
mysql> create database tnx;
mysql> use tnx;
mysql> create table docs(name varchar(255) primary key, body blob);
From spark load individual file as one record of RDD and
val dbOptions = Map(
"url" -> "jdbc:mysql://localhost:3306/tnx",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "docs",
"user" -> "root",
"password" -> "cloudera"
)
sc
.wholeTextFiles("/user/cloudera/docs") //Load each file as record. Name of file becomes a key and content is the value in the form of String
.toDF("name", "body") // Convert the tuple RDD into data frame so that we can apply udf
.select(col("name"), encode(col("body"), "UTF-8").alias("body")) // Encode the string value into binary using UTF8 encoding
.write
.format("jdbc") // JDBC connector to connect to RDBMS data source
.mode("append") //overwrite will truncate and load
.options(dbOptions) // DB options
.save()
Query XML stored in Blob field of a RDBMS table
Load the docs table from tnx db.
val docs = spark
.read
.format("jdbc")
.options(dbOptions)
.load()
View schema of docs
docs.printSchema
root
|-- name: string (nullable = true)
|-- body: binary (nullable = true)
Define a case class to hold the parsed fields from XML document.
case class CDT(
InstrId:String,
EndToEndId:String,
TxId:String)
Define a UDF to extract fields from the XML content
import org.apache.spark.sql.functions.udf
import scala.xml.XML
def toCDT = udf{(body:String) =>
val row = XML.loadString(body)
((row \\ "InstrId").text, (row \\ "EndToEndId").text, (row \\ "TxId").text)
}
Now you can use the toCDT UDF to parse xml doc
docs
.select(col("name"), decode(col("body"), "UTF-8").alias("body")) // Decoding the UTF encoded binary field type into String
.select(col("name"), toCDT(col("body")).alias("CDT"))
.show(false)
+---------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+
|name |CDT |
+---------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+
|hdfs://quickstart.cloudera:8020/user/cloudera/xml/pacs.008.001.06.xml|[20160516021200201BFFF0000000001,E2E-TxPrefix20160513223041391000000,TxPrefix....20160513223041391000000]|
+---------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------+