Design question on synchronization of two asynchronous data streams

I have two async streams suppose- Trip : {tripId, date, city} Bill : {billId, tripId, date, amount}. I need to design a system to get real time aggregated view of following nature: City, TripCount, TotalAmount. Events in both streams can be out of sync or duplicate. But result needs to be accurate and realtime.

My Solution:

1.) Create two different DB tables: Trip and Bill (indexed on TripID and BillID). Read the messages from the streams and persist in these tables with the status column as pending in Bill table and Trip table. Then write a worker which will read from the bill table and look into the Trip table for the record containing the given tripID. If the record is found it will update the aggregated view in third table (City, TripCount, TotalAmount). Then we will change the state of the bill and trip record to processed. There will be one background job running on periodic basis which will remove all the records which will have the state as processed from both Bill and Trip table.

The problem i see with the above solution is the indexing that done on the TripID and BillID will become a bottle neck if i remove the records at very high frequency. Other from it do you guy see any other problem with this solution. I have read on internet that people are suggesting it as very famous anti-pattern cause here i am using DB as queue.

2.) Here is other solution: Take the data from the streams, persist it in the tables: Trip, Bill (for audit purpose of records and to avoid duplicates). For storing the Trip data temporarily take one distributed key-value pair very fast data structure. I am taking REDIS for this purpose. So, after writing trip data to DB, i will write the same data in the cache with tripid as key and record as value. Then i will put the bill data in the queue. Workers will be reading from the queue and will lookup into the cache for the tripid. If the tripid is present then the workers will read the data from cache and update the aggregated view and delete the tripid from the cache and also bill message from the queue. If the tripid is not found in the cache then then worker will again put the same message in the queue back.

To avoid duplicates, the insertion will fail if we will try to insert same tripid or billid in the tables. In case where the insertion will fail, i will not put the message in the queue and cache.

Experts please let me know about your thoughts on the above two solutions and please propose any better solution if you have any.