To write a fault-tolerant system, you need at least two computers[27] and you need to distribute your program across them. Distributed systems lie at the heart of modern computing. In server-side programming, it is the exception rather than the rule to see a single computer performing a task of any difficulty; instead, a number of computers (or processors) will together provide a robust, efficient, and scalable platform upon which applications can be built.
Erlang distribution is built into the language, and from the user’s point of view, it can be completely transparent: processes are accessed by a pid, and this may equally well refer to a process on the local computer or a process on a system on the other side of the world. In this chapter, we will look at the theory behind distributed systems and see how it is applied to Erlang-based systems.
The essence of distributed systems is to provide in a transparent way a service of some kind through a number of computers, processors, or cores linked together by a network. A service can be specific, such as the storage provided by a distributed filesystem or database, or more general, as in a distributed operating system that provides all the facilities of a general-purpose OS across a network of computers. Distribution can be seen in tightly coupled parallel processors, but more clearly in the loosely coupled grids of e-science systems. Erlang provides distributed programming facilities so that Erlang systems can be run across networked Erlang nodes.
Take an installation of Ejabberd, an Erlang open source Jabber-based instant messaging (IM) server. Its implementation is distributed across a cluster of two or more Erlang nodes. These nodes, residing on the same or separate machines, help each other by sharing the message and event loads. Should one of the nodes terminate because of a software or hardware error, or simply because of lack of memory, the other nodes take over the traffic, hiding the fault from the end user. In the worst case, end users might believe they experienced a network glitch when the socket reconnects to the new node, but all they would notice are other users signing out and in.
The Erlang Web framework, an open source application for Erlang-based web applications, uses distribution for scalability and reliability. A typical cluster consists of frontend and backend nodes. The frontend nodes contain the web servers (running in the Erlang node), a cache layer, and a layer handling XML parsing for inbound requests. It also contains the functionality for handling the dynamic generation of XHTML. Two or more backend nodes contain the databases and all of the glue and logic needed to generate the dynamic content. The real load will be on the frontend, as it handles the socket connections and most of the parsing. To scale the system, all you need to do is add more hardware and frontend nodes, increasing the backend support only when necessary. Should any of the nodes fail the load balancers will automatically redirect the traffic to the nodes that are still alive.
If you want to scale a system in Erlang by distributing functionality across a number of nodes, one thing you need to consider is how the load might be balanced across the nodes. It would be possible to allocate tasks to nodes at random or to use a round-robin approach; either process works well with tasks of a similar size. Otherwise, you need to estimate the size of the tasks to be distributed. Finally, you could use a master-slave model where tasks are delegated as required.
Whatever approach you use, it is crucial to monitor the system behavior and to adapt the distribution strategy—either in real time or via code updates—to respond to the system’s changing requirements.
Another example of distributed systems is in one of the first flagship Erlang products, the AXD301 ATM switch. The smallest Erlang cluster consists of two nodes, a call setup node and an operation and maintenance (O&M) node. If the O&M node fails, a failover occurs, and the O&M applications are restarted on the call setup node. When the O&M node comes back up, through automated recovery or manual intervention, a takeover occurs, and the O&M applications are migrated back to the original node.
Failures in the call setup node are considered critical, as they affect ATM traffic. If a call setup node terminates, a failover would move the call setup applications to the O&M node. Data distribution ensures that any calls whose setups were initiated before the failover are not lost. They are picked up by the new call setup application running on the O&M node. When the original call setup node is restarted, to ensure that traffic is not disrupted and no call setup requests are lost, a takeover of the O&M applications results in the O&M functionality being migrated to the newly restarted node and the call setup functionality remaining on what was formerly the O&M node.
Concurrency is central to all distributed systems, since computation and communication can proceed in parallel across the processors and networks comprising the system. Central to the challenges of distributed systems is robustness in the event of failure. This is memorably summarized by one of the pioneers in the field, Leslie Lamport:
A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable.
But if you get it right, there are a number of advantages in building a distributed system:
It will provide performance that can be scaled with demand. A typical example here is a web server: if you are planning a new release of a piece of software, or you are planning to stream video of a football match in real time, distributing the server across a number of machines will make this possible without failure.
This performance is given by replication of a service—in this case a web server—which is often found in the architecture of a distributed system.
Replication also provides fault tolerance: if one of the replicated web servers fails or becomes unavailable for some reason, HTTP requests can still be served by the other servers, albeit at a slower rate. This fault tolerance allows the system to be more robust and reliable.
Distribution allows transparent access to remote resources, and building on this, it is possible to federate a collection of different systems to provide an overall user service. Such a collection of facilities is provided by modern e-commerce systems, such as the Amazon.com website.
Finally, distributed system architecture makes a system extensible, with other services becoming available through remote access.
Telecom systems need to reflect all of this. But looking at the bigger picture, they are not the only ones. Trading systems, retail banking systems, air and railway traffic control systems, and web services are just some of the areas where highly transactional, mission-critical systems have benefited from Erlang distribution.
An Erlang node is an executing Erlang runtime system that has been given a
name. Multiple nodes can run on a single host, but they can also be
running on different host computers, too, as shown in Figure 11-1, where three nodes are
running on the hosts STC
and FCC
in their respective subnetworks.
As a first example, we’ll look at two nodes running on the same
machine, STC
, as shown in Figure 11-2. To run a node, the
erl
command needs to be given the
sname
flag (the name
flag can also be used; we’ll discuss this
shortly). For example:
erl -sname foo
Erlang (BEAM) emulator version 5.6.3 [source] [64-bit] [smp:2]
[async-threads:0] [kernel-poll:false]
Eshell V5.6.3 (abort with ^G)
(foo@STC)1>
Note that the prompt in the Erlang shell displays the name of the
node, as well as its host computer: foo@STC
. This is called the (unique)
identifier of the node.
A similar command, erl–sname bar
,
will set up a second node on the STC
system.
To understand what happens next, you need to look at the module
dist.er1
. This contains the function
t
:
t(From) -> From ! node().
The function takes as a parameter a pid, From
, and in its single action it sends a
message to the process with that pid. The message is the result of
calling node()
, which returns
the identifier of the node where it is called.
Next, we’ll look at the user input typed at foo@STC
. This uses the spawn/4
function, whose first argument is the
node where the spawn should take place. The remaining arguments are as for
spawn/3
: the module, function, and
initial arguments. The effect of this is as follows:
The process is spawned at the node bar@STC
and starts executing function
t
with the pid of the shell running
at foo@STC
as the argument.
The effect of this is to send the value of node()
, the identifier of the current node,
which here will be bar@STC
, to the
pid; t
then terminates.
This can be tested by flushing the remaining messages at
foo@STC
, which shows that it has
been sent the identifier bar@STC
.
This example shows the transparency of
communication. The command to send the message between the two nodes has
exactly the same form as in the nondistributed case: Pid!Message
. Moreover, the messages from one
node to another will be delivered in the same order they are sent. The
only difference between this and the nondistributed case is that a remote
node may become unavailable.
Sending a message to a named process is different from the
nondistributed case, in that naming is local to each
node. So, to send a message to the process named frequency
at the node foo@STC
the form {frequency,foo@STC}!Message
is used.
As we said before, a node is an executing Erlang system, and a node is said to be alive if it can communicate with other nodes; this is another way of saying that the node is named, and so can take part in communication.
The function erlang:is_alive()
will
test whether the local runtime system is alive, and as you can see from
the following example, it is possible to change the live status of a
running runtime system using functions from the net_kernel
module, as well as finding out the
name of the current node using the node/0
BIF. Try it out:
1>erlang:is_alive().
false 2>net_kernel:start([foo]).
{ok,<0.33.0>} ([email protected])3>erlang:is_alive().
true ([email protected])4>node().
'[email protected]' ([email protected])5>net_kernel:stop().
ok 6>erlang:is_alive().
false
Each live node has to be named: these names must be unique on that host, but can be duplicated across different hosts. The name/host pair, called the identifier of the node, is used to uniquely identify the node in the network.
Names take two forms. You already saw the first one; the second one is new:
erl
–sname foo ...
The sname
will name a
host on the local network, and takes the form name@host
(e.g., foo@STC
).
erl
–name foo ...
The name gives the full IP address of
the host: this could be [email protected]
, or (on the local
network) [email protected]
. As you
can see in command 4 of the preceding example, using the net_kernel:start
functions to start a
distributed node results in the node being given a long
name.
Nodes with long names can communicate only with other nodes with long names; similarly for nodes with short names.
To use hostnames such as server.kent.ac.uk
, rather than raw IP
addresses such as 192.168.1.11
, it
is necessary to resolve hostnames to IP addresses. A domain name
system (DNS) server does this, but without access to a DNS
server, names can be resolved locally using information contained in a
hosts file. The details of how to do this vary across different
platforms. Consult the documentation for your particular operating
system for more information on how it works.
For two nodes to communicate, not only must both of them be alive, but also they must share some information contained in an atom called the secret cookie. Each node has a single cookie value at any time, and nodes sharing the same value can communicate.
Each node can be started with an explicit cookie value, as in the following:
erl -sname foo -setcookie blah
If no value is set on launch, the Erlang runtime system will pick up the value stored in the file .erlang.cookie. If the file does not exist, it will be created in the home directory of the user’s account. A randomly generated secret cookie value will be stored in it. As a result, nodes created on the same user account will share the same cookie value by default. If you have been experimenting with distributed Erlang without setting a cookie, look for the .erlang.cookie file. You can edit it to whatever value you want.
To show secret cookies and distribution in action, we have repeated the example from Figure 11-2 with nodes running on separate host computers on the same network; this is shown in Figure 11-3.
Note in Figure 11-3 how the two nodes are explicitly started with the same cookie value; this value will override any values contained in an .erlang.cookie file on either host.
Your first attempt to distribute the Erlang code may fail: why? The following call:
(foo@STC)1> spawn('bar@FCC', dist, t, [self()]).
is on the node foo
at host
STC
, but the spawned code is
executed on FCC
. The module
dist.erl
will not be executable on
FCC
unless it is there, as it will
not be transferred from STC
to
FCC
automatically.
Moreover, the call will still fail unless there is a compiled
version of the code in the code search path on the remote Erlang node
on host FCC
.
The most elementary communication is for one node to test whether it can communicate with another, a process informally known as pinging the node (see Figure 11-4).
Figure 11-4 shows two nodes initialized with
different cookie values, which will prevent their communication. The
foo
node attempts to communicate
using net_adm:ping/1
: the
pang
response shows that this fails.
The pinged node also gives an error report to signal the connection
attempt, by way of warning that a potential security problem has
occurred. After changing the cookie for foo
to cake
, the ping
is successful, indicated by the pong
result; such a successful attempt is not
signaled at the bar
node.
Next, we’ll look at an example in which a call to spawn/4
registers a process on a remote node,
and then communicates with it; this is illustrated in Figure 11-5.
The first command in the foo
node spawns the dist:s/0
process. The
effect of this is to register the loop loop/0
under the name server
. The effect of this loop is to
repeatedly receive messages of the form {M,
Pid}
and to return the message M
to the Pid
. In the second command at foo
, the message hi
is sent together with the Pid
of the shell running at foo
. This is received by server
, and the hi
message is returned to foo
; you can see this in the inbox of foo
as a result of the flush()
of the inbox.
Distributed Erlang nodes are able to communicate with each other, provided
that they share the same cookie information, but you haven’t seen how
the connections between the nodes are set up. That is because the Erlang
runtime system sets up the connection automatically to a node when it is
first referred to. This might be through the net_adm:ping/1
call or by sending a message to
a registered process on it. Information about nodes is, by default,
shared between connected nodes so that if A
knows about B
, and B
about C
, then A
will also find out about C
.
Each node has a cookie value at any one time. Security in distributed Erlang is based on sharing cookie information: what happens when cookie values are changed? The following interaction shows this in action:
(foo@STC)1>net_adm:ping('bar@STC').
pang (foo@STC)2>erlang:set_cookie(node(),cake).
true (foo@STC)3>net_adm:ping('bar@STC').
pong (foo@STC)4>erlang:set_cookie(node(),fish).
true (foo@STC)5>net_adm:ping('bar@STC').
pong
The nodes foo
and bar
initially have different cookies, hence
the negative reply to the first command. In command 2, the cookie of
foo
is set to that of bar
, and so in command 3, the ping
is successful, because a connection can
be established. In command 4, the cookie is changed back to its original
value, but command 5 shows that the two nodes remain connected despite
the fact that the two nodes now have different cookies.
This “inclusive” model of connection may not be what is required,
and using the facilities of the net_kernel
module it is possible to control
connections by hand. It is also possible to use the erl
command with the flag -connect_all false
to avoid nodes from
globally connecting to each other.
The net_kernel
process at each
node coordinates operations at a distributed Erlang node. BIFs such as
spawn/4
are converted by the net_kernel
to messages that are sent to the
net_kernel
on the remote node. The
net_kernel
process also handles
authentication by cookies. Because the net_kernel
is simply another Erlang process,
it is possible for a user to modify it to provide a different behavior,
such as changing the authentication scheme or not allowing processes
from another node to spawn processes.
Even the most security-unconscious readers will have realized that basing your security on secret cookies alone is not very reliable. As telecom clusters tend to run behind firewalls, enhancing security in its distribution model has never been an issue. In the early days, cookies were in fact sent across the network unencrypted!
Considering the low level of security in distributed Erlang, how can you build a secure distributed system in Erlang? There are two answers to this question:
If you are building a distributed system for scalability and robustness, it’s likely that you are working in a closed and secure network environment. In this case, the Erlang distribution model directly supports what you require in a transparent and effective way.
If you want to build a geographically distributed system, it is best to communicate between nodes using existing secure mechanisms, such as SSL over TCP/IP. The Erlang distribution has library support for many protocols, including secure protocols such as SSL. We’ll cover the fundamentals of how to communicate using TCP/IP in Erlang in Chapter 15.
You can enhance security by writing your own net_kernel
process, giving the process
whatever behavior and level of security you might require.
When nodes get connected, they start monitoring each other,
creating a fully meshed network. If you have four Erlang nodes, the
fully meshed network would result in six TCP/IP connections among the
nodes. Using the formula N * (N – 1) /
2
, we quickly compute that 10 nodes require 45 connections.
Not only would this result in an overhead of the monitoring messages
being sent among the nodes, but also we might not have wanted to
connect all of these nodes to each other in the first place. Going up
to 100 or more nodes makes the situation even worse, especially if
many of these nodes have no relation to each other.
The solution is to use hidden nodes and explicitly set up the connections where necessary. You start a hidden node by starting Erlang with the following:
erl -sname foo -hidden
Once started, connect to other nodes using the net_kernel:connect(NodeName)
call. Using the
nodes/0
BIF will not return any
hidden nodes. To view them, you would have to call nodes/1
with an atom as the argument:
calling nodes(hidden)
will list the
hidden nodes that you are connected to, and nodes(connected)
will give an aggregated
list of all nodes, both hidden and not hidden.
In the following example, we start three nodes, naming them
alpha
, beta
, and gamma
, where gamma
is a hidden node. Having started them,
we connect to beta@STC
and gamma@STC
from alpha@STC
. Once the three nodes have been
started, the result is as follows:
(alpha@STC)1>net_kernel:connect('beta@STC').
true (alpha@STC)2>net_kernel:connect('gamma@STC').
true (alpha@STC)3>nodes().
['beta@STC'] (alpha@STC)4>nodes(hidden).
['gamma@STC'] (alpha@STC)5>nodes(connected).
['beta@STC', 'gamma@STC']
Once we’ve executed all of the commands in alpha
, we can inspect how the node
connectivity has spread to beta
and
gamma
. Let’s have a look:
UNIXSHELL>erl -sname beta
Erlang (BEAM) emulator version 5.5 Eshell V5.5 (abort with ^G) (beta@STC)1>nodes().
['alpha@STC'] (beta@STC)2>nodes(connected).
['beta@STC']
As you can see in the preceding code, the hidden node gamma
does not appear. In gamma
itself, no nodes are visible unless
you view them with the hidden
or
connected
flag. When doing so, the
only visible node is alpha
, as the
information on beta
was not spread
when the connection was established:
UNIXSHELL>erl -sname gamma -hidden
Erlang (BEAM) emulator version 5.5 Eshell V5.5 (abort with ^G) (gamma@STC)1>nodes().
[] (gamma@STC)2>nodes(hidden).
['alpha@STC']
Hidden nodes can be used as gateways connecting smaller distributed clusters together. It is a technique which allows hundreds of nodes to be loosely connected in a grid, without the overhead of them having to monitor each other. Hidden nodes are also commonly used for operation and maintenance, as well as for trace nodes, where the node does not carry any traffic, and is not required for the system to operate, but has to be able to retrieve information and interact with remote nodes.
The classic construct in distributed computing is the remote procedure call (RPC) in which a local call to a procedure is replaced by a call to the same procedure running on a remote node. RPC is simple to implement in Erlang, and moreover, the Erlang implementation avoids many of the pitfalls of RPC in other languages.
In the basic Erlang implementation of RPC, the following (local) function call,
Val = fac(N)
is replaced by a message to send and receive, as shown here and illustrated in Figure 11-6:
remote_call(Message, Node) -> {facserver, Node} ! {self(), Message}, receive {ok, Res} -> Res end.
In the following code, the facserver
process is on the bar@STC
node and runs in the facLoop/0
loop function:
server() -> register(facserver,self()), facLoop(). facLoop() -> receive {Pid, N} -> Pid ! {ok, fac(N)} end, facLoop().
The main difference between a local and a remote call is the fact that a remote node may go down. You can deal with this in a number of different ways. For example, you can add a timeout in the client code:
remote_call(Message, Node) -> {facserver, Node} ! {self(), Message}, receive {ok, Res} -> Resafter 1000 ->
{error, timeout}
end.
If no reply is received within one second, the tuple {error, timeout}
is returned. You have to be
careful when using timeouts, as the message might still be received
after the timeout and stored in the process mailbox. The remote server
might be extremely busy or the network may be highly congested. If you
do not flush the message, the next time remove_call/2
is
invoked and a new request to the factorial server is sent, you’ll end up
retrieving the first message in the queue containing the reply from the
previous call.
Alternatively, it is possible to link to the server process, so if
that fails, the client process will also fail. You can do this by
launching the server process using spawn_link/4
rather
than spawn/4
:
setup() -> process_flag(trap_exit, true), spawn_link('bar@STC',myrpc,server,[]).
If the remote process terminates, you will receive the usual
'EXIT'
signal. If the network
connection between the two nodes goes down, the network kernel sends an
'EXIT'
signal with reason noconnection
.
Finally, it is possible to monitor whether a
node is alive; the monitor_node(Node,Bool)
BIF will switch this
on/off for Node
according to the
Boolean flag Bool
, as in the
following code. When monitoring is active, the message {nodedown, Node}
will be sent to the
monitoring process:
remote_call(Message, Node) ->monitor_node(Node,true),
{facserver, Node} ! {self(), Message}, receive {ok, Res} ->monitor_node(Node,false),
Res;{nodedown, Node} ->
{error, node_down}
end.
Don’t forget to demonitor your node when you are done, because
calling monitor_node(Node, true)
will
generate a nodedown
message for each
time the BIF was called.
The rpc
library module
provides implementations of services that are similar to
remote procedure calls, as well as facilities for broadcast and parallel
evaluation of RPC calls. The most commonly used function is:
rpc:call(Node, Module, Function, Arguments)
which executes the function on the remote node. The Module
must be in the code search path on the
remote node, and the nodes must either be connected or share the cookie.
The result is the return value of the call, or, upon failure, {badrpc, Reason}
.
If your applications are going to be distributed, spend some time
reading through the manual pages of the rpc
module and become familiar with it. You
will find help functions for synchronous, asynchronous, and blocking
calls. There are also calls to broadcast calls, both synchronously and
asynchronously to a pool of nodes. You never know when you are going to
need these functions, so reviewing them now will avoid your having to
reinvent the wheel at a later date.
A number of key modules support distributed programming in Erlang. Some we have already covered, while others are new:
erl
This module contains the erl
command that starts an Erlang
runtime system. You can change the runtime system behavior by
setting various flags on launch. These include:
-connect_all
false
With this flag, the system will not maintain a
global
list of connected
nodes, thus preventing global naming.
-hidden
This has the effect of launching a hidden node, which is often used for operation and maintenance purposes.
-name Name
/-sname Name
These flags give the node a long/short name, Name
.
-setcookie
Cookie
This flag sets the cookie value for the node to
Cookie
.
erlang
The erlang
module
collects the Erlang BIFs, many of which are
auto-imported and can thus be called without the erlang:
prefix. Here the ones that are
not auto-imported are prefixed with the erlang
module:
disconnect_node(Node)
This will disconnect the Node
passed as an argument.
erlang:get_cookie()
This returns the current cookie for the local node if
it is alive; otherwise, it returns nocookie
.
monitor_node(Node
,
Flag)
This turns on/off the monitoring of the node Node
depending on whether the
Flag
is set to true
or false
. There is also a variant,
monitor_node/3
, which is
not autoimported.
node()
This returns the name of the local node, Name@Host
, or it returns nonode@nohost
if it is not
alive.
node(Arg)
This returns the node where Arg
is located: Arg
can be a pid, a reference, or
a port.
nodes()
This returns a list of visible nodes in the system,
excluding the local node. nodes(Type)
will return a list of
particular nodes, where Type
is the atom hidden
or connected
.
erlang:set_cookie(Node
, Cookie)
This sets the cookie at Node
to be Cookie
.
spawn(Node
, Module, Function,
ArgumentList)
This performs spawn(Module
, Function
, ArgumentList)
on the node Node
. spawn_link/4
is similarly
analogous to spawn_link/3
.
net_kernel
This module contains the infrastructure for manually starting, stopping, connecting, and monitoring nodes. These functions will be called automatically by the runtime system, but can also be used in modified ways by the user.
net_adm
This module contains various useful functions, including
ping
(described earlier) and
functions to examine the local hosts file, among others.
When running the distributed Erlang examples in this chapter, you
might have noticed an OS thread running a command called epmd
. This is a part of the Erlang runtime
system that acts as a port mapper deamon for Erlang
distributed nodes. One epmd
daemon
process is started per machine, regardless of the number of distributed
Erlang nodes running on it. The daemon will listen for incoming connection
requests on port 4369, mapping them to the listening port of the node that
is being connected to. If not already running, epmd
is automatically launched when you start
your first distributed Erlang node. Starting it manually, however, allows
you to pass a set of commands and configuration parameters.
You will find the epmd
command
useful when troubleshooting problems relating to distribution,
configuring Erlang distribution to work through firewalls, or trying to
simulate busy networks. The executable is located in the Erlang root
directory, together with the binaries of the virtual machine. Flags which
can be passed to it include the following:
-help
Prints a list of debugging commands. These commands are not always listed in the manual pages.
-port PortNumber
Changes the listening port. This is useful when dealing with particular ports in firewalls.
-names
Lists the names of the local nodes. This is useful when running Erlang as a background process without a shell looking for name conflicts.
-daemon
Starts epmd
as a daemon
process.
-kill
Kills the epmd
process.
Connected processes remain connected, but new attempts to connect on
that host will fail. Restarting epmd
will result in the loss of
information regarding all connected nodes. New nodes will be able to
connect to each other, but old ones will not.
-packet_timeout
Sets the number of seconds a connection can be inactive before
epmd
times out and closes the
connection. Connections are kept open by a keepalive; if there is no
other traffic, tic
messages are
sent and acknowledged by a tok
.
-delay_accept and
–delay_write
Are used in testing environments to simulate busy servers and network congestion.
When running distributed Erlang nodes behind firewalls, you
need to open the port on which epmd
listens. By default, this is port 4369, but you can change it to
whatever port you please, as long as it is consistent in your node
cluster. You also need to open the ports that the individual nodes use
to connect to each other. You can specify the node range by running the
following commands:
application:set_env(kernel, inet_dist_listen_min,9100
) application:set_env(kernel, inet_dist_listen_max,9105
)
These commands force Erlang to use ports from 9100 to 9105 for distribution. You can replace these values with whatever range you want.
Design a distributed version of an associative store in which values are associated with tags. It is possible to store a tag/value pair, and to look up the value(s) associated with a tag. One example for this is an address book for email, in which email addresses (values) are associated with nicknames (tags).
Replicate the store across two nodes on the same host, send lookups to one of the nodes (chosen either at random or alternately), and send updates to both.
Reimplement your system with the store nodes on other hosts (from each other and from the frontend). What do you have to be careful about when you do this?
How could you reimplement the system to include three or four store nodes?
Design a system to test your answer to this exercise. This should generate random store and lookup requests.