Messaging

With all Domain Events persisted into the database, the only thing remaining to spread the news is to push them to our favorite messaging system. We prefer RabbitMQ, but any other system, such as ActiveMQ or ZeroMQ, will do the job. For integrating with RabbitMQ using PHP, there aren't many options, but php-amqplib will do the work.

First of all, we need a service capable of sending persisted Domain Events to RabbitMQ. You may want to query EventStore for all the Events and send each one, which isn't a bad idea. However, we could push the same Domain Event more than once, and generally speaking, we need to minimize the number of Domain Events republished. If the number of Domain Events republished is 0, that's even better. In order to not republish Domain Events, we need some sort of component to track which Domain Events have already been pushed and which ones are remaining. Last but not least, once we know which Domain Events we have to push, we send them and keep track of the last one published into our messaging system. Let's see a possible implementation for this service:

class NotificationService
{
private $serializer;
private $eventStore;
private $publishedMessageTracker;
private $messageProducer;

public function __construct(
EventStore $anEventStore,
PublishedMessageTracker $aPublishedMessageTracker,
MessageProducer $aMessageProducer,
Serializer $aSerializer
) {
$this->eventStore = $anEventStore;
$this->publishedMessageTracker = $aPublishedMessageTracker;
$this->messageProducer = $aMessageProducer;
$this->serializer = $aSerializer;
}

/**
* @return int
*/
public function publishNotifications($exchangeName)
{
$publishedMessageTracker = $this->publishedMessageTracker();
$notifications = $this->listUnpublishedNotifications(
$publishedMessageTracker
->mostRecentPublishedMessageId($exchangeName)
);

if (!$notifications) {
return 0;
}

$messageProducer = $this->messageProducer();
$messageProducer->open($exchangeName);
try {
$publishedMessages = 0;
$lastPublishedNotification = null;
foreach ($notifications as $notification) {
$lastPublishedNotification = $this->publish(
$exchangeName,
$notification,
$messageProducer
);
$publishedMessages++;
}
} catch (Exception $e) {
// Log your error (trigger_error, Monolog, etc.)
}

$this->trackMostRecentPublishedMessage(
$publishedMessageTracker,
$exchangeName,
$lastPublishedNotification
);

$messageProducer->close($exchangeName);

return $publishedMessages;
}

protected function publishedMessageTracker()
{
return $this->publishedMessageTracker;
}

/**
* @return StoredEvent[]
*/
private function listUnpublishedNotifications(
$mostRecentPublishedMessageId
) {
return $this
->eventStore()
->allStoredEventsSince($mostRecentPublishedMessageId);
}

protected function eventStore()
{
return $this->eventStore;
}

private function messageProducer()
{
return $this->messageProducer;
}

private function publish(
$exchangeName,
StoredEvent $notification,
MessageProducer $messageProducer
) {
$messageProducer->send(
$exchangeName,
$this->serializer()->serialize($notification, 'json'),
$notification->typeName(),
$notification->eventId(),
$notification->occurredOn()
);

return $notification;
}

private function serializer()
{
return $this->serializer;
}

private function trackMostRecentPublishedMessage(
PublishedMessageTracker $publishedMessageTracker,
$exchangeName,
$notification
) {
$publishedMessageTracker->trackMostRecentPublishedMessage(
$exchangeName, $notification
);
}
}

NotificationService depends on three interfaces. We've already seen EventStore, which is responsible for appending and querying Domain Events. The second one is PublishedMessageTracker, which is responsible for keeping track of pushed messages. The third one is MessageProducer, an interface representing our messaging system:

interface PublishedMessageTracker
{
/**
* @param string $exchangeName
* @return int
*/
public function mostRecentPublishedMessageId($exchangeName);

/**
* @param string $exchangeName
* @param StoredEvent $notification
*/
public function trackMostRecentPublishedMessage(
$exchangeName, $notification
);
}

The mostRecentPublishedMessageId method returns the ID of last PublishedMessage, so that the process can start from the next one. trackMostRecentPublishedMessage is responsible for tracking which message was sent last, in order to be able to republish messages in case you need to. $exchangeName represents the communication channel we're going to use to send out our Domain Events. Let's see a Doctrine implementation of PublishedMessageTracker:

class DoctrinePublishedMessageTracker extends EntityRepository
implements PublishedMessageTracker
{
/**
* @param $exchangeName
* @return int
*/
public function mostRecentPublishedMessageId($exchangeName)
{
$messageTracked = $this->findOneByExchangeName($exchangeName);
if (!$messageTracked) {
return null ;
}

return $messageTracked->mostRecentPublishedMessageId();
}

/**
*@param $exchangeName
* @param StoredEvent $notification
*/
public function trackMostRecentPublishedMessage(
$exchangeName, $notification
) {
if(!$notification) {
return;
}

$maxId = $notification->eventId();

$publishedMessage= $this->findOneByExchangeName($exchangeName);
if(null === $publishedMessage){
$publishedMessage = new PublishedMessage(
$exchangeName,
$maxId
);
}

$publishedMessage->updateMostRecentPublishedMessageId($maxId);

$this->getEntityManager()->persist($publishedMessage);
$this->getEntityManager()->flush($publishedMessage);
}
}

This code is quite straightforward. The only edge case we have to consider is when no Domain Event has already been published.

Why an Exchange Name?
We'll see this in more detail in the Chapter 12, Integrating Bounded Contexts. However, when a system is running and a new Bounded Context comes into play, you might be interested in resending all the Domain Events to the new Bounded Context. So keeping track of the last Domain Event published and the channel where it was sent might come in handy later.

In order to keep track of published Domain Events, we need an exchange name and a notification ID. Here's a possible implementation:

class PublishedMessage
{
private $mostRecentPublishedMessageId;
private $trackerId;
private $exchangeName;

/**
* @param string $exchangeName
* @param int $aMostRecentPublishedMessageId
*/
public function __construct(
$exchangeName, $aMostRecentPublishedMessageId
) {
$this->mostRecentPublishedMessageId =
$aMostRecentPublishedMessageId;
$this->exchangeName = $exchangeName;
}

public function mostRecentPublishedMessageId()
{
return $this->mostRecentPublishedMessageId;
}

public function updateMostRecentPublishedMessageId($maxId)
{
$this->mostRecentPublishedMessageId = $maxId;
}

public function trackerId()
{
return $this->trackerId;
}
}

And here is its corresponding mapping:

DddDomainEventPublishedMessage:
type: entity
table: event_published_message_tracker
repositoryClass:
DddInfrastructureApplicationNotification
DoctrinePublishedMessageTracker
id:
trackerId:
column: tracker_id
type: integer
generator:
strategy: AUTO
fields:
mostRecentPublishedMessageId:
column: most_recent_published_message_id
type: bigint
exchangeName:
type: string
column: exchange_name

Now let's see what the MessageProducer interface is used for, along with its implementation details:

interface MessageProducer 
{
public function open($exchangeName);

/**
* @param $exchangeName
* @param string $notificationMessage
* @param string $notificationType
* @param int $notificationId
* @param DateTimeImmutable $notificationOccurredOn
* @return
*/
public function send(
$exchangeName,
$notificationMessage,
$notificationType,
$notificationId,
DateTimeImmutable $notificationOccurredOn
);

public function close($exchangeName);
}

Easy. The open and close methods open and close a connection with the messaging system. send takes a message body — message name and message ID — and sends them to our messaging engine, whatever it is. Because we've chosen RabbitMQ, we need to implement the connection and sending process:

abstract class RabbitMqMessaging
{
protected $connection;
protected $channel ;

public function __construct(AMQPConnection $aConnection)
{
$this->connection =$aConnection;
$this->channel = null ;
}

private function connect($exchangeName)
{
if (null !== $this->channel ) {
return;
}

$channel = $this->connection->channel();
$channel->exchange_declare(
$exchangeName, 'fanout', false, true, false
);
$channel->queue_declare(
$exchangeName, false, true, false, false
);
$channel->queue_bind($exchangeName, $exchangeName);

$this->channel = $channel ;
}

public function open($exchangeName)
{

}

protected function channel ($exchangeName)
{
$this->connect($exchangeName);

return $this->channel;
}

public function close($exchangeName)
{
$this->channel->close();
$this->connection->close();
}
}

class RabbitMqMessageProducer
extends RabbitMqMessaging
implements MessageProducer
{
/**
* @param $exchangeName
* @param string $notificationMessage
* @param string $notificationType
* @param int $notificationId
* @param DateTimeImmutable $notificationOccurredOn
*/
public function send(
$exchangeName,
$notificationMessage,
$notificationType,
$notificationId,
DateTimeImmutable $notificationOccurredOn
) {
$this->channel ($exchangeName)->basic_publish(
new AMQPMessage(
$notificationMessage,
[
'type'=>$notificationType,
'timestamp'=>$notificationOccurredOn->getTimestamp(),
'message_id'=>$notificationId
]
),
$exchangeName
);
}
}

Now that we have a DomainService for pushing Domain Events into a messaging system like RabbitMQ, it's time to execute them. We need to choose a delivery mechanism to run the service. We personally suggest creating a Symfony Console Command:

class PushNotificationsCommand extends Command
{
protected function configure()
{
$this
->setName('domain:events:spread')
->setDescription('Notify all domain events via messaging')
->addArgument(
'exchange-name',
InputArgument::OPTIONAL,
'Exchange name to publish events to',
'my-bc-app'
);
}

protected function execute(
InputInterface $input, OutputInterface $output
) {
$app = $this->getApplication()->getContainer();

$numberOfNotifications =
$app['notification_service']
->publishNotifications(
$input->getArgument('exchange-name')
);

$output->writeln(
sprintf(
'<comment>%d</comment>' .
'<info>notification(s) sent!</info>',
$numberOfNotifications
)
);
}
}

Following the Silex example, let's see the definition of the $app['notification_service'] defined in the Silex Pimple Service Container:

 // ...
$app['event_store']=$app->share( function ($app) {
return $app['em']->getRepository('DddDomainEventStoredEvent');
});

$app['message_tracker'] = $app->share(function($app) {
return $app['em']
->getRepository('DddDomainEventPublishedMessage');
});

$app['message_producer'] = $app->share(function () {
return new RabbitMqMessageProducer(
new AMQPStreamConnection('localhost', 5672, 'guest', 'guest')
);
});

$app['message_serializer'] = $app->share(function () {
return SerializerBuilder::create()->build();
});

$app['notification_service'] = $app->share(function ($app) {
return new NotificationService(
$app['event_store'],
$app['message_tracker'],
$app['message_producer'],
$app['message_serializer']
);
});
//...
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset