Kafka, Spark and schema inference

At Wehkamp we use Apache Kafka in our event driven service architecture. It handles high loads of messages really well. We use Apache Spark to run analysis. From time to time, I need to read a Kafka topic into my Databricks notebook. In this article, I'll show what I use to read from a Kafka topic that has no schema attached to it. We'll also dive into how we can render the JSON schema in a human-readable format.

Setup

First, we need to specify the URL to the Kafka queue. We'll set it once and use it in the function that will read from Kafka.

kafka_broker = "kafka.queue.url:9092"

Next, we specify the imports. They are notebook-wide.

import json
import re

from pyspark.sql.functions import *
from pyspark.sql.types import *

Reading Kafka without a schema

We are reading a topic that does not have a schema. Fortunately, Spark can infer the schema for us. This is handy when a schema is bigger and harder to specify by hand. One drawback of this method is that Spark has to read the entire topic before it can safely infer the schema.

Most of the times you're only interested in the latest version of a key on your Kafka topic. To only get the latest record, we will group the data by its offset and only take the value with the highest offset.

To infer the schema, we'll use the rdd.map function.

def read_kafka_topic(topic):

    df_json = (spark.read
               .format("kafka")
               .option("kafka.bootstrap.servers", kafka_broker)
               .option("subscribe", topic)
               .option("startingOffsets", "earliest")
               .option("endingOffsets", "latest")
               .option("failOnDataLoss", "false")
               .load()
               # filter out empty values
               .withColumn("value", expr("string(value)"))
               .filter(col("value").isNotNull())
               # get latest version of each record
               .select("key", expr("struct(offset, value) r"))
               .groupBy("key").agg(expr("max(r) r")) 
               .select("r.value"))
    
    # decode the json values
    df_read = spark.read.json(
      df_json.rdd.map(lambda x: x.value), multiLine=True)
    
    # drop corrupt records
    if "_corrupt_record" in df_read.columns:
        df_read = (df_read
                   .filter(col("_corrupt_record").isNotNull())
                   .drop("_corrupt_record"))
 
    return df_read

Read here why I'm using a select, groupby, agg to get the max key record. Note: I'm trying to find the latest record, but you might have a Kafka topic that does not work that way!

Schema to JSON

Let's see how we can use this to extract a Spark schema in the JSON format from this Kafka topic. We can use the read_kafka_topic to get the contents of the topic into a dataframe, for example:

df = read_kafka_topic("dam-asset-classification")

We can now convert the Spark schema to JSON:

json_schema = df.schema.json()
print(json_schema)

This will look something like:

{"fields":[{"metadata":{},"name":"bucket","nullable":true,"type":"string"},{"metadata":{},"name":"classification-version","nullable":true,"type":"string"},{"metadata":{},"name":"colors","nullable":true,"type":"string"},{"metadata":{},"name":"fashion-image-type","nullable":true,"type":"string"},{"metadata":{},"name":"fashion-image-type-model-version","nullable":true,"type":"string"},{"metadata":{},"name":"fashion-image-type-prediction","nullable":true,"type":"string"},{"metadata":{},"name":"height","nullable":true,"type":"string"},{"metadata":{},"name":"home-living-style-type","nullable":true,"type":"string"},{"metadata":{},"name":"home-living-style-type-model-version","nullable":true,"type":"string"},{"metadata":{},"name":"home-living-style-type-prediction","nullable":true,"type":"string"},{"metadata":{},"name":"key","nullable":true,"type":"string"},{"metadata":{},"name":"orientation","nullable":true,"type":"string"},{"metadata":{},"name":"width","nullable":true,"type":"string"}],"type":"struct"}

Format the schema for humans

With this schema it is faster for Spark to read the Kafka topic. But... it is not so readable (for humans). Let's create a function that uses regular expressions to make the JSON more readable:

