At Wehkamp we use Apache Kafka in our event driven service architecture. It handles high loads of messages really well. We use Apache Spark to run analysis. From time to time, I need to read a Kafka topic into my Databricks notebook. In this article, I'll show what I use to read from a Kafka topic that has no schema attached to it. We'll also dive into how we can render the JSON schema in a human-readable format.
Setup
First, we need to specify the URL to the Kafka queue. We'll set it once and use it in the function that will read from Kafka.
kafka_broker = "kafka.queue.url:9092"
Next, we specify the imports. They are notebook-wide.
import json
import re
from pyspark.sql.functions import *
from pyspark.sql.types import *
Reading Kafka without a schema
We are reading a topic that does not have a schema. Fortunately, Spark can infer the schema for us. This is handy when a schema is bigger and harder to specify by hand. One drawback of this method is that Spark has to read the entire topic before it can safely infer the schema.
Most of the times you're only interested in the latest version of a key on your Kafka topic. To only get the latest record, we will group the data by its offset and only take the value with the highest offset.
To infer the schema, we'll use the rdd.map
function.
def read_kafka_topic(topic):
df_json = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", kafka_broker)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
# filter out empty values
.withColumn("value", expr("string(value)"))
.filter(col("value").isNotNull())
# get latest version of each record
.select("key", expr("struct(offset, value) r"))
.groupBy("key").agg(expr("max(r) r"))
.select("r.value"))
# decode the json values
df_read = spark.read.json(
df_json.rdd.map(lambda x: x.value), multiLine=True)
# drop corrupt records
if "_corrupt_record" in df_read.columns:
df_read = (df_read
.filter(col("_corrupt_record").isNotNull())
.drop("_corrupt_record"))
return df_read
Read here why I'm using a select, groupby, agg
to get the max key record. Note: I'm trying to find the latest record, but you might have a Kafka topic that does not work that way!
Schema to JSON
Let's see how we can use this to extract a Spark schema in the JSON format from this Kafka topic. We can use the read_kafka_topic
to get the contents of the topic into a dataframe, for example:
df = read_kafka_topic("dam-asset-classification")
We can now convert the Spark schema to JSON:
json_schema = df.schema.json()
print(json_schema)
This will look something like:
{"fields":[{"metadata":{},"name":"bucket","nullable":true,"type":"string"},{"metadata":{},"name":"classification-version","nullable":true,"type":"string"},{"metadata":{},"name":"colors","nullable":true,"type":"string"},{"metadata":{},"name":"fashion-image-type","nullable":true,"type":"string"},{"metadata":{},"name":"fashion-image-type-model-version","nullable":true,"type":"string"},{"metadata":{},"name":"fashion-image-type-prediction","nullable":true,"type":"string"},{"metadata":{},"name":"height","nullable":true,"type":"string"},{"metadata":{},"name":"home-living-style-type","nullable":true,"type":"string"},{"metadata":{},"name":"home-living-style-type-model-version","nullable":true,"type":"string"},{"metadata":{},"name":"home-living-style-type-prediction","nullable":true,"type":"string"},{"metadata":{},"name":"key","nullable":true,"type":"string"},{"metadata":{},"name":"orientation","nullable":true,"type":"string"},{"metadata":{},"name":"width","nullable":true,"type":"string"}],"type":"struct"}
Format the schema for humans
With this schema it is faster for Spark to read the Kafka topic. But... it is not so readable (for humans). Let's create a function that uses regular expressions to make the JSON more readable:
def prettify_spark_schema_json(json: str):
import re, json
parsed = json.loads(json_schema)
raw = json.dumps(parsed, indent=1, sort_keys=False)
str1 = raw
# replace empty meta data
str1 = re.sub('"metadata": {},\n +', '', str1)
# replace enters between properties
str1 = re.sub('",\n +"', '", "', str1)
str1 = re.sub('e,\n +"', 'e, "', str1)
# replace endings and beginnings of simple objects
str1 = re.sub('"\n +},', '" },', str1)
str1 = re.sub('{\n +"', '{ "', str1)
# replace end of complex objects
str1 = re.sub('"\n +}', '" }', str1)
str1 = re.sub('e\n +}', 'e }', str1)
# introduce the meta data on a different place
str1 = re.sub('(, "type": "[^"]+")', '\\1, "metadata": {}', str1)
str1 = re.sub('(, "type": {)', ', "metadata": {}\\1', str1)
# make sure nested ending is not on a single line
str1 = re.sub('}\n\s+},', '} },', str1)
return str1
We can now call the function with the schema:
pretty_json_schema = prettify_spark_schema_json(json_schema)
# or:
pretty_json_schema = prettify_spark_schema_json(df.schema.json())
print(pretty_json_schema)
Now the schema looks like this:
{ "fields": [
{ "name": "bucket", "nullable": true, "type": "string", "metadata": {} },
{ "name": "classification-version", "nullable": true, "type": "string", "metadata": {} },
{ "name": "colors", "nullable": true, "type": "string", "metadata": {} },
{ "name": "fashion-image-type", "nullable": true, "type": "string", "metadata": {} },
{ "name": "fashion-image-type-model-version", "nullable": true, "type": "string", "metadata": {} },
{ "name": "fashion-image-type-prediction", "nullable": true, "type": "string", "metadata": {} },
{ "name": "height", "nullable": true, "type": "string", "metadata": {} },
{ "name": "home-living-style-type", "nullable": true, "type": "string", "metadata": {} },
{ "name": "home-living-style-type-model-version", "nullable": true, "type": "string", "metadata": {} },
{ "name": "home-living-style-type-prediction", "nullable": true, "type": "string", "metadata": {} },
{ "name": "key", "nullable": true, "type": "string", "metadata": {} },
{ "name": "orientation", "nullable": true, "type": "string", "metadata": {} },
{ "name": "width", "nullable": true, "type": "string", "metadata": {} }
],
"type": "struct"
}
Yes, this is something I can read and understand!
Reading JSON into a schema
Spark provides us with a nice feature that reads the JSON into a Spark schema:
obj = json.loads(pretty_json_schema)
topic_schema = StructType.fromJson(obj)
print(topic_schema)
It has all the fields in it -- I added some formatting for readability:
StructType(
List(
StructField(bucket, StringType, true),
StructField(classification-version, StringType, true),
StructField(colors, StringType, true),
StructField(fashion-image-type, StringType, true),
StructField(fashion-image-type-model-version, StringType, true),
StructField(fashion-image-type-prediction, StringType, true),
StructField(height, StringType, true),
StructField(home-living-style-type, StringType, true),
StructField(home-living-style-type-model-version, StringType, true),
StructField(home-living-style-type-prediction, StringType, true),
StructField(key, StringType, true),
StructField(orientation, StringType, true),
StructField(width, StringType, true)
)
)
Final thoughts
This blogs shows how easy it is to connect Kafka to Spark and do something with the data. Schema inference is slow, because Spark has to parse the entire topic, that's why it might be better to use AVRO or save the schema for later use.
Improvements
2021-05-17: Introduced prettify_spark_schema_json
function and did a small fix for complex objects (the trailing }
is now on the same line).
Hey Kees,
Thanks for the awesome blog!
Great article!
I’m trying to accomplish the same in SCALA. But i got struck with the following line.
df_read = spark.read.json(
df_json.rdd.map(lambda x: x.value), multiLine=True)
Equivalent in scala. Getting following error “cannot be applied to (org.apache.spark.rdd.RDD[org.apache.spark.sql.Row])”
val df_read = spark.read.json(df_json.rdd.map(x=>x)). Any hacks to fix this? Kindly help.
No, sorry I’m not a Scala-person. We’ve created this script in Python so that anyone could use it. Most people are a bit intimidated by Scala :-D.
Did you have any luck?