Synchronizing the Write Model with the Read Model

Here comes the tricky part. How do we synchronize the Read Model with the Write Model? We already said we would do it by using Domain Events captured in a Write Model transaction. For each type of Domain Event captured, a specific projection will be executed. So a one-to-one relationship between Domain Events and projections will be set.

Let's have a look at an example of configuring projections so that we can get a better idea. First of all, we need to define a skeleton for the projections:

interface Projection 
{
public function listensTo();
public function project($event);
}

So defining an Elasticsearch projection for a PostWasCreated event would be as simple as this:

namespace InfrastructureProjectionElasticsearch;

use ElasticsearchClient;
use PostWasCreated;

class PostWasCreatedProjection implements Projection
{
private $client;

public function __construct(Client $client)
{
$this->client = $client;
}

public function listensTo()
{
return PostWasCreated::class;
}

public function project($event)
{
$this->client->index([
'index' => 'posts',
'type' => 'post',
'id' => $event->getPostId(),
'body' => [
'content' => $event->getPostContent(),
// ...
]
]);
}
}

The Projector implementation is a kind of specialized Domain Event listener. The main difference between that and the default Domain Event listener is that the Projector reacts to a group of Domain Events instead of only one:

namespace InfrastructureProjection;

class Projector
{
private $projections = [];

public function register(array $projections)
{
foreach ($projections as $projection) {
$this->projections[$projection->eventType()] = $projection;
}
}

public function project( array $events)
{
foreach ($events as $event) {
if (isset($this->projections[get_class($event)])) {
$this->projections[get_class($event)]
->project($event);
}
}
}
}

The following code shows how the flow between the projector and the events would appear:

$client = new ElasticsearchClientBuilder::create()->build();

$projector = new Projector();
$projector->register([
new InfrastructureProjectionElasticsearch
PostWasCreatedProjection($client),
new InfrastructureProjectionElasticsearch
PostWasPublishedProjection($client),
new InfrastructureProjectionElasticsearch
PostWasCategorizedProjection($client),
new InfrastructureProjectionElasticsearch
PostContentWasChangedProjection($client),
new InfrastructureProjectionElasticsearch
PostTitleWasChangedProjection($client),
]);

$events = [
new PostWasCreated(/* ... */),
new PostWasPublished(/* ... */),
new PostWasCategorized(/* ... */),
new PostContentWasChanged(/* ... */),
new PostTitleWasChanged(/* ... */),
];

$projector->project($event);

This code is kind of synchronous, but the process can be asynchronous if needed. And you could make your customers aware of this out-of-sync data by placing some alerts in the view layer.

For the next example, we'll use the amqplib PHP extension in combination with ReactPHP:

// Connect to an AMQP broker
$cnn = new AMQPConnection();
$cnn->connect();

// Create a channel
$ch = new AMQPChannel($cnn);

// Declare a new exchange
$ex = new AMQPExchange($ch);
$ex->setName('events');

$ex->declare();

// Create an event loop
$loop = ReactEventLoopFactory::create();

// Create a producer that will send any waiting messages every half a second
$producer = new GosComponentReactAMQPProducer($ex, $loop, 0.5);

$serializer = JMSSerializerSerializerBuilder::create()->build();

$projector = new AsyncProjector($producer, $serializer);

$events = [
new PostWasCreated(/* ... */),
new PostWasPublished(/* ... */),
new PostWasCategorized(/* ... */),
new PostContentWasChanged(/* ... */),
new PostTitleWasChanged(/* ... */),
];

$projector->project($event);

For this to work, we need an asynchronous projector. Here's a naive implementation of that:

namespace InfrastructureProjection;

use GosComponentReactAMQPProducer;
use JMSSerializerSerializer;

class AsyncProjector
{
private $producer;
private $serializer;

public function __construct(
Producer $producer,
Serializer $serializer
) {
$this->producer = $producer;
$this->serializer = $serializer;
}

public function project(array $events)
{
foreach ($events as $event) {
$this->producer->publish(
$this->serializer->serialize(
$event, 'json'
)
);
}
}
}

And the event consumer on the RabbitMQ exchange would look something like this:

// Connect to an AMQP broker
$cnn = new AMQPConnection();
$cnn-> connect();

// Create a channel
$ch = new AMQPChannel($cnn);

// Create a new queue
$queue = new AMQPQueue($ch);
$queue->setName('events');
$queue->declare();

// Create an event loop
$loop = ReactEventLoopFactory::create();

$serializer = JMSSerializerSerializerBuilder::create()->build();

$client = new ElasticsearchClientBuilder::create()->build();

$projector = new Projector();
$projector->register([
new InfrastructureProjectionElasticsearch
PostWasCreatedProjection($client),
new InfrastructureProjectionElasticsearch
PostWasPublishedProjection($client),
new InfrastructureProjectionElasticsearch
PostWasCategorizedProjection($client),
new InfrastructureProjectionElasticsearch
PostContentWasChangedProjection($client),
new InfrastructureProjectionElasticsearch
PostTitleWasChangedProjection($client),
]);

// Create a consumer
$consumer = new GosComponentReactAMQPConsumer($queue, $loop, 0.5, 10);

// Check for messages every half a second and consume up to 10 at a time.
$consumer->on(
'consume',
function ($envelope, $queue) use ($projector, $serializer) {
$event = $serializer->unserialize($envelope->getBody(), 'json');
$projector->project($event);
}
);

$loop->run();

From now on, it could be as simple as making all the needed Repositories consume an instance of the projector and then making them invoke the projection process:

class DoctrinePostRepository implements PostRepository
{
private $em;
private $projector;

public function __construct(EntityManager $em, Projector $projector)
{
$this->em = $em;
$this->projector = $projector;
}

public function save(Post $post)
{
$this->em->transactional(
function (EntityManager $em) use ($post)
{
$em->persist($post);

foreach ($post->recordedEvents() as $event) {
$em->persist($event);
}
}
);

$this->projector->project($post->recordedEvents());
}

public function byId(PostId $id)
{
return $this->em->find($id);
}
}

The Post instance and the recorded events are triggered and persisted in the same transaction. This ensures that no events are lost, as we'll project them to the Read Model if the transaction is successful. As a result, no inconsistencies will exist between the Write Model and the Read Model.

To ORM or Not To ORM  
One of the most common questions when implementing CQRS is if an Object-Relational Mapper (ORM) is really needed. We strongly believe that using an ORM for the Write Model is perfectly fine and has all of the advantages of using a tool, which will help us save a lot of work in case we use a relational database. But we shouldn't forget that we still need to persist and retrieve the Write Model's state in a relational database.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset