the/experts. Blog

Cover image for Replaying for a single aggregate in Axon Framework
Mitchell Herrijgers
Mitchell Herrijgers

Posted on • Updated on

Replaying for a single aggregate in Axon Framework

A recent migration of functionality to a new service went wrong. When ships cross the exit line of the harbor, we check if currently an open visit exists for that ship. If so, we close the visit and delete it from certain projections.

Then something a ship did I never imagine it would; it turned around right before entering the harbor, sailing back over the exit point line. When it wanted to enter, the Harbor Control Center could not find the visit, since it was closed.

Besides fixing the validations in our aggregate, we needed a way to restore the data. We could replay the entire projections. However, over the past 6 months we have accumulated around 2.5M events and even though we tuned it very well, a replay would take an estimated two hours. This replay would make the data for all other ships invalid with the Harbor Control Center having to fall back to painful methods. Not a good option.

We needed a way to replay events for a single aggregate. There was nothing on the topic to be found on the web, except questions a bout how to do it and some vague "I don't understand why you would do such a thing" or "It's not possible" replies.

Building the solution

This led me to a search on how to get it done. I already had quite some experience looking through Axon's internals and source code and thought it should be possible. Turns out it was!

Most people configure Axon through Spring Boot, which is probably the best way to do it. You can however, fully configure it yourself. Axon uses quite a few components to achieve a fully working system.
We can configure a mini-Axon environment specifically for replaying these events. We'll need some parts to achieve this.

Event Store

First off, we need the events of that specific aggregate. We can use one of two approaches:

  • Use the regular event store and filter the events. This effectively does a full replay on all events, but filters them in-memory before applying them to the projection
  • Collect the specific events for the aggregate, creating an in-memory event store and using that for our replay

I chose the second option since it saves a lot of time loading events from the database. We can collect the events by creating an EmbeddedEventStore and provinding an InMemoryStorageEngine. Then, we can append events to that event engine, which will effectively store them in that EventStore for our replay.

val engine = InMemoryEventStorageEngine()
val eventStore = EmbeddedEventStore
    .builder()
    .storageEngine(engine)
    .build()

# Now append the events
val events = eventStore
  .readEvents(aggregateId, 0)
  .asStream()
  .toList()
engine.appendEvents(events)
Enter fullscreen mode Exit fullscreen mode

Note that we convert the stream to a list to read this finite stream of events fully.

We now have the Eventstore we need. Let's move on!

Token Store

When using an asynchronous event processor Axon stores a token indicating which events it processed, but saving a position that correlates to the event index last processed.

Again, we have multiple options. We can use the already configured TokenStore, which probably stores tokens in the database using Jpa or JDBC. Not bad, but it would mean we have to clean up the token after.

We can also use an InMemoryTokenStore specifically created for this projection. If we do that, we won't have to worry about the cleanup!

val inMemoryTokenStore = InMemoryTokenStore()
Enter fullscreen mode Exit fullscreen mode

That's all, we are now ready for the last part.. an EventHandlerInvoker.

EventHandlerInvoker

The EventProcessor requires an EventHandlerInvoker, which is responsible for invoking the event handlers (it's in the name). In Spring, axon automatically configures this for us, but now we need to do that for ourselves.

You can create a SimpleEventHandlerInvoker by handing it a list of classes in its builder and the invoker will take care of the rest (scanning annotations within it and such).
Assuming we have an @ProcessingGroup annotation on every event handler class (which is really nice to give it a readable name), we can use spring to scan for it!

val projection = "processing-group-to-replay"
val eventHandlers = applicationContext
    .getBeansWithAnnotation(ProcessingGroup::class.java)
    .map {it.value }
    .filter {
        it.javaClass.getAnnotation(ProcessingGroup::class.java).value == projection
    }
# Now we can create the invoker
val invoker = SimpleEventHandlerInvoker.builder()
    .eventHandlers(eventHandlers)
    .build()
Enter fullscreen mode Exit fullscreen mode

Summing up, we've now got the EventHandlerInvoker to handle the events, the TokenStore to store the tokens of the EventProcessor and the EventStore to provide it with the events of our specific aggregate

The processor

Now we can create our processor to handle our events and effectively replay the aggregate to the specific projection! See how all the components fit together:

val processor = TrackingEventProcessor.builder()
    .name(name)
    .messageSource(eventStore)
    .tokenStore(inMemoryTokenStore)
    .eventHandlerInvoker(invoker)
    .transactionManager(transactionManager)
    .build()
Enter fullscreen mode Exit fullscreen mode

For logging the fact that it all is happening like it should, I attached a LoggingInterceptor on the processor.

processor.registerHandlerInterceptor { unitOfWork, interceptorChain ->
    logger.info("Handling specific-aggregate replay for projection {} and aggregateId {} for event event: {}", projection, aggregateId, unitOfWork?.message?.payload)
    interceptorChain.proceed()
}
Enter fullscreen mode Exit fullscreen mode

Now it will log the events and for which projection and aggregate we are replaying. Let's start the projection!

processor.start()
Enter fullscreen mode Exit fullscreen mode

Cool! We have what we want, we just need a little cleanup. We can await in this thread until the projection is done, then log that we are done and stopping the processor

var completed = false
var iterations = 0
while (!completed && iterations < 60) {
    logger.info("Waiting until replay for visit is completed, iteration {}/60...", iterations)
    Thread.sleep(1000)
    val currentToken = inMemoryTokenStore.fetchToken(name, 0)
    completed = currentToken.position().orElse(-1) == engine.createHeadToken().position()?.orElse(-1)
    iterations += 1
}
logger.info("Stopping processor...")
processor.shutDown()
Enter fullscreen mode Exit fullscreen mode

We now have replayed all events and shut down the processor. Java garbage collection will take care of cleanup of the EventStore and the TokenStore, so we have nothing more to clean up.

Summing up

Replaying events for a single aggregate is certainly possible. By using in-memory versions of the Axon infrastructure and the TrackingEventProcessor to handle the events we can only replay a subset of events.

Full code

For educationary (or copy/past) purposes, here is the full code of the endpoint.

@RestController
class ReplayEndpoint(
    private val eventStore: EventStore,
    private val logger: Logger,
    private val applicationContext: AbstractApplicationContext,
    private val transactionManager: TransactionManager
) {
    @GetMapping("/replay/{projection}/{ucrn}")
    fun startReplay(@PathVariable("projection") projection: String, @PathVariable("ucrn") aggregateId: String): String {
        val events = eventStore.readEvents(aggregateId, 0).asStream().toList()
        logger.info("replaying events: {}", events)
        val engine = InMemoryEventStorageEngine()
        val eventHandlers = applicationContext
            .getBeansWithAnnotation(ProcessingGroup::class.java)
            .map { it.value }
            .filter {
                it.javaClass.getAnnotation(ProcessingGroup::class.java).value == projection
            }
        val invoker = SimpleEventHandlerInvoker.builder()
            .eventHandlers(eventHandlers)
            .build()
        val inMemoryTokenStore = InMemoryTokenStore()
        engine.appendEvents(events)
        val name = "$projection-replay-$aggregateId"
        val eventStore = EmbeddedEventStore.builder().storageEngine(engine).build()
        val processor = TrackingEventProcessor.builder()
            .name(name)
            .messageSource(eventStore)
            .tokenStore(inMemoryTokenStore)
            .eventHandlerInvoker(invoker)
            .transactionManager(transactionManager)
            .build()
        processor.registerHandlerInterceptor { unitOfWork, interceptorChain ->
            logger.info("Handling specific-aggregate replay for projection {} and aggregateId {} for event event: {}", projection, aggregateId, unitOfWork?.message?.payload)
            interceptorChain.proceed()
        }
        processor.start()

        var completed = false
        var iterations = 0
        while (!completed && iterations < 60) {
            logger.info("Waiting until replay for visit is completed, iteration {}/60...", iterations)
            Thread.sleep(1000)
            val currentToken = inMemoryTokenStore.fetchToken(name, 0)
            completed = currentToken.position().orElse(-1) == engine.createHeadToken().position()?.orElse(-1)
            iterations += 1
        }
        logger.info("Stopping processor...")
        processor.shutDown()

        return "OK, replay done"
    }
}
Enter fullscreen mode Exit fullscreen mode

Discussion (1)

Collapse
patrickdronk profile image
Patrick Dronk

Awesome work! Well done! 👍