I have a Spark Streaming Job which processes messages coming from Kafka.
My incoming json that I process sort of looks like
{"sv" : 1.0, "field1" : "some data"}
The only thing I do is put these in a MYSQL database.
However, I need to process these messages differently based on the schema version number!
For instance, I may get data that looks like the below in the same stream
{"sv" : 1.0, "field1" : "some data"} {"sv" : 1.1, "field1" : "some data", "field2" : "new data"} {"sv" : 1.2, "field1" : "some data", "field2" : "new data", "field3" : "data"}
Now what I do is I have a function that formats the data for me like so
def formatData(json: String): Option[Data] = { var outputData: Option[Data] = None val jsonObject = new JSONObject(json) outputData = formatDataBasedOnSchemaVersion(jsonObject) outputData }
and another function that formats based on a schema version number
private def formatDataBasedOnSchemaVersion(jsonObject: JSONObject): Option[Data] = { val outputData = { jsonObject.getDouble("sv") match { case 1.0 => Some(formatVersion_1_0(jsonObject)) case 1.1 => Some(formatVersion_1_1(jsonObject)) case 1.2 => Some(formatVersion_1_2(jsonObject)) case x: Double => logger.warn("No formatter found for schema version: " + x); None } } outputData }
An example of my format function can look like
private def formatVersion_1_2(jsonObject: JSONObject): Data = { val f1 = jsonObject.getString("field1") val f2 = jsonObject.getString("field2") val f3 = jsonObject.getString("field3") val data = Data(f1,f2,f3) data }
In the format_1_0 function, all I do is pull out the “field1” parameter.
My Data class is simple DTO it just looks like
case class Data(field1: String, field2: String, field3: String)
If I get schema version 1.0, field2 and field3 are left blank and inserted into the DB as blank values.
The problem is, I have to hard code in the schema version numbers like “1.0”, “1.1” etc.. and design a new method to pull out the extra fields. So for every schema change, I have to edit the code and add a new method to pull out the new data. So is there any better pattern I can use that can handle this? Or maybe a framework? I’ve heard of ORM would this help with that problem or would I still need to make similar code changes for schema version changes?