14

Batches, Queues, and Cron

If in the previous chapter, we kept things a bit more theoretical with me throwing “rules” at you, in this chapter, I am going to make up for it and we are going to have some fun. This means we are going to write some code that demonstrates concepts related to data processing, especially larger amounts of it. And in doing so, we are going to cover a few topics.

First, we are going to look back at the hook_post_update_NAME() hook we saw in Chapter 8, The Database API. More specifically, we are going to see how the &$sandbox parameter can be used to handle updates that need to process some data that may take a bit longer and should be split across multiple requests. Next up, we are going to look at standalone batches (which basically use the same system) to process data in batches across multiple requests. And what better example to illustrate this technique than our Importer, which needs to process an undefined number of products?

We will explore a related subsystem that allows us to queue things for later processing (either in batches, during cron, or in simple requests). And since we are talking about cron, we will also go into a bit of detail and see how this system works in Drupal. Finally, we will finish this chapter by taking a look at the Lock API, an API that allows us to ensure multiple requests don’t run a process at the same time.

The main topics we will cover in this chapter are as follows:

  • Batch-powered update hooks
  • Batch operations
  • Cron
  • The Queue API
  • The Lock API

By the end of this chapter, you will be a lean, mean, data-processing machine. So, let’s get to it.

Batch-powered update hooks

The first thing we are going to look at is post update hooks, revisiting our previous Sports module created in Chapter 8, The Database API. We will focus on the &$sandbox, which is present in both the update hooks (hook_update_N()) and the post update hooks (hook_post_update_NAME()). The goal is to run an update on each of our records in the players table and mark them as retired. The point is to illustrate how we can process each of these records one at a time in individual requests to prevent a PHP timeout. This is handy if we have many records.

So, to get us going, here is all the code, and we’ll see right after what everything means. If you remember, this will go in the sports.post_update.php file of our module:

/**
 * Update all the players to mark them as retired.
 */
function sports_post_update_retire_players(&$sandbox) {
  $database = Drupal::database();
  if (empty($sandbox)) {
    $results = $database->query("SELECT [id] FROM
      {players}")->fetchAllAssoc('id');
    $sandbox['progress'] = 0;
    $sandbox['ids'] = array_keys($results);
    $sandbox['max'] = count($results);
  }
  $id = $sandbox['ids'] ? array_shift($sandbox['ids']) :
    NULL;
  $player = $database->query("SELECT * FROM {players} WHERE
    [id] = :id", [':id' => $id])->fetch();
  $data = $player->data ? unserialize($player->data) : [];
  $data['retired'] = TRUE;
  $database->update('players')
   ->fields(['data' => serialize($data)])
   ->condition('id', $id)
   ->execute();
  $sandbox['progress']++;
  $sandbox['#finished'] = $sandbox['progress'] /
    $sandbox['max'];
}

When this hook is fired, the $sandbox argument (passed by reference) is empty. Its goal is to act as temporary storage between the requests needed to process everything inside the function. We can use it to store arbitrary data, but we should be mindful of the size, as it must fit inside a LONGBLOB table column.

The first thing we are doing is getting our hands on the database service to make queries to our players table. But more importantly, we are checking whether the $sandbox variable is empty, which indicates that this is the start of the process. If it is, we add some data to it that is specific to our process. In this case, we want to store the progress (this is quite common), the IDs of the players that need to be updated, and the total number of records (also quite common). To do this, we make a simple query.

Once the sandbox is set, we can get the first ID in the list while also removing it so that iteratively, we have fewer records to process. Based on that ID, we load the relevant player, add our data to it, and update it back in the database. Once that is done, we increment the progress by 1 (as we have processed one record). Finally, the #finished key in the sandbox is what Drupal looks at to determine whether the process is finished. It expects an integer between 0 and 1, the latter signifying that we are done. If anything below 1 is found, the function gets called again and the $sandbox array will contain the data as we left it (incremented progress and one less ID to process). In this case, the main body of the function runs again, processing the next record, and so on, until the progress divided by the maximum number of records is equal to 1. If we have 100 records, when the progress reaches 100, the following is true: 100/100 = 1. Then, Drupal knows to finish the process and not call the function again.

This process is also called batching in Drupal terms and is very useful because Drupal will make as many requests as needed to finish it. We can control the workload each request needs to make in one request. The previous example might be a bit of overkill in the sense that a request is perfectly capable of processing more than one player. We are actually losing time because, like this, Drupal needs to bootstrap itself again and again for each request. So, it’s up to us to find that sweet spot. In our previous example, what we could have done was break up the array of IDs into chunks of maybe five and allowed a request to process five records instead of one. That would have surely increased the speed, so I encourage you to go ahead and try that on your own now that you understand the principles behind using $sandbox for batching.

Now that we have a basic understanding of Drupal’s capabilities of doing multi-request processing, let’s switch gears and look at the Batch API.

Batch operations

To demonstrate how batching works, we are going to rebuild the way our product JsonImporter plugin processes the data it retrieves. Currently, we simply load all the values into an array of objects and loop through each, saving them to the database. So, if there are 100,000 products in the JSON response, we might get into trouble with this approach. To be fair, if the remote provider has so many products, it usually provides a paginated way of requesting them by passing an offset and a limit. This keeps the payloads smaller (which is good for both communicating servers) and makes processing easier. For now, we’ll go with the assumption that the number of returned products is large, but not so large as to pose problems with communication or with the ability of PHP to store them in memory.

Moreover, while illustrating the Batch API, we will also perform an operation we “forgot” in Chapter 7, Your Own Custom Entity and Plugin Types. During the import, we also want to delete any products that have been previously imported but that are no longer in the JSON response. It is a kind of synchronization between the two data sources if you will. So, let’s get to it.

Creating the batch

Inside the JsonImporter::import() method, once we get our hands on the $products array, let’s replace the loop with the following:

$batch_builder = (new BatchBuilder())
  ->setTitle($this->t('Importing products'))
  ->setFinishCallback([$this, 'importProductsFinished']);
$batch_builder->addOperation([$this, 'clearMissing'],
  [$products]);
$batch_builder->addOperation([$this, 'importProducts'],
  [$products]);
batch_set($batch_builder->toArray());

And the new use statement at the top:

use DrupalCoreBatchBatchBuilder;

Creating a batch involves a few steps, the first one being the creation of a batch definition using the BatchBuilder.

The batch can have a title (used on the progress page). Similarly, it can also have an optional init, progress, and error message that can be set with corresponding methods, but which also come with sensible defaults. For more information as to what exactly you can do with them and what other options you have, make sure you check out the BatchBuilder class and the batch_set global function.

The most important part of the batch definition is the list of operations in which we specify what needs to take place in the batch. These are defined as any kind of valid PHP callback and an array of arguments to pass to these callbacks. If the latter resides in a file that has not been loaded, the setFile() method can be used to specify a file path to include. Each operation runs on its own PHP request, in the sequence in which they are defined. Moreover, each operation can also run across multiple requests, similar to how we wrote our post update hook earlier.

Our first operation will be responsible for removing from Drupal the products that no longer exist in the JSON response, while the latter will do the import. Both receive only one parameter—the array of products.

The finished key in the definition (set using the setFinishCallback() method) is another callback that gets fired at the end of the batch processing after all the operations are done.

Finally, we call the global batch_set() method, which statically sets the batch definition and marks it as ready to be run. There is just one more step to trigger the batch, and that is a call to batch_process(). But the reason we have not used it is that if the import runs as part of a form submission, the Form API triggers it automatically. So, it won’t work if we trigger it here as well. The reason why the Form API does it for us is that most of the time, we want batches to run only because an action is taken. And usually, this is done via forms. However, the other common possibility is to trigger the batch via a Drush command. In this case, we need to use the drush_backend_batch_process() function instead.

So, what we will do first is check that we are in a command-line environment (aka Drush) and trigger it only in that case:

if (PHP_SAPI == 'cli') {
  drush_backend_batch_process();
}

Otherwise, we leave it up to the Form API. In doing this, we can trigger the import both from a Form submit handler and via Drush.

Batch operations

Now that we have our batch definition in place, we are missing those three callback methods we are referencing in it. So, let’s see the first one:

public function clearMissing($products, &$context) {
  if (!isset($context['results']['cleared'])) {
    $context['results']['cleared'] = [];
  }
  if (!$products) {
    return;
  }
  $ids = [];
  foreach ($products as $product) {
    $ids[] = $product->id;
  }
  $ids = $this->entityTypeManager->getStorage('product')
    ->getQuery()
    ->condition('remote_id', $ids, 'NOT IN')
    ->accessCheck(FALSE)
    ->execute();
  if (!$ids) {
    $context['results']['cleared'] = [];
    return;
  }
  $entities = $this->entityTypeManager->
    getStorage('product')->loadMultiple($ids);
  /** @var DrupalproductsEntityProductInterface $entity
  */
  foreach ($entities as $entity) {
    $context['results']['cleared'][] = $entity->getName();
  }
  $context['message'] = $this->t('Removing @count
    products', ['@count' => count($entities)]);
  $this->entityTypeManager->getStorage('product')->
    delete($entities);
}

This is the first operation in the batch process. As an argument, it receives all the variables we defined in the batch definition (in our case, the products array). But it also gets a $context array variable passed by reference, which we can use in a similar way to how we used $sandbox in the post update hook (with some extra capabilities).

The task at hand is simple. We prepare a list of IDs of all the products in the JSON and, based on those, we query our product entities for those that are NOT IN that list. If any are found, we delete them. You’ll notice already that in this operation, we are not relying on the actual multi-request capabilities of Drupal’s Batch API because we expect the workload to be minimal. After all, how many products could be missing at any given time and would need to be deleted? We’ll assume not many for our use case.

But while we are doing all this, we are interacting somewhat with the batch processing. You’ll notice that the $context array has a results key. This is used to store information related to the outcome of each operation in the batch. We are not supposed to use it for managing progress but instead for keeping track of what was done so that at the end, we can present the user with some useful information as to what has happened. So, in our example, we create an array keyed by cleared (to namespace the data for this particular operation), to which we add the names of each product that has been deleted.

Moreover, we also have a message key that we use to print a message as the action is happening. This gets printed out in real time to indicate to the user what is currently being processed. If the batch is run via the UI through a form, it very well might be that you won’t see all the messages due to the speed of the processing. However, if triggered by Drush (as it will be in our case), each of these messages will be printed on the terminal screen.

With this, our first operation is done. It’s time to look at the second, more complex one:

public function importProducts($products, &$context) {
  if (!isset($context['results']['imported'])) {
    $context['results']['imported'] = [];
  }
  if (!$products) {
    return;
  }
  $sandbox = &$context['sandbox'];
  if (!$sandbox) {
    $sandbox['progress'] = 0;
    $sandbox['max'] = count($products);
    $sandbox['products'] = $products;
  }
  $slice = array_splice($sandbox['products'], 0, 3);
  foreach ($slice as $product) {
    $context['message'] = $this->t('Importing product
      @name', ['@name' => $product->name]);
    $this->persistProduct($product);
    $context['results']['imported'][] = $product->name;
    $sandbox['progress']++;
  }
  $context['finished'] = $sandbox['progress'] /
    $sandbox['max'];
}

The arguments it receives are the same as with our previous operation since we defined them in the same way.

Here, again, we ensure we have some products and start up our results array, this time to keep track of the imported records. But we also work with the sandbox key of the $context array to use the multi-request processing capabilities. The approach is similar to what we did in the post update hook—we keep a progress count, store the maximum number of products, and then we calculate the $context['finished'] key based on the division between the two. However, in this case, we opt to process three products at a time instead of one. Again, as with our previous operation, we are using the message key to inform the user as to what is going on and the results key to compile a list of products that have been imported.

Before moving on, let’s talk a bit about the way we are importing the products. Had the JSON resource been able to return paginated results, we would have had to change our approach. First, we could not have deleted the missing products in the same way. Instead, we would have had to keep track of the IDs of the imported products and only afterward delete the missing ones. Hence, the order of the two operations would have been reversed. Second, the retrieval of the products would have been done from inside the importProducts operation using an offset and a limit stored in the sandbox. So, each Drupal batch request would have made a new request to the JSON resource. Of course, we would have had to keep track of all the processed products so that we would know which ones were able to be deleted.

Finally, let’s see the callback used when the batch processing finishes:

public function importProductsFinished($success, $results,
  $operations) {
  if (!$success) {
    $this->messenger->addStatus($this->t('There was a
      problem with the batch'), 'error');
    return;
  }
  $cleared = count($results['cleared']);
  if ($cleared == 0) {
    $this->messenger->addStatus($this->t('No products had
      to be deleted.'));
  }
  else {
    $this->messenger->addStatus($this->formatPlural
      ($cleared, '1 product had to be deleted.', '@count
        products had to be deleted.'));
  }
  $imported = count($results['imported']);
  if ($imported == 0) {
    $this->messenger->addStatus($this->t('No products found
      to be imported.'));
  }
  else {
    $this->messenger->addStatus($this->formatPlural
      ($imported, '1 product imported.', '@count products
        imported.'));
  }
}

This callback receives three parameters: a Boolean indicating whether the processing was successful or not, the results array we used inside our $context to keep track of what has been done, and the array of operations. What we are doing is pretty simple. We first print a generic message if the batch has failed. In this case, we also return early. Otherwise, we print relevant messages to the operations we have done, using the $results array. Note the use of the t() and formatPlural() methods you learned about in the previous chapter. Moreover, note the use of the local $messenger service used for printing the messages. We did not yet inject it but by now, you should know how to do this, so I will let you do it on your own.

Our reworked JSON Importer now uses batching to make the process more stable in case the number of records it needs to process gets too big. So, now if we run the Drush command we wrote in Chapter 7, Your Own Custom Entity and Plugin Types, to trigger our importer, we get an output similar to this:

Figure 14.1: Drush command output

Figure 14.1: Drush command output

Note the messages set when importing each record, as well as the messages we set at the end of the process, which provides a kind of summary of what went down.

Note

When calling batch_process(), we can also pass in a URL to redirect to when the processing has finished. However, a better way is to return a RedirectResponse inside the finished callback. And it goes without saying that if we trigger the batch from Drush, there will be no actual redirect. However, it will work just fine in a form context.

Cron

In the previous section, we created an awesome multi-request batch processing of our JSON product import. In the next section, we’ll jump into the Queue API and see how we can plan the processing of multiple items at a later stage. However, before we dive into that, let’s talk a bit about how the Drupal cron works and what we can do with it. This is because our discussion about the Queue API is closely related to it.

Drupal doesn’t actually have a fully-fledged cron system. That is because it’s an application and not a server capable of scheduling tasks that run at specified times of the day. However, what it does have is a cron-like system, which can come very close, especially on busy websites. Often, it is affectionately referred to as the poor man’s cron. Why? Since Drupal cannot by itself do anything without any sort of input, it relies on visitors coming to the website to trigger the cron tasks. So, even if we can configure the frequency of Drupal’s cron, we are relying on visitors coming to the website and triggering it inadvertently. Drupal then keeps track of when the cron ran and ensures that the next time it runs is only after the configured amount of time has elapsed. So, in essence, if the cron is set to run every hour but the next visitor only comes in three hours, it will only run then:

Figure 14.2: Cron management page

Figure 14.2: Cron management page

The Drupal cron is very useful for maintenance tasks and relatively small jobs that don’t take too many resources away from the site visitors. Moreover, it can be triggered manually from the UI, from an outside script, or even with Drush, by using the following command:

drush cron

There are many Drupal core and contributed modules that rely on this system to perform various tasks, and we, as module developers, can do the same by implementing hook_cron(). The latter gets fired every time the cron runs, so basically, Drupal’s cron is a collection of function calls to various modules. For this reason, we must avoid overloading the request with heavy processing; otherwise, the request might crash. But as we will see in the next section, we can do something to control this if we have such jobs to run.

First, though, let’s look at an example implementation and see how it works. What we want to accomplish is that whenever cron runs, we delete all the records in the teams table (which we created in Chapter 8, The Database API) that are no longer referenced by any player. Essentially, if the teams don’t have any players, they need to go. So, we could do something simple like this:

/**
 * Implements hook_cron().
 */
function sports_cron() {
  $database = Drupal::database();
  $result = $database->query("SELECT [id] FROM {teams}
    WHERE [id] NOT IN (SELECT [team_id] FROM {players}
      WHERE [team_id] IS NOT NULL)")->fetchAllAssoc('id');
  if (!$result) {
    return;
  }
  $ids = array_keys($result);
  $database->delete('teams')
    ->condition('id', $ids, 'IN')
    ->execute();
}

We are implementing hook_cron(), and inside, we basically figure out which teams have no players and delete them. You’ll notice that the query to do the former is a more complex one, as we are using a subquery, but it is still not rocket science. Feel free to check Chapter 8, The Database API, for a refresher on the Drupal database API.

This function will then be fired every time our Drupal cron runs, and we could argue that doing this task is not such a big strain on our resources. However, in the next section, we will see how we can handle cases like those. Moreover, we’ll see why that approach might even be better than this one, regardless of resource intensiveness.

Queues

It’s finally time to talk a bit about the Queue API, how it works, and what its main components are; the theory, basically. We will do this before diving into code examples, which we all thoroughly enjoy.

Introduction to the Queue API

The main purpose of the Queue API is to provide a way for us to add items to a queue to have them processed at a later time. In charge of processing these items are the queue worker plugins, which can be enlisted either automatically by the Drupal cron, manually (programmatically) by us, or by Drush. We will look at examples of all three.

The central player in this API is an implementation of the QueueInterface, which is the actual queue into which we put items. There are two types of queues Drupal can handle: reliable and unreliable. The first preserves the order in which the items are processed (first in, first out) and guarantees that each item gets processed at least once. In this chapter, we will focus only on this type of queue. But there is also the possibility of working with unreliable queues, which give their best effort when maintaining the item order but do not guarantee that all items get processed.

By default, when we are working with queues in Drupal, we use a reliable queue that is based on a database table to store the items. This is represented by the DatabaseQueue implementation. The Batch API in fact uses a type of queue that extends from the default one Drupal comes with. Okay, but what does a queue do?

A queue has three main roles:

  • It creates items (adds stuff to a list that needs processing at some point).
  • It claims items (puts a hold on them while a worker does the processing).
  • It deletes items (removes the items from the queue once they have finished processing). Alternatively, it can also release them if another worker needs to process them, or if something went wrong, and they should be retrieved later.

We will soon see a practical example of how this works. But first, let’s look at how a queue comes about.

The QueueInterface implementation is created with the help of the QueueFactory service, named queue. The factory delegates to another factory service specific to the type of queue being created. By default, this is the QueueDatabaseFactory service (named queue.database), which expectedly returns an instance of the DatabaseQueue class. The table used by the latter is simply called queue.

Finally, the crux of the Queue API for us module developers is the system of QueueWorker plugins that are responsible for processing a single item in the queue. These can be written in two ways. The simplest approach is to have them triggered by cron. In this case, the plugin ID needs to match the name of the queue it needs to process items for. This way, we don’t have to worry about claiming, releasing, or deleting items. The cron system does it for us. However, a more flexible approach is the one in which we actually do that. We don’t rely on cron but process the items ourselves whenever we want. Moreover, both types of queue workers can be enlisted via Drush using a command that triggers the processing of a queue with a given name.

Cron-based queues

In the previous section, we wrote the sports_cron() implementation, which, at each run, looks for teams that no longer have players and deletes them from the database. However, if we run the Drupal cron every hour, we keep running that query even if we are pretty certain that teams don’t lose all their players so often. Moreover, we also go by the simple assumption (a functionality we have not written so far) that there is some code responsible for removing a player from a team. This would actually be the ideal place to check whether that team has lost all its players. The idea, then, is to check whether the team has been left empty and add it to a queue to be deleted later (whenever the cron runs).

We won’t go into the code specific to player and team management, but instead, focus on the part that adds the team that needs to be deleted to the queue.

The first thing we need to do is get our hands on the QueueFactory service:

/** @var DrupalCoreQueueQueueFactory $queue_factory */
$queue_factory = Drupal::service('queue');

Then, we need to create an instance of the default QueueInterface (database) with the name of our future worker plugin ID:

/** @var DrupalCoreQueueQueueInterface $queue */
$queue = $queue_factory->get('team_cleaner');

This is obviously the static approach to loading services, and you should be injecting them instead whenever possible. But if you cannot, there is also the following shorthand, which can achieve the same thing in one line:

$queue = Drupal::queue('team_cleaner');

$queue is an instance of DatabaseQueue with the name team_cleaner.

The next thing we need to do is add items to it (assuming that we’ve identified a team without players):

$item = new stdClass();
$item->id = $team_id;
$queue->createItem($item);

It’s standard practice to create a PHP object to wrap the data for the queue item. Inside, we can put anything we want that can serialize properly, and that’s all. We can now turn to our TeamCleaner worker plugin, which naturally goes into the Plugin/QueueWorker namespace of our module:

namespace DrupalsportsPluginQueueWorker;
use DrupalCoreDatabaseConnection;
use DrupalCorePluginContainerFactoryPluginInterface;
use DrupalCoreQueueQueueWorkerBase;
use SymfonyComponentDependencyInjection
  ContainerInterface;
/**
 * A worker plugin that removes a team from the database.
 *
 * @QueueWorker(
 *   id = "team_cleaner",
 *   title = @Translation("Team Cleaner"),
 *   cron = {"time" = 10}
 * )
 */
class TeamCleaner extends QueueWorkerBase implements
  ContainerFactoryPluginInterface {
  /**
   * @var DrupalCoreDatabaseConnection
   */
  protected $database;
  /**
   * Constructs a TeamCleaner worker.
   *
   * @param array $configuration
   * @param string $plugin_id
   * @param mixed $plugin_definition
   * @param DrupalCoreDatabaseConnection $database
   */
  public function __construct(array $configuration,
    $plugin_id, $plugin_definition, Connection $database) {
    parent::__construct($configuration, $plugin_id,
      $plugin_definition);
    $this->database = $database;
  }
  /**
   * {@inheritdoc}
   */
  public static function create(ContainerInterface
    $container, array $configuration, $plugin_id,
      $plugin_definition) {
    return new static(
      $configuration,
      $plugin_id,
      $plugin_definition,
      $container->get('database')
    );
  }
  /**
   * {@inheritdoc}
   */
  public function processItem($data) {
    $id = isset($data->id) && $data->id ? $data->id : NULL;
    if (!$id) {
      throw new Exception('Missing team ID');
      return;
    }
    $this->database->delete('teams')
      ->condition('id', $id)
      ->execute();
  }
}

As we’re already used to it, our plugin extends the base plugin class of its type to inherit any potential base functionality. In our case, this is limited to the implementation of the QueueWorkerInterface, which has one method whose name easily describes its responsibility: processItem($data). Also, not new to us is the implementation of ContainerFactoryPluginInterface, which allows us to inject the database service into our plugin. We use that to delete the queued team.

All the action in fact happens in the processItem() method, where we simply look into the $data object and delete the team with the specified ID. We also throw a simple exception if something goes wrong. We’ll talk about exceptions in queue processing shortly.

Somewhat more interesting for the Queue API, however, is the plugin annotation. Apart from the standard expected plugin definition, we also encounter the following:

cron = {"time" = 10}

This simply indicates that this plugin should be used by the cron system. In other words, when the Drupal cron runs, it loads all the worker plugin definitions, and whichever plugin has this information gets processed. And the key here is the time information, which we have set to 10 seconds. This essentially means that when the cron runs, we are saying: go ahead and process as many queue items as you can within 10 seconds; once that time limit is up, stop and continue with the rest of the cron tasks. This is actually very powerful because we allocated an amount of time from the PHP request and dedicated it to our queue. This means that we don’t have to guess how many items to allocate for a request (as we did with the batching). However, it also means that the rest of the time left needs to be enough for everything else. So, we need to adjust this carefully. As for the queue items that don’t fit into those 10 seconds, they will simply be processed at the next cron run.

This approach is better than our previous one, in which we ourselves implemented hook_cron(), because we don’t want to always keep checking teams for players, but can instead create queue items and defer the deletion until a later time, as needed.

Very similarly, we could refactor our JSON product importer. When calling the import() method, the products would get queued, and then a separate worker plugin would handle the product data creation/update whenever cron runs. This of course depends on whether we are okay with splitting the import functionality into two classes, which is not a big deal. We are actually fine with the way things are at the moment, so to illustrate the programmatic processing of the queue, we will use another example.

Processing a queue programmatically

Now that we have our queue worker that deletes teams (for all it knows, the teams don’t even have to be without any players), we can explore how we can process this queue ourselves if we don’t want the cron option. If we wanted it to be processed using a Drush command, we would not have to write that ourselves. Drush comes with one, and it would work like this:

drush queue-run team_cleaner

However, we may want to create an admin interface, a form of some kind, which allows the user to trigger the queue processing. In that case, we could do something like this:

$queue = Drupal::queue('team_cleaner');
/** @var DrupalCoreQueueQueueWorkerInterface
  $queue_worker */
$queue_worker = Drupal::service('plugin.manager
  .queue_worker')->createInstance('team_cleaner');
while($item = $queue->claimItem()) {
  try {
    $queue_worker->processItem($item->data);
    $queue->deleteItem($item);
  }
  catch (SuspendQueueException $e) {
    $queue->releaseItem($item);
    break;
  }
  catch (Exception $e) {
    // Log the exception.
  }
}

In this example, we get our QueueInterface object just like we did before. But then, we also create an instance of our own QueueWorker plugin. Next, we use the claimItem() method inside a while loop, which returns an object that contains the data to be passed to the queue worker. Additionally, it blocks the item from being usable by another worker for a period of (lease) time (by default, an hour).

Then, we try to use the worker to process the item, and if no exception is thrown, we delete the item. It’s done! However, if we catch a SuspendQueueException, that means we expect the entire queue to be problematic. This exception type is thrown when there is the expectation that all other items are also likely to fail, in which case we release the item and break out of the loop. Releasing the item means that other workers are now free to process it using the claimItem() method. Or even better, our own worker can try it later. Finally, we also catch any other exceptions, in which case we simply log the error but do not release the item to prevent an infinite loop. For the moment, that particular item cannot be processed, so we need to skip to the next one; it needs to stay blocked until our loop finishes. The latter can only happen when $queue->claimItem() no longer returns anything.

And that is pretty much the logic behind processing a queue ourselves: we claim an item, throw it to a worker, and delete it. If something goes wrong, we work with exceptions to determine whether the queue can be continued or whether it should be skipped altogether.

The Lock API

Whenever we process data on a regular basis, especially if it takes a while to complete, we might run into a situation in which parallel requests want to trigger that process again while the first is still running. Most of the time, this is not a good thing, as it can lead to conflicts and/or data corruption. A good example from Drupal core in which this can happen is the cron. If we start it, the process can end up taking a few seconds. Remember, it needs to pull together the hook_cron() implementations and run them all. So, while that is happening, if we trigger another cron run, it will give us a nice message asking us to chill because the cron is already running. It does this with the help of the Lock API.

The Lock API is a low-level Drupal solution for ensuring that processes don’t trample each other. Since in this chapter we are talking about things such as batch operations, queues, and other kinds of potentially time-consuming processes, let’s look at the Lock API to see how we can leverage it for our custom code. But first, let’s get an understanding of how this locking works.

The concept is very simple. Before starting a process, we acquire a lock based on a given name. This means we check if, by any chance, this process has not already been started. If we get the green light (we acquired the lock), we go ahead and start the process. The API at this point locks down this named process so that other requests cannot acquire it again until the initial one has released it. This normally happens when the process is finished and other requests may then start it up again. Before that, though, we get a red light which tells us we cannot start it—to maintain the analogy of traffic lights. Speaking of which, the main Lock API implementation in Drupal, namely the one using the database, takes this analogy to heart, as it names the table where the locks are being stored semaphore.

The API is actually pretty simple. We have a Lock service, which is an implementation of LockBackendInterface. By default, Drupal comes with two: DatabaseLockBackend and PersistentDatabaseLockBackend. Usually, the former is used. The difference between the two is that the latter can be used to keep a lock across multiple requests. The former in fact releases all the locks at the end of the request. We’ll be using this one to demonstrate how the API works, as that is what Drupal core uses mostly as well.

If you remember from Chapter 7, Your Own Custom Entity and Plugin Types, we created a Drush command that would run all of our Product importers. Of course, we so far have only created one plugin. But what we want to do is ensure that if this Drush command is executed multiple times at more or less the same time (before the actual import finishes), we don’t run the imports simultaneously. It’s probably not the most realistic example, as Drush commands must be run by someone so there is good control over their timing. However, the same approach, as we will see, can be applied to processes triggered by unpredictable requests.

We defined the ProductCommands::runPluginImport() helper method, which runs the import for a specific plugin. We can wrap this trigger with a lock block. First, though, we need to inject the service, and we can get to it using the lock key (or the static shorthand if we cannot inject it: Drupal::lock()). By now, you should know how to inject a new service, so I will not repeat that step here.

So instead of just running the import() method on the plugin, we can first have this (make sure you use the StringTranslationTrait if you haven’t already):

if (!$this->lock->acquire($plugin->getPluginId())) {
  $this->logger()->log('notice', $this->t('The plugin
    @plugin is already running.', ['@plugin' => $plugin->
      getPluginDefinition()['label']]));
  return;
}

We try to acquire the lock by passing an arbitrary name (in this case, our plugin ID). We are sticking to one plugin at a time here, so multiple plugins should in fact be able to run at the same time. If the acquire() method returns FALSE, that means we have a red light; a lock has already been acquired. In this case, we print a message to that effect and get out of there. However, if not, it means we have a green light and we can proceed with the rest of our code as it was. The acquire() method has locked it down, and other requests can no longer acquire it until we release it. Speaking of which, there is one thing we need to add at the end (after the import):

$this->lock->release($plugin->getPluginId());

We need to release the lock so other requests can run it again if they like. That is pretty much it. If we run our Drush command twice, more or less simultaneously, we will have something like this in the terminal:

Figure 14.3: The Lock API preventing parallel processes

Figure 14.3: The Lock API preventing parallel processes

As you can see, only one call to the Drush command actually went through. As expected.

But we can also do it a bit differently. Let’s say that we want to wait with the second request until the first one is finished, and then still run it. After all, we don’t want to miss out on any updates. We can do this using the wait() method of LockBackendInterface. The rework is minor:

if (!$this->lock->acquire($plugin->getPluginId())) {
  $this->logger()->log('notice', t('The plugin @plugin is
    already running. Waiting for it to finish.', ['@plugin'
      => $plugin->getPluginDefinition()['label']]));
  if ($this->lock->wait($plugin->getPluginId())) {
    $this->logger()->log('notice', t('The wait is killing
      me. Giving up.'));
    return;
  }
}

So basically, if we don’t acquire a lock, we print a message that we are waiting for the go-ahead. Then, we use the wait() method, which puts the request to sleep for a maximum of 30 seconds. Within that time frame, it will continuously check every 25 milliseconds (until it reaches 500 milliseconds when it starts checking every 500 milliseconds) if the lock has become available. If it has, it breaks out of the loop and returns FALSE (meaning that we can go ahead, as the lock has become available). Otherwise, if the 30 seconds have passed, it returns TRUE, which means that we still need to wait. At this point, we give up. Guess what: the second parameter of the wait() method is the number of maximum seconds to wait, so we can control that as well. I recommend you check out the code to better understand what it does.

Like this, we can run our two Drush commands in parallel and ensure that the second one that was requested only runs after the first finishes. If it takes longer than 30 seconds, we give up, because something probably went wrong. And there we have the Lock API.

Summary

In this chapter, we looked at some of the ways we, as module developers, can set up simple and complex data-processing tasks that can run at any time we want.

We started by looking into using the multi-request capabilities of the post update hooks. This was a continuation from Chapter 8, The Database API, where we introduced them for the first time, and we have now seen how we can expand on their capabilities. Then, we turned to the more complex Batch API, which uses similar, albeit more complex, techniques. This system allowed us to construct a series of operations that leveraged Drupal’s multi-request capabilities. Our playground was the JSON products importer, which can now handle large amounts of data without the worry of PHP memory timeouts. Next, we looked at how Drupal’s cron system works and why it is there, and even saw an example of how, as module developers, we can hook into it and process our own tasks whenever it runs. But then, we took things to the next level with the introduction of the Queue API, which allowed us to add items to a queue so that they can get processed at a later stage. This processing, as we saw, can be triggered by cron or we can take matters into our own hands and handle them one by one. Not to mention the Drush option, which can also make things easy. Finally, we looked at the Lock API, which allows us to get control over the triggering of certain processes that take longer to complete. All this is done to prevent them from being run multiple times simultaneously, causing errors or data corruption.

In the next chapter, we are going to talk about Views and how we can programmatically interact with them as module developers.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset