PassbirdEventRegistry.kt
package de.pflugradts.passbird.application.eventhandling
import de.pflugradts.passbird.domain.model.ddd.AggregateRoot
import de.pflugradts.passbird.domain.model.ddd.DomainEvent
import de.pflugradts.passbird.domain.service.eventhandling.EventHandler
import de.pflugradts.passbird.domain.service.eventhandling.EventRegistry
import java.util.ArrayDeque
import java.util.Collections
import java.util.IdentityHashMap
import java.util.Queue
import java.util.logging.Level
import java.util.logging.Logger
class PassbirdEventRegistry constructor(
eventHandlers: Set<EventHandler>,
) : EventRegistry {
private val eventHandlers = eventHandlers.toList()
private val aggregateRoots: MutableSet<AggregateRoot> = Collections.newSetFromMap(IdentityHashMap())
private val domainEvents: Queue<DomainEvent> = ArrayDeque()
private val abandonedAggregateRoots: Queue<AggregateRoot> = ArrayDeque()
override fun register(aggregateRoot: AggregateRoot) {
aggregateRoots.add(aggregateRoot)
}
override fun register(domainEvent: DomainEvent) {
domainEvents.add(domainEvent)
}
override fun deregister(aggregateRoot: AggregateRoot) {
abandonedAggregateRoots.add(aggregateRoot)
}
override fun processEvents() {
processAbandonedAggregateRoots()
processAggregateRoots()
processDomainEvents()
processAbandonedAggregateRoots()
}
override fun clearEvents() {
processAbandonedAggregateRoots()
aggregateRoots.forEach { it.clearDomainEvents() }
domainEvents.clear()
abandonedAggregateRoots.clear()
}
private fun processAggregateRoots() {
aggregateRoots.forEach { aggregateRoot ->
aggregateRoot.getDomainEvents().forEach(::postEvent)
aggregateRoot.clearDomainEvents()
}
}
private fun processDomainEvents() {
while (!domainEvents.isEmpty()) {
val domainEvent = domainEvents.peek()
postEvent(domainEvent)
domainEvents.remove()
}
}
private fun postEvent(domainEvent: DomainEvent) {
var subscriberException: RuntimeException? = null
eventHandlers
.filter { eventHandler -> eventHandler.eventTypes.any { eventType -> eventType.isAssignableFrom(domainEvent.javaClass) } }
.forEach { eventHandler ->
subscriberException = eventHandler.handleCatching(domainEvent, subscriberException)
}
subscriberException?.let { throw it }
}
private fun EventHandler.handleCatching(domainEvent: DomainEvent, previousException: RuntimeException?) =
runCatching { handle(domainEvent) }
.fold(
onSuccess = { previousException },
onFailure = { exception ->
exception.throwIfFatal()
LOGGER.log(Level.SEVERE, "Exception thrown by event handler", exception)
previousException ?: exception.asRuntimeException()
},
)
private fun processAbandonedAggregateRoots() {
while (!abandonedAggregateRoots.isEmpty()) aggregateRoots.remove(abandonedAggregateRoots.poll())
}
private companion object {
val LOGGER: Logger = Logger.getLogger(PassbirdEventRegistry::class.java.name)
}
}
private fun Throwable.asRuntimeException() = this as? RuntimeException ?: RuntimeException(this)
private fun Throwable.throwIfFatal() {
if (this is VirtualMachineError || this is ThreadDeath || this is LinkageError) throw this
}