So the event-driven architecture has a few great benefits, the catch is that for a low-level implementation, we need to write our code in a completely different style. Let's write an event-driven chat server to illustrate this.
Note that this example will not at all work on Windows as Windows lacks the poll
interface which we will be employing here. There is an older interface, called select
, which Windows does support, however it is slower and more complicated to work with. The event-driven frameworks that we look at later do automatically switch to select
for us though, if we're running on Windows.
There is a higher performance alternative to poll
called epoll
, available on Linux operating systems, however it also more complicated to use, so for simplicity we'll stick with poll
here. Again, the frameworks we discuss later automatically take advantage of epoll
if it is available.
Finally, counter-intuitively, Python's poll
interface lives in a module called select
, hence we will import select
in our program.
Create a file called 3.1-chat_server-poll.py
and save the following code in it:
import select import tincanchat from types import SimpleNamespace from collections import deque HOST = tincanchat.HOST PORT = tincanchat.PORT clients = {} def create_client(sock): """ Return an object representing a client """ return SimpleNamespace( sock=sock, rest=bytes(), send_queue=deque()) def broadcast_msg(msg): """ Add message to all connected clients' queues """ data = tincanchat.prep_msg(msg) for client in clients.values(): client.send_queue.append(data) poll.register(client.sock, select.POLLOUT) if __name__ == '__main__': listen_sock = tincanchat.create_listen_socket(HOST, PORT) poll = select.poll() poll.register(listen_sock, select.POLLIN) addr = listen_sock.getsockname() print('Listening on {}'.format(addr)) # This is the event loop. Loop indefinitely, processing events # on all sockets when they occur while True: # Iterate over all sockets with events for fd, event in poll.poll(): # clear-up a closed socket if event & (select.POLLHUP | select.POLLERR | select.POLLNVAL): poll.unregister(fd) del clients[fd] # Accept new connection, add client to clients dict elif fd == listen_sock.fileno(): client_sock,addr = listen_sock.accept() client_sock.setblocking(False) fd = client_sock.fileno() clients[fd] = create_client(client_sock) poll.register(fd, select.POLLIN) print('Connection from {}'.format(addr)) # Handle received data on socket elif event & select.POLLIN: client = clients[fd] addr = client.sock.getpeername() recvd = client.sock.recv(4096) if not recvd: # the client state will get cleaned up in the # next iteration of the event loop, as close() # sets the socket to POLLNVAL client.sock.close() print('Client {} disconnected'.format(addr)) continue data = client.rest + recvd (msgs, client.rest) = tincanchat.parse_recvd_data(data) # If we have any messages, broadcast them to all # clients for msg in msgs: msg = '{}: {}'.format(addr, msg) print(msg) broadcast_msg(msg) # Send message to ready client elif event & select.POLLOUT: client = clients[fd] data = client.send_queue.popleft() sent = client.sock.send(data) if sent < len(data): client.sends.appendleft(data[sent:]) if not client.send_queue: poll.modify(client.sock, select.POLLIN)
The crux of this program is the poll
object, which we create at the start of execution. This is an interface for the kernel's poll service, which lets us register sockets for the OS to watch and notify us when they are ready for us work with them.
We register a socket by calling the poll.register()
method, passing the socket as an argument along with the type of activity that we want the kernel to watch out for. There are several conditions which we can monitor by specifying various select.POLL*
constants. We're using POLLIN
and POLLOUT
in this program to watch out for when a socket is ready to receive and send data respectively. Accepting a new incoming connection on our listening socket will be counted as a read.
Once a socket is registered with poll
, the OS will watch it and record when the socket is ready to carry out the activity that we requested. When we call poll.poll()
, it returns a list of all the sockets that have become ready for us to work with. For each socket, it also returns an event
flag, which indicates the state of the socket. We can use this event flag to tell whether we can read from (POLLIN
event) or write to the socket (POLLOUT
event), or whether an error has occurred (POLLHUP
, POLLERR
, POLLNVAL
events).
To make use of this, we enter our event loop, repeatedly calling poll.poll()
, iterating through the ready objects it returns and operating on them as per their event
flags.
Because we're only running in a single thread, we don't need any of the synchronization mechanisms which we had to employ in the multithreaded server. We're just using a regular dict
to keep track of our clients. If you've not come across it before, the SimpleNamespace
object that we use in the create_client()
function is just a new idiom for creating an empty object with a __dict__
(this is needed because Object
instances don't have a __dict__
so they won't accept arbitrary attributes). Previously, we may have used the following to give us an object which we can assign arbitrary attributes to:
class Client: pass client = Client()
Python version 3.3 and later versions give us the new, more explicit SimpleNamespace
object.
We can run our multithreaded client against this server. The server is still using the same network protocol, and the architecture of the two programs won't affect the communication. Give it a try and verify if it works as expected.
This style of programming, employing poll
and non-blocking sockets, is often referred to as non-blocking and asynchronous, since we use sockets in non-blocking mode, and the thread of control handles I/O reactively, as it needs to happen, rather than locking to a single I/O channel until it's done. However, you should note that our program isn't completely non-blocking, since it still blocks on the poll.poll()
call. This is pretty much inevitable in an I/O bound system because when nothing's happening, you've got to wait for the I/O activity somewhere.