AMQP

AMQP stands for Advanced Message Queuing Protocol. It is a hardened and proven MOM protocol used by massive data sources such as JP Morgan Chase in processing over 1 billion messages per day, and the Ocean Observatory Initiative in collecting over 8 terabytes of oceanographic data each day. It was originally devised at JP Morgan Chase in 2003, who led the creation of a working group of 23 companies in 2006 chartered with the architecture and governance of the protocol. In 2011, the working group was merged into the OASIS group where it is currently hosted.

Today it is well established in banking and credit transaction industries but has a place in IoT as well. Furthermore, AMQP is standardized by ISO and IEM as ISO/IEC 19464:2014. A formal AMQP working group can be found at www.amqp.org.

The AMQP protocol resides on top of the TCP stack and uses port 5672 for communication. Data is serialized over AMQP, meaning that messages are broadcast in unit frames. Frames are transmitted in virtual channels identified with a unique channel_id.  Frames consist of headers, channel_ids, payload information, and footers. A channel, however, can only be associated with a single host. Messages are assigned a unique global identifier. 

AMQP is a flow-controlled, message-orientated communication system. It is a wire-level protocol and a low-level interface. A wire protocol refers to APIs immediately above the physical layer of a network. A wire-level API allows for different messaging services such as .NET (NMS) and Java (JMS) to communicate with each other. Similarly, AMQP attempts to decouple publishers from subscribers. Unlike MQTT it has mechanisms for load balancing and formal queuing. A well-used protocol that is based on AMQP is RabbitMQ. RabbitMQ is an AMQP message broker written in Erlang. Additionally,  several AMQP clients are available, such as the RabbitMQ client written in Java, C#, Javascript, and Erlang, as well as Apache Qpid written in Python, C++, C#, Java, and Ruby.

One or more virtual hosts with their own namespaces, exchanges, and message queues will reside on a central server(s). Producers and consumers subscribe to the exchange service. The exchange service receives messages from a publisher and routes the data to an associated queue. This relationship is called a binding and the binding can either be direct to one queue or fanned out to multiple queues (as in a broadcast). Alternatively, the binding can associate one exchange with one queue using a routing key; this is formally called a direct exchange. Another type of exchange is the topic exchange.

Here a pattern is used to wildcard a routing key (such as *.temp.# matches idaho.temp.celsius and wisconsin.temp.fahrenheit): 

AMQP architectural topology. There are producers and consumers in a typical AMQP implementation. Producers can use different languages and namespaces, as AMQP is language-agnostic given its APIs and a wire protocol. The broker resides in the cloud and providers exchanges for each producer. Exchanges route messages to proper queues based on binding rules. The queues are message buffers that produce messages to awaiting consumers.

The network topology of an AMQP deployment is hub-and-spoke with the ability for hubs to communicate with one another. AMQP consists of nodes and links. A node is a named source or a sink of messages. A message frame moves between nodes across unidirectional links. If a message is passed through a node, the global identifier is unchanged. If the Node performs any transformation, a new ID is assigned. A link has the ability to filter messages. There are three distinct messaging patterns that can be used in AMQP:

  • Asynchronous Directed Messages: Message is transmitted without requiring a receiver acknowledgment.
  • Request/Reply or Pub/Sub: Similar to MQTT with a central server acting as a pub/sub service.
  • Store and Forward: This is used for hub relaying, where a message is sent to an intermediate hub and then sent on towards its destination.

A basic directed exchange written in Python is shown here, using RabbitMQ and the pike Python libraries. Here we create a simple direct exchange called Idaho and bind it to a queue called weather:

#!/usr/bin/env python
#AMQP basic Python example the pika Python library

from pika import BlockingConnection, BasicProperties, ConnectionParameters

#initialize connections
connection = BlockingConnection(ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='Idaho', type='direct') #declare a direct exchange
channel.queue_declare(queue='weather') #declare the queue

channel.queue_bind(exchange='Idaho', queue='weather', routing_key='Idaho') #bindings

#produce the message
channel.basic_publish(exchange='Idaho', routing_key='Idaho', body='new important task')

#consume the message
method_frame, header_frame, body = ch.basic_get('weather')

#acknowledge
channel.basic_ack(method_frame.delivery_tag)
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset