More and more these days, it seems like every system we build has to talk to something, somewhere.[16] We’d hardly be doing anything if we didn’t actually talk with some other computers over some kind of network.
This chapter covers all of the normal remote communication modes you would expect—HTTP, TCP, UDP, and the like—as well as some relative newcomers[17] like message-oriented architectures.
Use slurp
to make simple HTTP GET requests:
(
slurp
"http://example.com"
)
;; -> "<!doctype html> <html> <head> <title>Example Domain</title> ...
Use the clj-http
library to make GET, POST, and other requests with
specific parameters or headers, to handle redirects and other special
circumstances, or to get specific details about the response.
To follow along, add [clj-http "0.7.7"]
to your project’s
dependencies, or use lein-try
to start a REPL:
$ lein try clj-http
Use clj-http.client/get
to make GET requests:
(
require
'
[
clj-http.client
:as
http
])
(
:status
(
http/get
"http://clojure.org"
))
;; -> 200
(
->
(
http/get
"http://clojure.org"
)
:headers
(
get
"server"
))
;; -> "nginx"
(
->
(
http/get
"http://www.amazon.com/"
)
:cookies
keys
)
;; -> ("session-id" "session-id-time" "x-wl-uid" "skin")
Parameters can be included in both GET and POST requests. Use
clj-http.client/post
to make POST requests:
(
http/get
"http://google.com/"
{
:query-params
{
:q
"clojure"
}})
;; -> {:status 200 ...}
(
http/post
"http://example.com"
{
:form-params
{
:username
"joecoder"
:password
"il0v3clojure"
}})
;; -> {:status 200 ...}
You can even use the :multipart
option to upload files, as from an
HTML form via a web browser.
slurp
works to make HTTP GET requests because its arguments are
passed to clojure.java.io/reader
, which in turn correctly handles
opening URL strings. This is totally sufficient for issuing a quick
HTTP GET to a well-behaved URL. Unfortunately, this is where slurp
’s
usefulness ends. Among other limitations, it will not behave correctly
for responses with HTTP redirects.
clj-http
is an extremely flexible Clojure wrapper around the very
robust Apache HttpComponents library. Its features include convenient functions for other HTTP verbs like
PUT and DELETE; for reading and sending cookies, headers, and other
request metadata; for reading and writing data using streams, files, or
byte arrays; and lots more. Refer to the
GitHub repository to learn about
the huge variety of options available and to see many more examples.
If you’re building production systems that rely on external services, you may want to consider wrapping HTTP calls in Netflix’s Hystrix library to make your application more fault-tolerant and resilient. Hystrix provides Clojure bindings that you can use to wrap network calls and more easily manage complex failure scenarios involving external services.
clj-http
’s GitHub
repository.
Use HTTP Kit, a highly performant, event-driven HTTP client/server library.
Before starting, add [http-kit "2.1.12"]
to your project’s
dependencies, or follow along in a REPL using lein-try
:
$ lein try http-kit
Use any of org.httpkit.client
’s HTTP verb functions to perform
asynchronous HTTP requests. In their base form, these functions return
a promise that you can await with deref
or the @
reader shorthand:
(
require
'
[
org.httpkit.client
:as
http
])
(
def
response
(
http/get
"http://example.com"
))
;; Some time later...
(
:status
@
response
)
;; -> 200
;; Or, using deref to specify a timeout length in milliseconds and
;; a value
(
deref
response
2000
nil
)
;; -> {:opts {:url "http://example.com", :method :get}
;; :body "..."
;; :headers {:content-type "text/html", :content-length "1270" ...}
;; :status 200}
The bulk of time spent performing HTTP requests is establishing the connection and awaiting the server’s response. Asynchronous requests enable your application to continue working while awaiting the delivery of data.
In this vein, HTTP Kit provides both a highly concurrent web server and a powerful HTTP client. It offers both callbacks and promises for asynchronous requests, as well as persistent connections and alternate SSL engines for dealing with unsigned SSL certificates.
The org.httpkit.client
namespace defines asynchronous versions of
numerous HTTP methods, including get
, delete
, head
, post
,
put
, options
, and patch
. Each of these verbs derives from
org.httpkit.client/request
, which defines a common interface. An
asynchronous request of a given method is made, and a promise is
returned. Upon completion of the request, the promise will be
fulfilled with the results/response.
All request
functions accept an optional map of options where you
can specify keys like :query-params
, :post-params
, or :headers
.
Functions also allow specifying a callback function to be called upon
request completion:
(
http/get
"http://example.com"
{
:timeout
1000
;; ms
:query-params
{
:search
"value"
}}
(
fn
[{
:keys
[
status
headers
body
error
]}]
(
if
error
(
binding
[
*out*
*err*
]
(
println
"Failed with, "
error
))
(
println
body
))))
;; -> #<core$promise$reify__6310@582e6c93: :pending>
;; *out*
;; <html>
;; <head>
;; <title>Example Domain</title>
;; ...
clj-http
; see
Recipe 5.1, “Making HTTP Requests”, for more information on the library.
Use the java.net.InetAddress
class to test if the address isReachable
:
(
.isReachable
(
java.net.InetAddress/getByName
"oreilly.com"
)
5000
)
;; -> true
Using isReachable
works great if the correct permissions can be
obtained. On a typical Unix-like implementation, you will need to start
your Clojure instance with sudo
to get an actual ICMP ping sent.
Otherwise, a standard connection will be attempted on port 7, which in
most cases will be blocked by a firewall. More information can be
found in the javadoc.
A common need when pinging another machine is to time the ping. You
can wrap an .isReachable
invocation in a function timed-ping
to
return timing values with every ping:
(
defn
timed-ping
"Time an .isReachable ping to a given domain"
[
domain
timeout
]
(
let
[
addr
(
java.net.InetAddress/getByName
domain
)
start
(
.
System
(
nanoTime
))
result
(
.isReachable
addr
timeout
)
total
(
/
(
double
(
-
(
.
System
(
nanoTime
))
start
))
1000000.0
)]
{
:time
total
:result
result
}))
(
timed-ping
"oreilly.com"
5000
)
;; -> {:time 88.07, :result true}
Use the feedparser-clj
library to parse RSS data.
Before starting, add [org.clojars.scsibug/feedparser-clj "0.4.0"]
to
your project’s dependencies, or follow along in a REPL using lein-try
:
$ lein try org.clojars.scsibug/feedparser-clj
Invoke feedparser-clj.core/parse-feed
with the URI of an RSS feed to
retrieve that feed and parse it into Clojure data:
(
require
'
[
feedparser-clj.core
:as
rss
])
(
rss/parse-feed
(
str
"https://github.com/clojure-cookbook/clojure-cookbook/"
"commits/master.atom"
))
;; -> {:authors [...]
;; :entries [{:link "LINK" :title "TITLE" :contents "CONTENT"} ...]
;; ...}
You can also invoke parse-feed
with a java.io.InputStream
to read
from a file or other location:
(
with-open
[
writer
(
clojure.java.io/writer
"master.atom"
)]
(
spit
writer
(
slurp
(
str
"https://github.com/clojure-cookbook/clojure-cookbook/"
"commits/master.atom"
))))
(
with-open
[
stream
(
clojure.java.io/input-stream
"master.atom"
)]
(
rss/parse-feed
stream
))
;; -> {:authors [...]
;; :entries [{:link "LINK" :title "TITLE" :contents "CONTENT"} ...]
;; ...}
feedparser-clj
is a wrapper around the Java ROME library that is
capable of processing a variety formats of RSS and Atom feeds.
feedparser-clj.core/parse-feed
returns a Clojure map that closely
mimics the underlying XML feed.
Most of the time, what you care about will be under the :entries
key, which contains an array of maps corresponding to each RSS entry.
Some RSS feeds will have <link rel="next">
elements that indicate
that the returned list is incomplete and more entries can be retrieved
by following the link. A lazy list of these RSS entries can be
generated:
(
defn
next-uri
"Return the rel=next href in a feed."
[
feed
]
(
->
feed
:entry-links
(
->>
(
filter
#
(
=
(
:rel
%
)
"next"
)))
first
:href
))
(
defn
lazy-stream
"Return a lazy stream of RSS entries."
[
uri
]
(
let
[
raw-response
(
rss/parse-feed
uri
)]
(
lazy-cat
(
:entries
raw-response
)
(
if-let
[
nxt
(
next-uri
raw-response
)]
(
lazy-stream
nxt
)))))
To verify that lazy loading is happening, logging or tracing can be
added to lazy-stream
, but it is also easy to confirm that you can
retrieve more entries than are present in a single fetch:
(
def
youtube-feed
"http://gdata.youtube.com/feeds/api/videos"
)
(
count
(
rss/parse-feed
youtube-feed
))
;; -> 15
(
count
(
take
50
(
lazy-stream
youtube-feed
)))
;; -> 50
Be careful when evaluating a lazy sequence in a REPL, since it will attempt
to print the entire sequence. Use take
to only realize part of
the sequence.
Use postal
, a thin wrapper over the JavaMail package, to send email
messages.
To follow along with this recipe, start a REPL using lein-try
:
$ lein try com.draines/postal
Send a message by invoking the postal.core/send-message
function
with two maps, the first containing connection details and the second
containing message details. For example, to send an email message to
yourself via a Gmail account:
(
require
'
[
postal.core
:refer
[
send-message
]])
;; Replace the following with your own credentials
(
def
"<<your gmail address>"
)
(
def
pass
"<your gmail password>"
)
(
def
conn
{
:host
"smtp.gmail.com"
:ssl
true
:user
:pass
pass
})
(
send-message
conn
{
:from
:to
:subject
"A message, from the past"
:body
"Hi there, me!"
})
;; -> {:error :SUCCESS, :code 0, :message "messages sent"}
If all is well, you should receive an email from yourself shortly thereafter.
With the venerable JavaMail at its core, there isn’t much postal
leaves for you to worry about. Even Gmail’s oft-maligned
authentication setup can be tackled with a single :ssl
key. While we
might normally suggest giving the native Java API a try for simple
email delivery, we prefer postal
because it presents an API oriented
around data rather than objects.
One of the places data orientation really shines is in specifying
connection details. The first argument to the send-message
function
is a (versatile) map of connection details. Valid connection details
are:
:host
:port
465
when :ssl
is set or 25
when :tls
is set.
:user
:pass
:ssl
:tls
When provided no connection details—either by omitting the first
argument or passing nil
—postal
will attempt to route email
through a local sendmail
instance.
Since Amazon’s Simple Email Service (SES) can operate over SMTP,
it is possible to use postal
to send email via Amazon’s
infrastructure.
Similar to connection details, messages themselves are represented as simple maps of data. The full complement of standard headers are supported as message keys:
Sender options
:from
:reply-to
Recipient options
:to
:cc
:bcc
Content options
:subject
:body
Metadata options
:date
:message-id
:user-agent
Options specified beyond these will be attached to the message as ancillary headers.
When specifying recipients on the :to
, :cc
, or :bcc
keys, values
may be either a single address or a sequence of addresses:
{
:to
"[email protected]"
:cc
[
"[email protected]"
,"[email protected]"
,"[email protected]"
]
:bcc
"[email protected]"
}
A message’s body can be specified as either a string or a sequence of part maps. While the former delivers a simple plain-text email, the latter will deliver a multipart MIME message. MIME (Multipurpose Internet Mail Extensions) is the standard that allows email messages to contain attachments or other rich content, such as HTML.
A part map is made up of two values: :type
and :content
. For
message body parts, :type
is the MIME type of the content, and
:content
is the textual representation of said content. For example,
to create a message with both plain text and HTML representations of
the content:
:body
[
:alternative
{
:type
"text/plain"
:content
"You just won the lottery!"
}
{
:type
"text/html"
:content
"<html>
<body>
<p>You just <b>won</b> the lottery!</p>
</body>
</html>"
}]
You’ll notice the first “part” in the preceding body was not, in fact, a
part map, but the keyword :alternative
. Messages are normally sent
in “mixed” mode, indicating to an email client that each part constitutes a
piece of the whole message. Messages of the :alternative
type,
however, inform a client that each part represents the entire message,
albeit in differing formats.
If you need to send complicated multipart messages and require a high level of control over message creation, you should use the raw JavaMail API to construct messages.
For attachments, the :type
parameter behaves a little differently,
controlling whether the attachment resides inline (:inline
) or as an
attachment (:attachment
). The contents of an attachment are
specified by providing a File
object for the :content
key. An
attachment’s content type and name are generally inferred from the
File
object, but they may be overridden via the :content-type
and
:file-name
keys, respectively.
For example, forwarding all of your closest friends a picture of your cat might look something like this:
:body
[{
:type
"text/plain"
:content
"Hey folks, Check out these pictures of my cat!"
}
{
:type
:inline
:content
(
File.
"/tmp/lester-flying-photoshop"
)
:content-type
"image/jpeg"
:file-name
"lester-flying.jpeg"
}
{
:type
:attachment
:content
(
File.
"/tmp/lester-upside-down.jpeg"
)}]
postal
’s GitHub repository
You want to communicate between a number of applications using a queueing broker such as RabbitMQ.
Use Langohr, a small RabbitMQ client, to communicate with RabbitMQ.
Before starting, add [com.novemberain/langohr "1.6.0"]
to your
project’s dependencies, or follow along in a REPL using lein-try
:
$ lein try com.novemberain/langohr
In order to follow along with this recipe, you need to have RabbitMQ installed and running.
Once installed, start a standalone RabbitMQ server with the command
rabbitmq-server
:
$ rabbitmq-server
Prior to performing any operations against RabbitMQ, you must connect to a server and open a communication channel. A channel is the medium over which you can produce and consume messages:
(
require
'langohr.core
'langohr.channel
)
;; Connect to local RabbitMQ cluster node on localhost:5672
(
def
conn
(
langohr.core/connect
{
:hostname
"localhost"
}))
;; Open a channel against the connection
(
def
ch
(
langohr.channel/open
conn
))
In RabbitMQ, messages are published to exchanges, routed to queues via a binding, then finally consumed by consumers. There are a number of different exchange types that vary the semantics of delivery; the most basic exchange type is direct, which routes messages based on their routing key.
To construct a pipeline between producer and consumer, start by
invoking langohr.queue/declare
to create a queue with the desired
name:
(
require
'
[
langohr.queue
:as
lq
])
(
def
resize-queue
"imaging.resize"
)
(
lq/declare
ch
resize-queue
)
;; -> {:queue "imaging.resize",
;; :consumer-count 0,
;; :message_count 0,
;; :consumer_count 0,
;; :message-count 0}
By default, RabbitMQ creates a binding between the empty exchange (an
empty string) and each queue. You can now publish a message to the
"imaging.resize"
queue by invoking langohr.basic/publish
with the
channel, direct exchange, routing key (your queue name), and a message:
(
lb/publish
ch
""
resize-queue
"hello.jpg"
)
To consume messages from a queue synchronously, invoke
langohr.basic/get
with the channel and queue name:
(
def
hello-msg
(
lb/get
ch
resize-queue
))
hello-msg
;; -> [{:routing-key "imaging.resize", :headers nil ...} #<byte[] [B@2b195c88>]
(
String.
(
last
hello-msg
)
"UTF-8"
)
;; -> "hello.jpg"
To consume messages asynchronously as they appear, use
langohr.consumers/subscribe
to subscribe to a queue. The handler
function you provide to subscribe
will be called for each message
published to the queue:
(
require
'
[
langohr.consumers
:as
lc
])
(
defn
resize-image-handler
"Spawn a resize process for each resize message received"
[
ch
metadata
^
bytes
payload
]
(
let
[
filename
(
String.
payload
"UTF-8"
)]
(
println
(
format
"Resizing file %s"
filename
))))
;; Subscribe to the queue with the handler function
(
def
tag
(
lc/subscribe
ch
resize-queue
resize-image-handler
))
;; The return value of subscribe is a subscription tag
tag
;; -> "amq.ctag-7hsNsSqLDEEoES5AkIC6XQ"
(
lb/publish
ch
""
resize-queue
"hello-again.jpg"
)
;; *out*
;; Resizing file hello-again.jpg
;; Unsubscribe resize-image-handler via the tag value
(
lb/cancel
ch
tag
)
At this point, you’ve round-tripped a few messages to RabbitMQ, but you’ve barely scratched the surface of what Langohr and RabbitMQ are capable of. Langohr is a small RabbitMQ client wrapping the Java RabbitMQ library that supports AMQP 0-9-1 and RabbitMQ extensions of AMQP, and provides an HTTP API client.
AMQP 0-9-1, and by extension, Langohr, centers around a few main concepts: exchanges, queues, and bindings.
An exchange is very much like a post office: when a message is published to an exchange, the exchange will route the message to one or more queues. How those messages are routed to queues is dependent on both the exchange type and the bindings between the exchange/queues.
There are multiple exchange types, each with its own routing semantics—see Table 5-1. Custom exchange types can be created to deal with sophisticated routing scenarios (e.g., routing based on content or geolocation data) or just for convenience.
Name | Behavior | Predeclared exchange |
Direct | 1:1, routed based on routing key |
|
Fanout | 1:N, ignoring routing key |
|
Topic | 1:N, taking routing key into consideration |
|
Headers | 1:1, taking into consideration any number of headers |
|
To declare one of the built-in exchanges, use one of
langohr.exchange/fanout
, langohr.exchange/topic
,
langohr.exchange/direct
, or langohr.exchange/headers
. Each of these
functions exposes the relevant options for that exchange type,
ultimately invoking langohr.exchange/declare
:
(
require
'
[
langohr.exchange
:as
le
])
;; Create a fanout exchange for image processing completion
(
le/fanout
ch
"imaging.complete"
)
Exchanges have several attributes associated with them:
x-arguments
)
Using langohr.exchange/declare
directly, you can customize these
attributes to create your own types of exchanges.
A queue is like a mailbox in a post office. The
langohr.queue/declare
function creates named queues. Apart from the
name, this function accepts a number of keyword arguments that vary
the characteristics of the queue, including whether it is :durable
,
:exclusive
, or :auto-delete
. Other arguments can be specified in an
:arguments
value:
(
lq/declare
ch
"imaging.transcode"
:durable
true
)
;; -> {:queue "imaging.transcode", ...}
Queues with unique names can be generated using the
langohr.queue/declare-server-named
function. This functions
similarly to langohr.queue/declare
, but without a name argument:
(
lq/declare-server-named
ch
)
;; -> "amq.gen-FcFv8JD9K8-4NuT8kC3jKA"
Unlike exchanges, queues in RabbitMQ are all of the same type.
As you saw in the solution, a direct exchange has an implicit
binding between the default exchange and every queue, by name. In the
wild, however, queues are usually bound to exchanges explicitly. You
can create your own bindings by invoking langohr.queue/bind
with a
channel, queue name, and exchange name:
;; Create a unique completion queue...
(
def
completion-queue
(
lq/declare-server-named
ch
))
;; and bind it to the imaging.complete fanout
(
lq/bind
ch
completion-queue
"imaging.complete"
)
Messages are published to an exchange using the
langohr.basic/publish
function. This function takes three primary
arguments (beyond channel):
"imaging.complete"
, or a built-in like "amq.fanout"
or ""
As optional arguments, publish
allows users to specify a plethora of
message headers as keyword arguments. For the full list, see the
docstring for the publish
function.
Having declared a number of queues, there are two ways to consume messages from them:
langohr.basic/get
langohr.consumers/subscribe
In the Push API, you make a synchronous invocation of the get
function
to retrieve a single message from a queue. The return value of get
is a tuple of metadata map and a body. The body payload, as returned,
is an array of bytes—for plain-text messages you can use the string
constructor (String.
) to intern those bytes to a string. Since
String
byte arrays are encoded using UTF-8, it is important to
invoke the String
constructor with an encoding option of "UTF-8"
:
(
lb/publish
ch
""
resize-queue
"hello.jpg"
)
(
let
[[
_
body
]
(
lb/get
ch
resize-queue
)]
(
String.
body
"UTF-8"
))
;; -> "hello.jpg"
When no messages are present on a queue, get
will return nil
.
In the Pull API, you subscribe to a queue using
langohr.consumers/subscribe
, providing a message handler function that
will be invoked for each message the queue receives. This function
will be invoked with three arguments: a channel, metadata, and the body
bytes:
;; A run-of-the-mill handler function
(
defn
resize-image-handler
"Spawn a resize process for each resize message received"
[
ch
metadata
^
bytes
payload
]
(
let
[
filename
(
String.
payload
"UTF-8"
)]
(
println
(
format
"Resizing file %s"
filename
))))
subscribe
is a nonblocking call, and upon completion will return a
tag string that can be used to later cancel the subscription using
langohr.consumers/cancel
.
The subscribe
function also allows you to specify a large number of
queue life cycle functions, documented at length in the
langohr.consumers/create-default
docstring.
Consumed messages need to be acknowledged. That can happen automatically (RabbitMQ will consider a message acknowledged as soon as it sends it to a consumer) or manually.
When a message is acknowledged, it is removed from the queue. If a channel closes unexpectedly before a delivery is acknowledged, it will be automatically requeued by RabbitMQ. Note that these acknowledgments have application-specific semantics and help ensure that messages are processed properly.
With manual acknowledgment, it is application’s responsibility to
either acknowledge or reject a delivery. This is done with
langohr.basic/ack
and langohr.basic/nack
, respectively, each of which
takes a metadata attribute called delivery-tag
(the delivery ID). To
enable manual acknowledgments, pass :auto-ack false
to
langohr.consumers/subscribe
:
(
defn
manual-ack-handler
"Spawn a resize process for each resize message received"
[
ch
{
:keys
[
delivery-tag
]}
^
bytes
payload
]
(
try
(
String.
payload
"UTF-8"
)
;; Do some work, then acknowledge the message
(
lb/ack
ch
delivery-tag
)
(
catch
Throwable
t
;; Reject message
(
lb/nack
ch
delivery-tag
))))
(
lc/subscribe
ch
resize-queue
manual-ack-handler
:auto-ack
false
)
Note that if you requeue a message with just one consumer on it, it will be redelivered immediately.
It is also possible to control how many messages will be pushed to the
client before it must receive an ack for at least one of them. This is
known as the prefetch setting and is set using langohr.basic/qos
.
This setting applies across an entire channel:
;; Prefetch a dozen messages
(
lb/qos
ch
12
)
RabbitMQ queues can also be mirrored between cluster nodes for high availability, have a bounded length or expiration period for messages, and more. To learn more, see RabbitMQ and Langohr documentation sites.
You want to communicate with embedded devices (think “Internet of things”) using a publish/subscribe model.
Use Machine Head, a Clojure library that enables machine-to-machine (M2M) communication via the MQTT protocol. The protocol requires an existing MQTT broker with which all devices (or machines) will communicate by publishing messages or subscribing to messages on specific topics. You can use the Mosquitto broker with its test installation at tcp://test.mosquitto.org:1883 (of course, you need a functional Internet connection on your machine).
To follow along with this recipe, launch a REPL using lein-try
:
$ lein try clojurewerkz/machine_head
To start, create a simple connect-and-subscribe
function that
listens to a topic and prints messages it receives:
(
require
'
[
clojurewerkz.machine-head.client
:as
mh
])
(
defn
message-handler
[
topic
meta
payload
]
(
let
[
p
(
apply str
(
map char
payload
))]
(
println
"received "
p
"on topic "
topic
)))
(
defn
connect-and-subscribe
[
broker-addr
topics
subscriberid
]
(
let
[
qos-levels
(
vec
(
repeat
(
count
topics
)
2
))
;; All at qos 2
conn-sub
(
mh/connect
broker-addr
subscriberid
)]
(
if
(
mh/connected?
conn-sub
)
(
do
(
mh/subscribe
conn-sub
topics
message-handler
{
:qos
qos-levels
})
conn-sub
))))
;; Return conn-sub for later mh/disconnect...
(
def
subscriberid
(
mh/generate-id
))
;; or use a unique id
;; (def subscriberid "SNSubscriber01")
(
connect-and-subscribe
"tcp://test.mosquitto.org:1883"
[
"SNControlNetwork/Florida/device1"
]
subscriberid
)
Open another terminal window and start a second lein-try
REPL session.
Use the following code to publish messages to the broker. Note that
subscriber must be connected already so as not to lose incoming
messages:
(
require
'
[
clojurewerkz.machine-head.client
:as
mh
])
(
defn
connect-and-publish
[
broker-addr
client-id
topic
]
(
let
[
qos
2
retained
false
conn
(
mh/connect
broker-addr
client-id
)]
(
if
(
mh/connected?
conn
)
(
do
(
dotimes
[
n
5
]
(
let
[
payload
(
str
"msg"
n
)]
(
mh/publish
conn
topic
payload
qos
retained
)
(
println
"published "
payload
)))
(
mh/disconnect
conn
)))))
(
def
pubclientid
(
mh/generate-id
))
pubclientid
;; -> "ryan.1384135173618"
(
connect-and-publish
"tcp://test.mosquitto.org:1883"
pubclientid
"SNControlNetwork/Florida/device1"
)
;; *out* of publish REPL
;; published msg0
;; published msg1
;; published msg2
;; published msg3
;; published msg4
;; *out* of client REPL
;; received msg0 on topic SNControlNetwork/Florida/device1
;; received msg1 on topic SNControlNetwork/Florida/device1
;; received msg2 on topic SNControlNetwork/Florida/device1
;; received msg3 on topic SNControlNetwork/Florida/device1
;; received msg4 on topic SNControlNetwork/Florida/device1
MQTT is an open, lightweight publish/subscribe messaging protocol. It is useful for connections where bandwidth is at a premium and/or connections are unreliable. While the AMQP protocol excels at various scenarios for business messaging, MQTT is usually the choice for smaller payloads and last-mile connectivity because it is simple to implement in hardware. The MQTT protocol has the following properties that make it good for constrained networks:
The protocol defines three possible Quality of Service (QoS) values: 0
, 1
, and 2
,
corresponding to fire-and-forget, at-least-once, and exactly-once
qualities of service. QoS parameters 1
and 2
require persistent
storage on the client so as to save the message until an acknowledgment
arrives. In the preceding recipe, the default persistence implementation
provided by the library is used.
MQTT also has a concept of retention of messages. If you were to set
retained
to true
in the connect-and-publish
function, the broker
would remember the last known retained message on the topic. When the
subscriber connects, it is given the last message (for which
retained
was true
) by the broker and does not have to wait to
receive the first message.
WebSphere and RabbitMQ also implement MQTT and can be used instead of Mosquitto. While the preceding code used the Mosquitto test broker (tcp://test.mosquitto.org:1883), you can install your own Mosquitto broker using the MQTT installation instructions.
The topics are usually defined with the separator /
defining
hierarchies. As an example, the sensor devices of a particular domain,
SNControl
, might be publishing their values to
SNControl/Florida/device1
, SNControl/Florida/device2
, and so on.
Meanwhile, the devices in domain RKNControl
might publish their values to
RKNControl/Washington/device1
, for example. Naming the topics in this way
helps in subscribing to multiple topics based on wildcards.
This is how wildcards are used:
/
+
#
For example, these subscriptions are possible:
SNControl/#
SNControl/Florida
(e.g., SNControl/Florida/device1/sensor1
and SNControl/Florida/device1/sensor2
) and SNControl/California/device1
will match.
SNControl/+/device1
device1
in states under domain SNControl
will match(e.g., SNControl/Florida/device1
and SNControl/California/device1
).
SNControl/+/+/sensor1
sensor1
in states under domain SNControl
will match (e.g., SNControl/Florida/device1/sensor1
and SNControl/Florida/device2/sensor1
).
In the preceding code, the connect-and-subscribe
method uses the
callback handler message-handler
to process incoming messages
arriving from the broker. In the connect-and-subscribe
method, the
connect
method from the Machine Head library is invoked by providing it
the broker address and client ID (generated using generate-id
, or
some other unique ID). Then it checks that the connection has been
established using the connected?
method. The subscribe
method is
invoked with the connection, a vector of topics to subscribe to, a message
handler, and a :qos
option. The subscriber then waits for some time
and disconnects using the disconnect
method.
The connect-and-publish
method calls the method connect
, which
accepts the broker address and client ID and returns the connection
conn
. Then it checks if the connection is successful with the
connected?
method and invokes the publish
method to publish
messages (a few times) to the broker. The publish
method accepts as parameters the
connection, topic string, payload, QoS value, and retained
. The QoS value of 2
corresponds to exactly-once delivery. The retained
value of false
instructs the broker not to retain messages.
Finally, the disconnect
method disconnects from the broker.
While the preceding code fragment just prints the incoming messages, you could potentially use the messages in some other way (e.g., triggering some actions based on an alarm that the code has received).
You want to use ZeroMQ concurrently, but ZeroMQ sockets are not thread-safe. You could manually set up mutual exclusion via locks or other Java concurrency primitives, but you’d rather use a simpler method.
Use the zmq-async
library to
simplify concurrent usage of ZeroMQ via core.async
.
In order to follow along with this recipe, your system should have ZeroMQ 3.2 installed.
If you’re on a Mac and have the Homebrew package manager installed, use this command to install it:
$ brew install zeromq
Or, if you are on Ubuntu:
$ apt-get install libzmq3
Otherwise, visit ØMQ’s downloads page.
Before starting, add [com.keminglabs/zmq-async "0.1.0"]
to your
project’s dependencies, or follow along in a REPL using lein-try
:
$ lein try com.keminglabs/zmq-async
Here’s a simple ping-pong between two asynchronous go
blocks in
core.async
, communicating via a ZeroMQ in-process socket:
(
require
'
[
com.keminglabs.zmq-async.core
:refer
[
register-socket!
]]
'
[
clojure.core.async
:refer
[
>!
<!
go
chan
sliding-buffer
close!
]])
(
def
addr
"inproc://ping-pong"
)
(
def
server-in
(
chan
(
sliding-buffer
64
)))
(
def
server-out
(
chan
(
sliding-buffer
64
)))
(
def
client-in
(
chan
(
sliding-buffer
64
)))
(
def
client-out
(
chan
(
sliding-buffer
64
)))
(
register-socket!
{
:in
server-in
:out
server-out
:socket-type
:rep
:configurator
(
fn
[
socket
]
(
.bind
socket
addr
))})
(
register-socket!
{
:in
client-in
:out
client-out
:socket-type
:req
:configurator
(
fn
[
socket
]
(
.connect
socket
addr
))})
(
do
;; A simple server worker that waits for incoming requests and
;; responds with "pong"
(
go
(
dotimes
[
_
3
]
(
println
(
String.
(
<!
server-out
)))
(
>!
server-in
"pong"
))
(
close!
server-in
))
;; A simple client worker that sends a "ping" request and awaits
;; a response
(
go
(
dotimes
[
_
3
]
(
>!
client-in
"ping"
)
(
println
(
String.
(
<!
client-out
))))
(
close!
client-in
)))
;; *out*
;; ping
;; pong
;; ping
;; pong
;; ping
;; pong
ZeroMQ is a message-oriented socket system that supports many communication styles (request/reply, pub/sub, etc.) on top of many transport layers (intra-process, inter-process, inter-machine via TCP, etc.) with bindings to many languages. ZeroMQ sockets are a great substrate upon which to build service-oriented architectures. ZeroMQ sockets have less overhead than HTTP and are architecturally more flexible, supporting publish/subscribe, fanout, and other topologies in addition to request/reply.
However, ZeroMQ sockets are not thread-safe—concurrent usage
typically requires explicit locking or dedicated threads and queues.
The zmq-async
library handles all of that for you, creating ZeroMQ
sockets on your behalf and giving you access to them via thread-safe
core.async
channels.
The zmq-async
library provides one function,
com.keminglabs.zmq-async.core/register-socket!
, which associates a
ZeroMQ socket with either one or two core.async
channels: :in
(to which you can
write strings or byte arrays) and :out
(from which you can read
byte arrays). Writing a Clojure collection of strings and/or byte
arrays to a channel using >!
sends a multipart message. Received
multipart messages are placed on core.async
channels. Reading these
messages with <!
will yield a vector of byte arrays.
To simulate two asynchronous processes interacting over ZeroMQ, the
preceding sample uses two go
blocks that read from and write to the registered
channels. Each go
block will begin executing immediately in
background threads. The “server” block will wait for and reply to
three requests (<!
blocks until it receives a value), replying with
“pong” each time. Concurrently, the “client” block will make three
“ping” requests, awaiting a reply before moving on to the next
request. Finally, after both blocks are done working, they each
close their channels using close!
.
The register-socket!
function can be given an already-created ZeroMQ
socket, but typically you would have the library create a socket for
you by passing the :socket-type
and a :configurator
. The
configurator is a function that is passed the raw ZeroMQ socket
object. This function is run on the socket after it is created in
order to connect/bind addresses, set pub/sub subscriptions, and
otherwise configure the socket.
The implicit context supporting register-socket!
can only
handle one incoming/outgoing message at a time. If you need sockets to
work in parallel (i.e., you don’t want to miss a small control message
just because you’re slurping in a 10 GB message on another socket),
then you’ll need multiple zmq-async
contexts.
core.async
, which provides a good overview
Use Java interop to create an instance of java.net.Socket
and
connect to a remote host.
For example, the following code uses a Socket
to create a TCP
connection and send an HTTP GET request, returning the result as a
string:
(
require
'
[
clojure.java.io
:as
io
])
(
import
'
[
java.io
StringWriter
]
'
[
java.net
Socket
])
(
defn
send-request
"Sends an HTTP GET request to the specified host, port, and path"
[
host
port
path
]
(
with-open
[
sock
(
Socket.
host
port
)
writer
(
io/writer
sock
)
reader
(
io/reader
sock
)
response
(
StringWriter.
)]
(
.append
writer
(
str
"GET "
path
" "
))
(
.flush
writer
)
(
io/copy
reader
response
)
(
str
response
)))
This function obtains instances of java.io.Writer
and
java.io.Reader
to send and receive data to and from the remote server. By
appending strings that conform to the HTTP specification to the
writer, it forms a rudimentary HTTP client and executes a GET
request to the specified endpoint. The results are then copied into an
instance of java.io.StringWriter
using the clojure.java.io/copy
utility
function, and returned as a string.
Invoking (send-request "google.com" 80 "/")
at the REPL should
return a very long string, consisting of the entire HTTP response that
is the Google home page.
This example uses the clojure.java.io
namespace to obtain
instances of java.io.Writer
and java.io.Reader
to read and write
textual data to/from the network socket. In point of fact, Socket
instances are
not actually limited to textual data, and it would be possible to
obtain raw binary input and output streams just as easily using
clojure.java.io/input-stream
and clojure.java.io/output-stream
,
respectively. Since HTTP is a textual protocol, however, it makes more
sense to use the higher-level features of Reader
and Writer
.
This example uses HTTP because it’s a protocol that many readers are familiar with. In the real world, using a raw TCP socket for HTTP requests is almost certainly a terrible idea. There are a plethora of libraries that provide a much higher-level interface to HTTP requests and responses, and encapsulate a lot of pesky details such as escaping, encoding, and formatting.
Also note that the reader, the writer, and the socket itself are bound
within the context of a with-open
macro. This guarantees that the
close
method is called when they are finished, which releases the
TCP connection. If the connection is not released, it will continue to
consume resources on both the client and the server and may be subject to
termination on the remote side.
When returning lazy sequences from a with-open
context, it is
important to fully realize those sequences using doall
. This is
because resources opened by with-open
are only available inside
the with-open
block. The doall
function fully realizes a
collection, retaining its entire contents in memory:
(
realized?
(
range
100
))
;; -> false
(
realized?
(
doall
(
range
100
)))
;; -> true
Depending on your application, you may prefer to use the doseq
macro. Instead of retaining the entire sequence, doseq
executes its
body for each element of the sequence. This is useful if you need to
cause side effects for each element of a sequence, but need to
hang on to the entire thing:
(
doseq
[
n
(
range
3
)]
(
println
n
))
;; *out*
;; 0
;; 1
;; 2
Use Java interop on the java.net.ServerSocket
class to create a TCP
listener. Use the functions in clojure.java.io
to obtain input and
output streams (or readers and writers) to read and write data to the
socket:
(
require
'
[
clojure.java.io
:as
io
])
(
import
'
[
java.net
ServerSocket
])
(
defn
receive
"Read a line of textual data from the given socket"
[
socket
]
(
.readLine
(
io/reader
socket
)))
(
defn
send
"Send the given string message out over the given socket"
[
socket
msg
]
(
let
[
writer
(
io/writer
socket
)]
(
.write
writer
msg
)
(
.flush
writer
)))
(
defn
serve
[
port
handler
]
(
with-open
[
server-sock
(
ServerSocket.
port
)
sock
(
.accept
server-sock
)]
(
let
[
msg-in
(
receive
sock
)
msg-out
(
handler
msg-in
)]
(
send
sock
msg-out
))))
This code defines three functions. receive
and send
deal with
reading and writing string data from and to a socket, using the
clojure.java.io/reader
and clojure.java.io/writer
functions. Both
of these accept a java.net.Socket
as an argument and will return a
java.io.Reader
or java.io.Writer
built from the socket’s input and
output streams.
server
handles actually creating an instance of ServerSocket
on a
particular port. It also takes a handler function, which will be used
to process the incoming request and determine a response message.
After creating an instance of ServerSocket
, server
immediately
calls its accept
method, which blocks until a TCP connection is
established. When a client connects, it returns the session as an
instance of java.net.Socket
.
It then passes the socket to the receive
function, which opens up a
reader on it and blocks until it receives a full line of input,
terminated by a newline character (
). When it receives one, it
calls the handler function with the resulting value, and calls send
to send the response using a writer opened on the same socket. send
also calls the flush
method on the writer to ensure that all the
data is actually sent back to the client, instead of being buffered in
the Writer
instance.
After sending the response, the serve
method returns. Because it
used the with-open
macro when creating the server socket and the TCP
session socket, it will invoke the close
method on each before
returning, which disconnects the client and ends the session.
To try it out, invoke the serve
function in the REPL. For a simple
example, use (serve 8888 #(.toUpperCase %))
. Note that it won’t return
right away; it blocks, waiting for a client to connect.
To connect to the server you can use a telnet client, which is installed by default on nearly every operating system. To use it, open up a command-line window:
$ telnet localhost 8888
Trying ::1...
Connected to localhost.
Escape character is '^]'.
At this point you can type anything you like (in the following example, the input is “Hello, World!”). When you finish, make sure you type Enter or Return to send a newline character:
$ telnet localhost 8888
Trying ::1...
Connected to localhost.
Escape character is '^]'.
Hello, World!
HELLO, WORLD!Connection closed by foreign host
As you can see, as soon as you type a newline, the server responds
with the uppercase version of your input (as per the handler
function) and then immediately terminates the connection. In the REPL,
you will find that the serve
function has finally returned.
This example uses readers and writers, which deal solely in textual data, to make the concepts of working with sockets easier to demonstrate. Of course, an actual socket is not limited to strings and can send and receive any kind of binary data.
To do this, simply use
the clojure.java.io/input-stream
and clojure.java.io/output-stream
functions instead of the clojure.java.io/reader
and
clojure.java.io/writer
functions, respectively, which return
java.io.InputStream
and java.io.OutputStream
objects. These
provide APIs for reading and writing raw bytes, rather than just
strings and characters.
One thing you may have noticed about the example is that, unlike a
traditional server, it doesn’t actually continue to accept incoming
connections after the serve
function returns. For ongoing use,
typically you’d like to be able to serve multiple incoming
connections.
Fortunately, this is relatively straightforward to do given the
concurrency tools that Clojure provides. Modifying the serve
function to work as a persistent server requires three changes:
Also, because the server will be running on a non-REPL thread, it would be good to provide a mechanism for terminating the server other than killing the whole JVM.
The modified code looks like this:
(
defn
serve-persistent
[
port
handler
]
(
let
[
running
(
atom
true
)]
(
future
(
with-open
[
server-sock
(
ServerSocket.
port
)]
(
while
@
running
(
with-open
[
sock
(
.accept
server-sock
)]
(
let
[
msg-in
(
receive
sock
)
msg-out
(
handler
msg-in
)]
(
send
sock
msg-out
))))))
running
))
The key feature of this code is that it launches the server socket
asynchronously inside a future and calls the accept
method inside
of a loop. It also creates an atom called running
and returns it,
checking it each time it loops. To stop the server, reset the atom to
false
, and the loop will break:
(
def
a
(
serve-persistent
8888
#
(
.toUpperCase
%
)))
;; -> #'my-server/a
;; Server is running, will respond to multiple requests
(
reset!
a
false
)
;; -> false
;; Server is stopped, will stop serving requests after the next one
ServerSocket
and Socket
objects in Java
clojure.java.io
namespace
Use Java interop with the java.net.DatagramSocket
and
java.net.DatagramPacket
classes to send and receive UDP messages.
The following example demonstrates functions that send and receive short strings encoded into UDP packets:
(
import
'
[
java.net
DatagramSocket
DatagramPacket
InetSocketAddress
])
(
defn
send
"Send a short textual message over a DatagramSocket to the specified
host and port. If the string is over 512 bytes long, it will be
truncated."
[
^
DatagramSocket
socket
msg
host
port
]
(
let
[
payload
(
.getBytes
msg
)
length
(
min
(
alength
payload
)
512
)
address
(
InetSocketAddress.
host
port
)
packet
(
DatagramPacket.
payload
length
address
)]
(
.send
socket
packet
)))
(
defn
receive
"Block until a UDP message is received on the given DatagramSocket, and
return the payload message as a string."
[
^
DatagramSocket
socket
]
(
let
[
buffer
(
byte-array
512
)
packet
(
DatagramPacket.
buffer
512
)]
(
.receive
socket
packet
)
(
String.
(
.getData
packet
)
0
(
.getLength
packet
))))
(
defn
receive-loop
"Given a function and DatagramSocket, will (in another thread) wait
for the socket to receive a message, and whenever it does, will call
the provided function on the incoming message."
[
socket
f
]
(
future
(
while
true
(
f
(
receive
socket
)))))
The send
function is fairly straightforward—most of its content is
devoted to constructing a byte array as a payload for the
DatagramPacket
and invoking constructor forms. The most interesting
thing is its limitation of the payload size to 512 bytes, using the
length
argument to the DatagramPacket
constructor. This is because
it generally isn’t safe to attempt to send over 512 bytes of payload
in a single UDP packet; although some network infrastructures may
support it, others do not.
The receive
function creates an incoming byte array, adds it to a
mutable empty DatagramPacket
instance, and invokes the DatagramSocket.receive
method on the socket. When incoming data is received, the receive
method will return after populating the instance of
DatagramPacket
. The Clojure code then constructs and returns a new
String
using the populated range of the byte array (that is, between
0 and the value reported by the DatagramPacket.getLength
method).
Because the receive
function blocks and only returns a single value,
it isn’t particularly useful for accepting multiple messages or using
from the REPL. receive-loop
wraps the receive
function, calling it
repeatedly on a separate thread. Whenever it returns a value, it invokes
the supplied function, then loops back to wait for more
input.
To execute this code, you’ll first need to create an instance of
DatagramSocket
. At the REPL:
(
def
socket
(
DatagramSocket.
8888
))
;; -> #'udp/socket
This creates a UDP socket on the specified port (in this case, 8888).
Next, start up a listener using the receive-loop
function. For this
example, simply pass it the println
function so it will print out
all received values:
(
receive-loop
socket
println
)
;; -> #<core$future_call$reify__6267@2783890e: :pending>
Then you can send a message! If you started the listener thread with
receive-loop
properly, you should see it print out the incoming
message immediately:
(
send
socket
"hello, world!"
"localhost"
8888
)
;; *out*
;; hello, world!
;;
;; -> nil
In this case, sending to localhost, the message transmission happens
so quickly that the message is actually received before the send
function even returns.
Unlike TCP, UDP (the User Datagram Protocol) is an asynchronous protocol that makes no guarantees regarding the order in which messages arrive, whether their contents are correct, or even if they arrive at all. In exchange, UDP typically has a lower per-packet latency than protocols like TCP, since it does not need to perform error checking or recovery.
Before you decide to use UDP, make sure your application is designed to continue working even if packets are dropped or corrupted.
Because UDP uses asynchronous messages as its model, it is fairly easy
to use core.async
to wrap the raw DatagramSocket
instances. core.async
provides a very nice channel abstraction that lets you consume and
produce inherently asynchronous events (such as UDP messages) in a
clean, managed way.
UDP is also capable of sending the same datagram packet to multiple
destinations using a technique called UDP multicast. To use
multicast, create an instance of java.net.MulticastSocket
instead of
java.net.DatagramSocket
.
A full explanation of how to use MulticastSocket
is very well
documented on
Oracle’s website and would be redundant to reproduce here, since it
is straightforward Java interop. After reading the preceding example,
extending it to MulticastSocket
should be relatively
self-explanatory.