def prettify_spark_schema_json(json: str):

  import re, json
  
  parsed = json.loads(json_schema)
  raw = json.dumps(parsed, indent=1, sort_keys=False)

  str1 = raw

  # replace empty meta data
  str1 = re.sub('"metadata": {},\n +', '', str1)

  # replace enters between properties
  str1 = re.sub('",\n +"', '", "', str1)
  str1 = re.sub('e,\n +"', 'e, "', str1)

  # replace endings and beginnings of simple objects
  str1 = re.sub('"\n +},', '" },', str1)
  str1 = re.sub('{\n +"', '{ "', str1)

  # replace end of complex objects
  str1 = re.sub('"\n +}', '" }', str1)
  str1 = re.sub('e\n +}', 'e }', str1)

  # introduce the meta data on a different place
  str1 = re.sub('(, "type": "[^"]+")', '\\1, "metadata": {}', str1)
  str1 = re.sub('(, "type": {)', ', "metadata": {}\\1', str1)

  # make sure nested ending is not on a single line
  str1 = re.sub('}\n\s+},', '} },', str1)
  
  return str1

We can now call the function with the schema:

pretty_json_schema = prettify_spark_schema_json(json_schema)
# or:
pretty_json_schema = prettify_spark_schema_json(df.schema.json())

print(pretty_json_schema)

Now the schema looks like this:

{ "fields": [
  { "name": "bucket", "nullable": true, "type": "string", "metadata": {} },
  { "name": "classification-version", "nullable": true, "type": "string", "metadata": {} },
  { "name": "colors", "nullable": true, "type": "string", "metadata": {} },
  { "name": "fashion-image-type", "nullable": true, "type": "string", "metadata": {} },
  { "name": "fashion-image-type-model-version", "nullable": true, "type": "string", "metadata": {} },
  { "name": "fashion-image-type-prediction", "nullable": true, "type": "string", "metadata": {} },
  { "name": "height", "nullable": true, "type": "string", "metadata": {} },
  { "name": "home-living-style-type", "nullable": true, "type": "string", "metadata": {} },
  { "name": "home-living-style-type-model-version", "nullable": true, "type": "string", "metadata": {} },
  { "name": "home-living-style-type-prediction", "nullable": true, "type": "string", "metadata": {} },
  { "name": "key", "nullable": true, "type": "string", "metadata": {} },
  { "name": "orientation", "nullable": true, "type": "string", "metadata": {} },
  { "name": "width", "nullable": true, "type": "string", "metadata": {} }
 ],
 "type": "struct"
}

Yes, this is something I can read and understand!

Reading JSON into a schema

Spark provides us with a nice feature that reads the JSON into a Spark schema:

obj = json.loads(pretty_json_schema)
topic_schema = StructType.fromJson(obj)
print(topic_schema)

It has all the fields in it -- I added some formatting for readability:

StructType(
  List(
    StructField(bucket, StringType, true),
    StructField(classification-version, StringType, true),
    StructField(colors, StringType, true),
    StructField(fashion-image-type, StringType, true),
    StructField(fashion-image-type-model-version, StringType, true),
    StructField(fashion-image-type-prediction, StringType, true),
    StructField(height, StringType, true),
    StructField(home-living-style-type, StringType, true),
    StructField(home-living-style-type-model-version, StringType, true),
    StructField(home-living-style-type-prediction, StringType, true),
    StructField(key, StringType, true),
    StructField(orientation, StringType, true),
    StructField(width, StringType, true)
  )
)

Final thoughts

This blogs shows how easy it is to connect Kafka to Spark and do something with the data. Schema inference is slow, because Spark has to parse the entire topic, that's why it might be better to use AVRO or save the schema for later use.

Improvements

2021-05-17: Introduced prettify_spark_schema_json function and did a small fix for complex objects (the trailing } is now on the same line).

  1. Chris says:

    Hey Kees,

    Thanks for the awesome blog!

  2. Rm says:

    Great article!
    I’m trying to accomplish the same in SCALA. But i got struck with the following line.
    df_read = spark.read.json(
    df_json.rdd.map(lambda x: x.value), multiLine=True)

    Equivalent in scala. Getting following error “cannot be applied to (org.apache.spark.rdd.RDD[org.apache.spark.sql.Row])”

    val df_read = spark.read.json(df_json.rdd.map(x=>x)). Any hacks to fix this? Kindly help.

    1. Kees C. Bakker says:

      No, sorry I’m not a Scala-person. We’ve created this script in Python so that anyone could use it. Most people are a bit intimidated by Scala :-D.

    2. Aziza Baratova says:

      Did you have any luck?

expand_less