Setting up NameNode

In this section, we will do a step-by-step installation and basic configuration of the NameNode service including the High Availability (HA) setup. Unlike many other guides and tutorials available online, which treat NameNode's HA setup as an advanced topic, we will focus on setting up NameNode's HA from the beginning. The reason for this is the critical role NameNode plays in the Hadoop setup. Basically, NameNode is a single point of failure for Hadoop cluster. Without this service, there is no way to access files on Hadoop Distributed File System (HDFS).

There are several approaches to setting up NameNode High Availability. Prior to CDH 4.1, HA could be implemented using a shared storage setup. In this case, the primary NameNode writes the filesystem metadata changes into an editlog, which is located on a shared network storage, and a secondary NameNode polls changes from the editlog and applies it to its own copy of metadata snapshot. Additionally, all DataNodes updated both the NameNodes with information about the current block's location, so the secondary NameNode could take over the primary role, in the case of the primary NameNode's failure.

Note

Prior to CDH version 4, the secondary NameNode didn't perform a standby function. Its only role was to perform checkpoint operations. With the HA implementation, a Standby NameNode performs both HA and checkpoint functions. You can think of a Standby NameNode as a secondary NameNode + hot standby.

This setup is less than ideal. It requires additional hardware, which in turn needs to be easily available. In CDH 4.1 a new version of an easily available setup for NameNode was released, which relies on distributed services to synchronize two data nodes and eliminates the need for shared network storage. This setup is called Quorum Journal Manager and it introduces several new components. There are two NameNodes: primary and standby. This is similar to the previous setup, but instead of writing an editlog to the shared network file, the primary NameNode writes them to a cluster of JournalNodes. JournalNode is new type of daemon introduced in CDH 4.1. The idea behind JournalNodes is that the primary NameNode submits editlog changes to a group of JournalNodes, which store them on the local disks. The write is considered successful if a majority of JournalNodes are able to persist it on disk. This eliminates shared storage requirement, but still guarantees that the editlog writes are durable and there is no single point of failure. One great thing about JournalNodes is that their operations are lightweight and you don't need to run those on separate hardware.

A common practice is to run three JournalNodes (an odd number guarantees a proper quorum). Two of these can be run on the same server as NameNodes and one on a JobTracker. This is not a mandatory requirement and you can run JournalNodes on any of the servers in the cluster you choose. For our setup, we will choose this option.

Standby NameNode polls an editlog from a cluster of JournalNodes and applies them to the copy of filesystem image it has. Standby NameNode still performs the checksum function, and ships the updated fsimage file back to the primary NameNode. In addition, DataNodes are configured to send heartbeats with information about block allocation to both nodes. In the case of the primary NameNode failure, the secondary NameNode can seamlessly take over the HDFS operations.

To make the whole cross-nodes coordination possible, NameNodes rely on ZooKeeper to track which NameNode is primary and which one is standby, to prevent situations when both nodes decide they are primary and start writing the editlog to the JournalNodes. ZooKeeper is another Apache project, which is a part of CDH. It provides distributed coordination services and is useful when many different nodes need to share a state, locking information, or any other data information. You can find more information about ZooKeeper at http://zookeeper.apache.org. The last piece of the NameNode HA puzzle is ZooKeeper Failover Controller (ZKFC). ZKFC is a daemon that runs on both primary and standby NameNodes, checks their health and status, and initiates a failover to a standby NameNode, if required. When it comes to dealing with NameNode failures, cluster administrators are presented with two options:

  • Use manual failover from failed NameNode to the standby NameNode. This is a simpler setup, but it means that the cluster administrator will have to carefully monitor the health of the active NameNode and quickly initiate the failover steps if something goes wrong.
  • Configure the automatic failover option, which relies on ZKFC to monitor the status of the active NameNode. ZKFC will initiate the failover if required and use the ZooKeeper cluster as a status synchronization point.

For our setup, we will choose the automatic NameNode failover option.

As you can see, there are many moving parts that are added with the NameNode HA setup. Here is a diagram that will help you visualize all the components involved and their relationship with each other:

Setting up NameNode

NameNode HA with JournalNode Quorum setup diagram

All of the examples in the following sections are performed on a test cluster with the following nodes being set up and configured: nn1.hadoop.test.com, nn2.hadoop.test.com, and jt1.hadoop.test.com. Names should be self-explanatory: nn1 and nn2 are primary and standby NameNodes respectively, and jt1 is a JobTracker. I will omit DataNodes for now, as we will be talking about them later in this chapter.

Tip

Dealing with a cluster of machines on a large scale, obviously, requires some degree of automation of common tasks. One of the tasks that will need to be constantly repeated while setting up and configuring a cluster is the propagation of these configuration files across different machines. Cloudera Manager can help a lot with configuration management. You can also use tools such as Puppet, Chef, or Ansible for this.

We will start with installing packages required by the NameNode on nn1, nn2, and jt1. The reason we are installing HDFS-packages on a JobTracker server is because we will need to run a JournalNode there.

Note

Unless specified otherwise, all commands are to be executed as a root user.

You can do it by running a simple yum command on nn1, nn2, and jt1 servers:

# yum install hadoop-hdfs-namenode

This will install several dependent packages. Let's quickly take a look at what those are.

  • bigtop-jsvc and bigtop-utils: These packages are for the Apache Bigtop project (http://bigtop.apache.org) This project was created to help a streamline development and packaging of Hadoop components. It is responsible for proper environment setup, making sure JAVA_HOME is correctly detected in different systems, and so on. Generally, you don't have to be concerned with this but need to be aware of its existence, since some of the configuration files' locations and usages have been changed since Bigtop's introduction.
  • hadoop: This package contains core Hadoop components, configuration files, and shared libraries. It will be installed on all cluster nodes.
  • hadoop-hdfs: This one provides configuration files for HDFS, NameNode, JournalNode, and DataNode, built-in web-servers configurations, and so on.
  • zookeeper: We discussed ZooKeeper's role in NameNode HA previously, but it is also being used by HBase columnar storage.

One thing to note here, is that along with the setup of HDFS packages, CDH will also create a new OS user named hdfs. All the daemon processes will be executed as this user.

JournalNode, ZooKeeper, and Failover Controller

The next step is to install the JournalNode package on all three servers:

# yum install hadoop-hdfs-journalnode

We have already installed the zookeeper package as a part of NameNode dependencies, but we also need to install scripts to start/stop the ZooKeeper server. Run the following command on nn1, nn2, and jt1:

# yum install zookeeper-server

And finally, we will need to install Failover Controller. This daemon needs to be executed only on primary and standby NameNodes, so we install it on nn1 and nn2:

# yum install hadoop-hdfs-zkfc

Before we can proceed with configuring NameNode and other components, we need to make sure that the ZooKeeper cluster is up and running. In our case, we have three ZooKeeper nodes on nn1, nn2, and jt1. The ZooKeeper configuration file, zoo.cfg is located at /etc/zookeeper/conf/, and here is how it looks for our setup:

maxClientCnxns=50
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/var/lib/zookeeper
# the port at which the clients will connect
clientPort=2181

server.1=nn1.hadoop.test.com:2888:3888
server.2=nn2.hadoop.test.com:2888:3888
server.3=jt1.hadoop.test.com:2888:3888

The sample configuration file contains some defaults and no changes are required, unless you are doing some advanced tuning. You may need to change the dataDir option, depending on your setup. What needs to be added to this configuration file are the last three lines you can see in the preceding code. These lines provide a configuration for the ZooKeeper cluster. The number after each server word is a server ID and 2888 and 3888 are the ports for connecting to ZooKeeper and electing a new leader respectively. We don't have to be concerned with these details right now, but one thing you need to do is to double check that these ports are open on ZooKeeper nodes, and that the client port 2181 is accessible for any other servers which will need to use ZooKeeper, such as HBase nodes.

After the configuration file is updated (don't forget to update it on all the nodes!), you need to run the following command, which will create and initialize the data directory:

# service zookeeper-server init --myid=1

We have already installed the zookeeper package as a part of NameNode dependencies, but we also need to install scripts to start/stop the ZooKeeper server.

Run the following command on nn1, nn2, and jt1:

# yum install zookeeper-server

This command needs to be executed on nn1, nn2, and jt1. It will also create a file called myid at /var/lib/zookeeper/ (location depends on the dataDir option) on all three nodes. This file contains a unique server ID for ZooKeeper nodes, and this is what you provide with the --myid option. So, you need to provide a different --myid value on each server. This is a way for the ZooKeeper daemon to understand who it is in the cluster.

To start the ZooKeeper service, execute the following command on all three nodes:

# service zookeeper-server start

Make sure you verify the contents of the log files that ZooKeeper, by default, writes at /var/log/zookeeper/zookeeper.log. Sometimes, even in the case of failures, the zookeeper-server start command still returns success, and the only way to see if the server has actually started properly is to check the log file.

Now, we are ready to proceed with the NameNode configuration.

Hadoop configuration files

Before we dive into the details of the NameNode daemon configuration, a couple of words need to be said about the Hadoop configuration files. There are many different daemons involved in a Hadoop cluster and one might expect all of them to have their own configuration files. In fact, there are only a few configuration files that you need to use for core Hadoop services. It can be confusing initially, because options for different roles are getting mixed together in several files.

There are three main configuration files for the core Hadoop components: core-site.xml, hdfs-site.xml, and mapred-site.xml. The core-site.xml file contains configuration options that are common for all servers in the cluster. The hdfs-site.xml and mapred-site.xml files provide the configuration for HDFS and MapReduce components of the cluster respectively. There are other configuration files which control different aspects of the cluster and we will take a look at those shortly. CDH puts these configuration files into the /etc/hadoop/conf directory, which in turn is a symbolic link to the alternatives directory. CDH uses the Linux Alternatives project to maintain different versions of configuration and other files. We don't have to be concerned about the exact setup, because it doesn't really affect the steps we need to take to set up a cluster.

Note

You can learn more about Linux Alternatives at http://www.linuxalt.com.

Let's take a look at what files are in /etc/hadoop/conf on one of our NameNode servers, nn1:

# ls –lh /etc/hadoop/conf
-rw-r--r--1 root root 1.2K May 21 05:40 core-site.xml
-rw-r--r--1 root root 1.8K Apr 22 19:36 hadoop-metrics2.properties
-rw-r--r--1 root root 2.5K Apr 22 19:36 hadoop-metrics.properties
-rw-r--r--1 root root 2.4K May 22 04:51 hdfs-site.xml
-rw-r--r--1 root root 8.6K Apr 22 19:36 log4j.properties
-rw-r--r--1 root root   10 Apr 22 19:36 slaves
-rw-r--r--1 root root 2.3K Apr 22 19:36 ssl-client.xml.example
-rw-r--r--1 root root 2.2K Apr 22 19:36 ssl-server.xml.example

You can see that core-site.xml and hdfs-site.xml are in place, but mapred-site.xml is missing. This is because we haven't installed any MapReduce-related packages, such as JobTracker or TaskTracker on this server yet.

The hadoop-metrics.properties and the hadoop-metrics2.properties files are controlling the way Hadoop exposes its internal metrics. This will become important when configuring cluster monitoring and we will be talking about these files in greater detail in Chapter 5, Monitoring Hadoop Cluster.

The log4j.properties configuration file is used to specify details about the Hadoop logging facilities. It is extremely flexible and allows you to specify retention and archival options, log detail level, and even log formats. Hadoop comes with a good set of defaults, so we will not discuss all available options here, but if the defaults don't fit your needs, feel free to explore the Log4j and Hadoop documentation.

The slaves file is optional and is empty by default. You can populate it with the list of DataNodes. This list would be used by scripts, such as start-all.sh, which would start all daemons in the cluster. This method of starting services in CDH is discouraged and the service command should be used instead.

The example files ssl-client.xml.example and ssl-server.xml.example are sample configuration files which can be used to set up an encrypted shuffle phase for MapReduce.

In addition to the /etc/hadoop/conf directory, there is another location that you need to be aware of. With the introduction of the Bigtop project, some of the settings were moved into a set of shell scripts in the /etc/default directory. These scripts set up some of the environment variables used by different services. Here is an example of what the default hadoop-hdfs-namenode script looks like (headers are stripped out to save space):

export HADOOP_PID_DIR=/var/run/hadoop-hdfs
export HADOOP_LOG_DIR=/var/log/hadoop-hdfs
export HADOOP_NAMENODE_USER=hdfs
export HADOOP_SECONDARYNAMENODE_USER=hdfs
export HADOOP_DATANODE_USER=hdfs
export HADOOP_IDENT_STRING=hdfs
export HADOOP_NAMENODE_OPTS="-Xmx10g"
# export HADOOP_SECURE_DN_USER=hdfs
# export HADOOP_SECURE_DN_PID_DIR=/var/run/hadoop-hdfs
# export HADOOP_SECURE_DN_LOG_DIR=/var/log/hadoop-hdfsewew

As you can see, this file specifies the PID and log file's location, the OS user that will be used to run the NameNode daemon, and other options. In most cases, when you implement the CDH package installation, the defaults path will be sufficient. One variable that is not part of the defaults is HADOOP_NAMENODE_OPTS. This variable specifies the list of JVM options that will be used for starting up the NameNode daemon. In this case, the NameNode JVM will be started with a maximum heap size of 10 GB. You will need to adjust your configuration based on the estimates of the number of files/blocks that you plan to store in HDFS. For details on RAM requirements for the NameNode please refer to Chapter 1, Setting Up Hadoop Cluster – from Hardware to Distribution.

NameNode HA configuration

We will start configuring our NameNode HA setup by adding several options to the core-site.xml file. The following is the structure of the file for this particular step. It will give you an idea of the XML structure, if you are not familiar with it. The header comments are stripped out:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
    <name>fs.default.name</name>
    <value>hdfs://sample-cluster/</value>
</property>
<property>
    <name>ha.zookeeper.quorum</name>
    <value>nn1.hadoop.test.com:2181,nn2.hadoop.test.com:2181,
       jt1.hadoop.test.com:2181
    </value>
</property>
</configuration>

The configuration file format is pretty much self-explanatory; variables are surrounded by the <property> tag, and each variable has a name and a value.

There are only two variables that we need to add at this stage. fs.default.name is the logical name of the NameNode cluster. The value hdfs://sample-cluster/ is specific to the HA setup. This is the logical name of the NameNode cluster. We will define the servers that comprise of it in the hdfs-site.xml file. In a non-HA setup, this variable is assigned a host and a port of the NameNode, since there is only one NameNode in the cluster.

The ha.zookeeper.quorum variable specifies locations and ports of the ZooKeeper servers. The ZooKeeper cluster can be used by other services, such as HBase, that is why it is defined in core-site.xml.

The next step is to configure the hdfs-site.xml file and add all HDFS-specific parameters there. I will omit the <property> tag and only include <name> and <value> to make the list less verbose.

<name>dfs.name.dir</name>
<value>/dfs/nn/</value>

NameNode will use the location specified by the dfs.name.dir variable to store the persistent snapshot of HDFS metadata. This is where the fsimage file will be stored. As discussed previously, the volume on which this directory resides needs to be backed by RAID. Losing this volume means losing NameNode completely. The /dfs/nn path is an example, however you are free to choose your own. You can actually specify several paths with a dfs.name.dir value, separating them by commas. NameNode will mirror the metadata files in each directory specified. If you have a shared network storage available, you can use it as one of the destinations for HDFS metadata. This will provide additional offsite backups.

<name>dfs.nameservices</name>
<value>sample-cluster</value>

The dfs.nameservices variable specifies the logical name of the NameNode cluster and should be replaced by something that makes sense to you, such as prod-cluster or stage-cluster. The value of dfs.nameservices must match the value of fs.default.name from the core-site.xml file.

<name>dfs.ha.namenodes.sample-cluster</name>
<value>nn1,nn2</value>

Here, we specify the NameNodes that make up our HA cluster setup. These are logical names, not real server hostnames or IPs. These logical names will be referenced in other configuration variables.

<name>dfs.namenode.rpc-address.sample-cluster.nn1</name>
<value>nn1.hadoop.test.com:8020</value>
<name>dfs.namenode.rpc-address.sample-cluster.nn2</name>
<value>nn2.hadoop.test.com:8020</value>

This pair of variables provide mapping from logical names like nn1 and nn2 to the real host and port value. By default, NameNode daemons use port 8020 for communication with clients and each other. Make sure this port is open for the cluster nodes.

<name>dfs.namenode.http-address.sample-cluster.nn1</name>
<value>nn1.hadoop.test.com:50070</value>

<name>dfs.namenode.http-address.sample-cluster.nn2</name>
<value>nn2.hadoop.test.com:50070</value>

Each NameNode daemon runs a built-in HTTP server, which will be used by the NameNode web interface to expose various metrics and status information about HDFS operations. Additionally, standby NameNode uses HTTP calls to periodically copy the fsimage file from the primary server, perform the checkpoint operation, and ship it back.

<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://nn1.hadoop.test.com:8485;nn2.hadoop.test.com:8485;jt1.hadoop.test.com:8485/sample-cluster</value>

The dfs.namenode.shared.edits.dir variable specifies the setup of the JournalNode cluster. In our configuration, there are three JournalNodes running on nn1, nn2, and nn3. Both primary and standby nodes will use this variable to identify which hosts they should contact to send or receive new changes from editlog.

<name>dfs.journalnode.edits.dir</name>
<value>/dfs/journal</value>

JournalNodes need to persist editlog changes that are being submitted to them by the active NameNode. The dfs.journalnode.edits.dir variable specifies the location on the local filesystem where editlog changes will be stored. Keep in mind that this path must exist on all JournalNodes and the ownership of all directories must be set to hdfs:hdfs (user and group).

<name>dfs.client.failover.proxy.provider.sample-cluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>

In an HA setup, clients that access HDFS need to know which NameNode to contact for their requests. The dfs.client.failover.proxy.provider.sample-cluster variable specifies the Java class name, which will be used by clients for determining the active NameNode.

At the moment, there is only ConfiguredFailoverProxyProvider available.

<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>

The dfs.ha.automatic-failover.enabled variable indicates if the NameNode cluster will use a manual or automatic failover.

<name>dfs.ha.fencing.methods</name>
<value>sshfence
       shell(/bin/true)
</value>

Orchestrating failover in a cluster setup is a complicated task involving multiple steps. One of the common problems that is not unique to the Hadoop cluster, but affects any distributed systems, is a "split-brain" scenario. Split-brain is a case where two NameNodes decide they both play an active role and start writing changes to the editlog. To prevent such an issue from occurring, the HA configuration maintains a marker in ZooKeeper, clearly stating which NameNode is active, and JournalNodes accepts writes only from that node. To be absolutely sure that the two NameNodes don't become active at the same time, a technique called fencing is used during failover. The idea is to force the shutdown of the active NameNode before transferring the active state to a standby.

There are two fencing methods currently available: sshfence and shell. sshfence. These require a passwordless ssh access as a user that starts the NameNode daemon, from the active NameNode to the standby and vice versa. By default, this is the hdfs user. The fencing process checks if there is anyone listening on a NameNode port using the nc command, and if the port is found busy, it tries to kill the NameNode process. Another option for dfs.ha.fencing.methods is shell. This will execute the specified shell script to perform fencing. It is important to understand that failover will fail if fencing fails. In our case, we specified two options, the second one always returns success. This is done for workaround cases where the primary NameNode machine goes down and the ssh method will fail, and no failover will be performed. We want to avoid this, so the second option would be to failover anyway, even without fencing, which, as already mentioned, is safe with our setup. To achieve this, we specify two fencing methods, which will be tried by ZKFC in the order of: if the first one fails, the second one will be tried. In our case, the second one will always return success and failover will be initiated, even if the server running the primary NameNode is not available via ssh.

<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/var/lib/hadoop-hdfs/.ssh/id_rsa</value>

The last option we will need to configure for NameNode HA setup is the ssh key, which will be used by sshfence. Make sure you change the ownership for this file to hdfs user. Two keys need to be generated, one for the primary and one for the secondary NameNode. It is a good idea to test ssh access as an hdfs user in both directions to make sure it is working fine.

The hdfs-site.xml configuration file is now all set for testing the HA setup. Don't forget to sync these configuration files to all the nodes in the cluster. The next thing that needs to be done is to start JournalNodes. Execute this command on nn1, nn2, and jt1 a root user:

# service hadoop-hdfs-journalnode start

With CDH, it is recommended to always use the service command instead of calling scripts in /etc/init.d/ directly. This is done to guarantee that all environment variables are set up properly before the daemon is started. Always check the logfiles for daemons.

Now, we need to initially format HDFS. For this, run the following command on nn1:

# sudo -u hdfs hdfs namenode –format

This is the initial setup of the NameNode, so we don't have to worry about affecting any HDFS metadata, but be careful with this command, because it will destroy any previous metadata entries. There is no strict requirement to run format command on nn1, but to make it easier to follow, let's assume we want nn1 to become an active NameNode. Format command will also format the storage for the JournalNodes.

The next step is to create an entry for the HA cluster in ZooKeeper, and start NameNode and ZKFC on the first NameNode. In our case, this is nn1:

# sudo -u hdfs hdfs zkfc -formatZK
# service hadoop-hdfs-namenode start
# service hadoop-hdfs-zkfc start

Check the ZKFC log file (by default, it is in /var/log/hadoop-hdfs/) to make sure nn1 is now an active NameNode:

INFO org.apache.hadoop.ha.ZKFailoverController: Trying to make NameNode at nn1.hadoop.test.com/192.168.0.100:8020 active...
INFO org.apache.hadoop.ha.ZKFailoverController: Successfully transitioned NameNode at nn1.hadoop.test.com/192.168.0.100:8020 to active state

To activate the secondary NameNode, an operation called bootstrapping needs to be performed. To do this, execute the following command on nn2:

# sudo -u hdfs hdfs namenode –bootstrapStandby

This will pull the current filesystem state from active NameNode and synchronize the secondary NameNode with the JournalNodes Quorum.

Now, you are ready to start the NameNode daemon and the ZKFC daemon on nn2. Use the same commands that you used for nn1. Check the ZKFC log file to make sure nn2 successfully acquired the secondary NameNode role. You should see the following messages at the end of the logfile:

INFO org.apache.hadoop.ha.ZKFailoverController: ZK Election indicated that NameNode at nn2.hadoop.test.com/192.168.0.101:8020 should become standby
INFO org.apache.hadoop.ha.ZKFailoverController: Successfully transitioned NameNode at nn2.hadoop.test.com/192.168.0.101:8020 to standby state

This is the last step in configuring NameNode HA. It is a good idea to verify if automatic failover is configured correctly, and if it will behave as expected in the case of a primary NameNode outage. Testing failover in the cluster setup stage is easier and safer than discovering that failover doesn't work during production stage and causing a cluster outage. You can perform a simple test: kill the primary NameNode daemon and verify if the secondary takes over its role. After that, bring the old primary back online and make sure it takes over the secondary role.

Note

You can use execute the following command to get the current status of NameNode nn1:

# sudo -u hdfs hdfs haadmin -getServiceState nn1

The hdfs haadmin command can also be used to initiate a failover in manual failover setup.

At this point, you have a fully configured and functional NameNode HA setup.

JobTracker configuration

Like NameNode, JobTracker plays a master role in the MapReduce framework; it collects heartbeat messages from TaskTrackers in the cluster, maintains information about the cluster's current capacity and is responsible for scheduling, submitting, and progress tracking of user-defined jobs. This is a lot of work to do and on a large cluster, JobTracker can become very busy; but unlike NameNode, JobTracker has to maintain much less state information about running jobs, and besides, when maintaining job logs there are minimal requirements for persistent storage.

For a long time, JobTracker, just like NameNode, was a single point of failure in a Hadoop cluster. If the JobTracker process fails, all currently running or scheduled jobs will fail and have to be restarted. Until the release of CDH 4.1, it was the user's responsibility to monitor the job status and re-submit it in case of JobTracker failures. In CDH 4.1, first steps towards high availability were made and the job persistence feature was added to the JobTracker. Now, JobTracker can automatically restart all the jobs that were running during the JobTracker crash or restart. In CDH 4.2, JobTracker's High Availability setup was introduced. It allows for configuring active and standby JobTrackers and performing automatic failover if the active JobTracker fails. One thing you need to be aware of when configuring JobTracker HA is that all jobs that were running at the moment of failover will have to be restarted from the beginning on the standby JobTracker. JobTracker HA setup uses similar components for the NameNode HA setup. It uses ZooKeeper to store information about which node is currently active, and relies on ZKFC daemons to monitor JobTracker's health and perform automatic failover. JobTrackers will share the job status information via files on HDFS, so no additional storage cluster, such as JournalNodes, is required.

For our sample installation, we will not be using JobTracker HA, but we will configure the job status information persistence. There are several reasons for setting things up this way. First of all, JobTracker HA is not as critical as NameNode HA. The Daemon process restarts or unplanned server reboots can be easily tolerated with jobs being automatically restarted by JobTracker upon startup. In the case of a complete server failure, it is easy enough to set up a new JobTracker, since no previous state information is required to make JobTracker operational quickly. So, JobTracker HA doesn't provide much when it comes to daemon crashes and restarts, but it can shorten recovery time in case of catastrophic node failure. It is up to you to decide where JobTracker is important for your cluster, depending on the SLAs you may have for the jobs running there. The second reason why JobTracker HA will not be discussed in detail is because setting it up is similar in many aspects to NameNode HA; you will need to configure a logical name for the JobTracker cluster, describe both servers participating in it in the configuration file, set up ZKFC, and so on.

You should be able to quickly configure HA for JobTracker by following the CDH HA guide, which can be found at http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH4/latest/CDH4-High-Availability-Guide/cdh4hag_topic_3.html

To install the JobTracker package, execute the following command on the dedicated server. In our sample cluster, the JobTracker hostname is jt1.hadoop.test.com:

# yum install hadoop-0.20-mapreduce-jobtracker

This command will install hadoop-0.20-mapreduce as a dependent package, which actually contains all the relevant .jar files, and other related files. The JobTracker package only provides default files and a server startup script. You should start seeing the trend, in the way things are packaged, by now.

MapReduce specific parameters are stored in /etc/hadoop/conf/mapred-site.xml, which is used by both JobTracker and TaskTracker. A small quirk of the JobTracker package is that it doesn't provide a skeleton configuration file, so you should create it manually.

The first thing we need to configure is a JobTracker hostname and port. This option will be used by TaskTrackers to know where to look for master node:

<name>mapred.job.tracker</name>
<value>jt1.hadoop.test.com:8021</value>

As mentioned previously, local disk space requirements for JobTracker are minimal. Besides the standard log files, JobTracker also stores configurations for all the submitted user jobs as well as historical information on completed jobs. This logging goes to the standard Hadoop log directory, which, in CDH, is located at /var/log/hadoop-0.20-mapreduce/. Additionally, JobTracker needs to store some of its system information on the local disk. This data is not critical and normally doesn't take much disk space. The following variable specifies the location of this local directory:

<name>mapred.local.dir</name>
<value>/tpm/mapred/jt</value>

The mapred.local.dir directory needs to be owned by the mapred user and group. This is the user that CDH created during the package installation.

MapReduce jobs need to run on a large number of machines, so there is a need to be able to propagate job shared files, as job configuration files, jar files, among others to all of them. JobTracker uses HDFS to store such files. It allows shared files to be easily accessed by TaskTrackers on all cluster nodes.

The following variable specifies the location of such a shared directory:

<name>mapred.system.dir</name>
<value>/tmp/mapred/system</value>

There is no real indication of whether the paths in the mapred-site.xml file are referring to the local filesystem, or are using HDFS, so it's important to remember which variables are responsible for what.

MapReduce also writes some information about the running and completed jobs into HDFS, so users who are submitting jobs can easily access it. To make this happen, the following variable needs to point to the root user directory, which is normally /user in HDFS:

<name>mapreduce.jobtracker.staging.root.dir</name>
<value>/user</value>

The next couple of variables are related to how the job status information is stored on HDFS. Previously, JobTracker stored the job status information in memory, so in the case of a daemon restart, there was a possibility that MapReduce clients would never get the status of their submitted jobs. To fix this issue, Job Status Store was created, which is basically a directory in HDFS. To activate Job Status Store, you need to enable the following variables:

<name>mapred.job.tracker.persist.jobstatus.active</name>
<value>true</value>

By default, job status information will be written to the /jobtracker/jobsInfo directory on HDFS. You can control this location with the mapred.job.tracker.persist.jobstatus.dir variable. We will use a default for our setup, so we need to create these directories in HDFS and set proper permissions:

# sudo -u hdfs hdfs dfs -mkdir /jobtracker
# sudo -u hdfs hdfs dfs -chown mapred:mapred /jobtracker

The next variable allows restarting of the jobs that were running when JobTracker was restarted or crashed. JobTracker reads the job status from HDFS and restarts it. While it can provide an additional level of reliability and allow resuming failed jobs faster than the case where human interaction is required, you should decide if this option is required in your cluster. There are several potential issues that can occur with this automatic job restart. First of all, some clients may decide to restart jobs themselves, in the case of a failure, and this leads to a situation where multiple copies of the same job are submitted. In a multitenant environment, the general rules regarding job submissions can be hard to impose.

Another potential problem is that JobTracker crashes can be actually related to the jobs that are currently running, and restarting all the jobs after JobTracker is back online, without proper investigation about what caused the crash, may not be the best idea. On the other hand, if you have a good level of control of jobs that are running on the cluster, or you have strict SLAs around jobs completion, enabling automatic jobs restart is a valid option.

<name>mapred.jobtracker.restart.recover</name>
<value>true</value>

You can also control for how long the job status information is being stored in HDFS by setting the following option:

<name>mapred.job.tracker.persist.jobstatus.hours</name>
<value>24</value>

Configuring the job scheduler

One of the main roles of JobTracker is to schedule user jobs and submit them to the TaskTrackers. There is probably no Hadoop cluster for production purposes out there that is being used to run one job at a time. In most of the cases, there will be several jobs submitted by different users and a valid question of how the cluster resources will be allocated for competing jobs. From the MapReduce framework's perspective, cluster capacity consists of available map and reduce slots. These parameters are defined during TaskTrackers' configuration and are a fixed number for each worker node. TaskTrackers send regular messages to the JobTracker with the information of how many map and reduce slots are currently occupied on a given server, and it is a responsibility of a JobTracker scheduler to decide how to utilize those slots in the most efficient way.

There are several algorithms that JobTracker can use to schedule concurrent jobs. These algorithms are implemented as pluggable classes and need to be specified in the mapred-site.xml file. It is the responsibility of the administrator to choose the best scheduler for a given workload. Scheduler is identified by the mapred.jobtracker.taskScheduler variable, which is a Java class name for a specific scheduler. At the moment, there are three different schedulers available: JobQueueTaskSchedule, FairScheduler, and CapacityTaskScheduler. Since achieving good cluster utilization is a top priority for any Hadoop administrator, it is worth giving a brief overview of the main differences between schedulers.

JobQueueTaskScheduler

The default, most primitive scheduler, is JobQueueTaskSchedule or FIFO scheduler. This scheduler forms a queue of submitted jobs, and lets the first job arrive and occupy all available slots in the cluster. All other jobs will have to wait in the queue for the first job to finish completely. This can cause significant wait times, even for small jobs if they are unlucky to get scheduled after a long job. Such an approach is less than optimal in environments where long ETL processes have to co-exist with short-lived user requests. FIFO scheduler allows assigning different priorities to the jobs that are submitted, but each priority level just gets assigned to a separate queue, and jobs from a high priority queue are getting submitted to the TaskTrackers faster than jobs from a lower priority queue. This scheduler lacks important features of a proper resource manager and is not a good fit for most production installations.

FairScheduler

The second available scheduler is FairScheduler. The main idea behind FairScheduler is to allow equal distribution of cluster resources to different jobs over a period of time. Jobs are organized into pools, where each pool gets an equal amount of cluster slots. By default, FairScheduler is configured to create a pool per user, but other options are also available. If there is only one job running, it is allowed to consume all available slots, but slots that are freed (some map tasks will finish faster than others) can be re-used by other jobs. There is also a way to assign a minimal amount of cluster resources' shares to more important pools. For example, you may want to do it for critical production jobs. You can also configure this scheduler to kill running tasks to guarantee adequate capacity for high priority jobs. FairScheduler provides much better cluster utilization for multitenant environments and has more advanced tuning options. We will choose this scheduler for our cluster configuration.

CapacityTaskScheduler

The last currently available scheduler is CapacityTaskScheduler. This scheduler uses an approach similar to FairScheduler, but with some differences. Jobs are assigned into a pre-defined set of queues and each queue gets a portion of cluster resources specified by the administrator. This share of resources is assigned to a queue and is not given away, even if there are no other jobs running. Inside a queue, jobs are scheduled in a FIFO fashion, meaning that jobs from one user will be serialized within a queue. One of the interesting features of CapacityTaskScheduler is that this scheduler can interpret information about system resources utilization such as RAM, reported by TaskTrackers and make scheduling decisions based on general server utilization and not only by looking at the cluster slots available. This is a popular option for production usage and works well when the type of load is more or less static.

As mentioned earlier, we will configure FairScheduler for our sample setup. We will use a basic setup with the task pool created for each user and all pools configured with no minimal capacity sharing requirements. First of all, we need to specify the scheduler we want to use:

<name>mapred.jobtracker.taskScheduler</name>
<value>org.apache.hadoop.mapred.FairScheduler</value>

Next, we need to configure how pools will be created. In the following example, each user submitting a job will get a separate pool:

<name>mapred.fairscheduler.poolnameproperty</name>
<value>user.name</value>

The value of the mapred.fairscheduler.poolnameproperty variable can be any job property. For example, to create a pool based on the user's primary OS group, you can set the value to group.name.

An administrator can pre-define pools in a separate XML file called the allocations file. All the additional pool properties such as minimal slots shared, are specified in this file. For our setup, we will allow FairScheduler to dynamically create pools for every user submitting a job, with no additional options. To do this, create a new file in /etc/hadoop/conf/ and name it fair-scheduler.xml. It will be an empty XML file with only one section in it:

<?xml version="1.0"?>
<allocations>
</allocations>

Now, we need to specify the location of this file in mapred-site.xml:

<name>mapred.fairscheduler.allocation.file</name>
<value>/etc/hadoop/conf/fair-scheduler.xml</value>

Since we haven't declared any pools, we need to allow FairScheduler to allocate pools dynamically. This is done by setting the mapred.fairscheduler.allow.undeclared.pools variable to true:

<name>mapred.fairscheduler.allow.undeclared.pools</name>
<value>true</value>

If you would like to fine tune the pools' behavior, you can explore the available options in the scheduler documentation at http://hadoop.apache.org/docs/r1.2.1/fair_scheduler.html.

These are the all JobTracker configuration options we need to specify at this stage. You can now start the daemon using the service command:

# service hadoop-0.20-mapreduce-jobtracker start

DataNode configuration

Now that we have finished configuring Hadoop master services, NameNode and JobTracker, it's time to configure nodes where most of the data processing is done: the DataNodes.

DataNodes are Hadoop's worker nodes, they also comprise the majority of the cluster's server population. DataNodes play a dual role; they host the DataNode HDFS daemon, as well as the TaskTracker from the MapReduce framework. This blend allows the MapReduce jobs to utilize locality of data and avoid expensive network transfers when possible.

Let's walk through the details of DataNode configuration. You then will be able to replicate these steps across all DataNodes in your cluster. For the following examples, I have added a new host to the sample cluster: dn1.hadoop.test.com. The same assumptions we made for NameNode installation are true for this section as well. Local disk volumes should be properly formatted and mounted, OS tuning applied, there should be no firewall restrictions for Hadoop ports, and the CDH repository should be configured.

To install a DataNode using the CDH package, use the following yum command:

# yum install hadoop-hdfs-datanode.x86_64

This will install Hadoop core, Hadoop HDFS, and DataNode specific packages. Actually, the hadoop-hdfs-datanode package includes only the default configuration file and service startup script. All the core components are being installed as a part of hadoop and hadoop-hdfs packages.

Now, we need to add several DataNode specific options into hdfs-site.xml. Some of these options are used by DataNode exclusively, some of them can be used by different HDFS clients, but to keep things consistent we will edit the hdfs-site.xml file, which we already have, after we finish the NameNode configuration and propagate it on all the nodes in the cluster.

First, we need to specify which volumes DataNode will use to store actual HDFS data. This is done by setting the dfs.data.dir variable:

<name>dfs.data.dir</name>
<value>
/dfs/data1,/dfs/data2,/dfs/data3,/dfs/data4,/dfs/data5,/dfs/data6
</value>

This variable is a comma-separated list of volumes that are local on each DataNode. Since no RAID is usually configured for DataNodes, these volumes are actually separate disks. DataNode will use these disks in a round-robin fashion to store the HDFS data.

The next variable we add to hdfs-site.xml is dfs.block.size. This variable sets the default block size in bytes for files that are stored in HDFS. You can override this setting by specifying the block size for a given file. By default, Hadoop uses a 64 MB block size, and these days in most production installations a 128 MB block size has became the standard. You can vary the block size for different files, when you upload them into HDFS by specifying a client-side option. If no such option is provided, Hadoop will use dfs.block.size from the configuration file. For our sample cluster, we will go with a 128 MB block size:

<name>dfs.block.size</name>
<value>134217728</value>

Another core variable that controls HDFS behavior is dfs.replication. This variable specifies how many times each file block will be replicated. The default setting is 3 and in most cases, this is the optimal choice between cluster resilience and total usable cluster storage capacity. Just like with block size, HDFS clients can specify a replication factor on a per file basis. Even though there is no need to adjust this variable, it's a good idea to explicitly add it to the configuration file to make it easy to understand which setting is used:

<name>dfs.replication</name>
<value>3</value>

There are two other variables related to dfs.replication, the dfs.replication.max variable, that limits the number of possible replicas of one block in the cluster. By default, it's 512. You may want to lower this value if you don't trust all the clients that are using your cluster, and if you don't want anyone to flood the filesystem with files setup to replicate 512 times. Another related setting is dfs.namenode.replication.min. It controls what would be the minimal number of replicas for a given block to consider the block write completed. The default for this variable is 1, which means if at least one DataNode stored the block, the write operation is considered successful. Such default behavior means that some files will be stored with a number of replicas less than those demanded by the dfs.replication setting. This can happen when there is a network issue in the cluster or not all DataNodes are available.

Hadoop will periodically scan the filesystem for such under-replicated blocks and will try to replicate them to match with the system default. The default of 1 is a reasonable setting for the dfs.namenode.replication.min and we will leave it as it is for our setup, but you may want to review this setting, especially if you have strong requirements for data resilience.

The next variable that we will specify for DataNode configuration is dfs.du.reserve. The name of this variable is not very descriptive, but the idea is that Hadoop can reserve some space on the disks that are being used by DataNodes. This is required because the MapReduce framework will need a local volume to store temporary data. Both the map and reduce phase of the jobs will generate temporary files, and having enough space in the MapReduce temporary directory is critical. It's a common practice to utilize the same local disks that HDFS uses to be able to split the IO load across multiple spindles, but this means that some space needs to be reserved to prevent a situation where all the disk space is consumed by HDFS, and MapReduce jobs start failing because there is nowhere to transfer temporary data to. It's a good idea to reserve about 10 percent of the total disk capacity on each disk, unless you plan to have a dedicated volume on each server for the MapReduce temporary directory. To reserve 10 GB per disk, we will use the following setting:

<name>dfs.du.reserve</name>
<value>10737418240</value>

Before we start the DataNode daemon, it is worth double checking the JVM heap settings. By default, DataNode will start with 1 GB heap memory size. It is enough in most cases, because DataNode is mostly focused on large sequential reads and writes, but if you need to adjust it, use the /etc/default/hadoop-hdfs-datanode file and set the HADOOP_DATANODE_OPTS variable, similar to what we did for the NameNode's heap size:

export HADOOP_DATANODE_OPTS="-Xmx1024m"

Now, we are ready to start the DataNode daemon using the service command:

# service hadoop-hdfs-datanode start

Always check logfiles for the daemons that were just started, especially when making changes to the configuration files. If DataNode started successfully and was able to communicate with the active and standby NameNode, you should see messages in the log file indicating that DataNode started sending block allocation information to the NameNodes.

To verify that DataNode is functioning properly, we will perform two simple tests. It is possible to use HDFS at this point, even though we have configured only one DataNode. Recall the dfs.namenode.replication.min setting. With a default value of 1, HDFS will be able to store files, if at least one replica of file blocks can be created. This will allow us to verify the configuration on one DataNode before investing time into propagating these settings across all DataNodes.

First of all, we will create a temporary directory in HDFS and then upload a sample file there. A temporary directory in Hadoop plays the same role as a /tmp directory in your Linux installation. Some of the ecosystem projects that we will be setting up in further chapters will require /tmp to be present in HDFS. To create a new directory in HDFS, we will use the hdfs command:

# sudo -u hdfs hdfs dfs -mkdir /tmp

The hdfs command-line client tool is used to perform various operations on HDFS. It accepts HDFS commands as arguments and generally tries to mimic standard Linux commands such as mkdir, ls, rm, among others. Note that we execute this command as an hdfs user because we need to create a root level directory in the filesystem and hdfs is an equivalent of a root user for your local filesystem. If you try to execute this command as a root user, you will get a permission denied error.

Now, we need to set permissions on the /tmp directory in HDFS, in the same fashion you would do for the local filesystem; everyone is allowed to write and read to this directory, but users can only delete or rename their own files (sticky bit). There is a chmod command for HDFS which is similar to the following Linux command:

# sudo -u hdfs hdfs dfs -chmod 1777 /tmp

Let's create a simple text file, test.txt and upload it into HDFS, assuming the file is in /root you can upload it using the following command:

# hdfs dfs -put /root/test.txt /tmp

To verify that the file was correctly loaded we can use the -ls and -cat commands for hdfs. You can also use the hdfs fsck command to verify the health of the filesystem.

At this point, you should proceed with configuring the rest of the DataNodes in your cluster. Obviously, when dealing with a large number of machines, using automation tools to propagate configuration files and execute commands in parallel comes in handy.

TaskTracker configuration

TaskTracker is the final piece of the Hadoop core that we need to set up and configure. TaskTracker is to MapReduce framework what DataNode is to HDFS. TaskTrackers are responsible for launching and executing individual tasks for the jobs that JobTracker submits to them. TaskTrackers' daemons run on the same servers as DataNodes. This is required to maximize local data access during the map phase. JobTracker is smart enough to figure out on which DataNodes, the file blocks—that will be required by individual tasks—reside, and submit those tasks to TaskTrackers running on the same server.

First of all, we need to install the required packages:

# yum install hadoop-0.20-mapreduce-tasktracker

Similarly to JobTracker, the hadoop-0.20-mapreduce package will be installed as a dependency.

The TaskTracker daemon uses the same mapred-site.xml configuration file, just as JobTracker does. This may not be the best decision, because some settings may have different values for TaskTracker and JobTracker. In the following examples, we will assume that you maintain two different versions of mapred-site.xml, one for JobTracker and one for all TaskTrackers.

The first setting we need to specify is the address and port of the JobTracker daemon:

<name>mapred.job.tracker</name>
<value>jt1.hadoop.test.com:8021</value>

This is exactly the same value we configured for JobTracker too.

We have also configured mapred.local.dir on the JobTracker server as well, but on TaskTracker, it has a different function. While it still specifies local volumes to store temporary data, requirements for local storage are more demanding on TaskTracker. MapReduce jobs will use this local storage to save temporary data (map task output data), which later will be passed to the reduce phase. The size of the data and intensity of IO can be significant, depending on the types of jobs you run and jobs concurrency. A common practice is to allocate separate directories for the jobs' data on the same disks that DataNodes are using. This allows for enough disk space, as well as utilizing multiple disks to improve IO throughput. Previously, we have configured the dfs.du.reserve variable for the DataNode daemon. This is required to prevent HDFS hogging up all the disk space and causing jobs to fail, because they can't write to a temporary location anymore. We configured the same variable on JobTracker, but disk space requirements there are much humbler. Here is how to configure this variable on TaskTracker:

<name>mapred.local.dir</name>
<value>
/dfs/data1/mapred,/dfs/data2/mapred,/dfs/data3/mapred,/dfs/data4/mapred,/dfs/data5/mapred,/dfs/data6/mapred
</value>

This assumes that your HDFS volumes are mounted on /dfs/data*. Make sure that a mapred OS user can access the mapred directories.

TaskTracker is where the Hadoop cluster capacity configuration happens. From MapReduce's perspective, cluster capacity consists of available map slots and available reduce slots. The number of slots that each TaskTracker will be available to allocate depends on the server hardware configuration, specifically, the CPU and RAM available. Creating more slots that a single worker node can handle doesn't make much sense, because then the job processes will start to compete for machine resources, causing significant slowdown. Let's assume we are setting up a smaller cluster of higher-end machines with 16 CPU cores and 64 GB of RAM per node, as discussed earlier. The rule of thumb is to approximately match the number of available slots with the number of available cores. For faster servers, oversubscribing the CPU a little bit is OK and for a 16 core machine, we can allocate a total of 24 task slots (16 * 1.5 = 24). Keep in mind that these are the total available slots, including map and reduce slots, since map tasks do initial data processing and one map task is created for each input split, you can expect that there will be more demand for map slots than for reduce slots. The reduce phase is executed over data that map tasks produce and is generally expected to process much less data than the map phase did. The exact ratio of an available map to reduce slots will depend on the actual workload, but a good starting point would be to allocate 2/3 of all server slots to map tasks and 1/3 to reduce tasks. For our 16 core machine, we will have 16 map slots and 8 reduce slots:

<name>mapred.tasktracker.map.tasks.maximum</name>
<value>16</value>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
<value>8</value>

TaskTracker creates separate Java processes for each map and reduce tasks, so apart from its own heap memory settings, it needs to provide those settings to the child processes as well. Just like with all other Hadoop daemons we have configured so far, TaskTracker's heap size can be configured by setting the HADOOP_TASKTRACKER_OPTS variable in the /etc/default/hadoop-0.20-mapreduce file:

export HADOOP_TASKTRACKER_OPTS="-Xmx1G"

The default setting for the TaskTracker heap memory is 1G and it should be enough for most installations. This setting, though, is not related to the memory that will be available to map and reduce processes. For this, a mapred.java.child.opts variable in mapred-site.xml is used. Actually, it can be used to provide different parameters to map and reduce processes, but the most common are minimum and maximum heap memory settings. The amount of RAM that you should allocate to individual tasks depends on the total number of slots configured and the server RAM. For our example, for a server that has 64 GB of RAM and a total of 24 task slots, allocating 2G per task is reasonable:

<name>mapred.java.child.opts</name>
<value>-Xmx2G</value>

The mapred.java.child.opts variable is just an option string that will be passed to the task processes, so you can include additional options, such as the amount of memory that will be allocated to the process initially with the -Xms option.

This concludes the basic TaskTracker configuration and we are now ready to start it. Use the following command to bring it online:

service hadoop-0.20-mapreduce-tasktracker

Check the TaskTracker log file to make sure there are no errors. Typical problems during initial setup are caused by incorrect permissions on local directories, typos in JobTracker address, and so on. You can also run the following command to list all TaskTrackers that have successfully registered with JobTracker:

# hadoop job -list-active-trackers

Before you propagate this setup to all the TaskTrackers, we run a final validation test and that is executing an actual MapReduce job. This will be a basic WordCount example, that is often presented as a "Hello, World" equivalent for MapReduce. It is also included in examples shipped with CDH packages.

  1. First of all, create a file with some arbitrary text in it and save it in a local directory. Let's assume you are doing it on the jt1 server and operating as a root user. To execute a MapReduce job, you need to create a /user/root directory in HDFS. It will be used as a job staging directory. Make sure the ownership is assigned to the root:
    # sudo –u hdfs hdfs dfs -mkdir /user/root
    # sudo -u hdfs hdfs dfs -chown root:root /user/root
    
  2. Note that we are performing these operations as an hdfs user, which is a superuser for HDFS, while the root from HDFS' perspective is just an ordinary user. Next, create a job input directory in /user/root:
    # hdfs dfs -mkdir /user/root/words_input
    
  3. To upload a file into HDFS, use the following command:
    # hdfs dfs -put /tmp/words.txt /user/root/words_input
    
  4. The word count example, along with other sample jobs is located in /usr/lib/hadoop-0.20-mapreduce/hadoop-examples.jar. To submit a MapReduce job, use the hadoop jar command:
    # hadoop jar /usr/lib/hadoop-0.20-mapreduce/hadoop-examples.jar wordcount /user/root/words_input/tmp/words_output
    
  5. The hadoop jar command takes in several arguments: the location of the .jar file, the class name of the job we want to run, the directory where to look for input files, and the directory where to write the final results. The output directory is getting created for every job run, so make sure it doesn't exist before you start the job. If everything is configured properly after executing this command, you should see the job execution progress and some additional statistics, including the time spent in map and reduce phases, the number of rows processed, and so on.
  6. Take a look at the output directory, you should see the following files:
    # hdfs dfs -ls /user/root/words_output
    Found 3 items
    -rw-r--r--   3 root supergroup          0  /user/root/words_output/_SUCCESS
    drwxrwxrwt   - root supergroup          0  /user/root/words_output/_logs
    -rw-r--r--   3 root supergroup         65  /user/root/words_output part-r-00000
    
  7. The _SUCCESS file is a marked file, indicating the job completed without errors. The _logs directory contains the job execution logs, and finally part-r-00000 is the result file containing words and their respective counts from the input file. You can check its content by running this command:
    # hdfs dfs -cat /user/root/words_output/part-r-00000
    

If everything worked as described here, your MapReduce and HDFS configurations are valid and you can proceed with propagating them across all nodes in the cluster.

Advanced Hadoop tuning

The previous sections described the steps to set up and configure core Hadoop components. After you learn more about your typical production workload and the types of job that are being executed, you may want to adjust some of the additional configuration settings to improve or balance the workload better. Hadoop exposes much more configuration options than what we have already discussed. You will probably never need to change most of them, but there are a few performance related variables worth being mentioned here.

hdfs-site.xml

NameNode needs to respond to many different requests from HDFS clients, including DataNodes and external applications. To handle those requests efficiently, NameNode spawns several threads to handle this load and the number of threads it creates is configurable via the dfs.namenode.handler.count variable. The default number of threads is 10, but if you are setting up a large cluster you may want to increase it. There are different opinions on how to calculate this number, but generally you will not get a significant performance boost from setting this value too high. Set this value to 100 if your cluster size exceeds 200 nodes, and to 50 if it is less than that:

<name>dfs.namenode.handler.count</name>
<value>50</value>

Adding new DataNodes to the cluster to increase the total capacity, or to replace the failed nodes, is a common task that the Hadoop administrator will need to perform sooner or later. While new nodes will start accepting blocks for newly created files and replicas, existing data will not be rebalanced automatically. This means that your new node will not be fully utilized. To force Hadoop to move existing data blocks to servers that have lots of capacity, you need to run the hdfs balancer command, but moving terabytes of data across the network can easily disrupt regular jobs. To prevent this, Hadoop throttles balancing data moves to a value defined in the dfs.datanode.balance.bandwidthPerSec variable, which is in bytes per second. The exact value will depend on the network bandwidth in your cluster; the default is 1 MB/sec. To increase it to 10 MB/sec, use the following setting:

<name>dfs.datanode.balance.bandwidthPerSec</name>
<value>10485760</value>

mapred-site.xml

Similar to the NameNode, JobTracker can get pretty busy responding to multiple client requests. To make sure JobTracker has enough threads allocated for this purpose, you need to tune the mapred.job.tracker.handler.count variable. Use the same rule as for the NameNode threads number:

<name>mapred.job.tracker.handler.count</name>
<value>50</name>

MapReduce's jobs' performance is highly influenced by the sort-and-shuffle phase, when the output of an individual map task has to be sorted by keys, merged together into several larger files, and sent to reducers. This phase normally generates a lot of network traffic and can also cause significant local IO. When a map task needs to sort its output, it uses an internal buffer and once this buffer is full, it spills data onto local disks. To reduce the number of such spills you can increase the value of the io.sort.mb variable. By default, it is set to 100 MB and it is a good buffer size for most of the workloads. If you are observing a lot of local IO during the sort phase, you can increase this value. To increase the sort buffer to 200 MB, add the following lines to the mapred-site.xml file:

<name>io.sort.mb</name>
<value>200</value>

Note

Keep in mind that this buffer is allocated as a part of the task's heap size, specified by the mapred.java.child.opts variable.

A related option is io.sort.factor. It specifies how many files will be merged in one batch. Again, there is no strict rule about this variable, but the default of 10 seems to be too low. You can start with 32, which is a somewhat arbitrary number, and observe the cluster's performance to tune it further:

<name>io.sort.factor</name>
<value>32</value>

During the reduce phase of the jobs map, the outputs, belonging to a given key range, need to be copied to a specific reducer. There can be many such pieces scattered across the cluster, and to speed up the copy reducers use several parallel processes. By default, there are five copy processes created in parallel. You may want to increase this value if you are setting up a large cluster, or you plan to run jobs that will need thousands of map tasks. For small to medium sized clusters (20-100 nodes) go with 12-16 parallel processes:

<name>mapred.reduce.parallel.copies</name>
<value>12</value>

core-site.xml

Large Hadoop clusters are spawn across multiple racks in datacenters, but it is not unusual to see smaller clusters to be split between two or three different racks. For smaller clusters, the multiple rack setup provides additional redundancy in case one of the racks fails. Hadoop was designed to be rack-aware, for example, being able to make some optimizations, depending on which rack a given server belongs to. One of such optimizations is a block replica placement strategy that Hadoop uses. It will try to store at least one replica in a different rack to increase data resilience. When requesting data blocks for, say, a MapReduce job, Hadoop will try to minimize cross-rack network traffic, picking up servers that are in the same rack instead.

To be able to use this feature, Hadoop needs to know which server belongs to which rack. The idea is that a Hadoop administrator will provide an executable, which will return a rack ID and server IP or hostname. The implementation of this executable is completely up to the administrator. The most common case is to create a comma-separated text file, which will keep the server mapped to the racks, and write a shell / python / perl script, which will take an IP or hostname as an argument and return the rack ID. The rack ID can be any string. Providing such a script is beyond the scope of this book, but there are multiple versions available online. To enable rack-awareness, you need to specify your script path in the net.topology.script.file.name variable:

<name>net.topology.script.file.name</name>
<value>/etc/Hadoop/conf/rack_topology.bash</value>

If no script is provided, Hadoop assumes that all servers are within one rack.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset