Spark: extract fields from an XML column

Last week we wanted to parse some XML data with Spark. We have a column with unstructured XML documents and we need to extract text fields from it. This article shows how you can extract data in Spark using a UDF and ElementTree, the Python XML parser.

A big thanks to Gerard Jager for the collaboration on this code.

  1. Intro
  2. Preparation
    1. CD Catalog Test Data
  3. Extract the title (a single value)
  4. Extract title and artist (multiple values)
  5. Final thoughts
  6. Comments

Preparation

Let's use a single cell to define the imports, which are:

import requests
import xml.etree.ElementTree as ET

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *

CD Catalog Test Data

Now, let's make some test data. I'm using the CD Catalog XML from W3Schools. Each CD represents a blob of XML.

cds_url = 'https://www.w3schools.com/xml/cd_catalog.xml'

# download data
cds_txt = requests.get(cds_url).text

# convert to XML
doc = fromstring(cds_txt)

# extract CD XML
utf8 = 'utf-8'
cds = [ET.tostring(x, encoding=utf8) for x in doc.findall('CD')]

# create dataframe
normalizedCds = [str(cd, utf8).strip() for cd in cds]
rows = [Row(index=index, cd=cd) for index, cd in enumerate(normalizedCds)]
cd_df = spark.createDataFrame(rows)

The data looks like this:

The cd column is filled with XML.

Extract the title (a single value)

Let's extract the TITLE element from the XML field and return it as a column in our Dataframe. We need to create a User Defined Function (UDF) to parse the XML and extract the text from the selected tag. Decorating the function with @udf will signal to Spark handle it as a UDF.

@udf
def extract_title_udf(payload):
  doc = ET.fromstring(payload)
  result = [e.text for e in doc.findall('TITLE') if isinstance(e, ET.Element)]
  return next(iter(result), None)

Note: the next(iter(result), None) is a nice way of getting the first element or None from an array. The findall will return an array, but we are only interested in the first element here.

Now we can call the extract_title_udf with the cd column as an input. The result is added as a separate column.

(cd_df
 .select("index", extract_title_udf(col('cd')).alias('title'))
 .show(10, False))

Here is a sample of the data:

+-----+------------------------+
|index|title                   |
+-----+------------------------+
|0    |Empire Burlesque        |
|1    |Hide your heart         |
|2    |Greatest Hits           |
|3    |Still got the blues     |
|4    |Eros                    |
|5    |One night only          |
|6    |Sylvias Mother          |
|7    |Maggie May              |
|8    |Romanza                 |
|9    |When a man loves a woman|
+-----+------------------------+
only showing top 10 rows

It is really easy to extract a single value from an XML blob. But what if we have more values to select? We could make the parse_title_udf more generic. But having more generic UDF calls means we're doing more XML parsing with ET.fromstring, which is not very cost-effective! What if we could return multiple values from our UDF?

Extract title and artist (multiple values)

To return multiple fields from an UDF, we need to add a schema. This schema tells Spark what we are returning from our UDF. In the previous example we've decorated the function with @udf. In this version we're going to create an UDF using the udf method:

extract_cd_info_schema = StructType([
    StructField("title", StringType(), True),
    StructField("artist", StringType(), True)
])

def select_text(doc, xpath):
  nodes = [e.text for e in doc.findall(xpath) if isinstance(e, ET.Element)]
  return next(iter(nodes), None)

def extract_cd_info(payload):
  doc = ET.fromstring(payload)
  return {
    'title':  select_text(doc, 'TITLE'),
    'artist': select_text(doc, 'ARTIST')
  }

extract_cd_info_udf = udf(extract_cd_info, extract_cd_info_schema)

Let's add the data as a column and then select it:

(cd_df
 .withColumn("info", extract_cd_info_udf('cd'))
 .select('index', 'info.artist', 'info.title')
 .show(10, False))

Now we have both artist and title in our Dataframe:

+-----+---------------+------------------------+
|index|artist         |title                   |
+-----+---------------+------------------------+
|0    |Bob Dylan      |Empire Burlesque        |
|1    |Bonnie Tyler   |Hide your heart         |
|2    |Dolly Parton   |Greatest Hits           |
|3    |Gary Moore     |Still got the blues     |
|4    |Eros Ramazzotti|Eros                    |
|5    |Bee Gees       |One night only          |
|6    |Dr.Hook        |Sylvias Mother          |
|7    |Rod Stewart    |Maggie May              |
|8    |Andrea Bocelli |Romanza                 |
|9    |Percy Sledge   |When a man loves a woman|
+-----+---------------+------------------------+
only showing top 10 rows

Final thoughts

User Defined Functions (UDF) could make Spark slower, but they are powerful when used correctly. You can use a UDF to select multiple values from the XML blob and return it together: all you need is a schema.

expand_less