Parsing JSON data from S3 (Kinesis) with Spark

Yesterday we've encountered a curious problem: we needed use Spark to parse JSON data that was produced by AWS Kinesis. Unfortunately, the data was in a concatenated JSON format. Spark does not support that format, so we had to come up with something our selves.

A big thanks to Gerard Jager for the collaboration on this code.

JSON!?

Our data source contains the result of a Concatenated JSON stream. This means each file looks like this:

{ "type": "success", "value": { "id": 463, "joke": "Chuck Norris doesn't use web standards as the web will conform to him.", "categories": ["nerdy"] } }{ "type": "success", "value": { "id": 599, "joke": "Chuck Norris can make fire using two ice cubes.", "categories": [] } }{ "type": "success", "value": { "id": 559, "joke": "Chuck Norris' unit tests don't run. They die.", "categories": ["nerdy"] } }

There is no separation between objects. A new object just begins. There is also no array, so the file is not a proper JSON file.

1. Read the data into a DataFrame

Spark cannot deal with this format, so we have to read it ourselves. Let's start by reading the data into a DataFrame. We'll end up with the data of each file in a row. We don't have a separator, so we need to disable it!

path = '/mnt/my_bucket_name/messages/*/*/*/*/'

df = spark.read.option("lineSep", None).text(path)
df.show(truncate=80)

This results in the following 2 lines:

+--------------------------------------------------------------------------------+
|                                                                           value|
+--------------------------------------------------------------------------------+
|{ "type": "success", "value": { "id": 463, "joke": "Chuck Norris doesn't use ...|
|{ "type": "success", "value": { "id": 478, "joke": "Chuck Norris can instanti...|
+--------------------------------------------------------------------------------+

2. Make proper JSON arrays

Now that we have the data loaded, we want to parse the content of each row to see if we can make proper JSON from it. We'll wrap the value into an array and introduce a separator between the JSON objects.

Some solutions suggest replacing the }{ strings, but we need to be careful! We might mess up text data that way:

{ "line": "art\"tra}{art" }

In the first version we've tried regular expression replacement, but we couldn't get it to work without messing with string-properties. Let's create an UDF to do the parsing:

@udf
def concatenated_json_to_array(text):
  final = "["
  comma = instr = False
  first = True
  open = 0

  for c in text:
    if c == "{":
      if not instr:
        open += 1
        if open == 1:
          if first:
            first = False
          elif comma:
            comma = False
          else:
            final += ","

    elif c == "}":
      if not instr:
        open -= 1

    elif c == '"':
      if final[-1] != "\\":
        instr = not instr
        comma = False

    elif c == ",":
      if not instr:
        comma = True

    final += c

  return final + "]"

Note: in a previous version of this article, I used a regular expression. Scanning the array manually is a bit faster.

Let's use the function:

df2 = df.withColumn("value", concatenated_json_to_array("value"))
df2.show(truncate=80)

This results in:

+--------------------------------------------------------------------------------+
|                                                                           value|
+--------------------------------------------------------------------------------+
|[{ "type": "success", "value": { "id": 463, "joke": "Chuck Norris doesn't use...|
|[{ "type": "success", "value": { "id": 478, "joke": "Chuck Norris can instant...|
+--------------------------------------------------------------------------------+

Note the bracket at the beginning of each record. We now have valid JSON data.

3. Parse JSON

Now that we have proper JSON data, we can start parsing it. Let's defined the schema first:

from pyspark.sql.types import *

schema = ArrayType(
  StructType([
    StructField("type", StringType(), True),
    StructField("value", StructType([
      StructField("id", IntegerType(), True),
      StructField("joke", StringType(), True),
      StructField("categories", ArrayType(StringType()), True)  
    ]), True)
  ])
)

We use the schema with the from_json to parse the value into actual objects:

from pyspark.sql.functions import *

df3 = df2.withColumn("value", from_json("value", schema))
df3.show(truncate=80)

This will render the following result:

+--------------------------------------------------------------------------------+
|                                                                           value|
+--------------------------------------------------------------------------------+
|[{success, {463, Chuck Norris doesn't use web standards as the web will confo...|
|    [{success, {478, Chuck Norris can instantiate an abstract class., [nerdy]}}]|
+--------------------------------------------------------------------------------+

Note: we don't see any JSON anymore in our result, just data. Each row is still an array of data.

4. Explode and expand

Now that we have our schematic value, we only need to explode the array into rows of values and select the data we want:

df4 = (df3
       .withColumn("value", explode("value"))
       .select("value.type", "value.value.*"))

df4.show(truncate=57)

This produces:

+-------+---+---------------------------------------------------------+----------+
|   type| id|                                                     joke|categories|
+-------+---+---------------------------------------------------------+----------+
|success|463|Chuck Norris doesn't use web standards as the web will...|   [nerdy]|
|success|599|          Chuck Norris can make fire using two ice cubes.|        []|
|success|559|            Chuck Norris' unit tests don't run. They die.|   [nerdy]|
|success|478|          Chuck Norris can instantiate an abstract class.|   [nerdy]|
+-------+---+---------------------------------------------------------+----------+

Combine it

Let's combine all the steps into a single read_concatenated_json function:

from pyspark.sql.functions import *

def read_concatenated_json(path, schema):
  return (spark.read
          .option("lineSep", None)
          .text(path)
          .withColumn("value", concatenated_json_to_array("value"))
          .withColumn("value", from_json("value", schema))
          .withColumn("value", explode("value"))
          .select("value.*"))  


@udf
def concatenated_json_to_array(text):
  final = "["
  comma = instr = False
  first = True
  open = 0

  for c in text:
    if c == "{":
      if not instr:
        open += 1
        if open == 1:
          if first:
            first = False
          elif comma:
            comma = False
          else:
            final += ","

    elif c == "}":
      if not instr:
        open -= 1

    elif c == '"':
      if final[-1] != "\\":
        instr = not instr
        comma = False

    elif c == ",":
      if not instr:
        comma = True

    final += c

  return final + "]"

Provided you have the proper schema and mounted the S3 bucket, you can now read your Kinesis data with Spark:

df = read_concatenated_json(path, schema).select("type", "value.*")

Changelog

2021-11-23: Initial article
2021-11-25: Swapped the regular expression out for a token scanner. A compare can be found here.

expand_less