Replicating Database Changes to a Message Queue is Tricky

about | archive


[ 2022-December-12 21:01 ]

Let's imagine we have an program that stores its state in a database, and we want other programs to do things when changes occur. For example, we might want to send email notifications if a bank balance drops below a threshold. This is a very common reason applications use message queues like Kafka. Unfortunately, the "trivial" implementation does not work when components fail. I suspect there are many real applications that get this wrong. Most of the time, these applications work correctly, and the changes are replicated across multiple systems. However, when things restart, updates can go missing, or extra updates can appear. In this article, I'm going to try to explain how this can go wrong, and some ways to fix it.

Attempt one: Update both in parallel

The application performs the following operations:

  1. Write the change to the database.
  2. At the same time, publish the message to the message queue.

The problem in this case is that the database update could fail, but publishing the message succeeds. This means applications consuming the stream receive an "extra" update that does not exist in the database.

Attempt two: Update database, then publish message

Okay, let's try again, and make sure that updating the database succeeds:

  1. Write the change to the database.
  2. Wait for the database to confirm the write occurred.
  3. Publish the message to the message queue.

We fixed the "extra" message update problem! However, we still have a problems: If the application crashes after writing to the database, but before publishing the message, the stream is missing an update. This can be particularly bad if the message queue is unavailable. The application can retry publishing the message for a while. However, if the message queue is down for long enough, it is likely the application will run out of memory, or be restarted. In this case, all the pending updates are lost.

So now what? We can't do the operations sequentially, and we can't do them in parallel. The trick is to order the work so there is a single commit point: the point before which the operation does not happen, but after it does. I find this concept to be very helpful. If you can't find a single commit point in a protocol that should provide some form of atomicity, either you need to keep digging, or the protocol is wrong. In this case, we have two things that store the "correct" state: the message queue, and the database. We need to change it so there is only one.

Solution: Database is the source of truth

Probably the most straightforward option is to make the database the source of truth for both the updates and messages to be published. To do this, the application needs to perform two database operations in a database transaction: Update the application state, and append a new record to a "pending updates" table. This transaction becomes the "commit point": If it commits, the update to the database has occurred. We just need to also guarantee the message gets published. To do that, we run a separate process to periodically scan the pending updates table. This process publishes messages to the queue, then deletes the record. This message publishing happens with "at least once" semantics. In case of failures, we can retry until it succeeds. We might end up with more than one message in the queue for a single update, but we won't ever miss any.

A more efficient and sometimes more convenient way to implement this is to use a "change data capture" system that replicates a database's changes, by integrating with its existing write ahead log (e.g such as Debezium). This effectively does the same thing, but without the application being involved. The one disadvantage is the contents of the message are limited to what the database itself records, and cannot be application-specific.

Either of these two solutions seem like the best to me. The remaining two solutions are listed for completeness, but they do not seem as good to me.

Solution: Message queue is the source of truth

The other solution is to rely on the message queue. The application does not write directly to the database. Instead, it only writes to the message queue. The message queue is then replicated to the database and to other applications. This means the "commit point" is now publishing the message. The biggest disadvantage is if there are constraints that need to be enforced and the database might reject some updates, then we have the same problem where an update is in the message queue, but not applied to the database. For example, if updates represent "withdraw cash" and the bank application enforces that accounts must not be overdrawn, the database may need to reject the updates. An application could theoretically work around this. It needs to implement a form of two-phase commit: In the the first phase, the application directly accesses the database to check that the update can be applied, and to logically "lock" the relevant items. The update is then sent to the message queue. Processing the message then applies the update and "unlocks" the items. This is extremely complex, so you probably should just make the database store pending updates instead.

Another disadvantage is that it can be difficult to know when the update is applied, since the . This can lead to bugs where the application saves an update, but when the user reloads the state, they don't see the update until some time later. This can also be worked around by having the application poll the database until it sees the update get applied.

Maybe Solution: Two-phase commit?

Another solution would be for applications to use two-phase commit between the database and the queue. The advantage is the queue would effectively not have duplicates. The disadvantage is this is requires using rarely used features, and requires complex handling for application restarts. I think Kafka's transaction support could be used to do this, although their documentation says they do not support it. This also has the problem of "stuck transactions" when the application crashes. Those "stuck transactions" can probably be solved by making the database be the source of truth for the two-phase commit, but that requires very careful ordering of operations. The two-phase commit solution seems theoretically interesting, but not practical.