Processing Event Hubs Capture files using Spark
Azure Event Hubs has a feature called “Capture” that allows you to easily persist all events going through the Event Hub into Azure Storage Blobs.
Note: This post was originally authored when the service was called “Event Hubs Archive”. It is now called “Event Hubs Capture”.
Once you have events in storage, you will probably want to process them using some kind of data processing tool. One of the most popular tools for these kind of tasks is Apache Spark. In this post I’ll describe the process to analyze files generated by Event Hubs Capture using Apache Spark 2 on Azure HDInsight (pyspark to be precise).
If you don’t care for the details - jump straight to the code: https://gist.github.com/itaysk/e975bc70f24d4ccadf591bc975437e96
Step 1: Understand the data
First thing is to read the doc about how Capture works: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-capture-overview.
Files are created in “Avro” format. Avro is a binary, schematized format for data transfer and persistence. Find more info at https://avro.apache.org.
When you enable the Capture feature, you specify an Azure Storage Container to save the files in. In this container, blobs will be created in the following pattern:
[Namespace]/[EventHub]/[Partition]/[YYYY]/[MM]/[DD]/[HH]/[mm]/[ss]
Step 2: Load Avro into Spark
You would probably go ahead and search for how to work with Avro files in Spark. Most samples and answers will point you to spark-avro.
spark-avro is a library for spark that allows you to use Spark SQL’s convenient DataFrameReader API to load Avro files.
Initially I hit a few hurdles with earlier versions of spark and spark-avro. You can read the summary here; the workaround is to use the lower level Avro API for Hadoop. Fortunately all issues were eventually resolved and by the time you will read this post you can safely use spark-avro.
2.1: Link with spark-avro
First step to use spark-avro is to link with it.
For testing, I use Jupyter notebook. Use the %%configure
magic and point to the maven coordinates of spark-avro. see here for more information.
In my case it looks like:
%%configure
{"conf": {"spark.jars.packages": "com.databricks:spark-avro_2.11:3.1.0" }}
For production I use Livy to submit a py file. Use the jars
parameter of Livy and point to a location where the jar can be found. see here for more information.
In my case I have put the jar in Azure Storage, so it looks like:
{
"jars":["wasb://containername@storageaccountname.blob.core.windows.net/path/to/spark-avro_2.11-3.1.0.jar"]
}
2.2: Configure extension support
The spark-avro library is a wrapper around the original Avro map-reduce API. That one has an annoying behavior to ignore files without the .avro
extension. As you might have noticed, Event Hubs Capture generate files without any extension, so we need to configure Avro to acknowledge our files. Unfortunately this setting is not configurable from spark-avro so we will have to use the lower level Hadoop configuration. The key is called avro.mapred.ignore.inputs.without.extension
and the value is boolean string.
Setting Hadoop configurations can be done in many ways: Hadoop xml files, Ambari UI, Spark configs, and more… Since I wanted to keep this setting in the realm of the spark application and not the entire cluster, I chose to configure it via Spark configuration.
Spark configurations itself can also be configured in many ways: SparkConf object, spark-submit switches, and more… I chose to set it in the code:
In my py file, I added .config('spark.hadoop.avro.mapred.ignore.inputs.without.extension', 'false')
to the SparkSession
construction factory method. Notice that since I’m configuring Hadoop via Spark I prefixed the key with spark.hadoop
. Session (or context) creation in my case it looks like this:
spark = SparkSession \
.builder \
.appName("myappname") \
.config('spark.hadoop.avro.mapred.ignore.inputs.without.extension', 'false') \
.getOrCreate()
For Jupyter, since the session (or context) is created for me, I couldn’t use that method. I tried finding a “pretty” way to set this setting, but eventually the only one that actually worked from a Jupyter cell was:
spark.sparkContext._jsc.hadoopConfiguration().set('avro.mapred.ignore.inputs.without.extension', 'false')
It is not “pretty” because I am using a private member, but as I said it’s the only thing that worked, and it’s just for testing purposes.
2.3: Read files
Now we can use sparks read API and specify the com.databricks.spark.avro
format. For example:
avroDf = spark.read.format("com.databricks.spark.avro").load("path_to_files")
Update: As commenter Jan Mtzgr points out - EH Capture will create empty files if there are no events in a given time window. This will cause problem if ALL files in the loaded path are empty (error: ‘Not an Avro data file’). According to my tests if at least one file is non empty, the load will work and just ignore the empty files. Apparently there’s work to fix this behavior.
Followup Update: Thanks to the Event Hubs team that fixed this issue following this feedback! Now when there’s no data to write, the avro file created will still include the basic headers required for it to be at least valid avro file.
Step 3: Extract your data from from the EH Capture format
Event Hubs Capture saves Avro files in a specific format that represents generic Event Hubs event. If we look at a file generated by Capture and examine it’s schema we will get:
{
"type":"record",
"name":"EventData",
"namespace":"Microsoft.ServiceBus.Messaging",
"fields":[
{"name":"SequenceNumber","type":"long"},
{"name":"Offset","type":"string"},
{"name":"EnqueuedTimeUtc","type":"string"},
{"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},
{"name":"Properties","type":{"type":"map","values":["long","double","string","bytes"]}},
{"name":"Body","type":["null","bytes"]}
]
}
It’s Important to understand that this is not your schema. Regardless of what kind of data you sent to Event Hubs or weather it had schema or not, this structure probably won’t do you any good when you want to work with your data.
The data you sent, is actually stored in it’s entirety in the Body
field (which is represented as Bytes
).
So what we need to do is to extract take these bytes from the Body field, decode them in the desired encoding, and parse the result into our schema.
If my case, the events I send to Event Hubs are JSON documents. Each event become a record in the Avro file, where the Body contains the original JSON string that was sent as UTF-8 bytes. I will need to take these bytes, convert to string, and parse that string into a JSON object that I can work with in Spark.
We will use the spark.read.json
reader, which other than files, can also read from RDD.
The JSON reader takes RDD[String], but what we have at hand is a DataFrame, which can be treated as RDD[Row]. We will need to transform our DataFrame to the desired form:
jsonRdd = avroDf.select(avroDf.Body.cast("string")).rdd.map(lambda x: x[0])
Now we can go ahead and load this RDD into a DataFrame:
data = spark.read.json(jsonRdd)
The resulting data
Data Frame is finally a Spark SQL Data Frame with our data, in our schema that we can go ahead and query.