I am trying to improve an event-driven processing system, which is having a few problems because the events are not guaranteed to arrive in the correct chronological sequence. This is due to batching and caching upstream which is currently out of my control.
The sequence errors are not a complete disaster for my processor, mainly because it’s FSM oriented and so copes with “weird” transitions and mostly ignores them. Also the events are timestamped and expected to be delayed so we are used to reconstructing history. However there are a few cases which cause unwanted behaviour or unnecessary double-processing.
I’m looking for a technique which can either somehow queue up, group the events and sort them before processing, or perhaps identify “transactions” in the stream.
An example correct event stream would be like this, noting that all events are timestamped:
10:01 User Session Start (user A) 10:02 Device event 1 10:03 Device event 2 10:10 User Session End (user A) 10:15 User Session Start (user A) 10:16 User Session End (user A) 10:32 User Session Start (user B) 10:34 Device event 3 10:35 Device event 4 10:50 User Session End (user B)
My downstream processor keeps track of who is using a device, but also needs to correlate the other device events with the users. It does that by keeping state of the sessions, while receiving the other events.
Each event is in practise processed by different message queue workers, with a central database. So there are potential race hazards, but that’s not the focus of this question.
The problems arise when the same stream arrives like this, where the … indicate gaps between the three “batches” as they are received much later.
10:10 User Session End (user A) 10:01 User Session Start (user A) 10:02 Device event 1 10:03 Device event 2 ... 10:16 User Session End (user A) 10:15 User Session Start (user A) 10:34 Device event 3 10:35 Device event 4 ... 10:50 User Session End (user B) 10:32 User Session Start (user B)
I am particularly interested in “the final device event in a session”. So here I need the 10:10 session and + the 10:03 Device event 2, to complete the picture. I know that any device event timeboxed between 10:01 and 10:10 is “owned” by user A, so when I receive device event 2, I can correlate it – OK. When I receive the 10:01 start event I can ignore it, as I already saw the corresponding end (just annoying). When I receive device event 1, I can’t tell if it’s the final one or not, so I process it. Then I receive device event 2 immediately after and re-do the same work, update the state to presume this is the last one. I cannot predict if there will be any more device events coming, so the FSM has to just remain with that assumption – which in this case is correct.
The next batch is harder to deal with. I get a second “empty” session from user A – not a problem in itself. Then I get some device events out of sequence which are for the user B session which I’ve not received yet. This isn’t a critical problem, I can update the associated device model with this information, but cannot complete the processing yet.
Eventually the user B events arrive and I can correlate back with the device events, again ignoring “older” events where possible.
You can hopefully see this adds a lot of difficulty to the processing and is probably leading to some missing cases.
What can I do to massage this event stream to make it more processable?
I have been thinking about:
- event sourcing (but it requires correct sequence)
- re-buffering the queue for X minutes (but I still can’t be sure how long)
- implementing something like the Nagle Algorithm for chunking and pause/gap detection
- Combine all the workers into one, with an FSM (mirroring the session-boxing) which then outputs the events once they’ve satisfied the inter-dependency sequence checks
- Don’t fix the queue and implement a random-order resilient processor
Because I can make some assumptions about the likely contents of the stream, I can considering a “transaction detector” or making no assumptions just a more generalised “stream re-order” approach.
I know sequence numbers would solve it easily, but as mentioned I cannot presently modify the upstream publisher.
I am not looking for an entire solution – just pointers to algorithms or techniques used for this class of problem, so I can do further research.