Processing a queue programmatically

Now that we have our queue worker that deletes teams (for all it knows, the teams don't even have to be without any players), we can explore how we can process this queue ourselves if we don't want the cron option. If we wanted it to be processed using a Drush command, we would not have to write that ourselves. Drush comes with one, and it would work like this:

drush queue-run team_cleaner

However, we may want to create an admin interface, a form of some kind, which allows the user to trigger the queue processing. In that case, we could do something like this:

$queue = Drupal::queue('team_cleaner'); 
/** @var DrupalCoreQueueQueueWorkerInterface $queue_worker */ 
$queue_worker = Drupal::service('plugin.manager.queue_worker')->createInstance('team_cleaner'); 
 
while($item = $queue->claimItem()) { 
  try { 
    $queue_worker->processItem($item->data); 
    $queue->deleteItem($item); 
  } 
  catch (SuspendQueueException $e) { 
    $queue->releaseItem($item); 
    break; 
  } 
  catch (Exception $e) { 
    // Log the exception. 
  } 
}  

In this example, we get our QueueInterface object just like we did before. But then, we also create an instance of our own QueueWorker plugin. Next, we use the claimItem() method inside a while loop, which returns an object that contains the data to be passed to the queue worker. Additionally, it blocks the item from being usable by another worker for a period of (lease) time (by default an hour).

Then, we try to use the worker to process the item, and if no exception is thrown, we delete the item. It's done! However, if we catch a SuspendQueueException, it means we expect the entire queue to be problematic. This exception type is thrown when there is the expectation that all other items are also likely to fail, in which case we release the item and break out of the loop. Releasing the item means that other workers are now free to process it using the claimItem() method. Or even better, our own worker can try it later on. Finally, we also catch any other exceptions, in which case we simply log the error but do not release the item to prevent an infinite loop. For the moment, that particular item cannot be processed, so we need to skip to the next one; it needs to stay blocked until our loop finishes. The latter can only happen when $queue->claimItem() no longer returns anything.

And that is pretty much the logic behind processing a queue ourselves: we claim an item, throw it to a worker and delete it. If something goes wrong, we work with exceptions to determine whether the queue can be continued or whether it should be skipped altogether.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset