At Wehkamp we use Apache Kafka in our event driven service architecture. It handles high loads of messages really well. We use Apache Spark to run analysis. From time to time, I need to read a Kafka topic into my Databricks notebook. In this article, I’ll show what I used to read from a Kafka topic that has no schema attached to it. We’ll also dive into how we can render the JSON schema in a human-readable format.

Setup

First, we need to specify the URL to the Kafka queue. We’ll set it once and use it in the function that will read from Kafka.

kafka_broker = "kafka.queue.url:9092"

Next, we specify the imports. They are notebook-wide.

import json
import re

from pyspark.sql.functions import *
from pyspark.sql.types import *

Reading Kafka without a schema

We are reading a topic that does not have a schema. Fortunately, Spark can infer the schema for us. This is handy when a schema is bigger and harder to specify by hand. One drawback of this method is that Spark has to read the entire topic before it can safely infer the schema.

Most of the times you’re only interested in the latest version of a key on your Kafka topic. To only get the latest record, we will group the data by its offset and only take the value with the highest offset.

To infer the schema, we’ll use the rdd.map function.

def read_kafka_topic(topic):

    df_json = (spark.read
               .format("kafka")
               .option("kafka.bootstrap.servers", kafka_broker)
               .option("subscribe", topic)
               .option("startingOffsets", "earliest")
               .option("endingOffsets", "latest")
               .option("failOnDataLoss", "false")
               .load()
               # filter out empty values
               .withColumn("value", expr("string(value)"))
               .filter(col("value").isNotNull())
               # get latest version of each record
               .select("key", expr("struct(offset, value) r"))
               .groupBy("key").agg(expr("max(r) r")) 
               .select("r.value"))
    
    # decode the json values
    df_read = spark.read.json(
      df_json.rdd.map(lambda x: x.value), multiLine=True)
    
    # drop corrupt records
    if "_corrupt_record" in df_read.columns:
        df_read = (df_read
                   .filter(col("_corrupt_record").isNotNull())
                   .drop("_corrupt_record"))
 
    return df_read

Schema to JSON

Let’s see how we can use this to extract a Spark schema in the JSON format from this Kafka topic. We can use the read_kafka_topic to get the contents of the topic into a dataframe, for example:

df = read_kafka_topic("dam-asset-classification")

We can now convert the Spark schema to JSON:

json_schema = df.schema.json()
print(json_schema)

This will look something like:

{"fields":[{"metadata":{},"name":"bucket","nullable":true,"type":"string"},{"metadata":{},"name":"classification-version","nullable":true,"type":"string"},{"metadata":{},"name":"colors","nullable":true,"type":"string"},{"metadata":{},"name":"fashion-image-type","nullable":true,"type":"string"},{"metadata":{},"name":"fashion-image-type-model-version","nullable":true,"type":"string"},{"metadata":{},"name":"fashion-image-type-prediction","nullable":true,"type":"string"},{"metadata":{},"name":"height","nullable":true,"type":"string"},{"metadata":{},"name":"home-living-style-type","nullable":true,"type":"string"},{"metadata":{},"name":"home-living-style-type-model-version","nullable":true,"type":"string"},{"metadata":{},"name":"home-living-style-type-prediction","nullable":true,"type":"string"},{"metadata":{},"name":"key","nullable":true,"type":"string"},{"metadata":{},"name":"orientation","nullable":true,"type":"string"},{"metadata":{},"name":"width","nullable":true,"type":"string"}],"type":"struct"}

Format the schema for humans

With this schema it is easier to read the topic. But… it is not so readable. Let’s fix it by applying some regular expressions:

parsed = json.loads(json_schema)
raw = json.dumps(parsed, indent=1, sort_keys=False)

str = raw

# replace empty meta data
str = re.sub('"metadata": {},\n +', '', str)

# replace enters between properties
str = re.sub('",\n +"', '", "', str)
str = re.sub('e,\n +"', 'e, "', str)

# replace endings and beginnings of simple objects
str = re.sub('"\n +},', '" },', str)
str = re.sub('{\n +"', '{ "', str)

# replace end of complex objects
str = re.sub('"\n +}', '" }', str)
str = re.sub('e\n +}', 'e }', str)

# introduce the meta data on a different place
str = re.sub('(, "type": "[^"]+")', '\\1, "metadata": {}', str)
str = re.sub('(, "type": {)', ', "metadata": {}\\1', str)

print(str)

Now the schema looks like this:

{ "type": "struct", "fields": [
  { "name": "bucket", "type": "string", "metadata": {}, "nullable": true },
  { "name": "classification-version", "type": "string", "metadata": {}, "nullable": true },
  { "name": "colors", "type": "string", "metadata": {}, "nullable": true },
  { "name": "fashion-image-type", "type": "string", "metadata": {}, "nullable": true },
  { "name": "fashion-image-type-model-version", "type": "string", "metadata": {}, "nullable": true },
  { "name": "fashion-image-type-prediction", "type": "string", "metadata": {}, "nullable": true },
  { "name": "height", "type": "string", "metadata": {}, "nullable": true },
  { "name": "home-living-style-type", "type": "string", "metadata": {}, "nullable": true },
  { "name": "home-living-style-type-model-version", "type": "string", "metadata": {}, "nullable": true },
  { "name": "home-living-style-type-prediction", "type": "string", "metadata": {}, "nullable": true },
  { "name": "key", "type": "string", "metadata": {}, "nullable": true },
  { "name": "orientation", "type": "string", "metadata": {}, "nullable": true },
  { "name": "width", "type": "string", "metadata": {}, "nullable": true }
 ]
}

This is something I can read and understand.

Reading JSON into a schema

Spark provides us with a nice feature that reads the JSON into a Spark schema:

topic_schema = StructType.fromJson(json.loads(str))
print(topic_schema)

It has all the fields in it:

StructType(List(StructField(bucket,StringType,true),StructField(classification-version,StringType,true),StructField(colors,StringType,true),StructField(fashion-image-type,StringType,true),StructField(fashion-image-type-model-version,StringType,true),StructField(fashion-image-type-prediction,StringType,true),StructField(height,StringType,true),StructField(home-living-style-type,StringType,true),StructField(home-living-style-type-model-version,StringType,true),StructField(home-living-style-type-prediction,StringType,true),StructField(key,StringType,true),StructField(orientation,StringType,true),StructField(width,StringType,true)))

Final thoughts

This blogs shows how easy it is to connect Kafka to Spark and do something with the data. Schema inference is slow, because Spark has to parse the entire topic, that’s why it might be better to use AVRO or save the schema for later use.