Event Sourcing

CQRS is a powerful and flexible architecture. There's an added benefit to it in regard to gathering and saving the Domain Events (which occurred during an Aggregate operation), giving you a high-level degree of detail of what's going on within your Domain. Domain Events are one of the key tactical patterns because of their significance within the Domain, as they describe past occurrences.

Be careful with recording too many events
An ever-growing number of events is a smell. It might reveal an addiction to event recording at the Domain, most likely incentivized by the business. As a rule of thumb, remember to keep it simple.

By using CQRS, we've been able to record all the relevant events that occurred in the Domain Layer. The state of the Domain Model can be represented by reproducing the Domain Events we previously recorded. We just need a tool for storing all those events in a consistent way. We need an event store.

The fundamental idea behind Event Sourcing is to express the state of Aggregates as a linear sequence of events

With CQRS, we partially achieved the following: The Post entity alters its state by using Domain Events, but it's persisted, as explained already, thereby mapping the object to a database table.

Event Sourcing takes this a step further. If we were using a database table to store the state of all the blog posts, another to store the state of all the blog post comments, and so on, using Event Sourcing would allow us to use a single database table: A single append —  only database table that would store all the Domain Events published by all the Aggregates within the Domain Model. Yes, you read that correctly. A single database table.

With this model in mind, tools like object-relational mappers are no longer needed. The only tool needed would be a simple database abstraction layer by which events can be appended:

interface EventSourcedAggregateRoot
{
public static function reconstitute(EventStream $events);
}

class Post extends AggregateRoot implements EventSourcedAggregateRoot
{
public static function reconstitute(EventStream $history)
{
$post = new static($history->getAggregateId());

foreach ($events as $event) {
$post->applyThat($event);
}

return $post;
}
}

Now the Post Aggregate has a method which, when given a set of events (or, in other words, an event stream), is able to replay the state step by step until it reaches the current state, all before saving. The next step would be building an adapter of the PostRepository port that will fetch all the published events from the Post Aggregate and append them to the data store where all the events are appended. This is what we call an event store:

class EventStorePostRepository implements PostRepository
{
private $eventStore;
private $projector;

public function __construct($eventStore, $projector)
{
$this->eventStore = $eventStore;
$this->projector = $projector;
}

public function save(Post $post)
{
$events = $post->recordedEvents();

$this->eventStore->append(new EventStream(
$post->id(),
$events)
);
$post->clearEvents();

$this->projector->project($events);
}
}

This is how the implementation of the PostRepository looks when we use an event store to save all the events published by the Post Aggregate. Now we need a way to restore an Aggregate from its events history. A reconstitute method implemented by the Post Aggregate and used to rebuild a blog post state from triggered events comes in handy:

class EventStorePostRepository implements PostRepository
{
public function byId(PostId $id)
{
return Post::reconstitute(
$this->eventStore->getEventsFor($id)
);
}
}

The event store is the workhorse that carries out all the responsibility in regard to saving and restoring event streams. Its public API is composed of two simple methods: They are append and getEventsFrom. The former appends an event stream to the event store, and the latter loads event streams to allow Aggregate rebuilding.

We could use a key-value implementation to store all events:

class EventStore
{
private $redis;
private $serializer;

public function __construct($redis, $serializer)
{
$this->redis = $redis;
$this->serializer = $serializer;
}

public function append(EventStream $eventstream)
{
foreach ($eventstream as $event) {
$data = $this->serializer->serialize(
$event, 'json'
);

$date = (new DateTimeImmutable())->format('YmdHis');

$this->redis->rpush(
'events:' . $event->getAggregateId(),
$this->serializer->serialize([
'type' => get_class($event),
'created_on' => $date,
'data' => $data
],'json')
);
}
}

public function getEventsFor($id)
{
$serializedEvents = $this->redis->lrange('events:' . $id, 0, -1);

$eventStream = [];
foreach($serializedEvents as $serializedEvent){
$eventData = $this->serializerdeserialize(
$serializedEvent,
'array',
'json'
);

$eventStream[] = $this->serializer->deserialize(
$eventData['data'],
$eventData['type'],
'json'
);
}

return new EventStream($id, $eventStream);
}
}

This event store implementation is built upon Redis, a widely used key-value store. The events are appended in a list using the prefix events: In addition, before persisting the events, we extract some metadata like the event class or the creation date, as it will come in handy later.

Obviously, in terms of performance, it's expensive for an Aggregate to go over its full event history to reach its final state all of the time. This is especially the case when an event stream has hundreds or even thousands of events. The best way to overcome this situation is to take a snapshot from the Aggregate and replay only the events in the event stream that occurred after the snapshot was taken. A snapshot is just a simple serialized version of the Aggregate state at any given moment. It can be based on the number of events of the Aggregate's event stream, or it can be time based. With the first approach, a snapshot will be taken every n triggered events (every 50, 100, or 200 events, for example). With the second approach, a snapshot will be taken every n seconds.

To follow the example, we'll use the first way of snapshotting. In the event's metadata, we store an additional field, the version, from which we'll start replaying the Aggregate history:

class SnapshotRepository
{
public function byId($id)
{
$key = 'snapshots:' . $id;
$metadata = $this->serializer->unserialize(
$this->redis->get($key)
);

if (null === $metadata) {
return;
}

return new Snapshot(
$metadata['version'],
$this->serializer->unserialize(
$metadata['snapshot']['data'],
$metadata['snapshot']['type'],
'json'
)
);
}

public function save($id, Snapshot $snapshot)
{
$key = 'snapshots:' . $id;
$aggregate = $snapshot->aggregate();

$snapshot = [
'version' => $snapshot->version(),
'snapshot' => [
'type' => get_class($aggregate),
'data' => $this->serializer->serialize(
$aggregate, 'json'
)
]
];

$this->redis->set($key, $snapshot);
}
}

And now we need to refactor the EventStore class so that it starts using the SnapshotRepository to load the Aggregate with acceptable performance times:

class EventStorePostRepository implements PostRepository
{
public function byId(PostId $id)
{
$snapshot = $this->snapshotRepository->byId($id);

if (null === $snapshot) {
return Post::reconstitute(
$this->eventStore->getEventsFrom($id)
);
}

$post = $snapshot->aggregate();

$post->replay(
$this->eventStore->fromVersion($id, $snapshot->version())
);

return $post;
}
}

We just need to take Aggregate snapshots periodically. We could do this synchronously or asynchronously by a process responsible for monitoring the event store. The following code is a simple example demonstrating the implementation of Aggregate snapshotting:

class EventStorePostRepository implements PostRepository
{
public function save(Post $post)
{
$id = $post->id();
$events = $post->recordedEvents();
$post->clearEvents();
$this->eventStore->append(new EventStream($id, $events));
$countOfEvents =$this->eventStore->countEventsFor($id);
$version = $countOfEvents / 100;

if (!$this->snapshotRepository->has($post->id(), $version)) {
$this->snapshotRepository->save(
$id,
new Snapshot(
$post, $version
)
);
}

$this->projector->project($events);
}
}
To ORM or Not To ORM 
It's clear from the use case of this architectural style that using an ORM just to persist / fetch events would be overkill. Even if we use a relational database for storing them, we only need to persist / fetch events from the data store.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset