Sending logging messages to a remote process

One high-performance design pattern is to have a cluster of processes that are being used to solve a single problem. We might have an application that is spread across multiple application servers or multiple database clients. For this kind of architecture, we often want a centralized log among all of the various processes.

One technique for creating a unified log is to include accurate timestamps and then sort records from separate log files into a single, unified log. This sorting and merging is extra processing that can be avoided. Another, more responsive technique is to send log messages from a number of concurrent producer processes to a single consumer process.

Our shared logging solution makes use of the shared queues from the multiprocessing module. For additional information on multiprocessing, see Chapter 13, Transmitting and Sharing Objects.

The following is the three-step process to build a multiprocessing application:

  • Firstly, we'll create a queue object shared by producers and consumers.
  • Secondly, we'll create the consumer process, which gets the logging records from the queue. The logging consumer can apply filters to the messages and write them to a unified file.
  • Thirdly, we'll create the pool of producer processes that do the real work of our application and produce logging records in the queue they share with the consumer.

As an additional feature, the ERROR and FATAL messages can provide immediate notification via an SMS or email to concerned users. The consumer can also handle the (relatively) slow processing associated with rotating log files.

Here's the definition of a consumer process:

import collections
import logging
import multiprocessing

class Log_Consumer_1(multiprocessing.Process):

def __init__(self, queue):
self.source = queue
super().__init__()
logging.config.dictConfig(yaml.load(consumer_config))
self.combined = logging.getLogger(f"combined.{self.__class__.__qualname__}")
self.log = logging.getLogger(self.__class__.__qualname__)
self.counts = collections.Counter()

def run(self):
self.log.info("Consumer Started")
while True:
log_record = self.source.get()
if log_record == None:
break
self.combined.handle(log_record)
self.counts[log_record.getMessage()] += 1
self.log.info("Consumer Finished")
self.log.info(self.counts)

This process is a subclass of multiprocessing.Process. The multiprocessing.Process class provides a start() method, which will fork a subprocess and executes the run() method provided here.

The self.counts object tracks individual messages from the producers. The idea here is to create a summary showing the types of messages received. This is not a common practice, but it helps here to reveal how the demonstration works.

While the process is running, this object will use the Queue.get() method to get the log records from the queue. The messages will be routed to a logger instance. In this case, we're going to create a special logger named with a parent name of combined.; this will be given each record from a source process.

A sentinel object, None, will be used to signal the end of processing. When this is received, the while statement will finish and the final log messages will be written. The self.counts object will demonstrate how many messages were seen. This lets us tune the queue size to be sure messages aren't lost due to queue overruns.

Here's a logging configuration file for this process:

version: 1
disable_existing_loggers: False
handlers:
console:
class: logging.StreamHandler
stream: ext://sys.stderr
formatter: basic
formatters:
basic:
style: "{"
format: "{levelname:s}:{name:s}:{message:s}"
loggers:
combined:
handlers: [console]
formatter: detail
level: INFO
propagate: False
root:
handlers: [console]
level: INFO

We defined a simple console Logger with a basic format. We also defined the top-level of a hierarchy of loggers with names that begin with combined.. These loggers will be used to display the combined output of the various producers.

Here's a logging producer:

import multiprocessing
import time
import logging
import logging.handlers

class
Log_Producer(multiprocessing.Process):
handler_class = logging.handlers.QueueHandler

def __init__(self, proc_id, queue):
self.proc_id = proc_id
self.destination = queue
super().__init__()
self.log = logging.getLogger(
f"{self.__class__.__qualname__}.{self.proc_id}")
self.log.handlers = [self.handler_class(self.destination)]
self.log.setLevel(logging.INFO)

def run(self):
self.log.info(f"Started")
for i in range(100):
self.log.info(f"Message {i:d}")
time.sleep(0.001)
self.log.info(f"Finished")

The producer doesn't do much in the way of configuration. It gets a logger to use the qualified class name and an instance identifier (self.proc_id). It sets the list of handlers to be just QueueHandler wrapped around the destination—a Queue instance. The level of this logger is set to INFO.

We made handler_class an attribute of the class definition because we plan to change it. For the first example, it will be logging.handlers.QueueHandler. It allows the example producer to be reused with other kinds of handlers.

The process to actually do this work uses the logger to create log messages. These messages will be enqueued for processing by the centralized consumer. In this case, the process simply floods the queue with 102 messages as quickly as possible.

Here's how we can start the consumer and producers. We'll show this in small groups of steps. First, we create the queue as follows:

import multiprocessing 
queue= multiprocessing.Queue(10) 

This queue is way too small to handle 10 producers blasting 102 messages in a fraction of a second. The idea of a small queue is to see what happens when messages are lost. Here's how we start the consumer process:

consumer = Log_Consumer_1(queue) 
consumer.start() 

Here's how we start an array of producer processes:

producers = [] 
for i in range(10): 
    proc= Log_Producer(i, queue) 
    proc.start() 
    producers.append(proc) 

As expected, 10 concurrent producers will overflow the queue. Each producer will receive a number of queues full of exceptions to show us that the messages were lost.

Here's how we cleanly finish the processing:

for p in producers: 
    p.join() 
queue.put(None) 
consumer.join() 

First, we wait for each producer process to finish and then rejoin the parent process. Then, we put a sentinel object into the queue so that the consumer will terminate cleanly. Finally, we wait for the consumer process to finish and join the parent process.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset