CHAPTER 2

image

Introducing Hadoop

I was at a data warehousing conference and talking with a top executive from a leading bank about Hadoop. As I was telling him about the technology, he interjected, “But does it have any use for us? We don’t have any Internet usage to analyze!” Well, he was just voicing a common misconception. Hadoop is not a technology meant for analyzing web usage or log files only; it has a genuine use in the world of petabytes (of 1,000 terabytes apiece). It is a super-clever technology that can help you manage very large volumes of data efficiently and quickly—without spending a fortune on hardware.

Hadoop may have started in laboratories with some really smart people using it to analyze data for behavioral purposes, but it is increasingly finding support today in the corporate world. There are some changes it needs to undergo to survive in this new environment (such as added security), but with those additions, more and more companies are realizing the benefits it offers for managing and processing very large data volumes.

For example, the Ford Motor Company uses Big Data technology to process the large amount of data generated by their hybrid cars (about 25GB per hour), analyzing, summarizing, and presenting it to the driver via a mobile app that provides information about the car’s performance, the nearest charging station, and so on. Using Big Data solutions, Ford also analyzes the data available on social media through consumer feedback and comments about their cars. It wouldn’t be possible to use conventional data management and analysis tools to analyze such large volumes of diverse data.

The social networking site LinkedIn uses Hadoop along with custom-developed distributed databases, called Voldemort and Espresso, to power its voluminous amount of data, enabling it to provide popular features such as “People you might know” lists or the LinkedIn social graph at great speed in response to a single click. This wouldn’t have been possible with conventional databases or storage.

Hadoop’s use of low-cost commodity hardware and built-in redundancy are major factors that make it attractive to most companies using it for storage or archiving. In addition, features such as distributed processing that multiplies your processing power by the number of nodes, capability of handling petabytes of data at ease; expanding capacity without downtime; and a high amount of fault tolerance make Hadoop an attractive proposition for an increasing number of corporate users.

In the next few sections, you will learn about Hadoop architecture, the Hadoop stack, and also about the security issues that Hadoop architecture inherently creates. Please note that I will only discuss these security issues briefly in this chapter; Chapter 4 contains a more detailed discussion about these issues, as well as possible solutions.

Hadoop Architecture

The hadoop.apache.org web site defines Hadoop as “a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models.” Quite simply, that’s the philosophy: to provide a framework that’s simple to use, can be scaled easily, and provides fault tolerance and high availability for production usage.

The idea is to use existing low-cost hardware to build a powerful system that can process petabytes of data very efficiently and quickly. Hadoop achieves this by storing the data locally on its DataNodes and processing it locally as well. All this is managed efficiently by the NameNode, which is the brain of the Hadoop system. All client applications read/write data through NameNode as you can see in Figure 2-1’s simplistic Hadoop cluster.

9781430265443_Fig02-01.jpg

Figure 2-1. Simple Hadoop cluster with NameNode (the brain) and DataNodes for data storage

Hadoop has two main components: the Hadoop Distributed File System (HDFS) and a framework for processing large amounts of data in parallel using the MapReduce paradigm. Let me introduce you to HDFS first.

HDFS

HDFS is a distributed file system layer that sits on top of the native file system for an operating system. For example, HDFS can be installed on top of ext3, ext4, or XFS file systems for the Ubuntu operating system. It provides redundant storage for massive amounts of data using cheap, unreliable hardware. At load time, data is distributed across all the nodes. That helps in efficient MapReduce processing. HDFS performs better with a few large files (multi-gigabytes) as compared to a large number of small files, due to the way it is designed.

Files are “write once, read multiple times.” Append support is now available for files with the new version, but HDFS is meant for large, streaming reads—not random access. High sustained throughput is favored over low latency.

Files in HDFS are stored as blocks and replicated for redundancy or reliability. By default, blocks are replicated thrice across DataNodes; so three copies of every file are maintained. Also, the block size is much larger than other file systems. For example, NTFS (for Windows) has a maximum block size of 4KB and Linux ext3 has a default of 4KB. Compare that with the default block size of 64MB that HDFS uses!

NameNode

NameNode (or the “brain”) stores metadata and coordinates access to HDFS. Metadata is stored in NameNode’s RAM for speedy retrieval and reduces the response time (for NameNode) while providing addresses of data blocks. This configuration provides simple, centralized management—and also a single point of failure (SPOF) for HDFS. In previous versions, a Secondary NameNode provided recovery from NameNode failure; but current version provides capability to cluster a Hot Standby (where the standby node takes over all the functions of NameNode without any user intervention) node in Active/Passive configuration to eliminate the SPOF with NameNode and provides NameNode redundancy.

Since the metadata is stored in NameNode’s RAM and each entry for a file (with its block locations) takes some space, a large number of small files will result in a lot of entries and take up more RAM than a small number of entries for large files. Also, files smaller than the block size (smallest block size is 64 MB) will still be mapped to a single block, reserving space they don’t need; that’s the reason it’s preferable to use HDFS for large files instead of small files.

Figure 2-2 illustrates the relationship between the components of an HDFS cluster.

9781430265443_Fig02-02.jpg

Figure 2-2. HDFS cluster with its components

HDFS File Storage and Block Replication

The HDFS file storage and replication system is significant for its built-in intelligence of block placement, which offers a better recovery from node failures. When NameNode processes a file storage request (from a client), it stores the first copy of a block locally on the client—if it’s part of the cluster. If not, then NameNode stores it on a DataNode that’s not too full or busy. It stores the second copy of the block on a different DataNode residing on the same rack (yes, HDFS considers rack usage for DataNodes while deciding block placement) and third on a DataNode residing on a different rack, just to reduce risk of complete data loss due to a rack failure. Figure 2-2 illustrates how two replicas (of each block) for the two files are spread over available DataNodes.

DataNodes send heartbeats to NameNode, and if a DataNode doesn’t send heartbeats for a particular duration, it is assumed to be “lost.” NameNode finds other DataNodes (with a copy of the blocks located on that DataNode) and instructs them to make a fresh copy of the lost blocks to another DataNode. This way, the total number of replicas for all the blocks would always match the configured replication factor (which decides how many copies of a file will be maintained).

Adding or Removing DataNodes

It is surprisingly easy to add or remove DataNodes from a HDFS cluster. You just need to add the hostname for the new DataNode to a configuration file (a text file named slaves) and run an administrative utility to tell NameNode about this addition. After that, the DataNode process is started on the new DataNode and your HDFS cluster has an additional DataNode.

DataNode removal is equally easy and just involves a reverse process—remove the hostname entry from slaves and run the administrative utility to make NameNode aware of this deletion. After this, the DataNode process can be shut down on that node and removed from the HDFS cluster. NameNode quietly replicates the blocks (from decommissioned DataNode) to other DataNodes, and life moves on.

Cluster Rebalancing

Adding or removing DataNodes is easy, but it may result in your HDFS cluster being unbalanced. There are other activities that may create unbalance within your HDFS cluster. Hadoop provides a utility (the Hadoop Balancer) that will balance your cluster again. The Balancer moves blocks from overutilized DataNodes to underutilized ones, while still following Hadoop’ s storage and replication policy of not having all the replicas on DataNodes located on a single rack.

Block movement continues until utilization (the ratio of used space to total capacity) for all the DataNodes is within a threshold percentage of each other. For example, a 5% threshold means utilization for all DataNodes is within 5%. The balancer runs in the background with a low bandwidth without taxing the cluster.

Disk Storage

HDFS uses local storage for NameNode, Secondary NameNode, and DataNodes, so it’s important to use the correct storage type. NameNode, being the brain of the cluster, needs to have redundant and fault-tolerant storage. Using RAID 10 (striping and mirroring your data across at least two disks) is highly recommended. Secondary NameNode needs to have RAID 10 storage. As far as the DataNodes are concerned, they can use local JBOD (just a bunch of disks) storage. Remember, data on these nodes is already replicated thrice (or whatever the replication factor is), so there is no real need for using RAID drives.

Secondary NameNode

Let’s now consider how Secondary NameNode maintains a standby copy of NameNode metadata. The NameNode uses an image file called fsimage to store the current state of HDFS (a map of all files stored within the file system and locations of their corresponding blocks) and a file called edits to store modifications to HDFS. With time, the edits file can grow very large; as a result, the fsimage wouldn’t have an up-to-date image that correctly reflects the state of HDFS. In such a situation, if the NameNode crashes, the current state of HDFS will be lost and the data unusable.

To avoid this, the Secondary NameNode performs a checkpoint (every hour by default), merges the fsimage and edits files from NameNode locally, and copies the result back to the NameNode. So, in a worst-case scenario, only the edits or modifications made to HDFS will be lost—since the Secondary NameNode stores the latest copy of fsimage locally. Figure 2-3 provides more insight into this process.

9781430265443_Fig02-03.jpg

Figure 2-3. Checkpoint performed by Secondary NameNode

What does all this mean for your data? Consider how HDFS processes a request. Figure 2-4 shows how a data request is addressed by NameNode and data is retrieved from corresponding DataNodes.

9781430265443_Fig02-04.jpg

Figure 2-4. Anatomy of a Hadoop data access request

NameNode High Availability

As you remember from the Name Node section, NameNode is a SPOF. But if a Hadoop cluster is used as a production system, there needs to be a way to eliminate this dependency and make sure that the cluster will work normally even in case of NameNode failure. One of the ways to counter NameNode failure is using NameNode high availability (or HA), where a cluster is deployed with an active/passive pair of NameNodes. The edits write-ahead log needs to be available for both NameNodes (active/passive) and hence is located on a shared NFS directory. The active NameNode writes to the edits log and the standby NameNode replays the same transactions to ensure it is up to date (to be ready to take over in case of a failure). DataNodes send block reports to both the nodes.

You can configure an HA NameNode pair for manual or automatic failover (active and passive nodes interchanging roles). For manual failover, a command needs to be executed to have the Standby NameNode take over as Primary or active NameNode. For automatic failover, each NameNode needs to run an additional process called a failover controller for monitoring the NameNode processes and coordinate the state transition as required. The application ZooKeeper is often used to manage failovers.

In case of a failover, it’s not possible to determine if an active NameNode is not available or if it’s inaccessible from the standby NameNode. If both NameNode processes run parallel, they can both write to the shared state and corrupt the file system metadata. This constitutes a split-brain scenario, and to avoid this situation, you need to ensure that the failed NameNode is stopped or “fenced.” Increasingly severe techniques are used to implement fencing; starting with a stop request via RPC (remote procedure call) to a STONITH (or “shoot the other node in the head”) implemented by issuing a reboot remotely or (programmatically) cutting power to a machine for a short duration.

When using HA, since the standby NameNode takes over the role of the Secondary NameNode, no separate Secondary NameNode process is necessary.

Inherent Security Issues with HDFS Architecture

After reviewing HDFS architecture, you can see that this is not the traditional client/server model of processing data we are all used to. There is no server to process the data, authenticate the users, or manage locking. There was no security gateway or authentication mechanism in the original Hadoop design. Although Hadoop now has strong authentication built in (as you shall see later), complexity of integration with existing corporate systems and role-based authorization still presents challenges.

Any user with access to the server running NameNode processes and having execute permissions to the Hadoop binaries can potentially request data from NameNode and request deletion of that data, too! Access is limited only by Hadoop directory and file permissions; but it’s easy to impersonate another user (in this case a Hadoop superuser) and access everything. Moreover, Hadoop doesn’t enable you to provide role-based access or object-level access, or offer enough granularity for attribute-level access (for a particular object). For example, it doesn’t offer special roles with ability to run specific Hadoop daemons (or services). There is an all-powerful Hadoop superuser in the admin role, but everyone else is a mere mortal. Users simply have access to connect to HDFS and access all files, unless file access permissions are specified for specific owners or groups.

Therefore, the flexibility that Hadoop architecture provides also creates vulnerabilities due to lack of a central authentication mechanism. Because data is spread across a large number of DataNodes, along with the advantages of distributed storage and processing, the DataNodes also serve as potential entry points for attacks and need to be secured well.

Hadoop clients perform metadata operations such as create file and open file at the NameNode using RPC protocol and read/write the data of a file directly from DataNodes using a streaming socket protocol called the data-transfer protocol. It is possible to encrypt communication done via RPC protocol easily through Hadoop configuration files, but encrypting the data traffic between DataNodes and client requires use of Kerberos or SASL (Simple Authentication and Security Layer) framework.

The HTTP communication between web consoles and Hadoop daemons (NameNode, Secondary NameNode, DataNode, etc.) is unencrypted and unsecured (it allows access without any form of authentication by default), as seen in Figure 2-5. So, it’s very easy to access all the cluster metadata. To summarize, the following threats exist for HDFS due to its architecture:

  • An unauthorized client may access an HDFS file or cluster metadata via the RPC or HTTP protocols (since the communication is unencrypted and unsecured by default).
  • An unauthorized client may read/write a data block of a file at a DataNode via the pipeline streaming data-transfer protocol (again, unencrypted communication).
  • A task or node may masquerade as a Hadoop service component (such as DataNode) and modify the metadata or perform destructive activities.
  • A malicious user with network access could intercept unencrypted internode communications.
  • Data on failed disks in a large Hadoop cluster can leak private information if not handled properly.

9781430265443_Fig02-05.jpg

Figure 2-5. Hadoop communication protocols and vulnerabilities

When Hadoop daemons (or services) communicate with each other, they don’t verify that the other service is really what it claims to be. So, it’s easily possible to start a rogue TaskTracker to get access to data blocks. There are ways to have Hadoop services perform mutual authentication; but Hadoop doesn’t implement them by default and they need configuration changes as well as some additional components to be installed. Figure 2-5 summarizes these threats.

We will revisit the security issues in greater detail (with pertinent solutions) in Chapters 4 and 5 (which cover authentication and authorization) and Chapter 8 (which focuses on encryption). For now, turn your attention to the other major Hadoop component: the framework for processing large amounts of data in parallel using MapReduce paradigm.

Hadoop’s Job Framework using MapReduce

In earlier sections, we reviewed one aspect of Hadoop: HDFS, which is responsible for distributing (and storing) data across multiple DataNodes. The other aspect is distributed processing of that data; this is handled by Hadoop’s job framework, which uses MapReduce.

MapReduce is a method for distributing a task across multiple nodes. Each node processes data stored on that node (where possible). It consists of two phases: Map and Reduce. The Map task works on a split or part of input data (a key-value pair), transforms it, and outputs the transformed intermediate data. Then there is a data exchange between nodes in a shuffle (sorting) process, and intermediate data of the same key goes to the same Reducer.

When a Reducer receives output from various mappers, it sorts the incoming data using the key (of the key-value pair) and groups together all values for the same key. The reduce method is then invoked (by the Reducer). It generates a (possibly empty) list of key-value pairs by iterating over the values associated with a given key and writes output to an output file.

The MapReduce framework utilizes two Hadoop daemons (JobTracker and TaskTracker) to schedule and process MapReduce jobs. The JobTracker runs on the master node (usually the same node that’s running NameNode) and manages all jobs submitted for a Hadoop cluster. A JobTracker uses a number of TaskTrackers on slave nodes (DataNodes) to process parts of a job as required.

A task attempt is an instance of a task running on a slave (TaskTracker) node. Task attempts can fail, in which case they will be restarted. Thus there will be at least as many task attempts as there are tasks that need to be performed.

Subsequently, a MapReduce program results in the following steps:

  1. The client program submits a job (data request) to Hadoop.
  2. The job consists of a mapper, a reducer, and a list of inputs.
  3. The job is sent to the JobTracker process on the master node.
  4. Each slave node runs a process called the TaskTracker.
  5. The JobTracker instructs TaskTrackers to run and monitor tasks (a Map or Reduce task for input data).

Figure 2-6 illustrates Hadoop’s MapReduce framework and how it processes a job.

9781430265443_Fig02-06.jpg

Figure 2-6. MapReduce framework and job processing

Task processes send heartbeats to the TaskTracker. TaskTrackers send heartbeats to the JobTracker. Any task that fails to report in 10 minutes is assumed to have failed and is killed by the TaskTracker. Also, any task that throws an exception is said to have failed.

Failed tasks are reported to the JobTracker by the TaskTracker. The JobTracker reschedules any failed tasks and tries to avoid rescheduling the task on the same TaskTracker where it previously failed. If a task fails more than four times, the whole job fails. Any TaskTracker that fails to report in 10 minutes is assumed to have crashed and all assigned tasks restart on another TaskTracker node.

Any TaskTracker reporting a high number of failed tasks is blacklisted (to prevent the node from blocking the entire job). There is also a global blacklist for TaskTrackers that fail on multiple jobs. The JobTracker manages the state of each job and partial results of failed tasks are ignored.

Figure 2-7 shows how the MapReduce paradigm works for input key-value pairs and results in a reduced output.

9781430265443_Fig02-07.jpg

Figure 2-7. MapReduce processing for a job

A detailed coverage of MapReduce is beyond this book’s coverage; interested readers can refer to Pro Hadoop by Jason Venner (Apress, 2009). Jason introduces MapReduce in Chapter 2 and discusses the anatomy of a MapReduce program at length in Chapter 5. Each of the components of MapReduce is discussed in great detail, offering an in-depth understanding.

Apache Hadoop YARN

The MapReduce algorithm used by earlier versions of Hadoop wasn’t sufficient in many cases for scenarios where customized resource handling was required. With YARN, Hadoop now has a generic distributed data processing framework (with a built-in scheduler) that can be used to define your own resource handling. Hadoop MapReduce is now just one of the distributed data processing applications that can be used with YARN.

YARN allocates the two major functionalities of the JobTracker (resource management and job scheduling/monitoring) to separate daemons: a global ResourceManager and a per-application ApplicationMaster. The ResourceManager and NodeManager (which runs on each “slave” node) form a generic distributed data processing system in conjunction with the ApplicationMaster.

ResourceManager is the overall authority that allocates resources for all the distributed data processing applications within a cluster. ResourceManager uses a pluggable Scheduler (of your choice—e.g., Fair or first-in, first-out [FIFO] scheduler) that is responsible for allocating resources to various applications based on their need. This Scheduler doesn’t perform monitoring, track status, or restart failed tasks.

The per-application ApplicationMaster negotiates resources from the ResourceManager, works with the NodeManager(s) to execute the component tasks, tracks their status, and monitors their progress. This functionality was performed earlier by TaskTracker (plus the scheduling, of course).

The NodeManager is responsible for launching the applications’ containers, monitoring their resource usage (CPU, memory, disk, network), and reporting it to the ResourceManager.

So, what are the differences between MapReduce and YARN? As cited earlier, YARN splits the JobTracker functionalities to ResourceManager (scheduling) and Application Master (resource management). Interestingly, that also moves all the application-framework-specific code to ApplicationMaster, generalizing the system so that multiple distributed processing frameworks such as MapReduce, MPI (Message Passing Interface, a message-passing system for parallel computers, used in development of many scalable large-scale parallel applications) and Graph Processing can be supported.

Inherent Security Issues with Hadoop’s Job Framework

The security issues with the MapReduce framework revolve around the lack of authentication within Hadoop, the communication between Hadoop daemons being unsecured, and the fact that Hadoop daemons do not authenticate each other. The main security concerns are as follows:

  • An unauthorized user may submit a job to a queue or delete or change priority of the job (since Hadoop doesn’t authenticate or authorize and it’s easy to impersonate a user).
  • An unauthorized client may access the intermediate data of a Map job via its TaskTracker’s HTTP shuffle protocol (which is unencrypted and unsecured).
  • An executing task may use the host operating system interfaces to access other tasks and local data, which includes intermediate Map output or the local storage of the DataNode that runs on the same physical node (data at rest is unencrypted).
  • A task or node may masquerade as a Hadoop service component such as a DataNode, NameNode, JobTracker, TaskTracker, etc. (no host process authentication).
  • A user may submit a workflow (using a workflow package like Oozie) as another user (it’s easy to impersonate a user).

As you remember, Figure 2-6 illustrated how the MapReduce framework processes a job. Comparing Figure 2-6 with Figure 2-8 will give you a better insight into the security issues with the MapReduce framework. Figure 2-8 details the security issues in the same context: job execution.

9781430265443_Fig02-08.jpg

Figure 2-8. MapReduce framework vulnerabilities

Hadoop’s Operational Security Woes

The security issues discussed so far stem from Hadoop’s architecture and are not operational issues that we have to deal with on a daily basis. Some issues arise from Hadoop’s relative newness and origins in isolated laboratories with insulated, secure environments. There was a time when Hadoop was “secured” by severely restricting network access to Hadoop clusters. Any access request had to be accompanied by several waivers from security departments and the requestor’s own management hierarchy!

Also, some existing technologies have not had time to build interfaces or provide gateways to integrate with Hadoop. For example, a few features that are missing right now may even have been added by the time you read this. Like Unix of yesteryear, Hadoop is still a work in progress and new features as well as new technologies are added on a daily basis. With that in mind, consider some operational security challenges that Hadoop currently has.

Inability to Use Existing User Credentials and Policies

Suppose your organization uses single sign-on or active directory domain accounts for connecting to the various applications used. How can you use them with Hadoop? Well, Hadoop does offer LDAP (Lightweight Directory Access Protocol) integration, but configuring it is not easy, as this interface is still in a nascent stage and documentation is extremely sketchy (in some cases there is no documentation). The situation is compounded by Hadoop being used on a variety of Linux flavors, and issues vary by operating system used and its versions. Hence, allocating selective Hadoop resources to active directory users is not always possible.

Also, how can you enforce existing access control policies such as read access for application users, read/write for developers, and so forth? The answer is that you can’t. The easiest way is to create separate credentials for Hadoop access and reestablish access control manually, following the organizational policies. Hadoop follows its own model for security, which is similar (in appearance) to Linux and confuses a lot of people. Hadoop and the Hadoop ecosystem combine many components with different configuration endpoints and varied authorization methods (POSIX file-based, SQL database-like), and this can present a big challenge in developing and maintaining security authorization policy. The community has projects to address these issues (e.g., Apache Sentry and Argus), but as of this writing no comprehensive solution exists.

Difficult to Integrate with Enterprise Security

Most of the organizations use an enterprise security solution for achieving a variety of objectives. Sometimes it is to mitigate the risk of cyberattacks, for security compliance, or for simply establishing customer trust. Hadoop, however, can’t integrate with any of these security solutions. It may be possible to write a custom plug-in to accommodate Hadoop; but it may not be possible to have Hadoop comply with all the security policies.

Unencrypted Data in Transit

Hadoop is a distributed system and hence consists of several nodes (such as NameNode and a number of DataNodes) with data communication between them. That means data is transmitted over the network, but it is not encrypted. This may be sensitive financial data such as account information or personal data (such as a Social Security number), and it is open to attacks.

Internode communication in Hadoop uses protocols such as RPC, TCP/IP, and HTTP. Currently, only RPC communication can be encrypted easily (that’s communication between NameNode, JobTracker, DataNodes, and Hadoop clients), leaving the actual read/write of file data between clients and DataNodes (TCP/IP) and HTTP communication (web consoles, communication between NameNode/Secondary NameNode and MapReduce shuffle data) open to attacks.

It is possible to encrypt TCP/IP or HTTP communication; but that needs use of Kerberos or SASL (Simple Authentication and Security Layer) frameworks. Also, Hadoop's built-in encryption has a very negative impact on performance and is not widely used.

No Data Encryption at Rest

At rest, data is stored on disk. Hadoop doesn’t encrypt data that’s stored on disk and that can expose sensitive data to malevolent attacks. Currently, no codec or framework is provided for this purpose. This is especially a big issue due to the nature of Hadoop architecture, which spreads data across a large number of nodes, exposing the data blocks at all those unsecured entry points.

There are a number of choices for implementing encryption at rest with Hadoop; but they are offered by different vendors and rely on their distributions for implementing encryption. Most notable was the Intel Hadoop distribution that provided encryption for data stored on disk and used Apache as well as custom codecs for encrypting data. Some of that functionality is proposed to be available through Project Rhino (an Apache open source project).

You have to understand that since Hadoop usually deals with large volumes of data and encryption/decryption takes time, it is important that the framework used performs the encryption/decryption fast enough, so that it doesn’t impact performance. The Intel distribution claimed to perform these operations with great speed—provided Intel CPUs were used along with Intel disk drives and all the other related hardware.

Hadoop Doesn’t Track Data Provenance

There are situations where a multistep MapReduce job fails at an intermediate step, and since the execution is often batch oriented, it is very difficult to debug the failure because the output data set is all that’s available.

Data provenance is a process that captures how data is processed through the workflow and aids debugging by enabling backward tracing—finding the input data that resulted in output for any given step. If the output is unusual (or not what was expected), backward tracing can be used to determine the input that was processed.

Hadoop doesn’t provide any facilities for data provenance (or backward tracing); you need to use a third-party tool such as RAMP if you require data provenance. That makes troubleshooting job failures really hard and time consuming.

This concludes our discussion of Hadoop architecture and the related security issues. We will discuss the Hadoop Stack next.

The Hadoop Stack

Hadoop core modules and main components are referred to as the Hadoop Stack. Together, the Hadoop core modules provide the basic working functionality for a Hadoop cluster. The Hadoop Common module provides the shared libraries, and HDFS offers the distributed storage and functionality of a fault-tolerant file system. MapReduce or YARN provides the distributed data processing functionality. So, without all the bells and whistles, that’s a functional Hadoop cluster. You can configure a node to be the NameNode and add a couple of DataNodes for a basic, functioning Hadoop cluster.

Here’s a brief introduction to each of the core modules:

  • Hadoop Common: These are the common libraries or utilities that support functioning of other Hadoop modules. Since the other modules use these libraries heavily, this is the backbone of Hadoop and is absolutely necessary for its working.
  • Hadoop Distributed File System (HDFS): HDFS is at the heart of a Hadoop cluster. It is a distributed file system that is fault tolerant, easily scalable, and provides high throughput using local processing and local data storage at the data nodes. (I have already discussed HDFS in great detail in the “HDFS” section).
  • Hadoop YARN: YARN is a framework for job scheduling and cluster resource management. It uses a global resource manager process to effectively manage data processing resources for a Hadoop cluster in conjunction with Node Manager on each data node.

    The resource manager also has a pluggable scheduler (any scheduler can be used such as the FIFO or Fair scheduler) that can schedule jobs and works with the Application Master Process on DataNodes. It uses MapReduce as a distributed data processing algorithm by default, but can also use any other distributed processing application as required.

  • Hadoop MapReduce: A YARN-based system for parallel processing of large data sets. MapReduce is the algorithm that takes “processing to data.” All the data nodes can process maps (transformations of input to desired output) and reduce (sorting and merging of output) locally, independently and in parallel, to provide the high throughput that’s required for very large datasets. I have discussed MapReduce in detail earlier in the “Hadoop’s Job Framework using MapReduce” section.

So, you now know what the Hadoop core modules are, but how do they relate to each other to form a cohesive system with the expected functionality? Figure 2-9 illustrates the interconnections.

9781430265443_Fig02-09.jpg

Figure 2-9. Hadoop core modules and their interrelations

As you can see, the two major aspects of Hadoop are distributed storage and distributed data processing. You can also see clearly the dependency of both these aspects on Hadoop Common libraries and the operating system. Hadoop is like any other application that runs in the context of the operating system. But then what happens to the security? Is it inherited from the operating system? Well, that’s where the problem is. Security is not inherited from the operating system and Hadoop’s security, while improving, is still immature and difficult to configure. You therefore have to find ways to authenticate, authorize, and encrypt data within your Hadoop cluster. You will learn about those techniques in Chapters 4, 5, 8, and 9.

Lastly, please note that in the real world, it is very common to have NameNode (which manages HDFS processing) and JobTracker (which manages job processing) running on the same node. So, Figure 2-9 only indicates a logical division of processing; it may not necessarily be true in case of physical implementation.

Main Hadoop Components

As you saw in the last section, Hadoop core modules provide basic Hadoop cluster functionality, but the main components are not limited to core modules. After all, a basic Hadoop cluster can’t be used as a production environment. Additional functionality such as ETL and bulk-load capability from other (non-Hadoop) data sources, scheduling, fast key-based retrieval, and query capability (for data) are required for any data storage and management system. Hadoop’s main components provide these missing capabilities as well.

For example, the Pig component provides a data flow language useful for designing ETL. Sqoop provides a way to transfer data between HDFS and relational databases. Hive provides query capability with an SQL-like language. Oozie provides scheduling functionality, and HBase adds columnar storage for massive data storage and fast key-based retrieval. Table 2-1 lists some popular components along with their usage.

Table 2-1. Popular Hadoop Components

Component

Description

Notes

HBase

HBase is an open source, distributed, versioned, column-oriented data store.

It can be used to store large volumes of structured and unstructured data. It provides key-based access to data and hence can retrieve data very quickly. It is highly scalable and uses HDFS for data storage. Real strengths of HBase are its ability to store unstructured schema-less data and retrieve it really fast using the row keys.

Hive

Hive provides a SQL-like query language (HiveQL) that can be used to query HDFS data.

Hive converts the queries to MapReduce jobs, runs them, and displays the results. Hive “tables” are actually files within HDFS. Hive is suited for data warehouse use, as it doesn’t support row-level inserts, updates, or deletes. Over 95% of Facebook’s Hadoop jobs are now driven by a Hive front end.

Pig

Pig is a data flow language that can effectively be used as an ETL system for warehousing environments.

Like actual pigs, which eat almost anything, the Pig programming language is designed to handle any kind of data—hence the name. Using Pig, you can load HDFS data you want to manipulate, run the data through a set of transformations (which, behind the scenes, are translated into MapReduce tasks), and display the results on screen or write them to a file.

Sqoop

Sqoop provides connectivity with relational databases (Microsoft SQL Server, Oracle, MySQL, etc.), data warehouses, as well as NoSQL databases (Cassandra, HBase, MongoDB, etc.).

It is easy to transfer data between HDFS (or Hive/HBase tables) and any of these data sources using Sqoop “connectors.” Sqoop integrates with Oozie to schedule data transfer tasks. Sqoop’s first version was a command-line client; but Sqoop2 has a GUI front end and a server that can be used with multiple Sqoop clients.

Oozie

Oozie is a workflow scheduler, meaning it runs jobs based on workflow. In this context, workflow is a collection of actions arranged in a control dependency DAG (Direct Acyclic Graph).

Control dependency between actions simply defines the sequence of actions; for example, the second action can’t start until the first action is completed. DAG refers to a loopless graph that has a starting point and an end point and proceeds in one direction without ever reversing.

To summarize, Oozie simply executes actions or jobs (considering the dependencies) in a predefined sequence. A following step in the sequence is not started unless Oozie receives a completion response from the remote system executing the current step or job. Oozie is commonly used to schedule Pig or Sqoop workflows and integrates well with them.

Flume

Flume is a distributed system for moving large amounts of data from multiple sources (while transforming or aggregating it as needed) to a centralized destination or a data store.

Flume has sources, decorators, and sinks. Sources are data sources such as log files, output of processes, traffic at a TCP/IP port, etc., and Flume has many predefined sources for ease of use. Decorators are operations on the source stream (e.g. compress or un-compress data, adding or removing certain characters from data stream, grouping and averaging numeric data etc.). Sinks are targets such as text files, console displays, or HDFS files. A popular use of Flume is to move diagnostic or job log files to a central location and analyze using keywords (e.g., “error” or “failure”).

Mahout

Mahout is a machine learning tool

Remember how Amazon or Netflix recommends products when you visit their sites based on your browsing history or prior purchases? That’s Mahout or a similar machine-learning tool in action, coming up with the recommendations using what’s termed collaborative filtering—one of the machine-learning tasks Mahout uses that generates recommendations based on a user’s clicks, ratings, or past purchases.

Mahout uses several other techniques to “learn” or make sense of data, and it provides excellent means to develop machine-learning or data-mining libraries that are highly scalable (i.e., they can be still be used in case data volumes change astronomically).

You might have observed that no component is dedicated to providing security. You will need to use open source products, such as Kerberos and Sentry, to supplement this functionality. You’ll learn more about these in Chapters 4 and 5.

It is important to have this brief introduction to the main components, as I am assuming usage of an “extended” Hadoop cluster (core modules and main components) throughout the book while discussing security implementation as well as use of monitoring (Chapter 7), logging (Chapter 6), or encryption (Chapters 8 and 9).

Summary

This chapter introduced Hadoop’s architecture, core modules, main components, and inherent security issues.

Hadoop is not a perfectly secure system, but what is? And how does Hadoop compare with it? What modifications will you need to make to Hadoop in order to make it a secure system? Chapter 1 briefly outlined a model secure system (SQL Server), and I will discuss how to secure Hadoop in Chapters 4 to 8 using various techniques.

In later chapters, you will also learn how a Hadoop cluster uses the Hadoop Stack (Hadoop core modules and main components together) presented here. Understanding the workings of the Hadoop Stack will also make it easier for you to understand the solutions I am proposing to supplement security. The next chapter provides an overview of the solutions I will discuss throughout the book. Chapter 3 will also help you decide which specific solutions you want to focus on and direct you to the chapter where you can find the details you need.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset