MapReduce is a processing framework originally developed by Google in the early 2000s for performing web page searches across thousands of physically separated machines. The MapReduce approach is extremely general. Complete MapReduce systems can be implemented in a variety of languages although the most significant implementation is in Java. MapReduce is really a UDF (user defined function) execution framework, where the “F” can be extraordinarily complex. Originally targeted to building Google’s webpage search index, a MapReduce job can be defined for virtually any data structure and any application. The target processors that actually perform the requested computation can be identical (a “cluster”), or can be a heterogeneous mix of processor types (a “grid”). The data in each processor upon which the ultimate computation is performed can be stored in a database, or more commonly in a file system, and can be in any digital format.
The most significant implementation of MapReduce is Apache Hadoop, known simply as Hadoop. Hadoop is an open source, top-level Apache project, with thousands of contributors and a whole industry of diverse applications. Hadoop runs natively on its own distributed file system (HDFS) and can also read and write to Amazon S3 and others. Conventional database vendors are also implementing interfaces to allow Hadoop jobs to be run over massively distributed instances of their databases.
As we will see when we give a brief overview of how a Hadoop job works, bandwidth between the separate processors can be a huge issue. HDFS is a so-called “rack aware” file system because the central name node knows which nodes reside on the same rack and which are connected by more than one network hop. Hadoop exploits the relationship between the central job dispatcher and HDFS to significantly optimize a massively distributed processing task by having detailed knowledge of where data actually resides. This also implies that a critical aspect of performance control is co-locating segments of data on actual physical hardware racks so that the MapReduce communication can be accomplished at backplane speeds rather than slower network speeds. Note that remote cloud-based file systems such as Amazon S3 and CloudStore are, by their nature, unable to provide the rack aware benefit. Of course, cloud-based file systems have a number of compelling advantages which we’ll discuss later.
How MapReduce works in Hadoop
A MapReduce job is submitted to a centralized JobTracker, which in turn schedules parts of the job to a number of TaskTracker nodes. Although, in general a TaskTracker may fail and its task can be reassigned by the JobTracker, the JobTracker is a single point of failure. If the JobTracker halts, the MapReduce job must be restarted or be resumed from intermediate snapshots.
A MapReduce job is always divided into two distinct phases, map and reduce. The overall input to a MapReduce job is divided into many equal sized splits, each of which is assigned a map task. The map function is then applied to each record in each split.
For large jobs, the job tracker schedules these map tasks in parallel. The overall performance of a MapReduce job depends significantly on achieving a balance of enough parallel splits to keep many machines busy, but not so many parallel splits that the interprocess communication of managing all the splits bogs down the overall job. When MapReduce is run over the HDFS file system, a typical default split size is 64 MB of input data.
As the name suggests, the map task is the first half of the MapReduce job. Each map task produces a set of intermediate result records which are written to the local disk of the machine performing the map task. The second half of the MapReduce job, the reduce task, may run on any processing node. The outputs of the mappers (nodes running map tasks) are sorted and partitioned in such a way that these outputs can be transferred to the reducers (nodes running the reduce task). The final outputs of the reducers comprise the sorted and partitioned results set of the overall MapReduce job. In MapReduce running over HDFS, the results set is written to HDFS and is replicated for reliability.
In Figure 3, we show this task flow for a MapReduce job with three mapper nodes feeding two reducer nodes, by reproducing figure 2.3 from Tom White’s book, Hadoop, The Definitive Guide, 2nd Edition, (O’Reilly, 2010).
Figure 3. An example MapReduce job
In Tom White’s book, a simple MapReduce job is described which we extend somewhat here. Suppose that the original data before the splits are applied consists of a very large number (perhaps billions) of unsorted temperature measurements, one per record. Such measurements could come from many thousands of automatic sensors located around the United States. The splits are assigned to the separate mapper nodes to equalize as much as possible the number of records going to each node. The actual form of the mapper inputs are key-value pairs, in this case a sequential record identifier and the full record containing the temperature measurements as well as other data. The job of each mapper is simply to parse the records presented to it and extract the year, the state, and the temperature, which becomes the second set of key-value pairs passed from the mapper to the reducer.
The job of each reducer is to find the maximum reported temperature for each state, and each distinct year in the records passed to it. Each reducer is responsible for a state, so in order to accomplish the transfer, the output of each mapper must be sorted so that the key-value pairs can be dispatched to the appropriate reducers. In this case there would be 50 reducers, one for each state. These sorted blocks are then transferred to the reducers in a step which is a critical feature of the MapReduce architecture, where it is called the “shuffle.”
Notice that the shuffle involves a true physical transfer of data between processing nodes. This makes the value of the rack aware feature more obvious, since a lot of data needs to be moved from the mappers to the reducers. The clever reader may wonder if this data transfer could be reduced by having the mapper outputs combined so that many readings from a single state and year are given to the reducer as a single key-value pair rather than many. The answer is “yes,” and Hadoop provides a combiner function to accomplish exactly this end.
Each reducer receives a large number of state/year-temperature key-value pairs, and finds the maximum temperature for a given year. These maximum temperatures for each year are the final output from each reducer.
This approach can be scaled more or less indefinitely. Really serious MapReduce jobs running on HDFS may have hundreds or thousands of mappers and reducers, processing petabytes of input data.
At this point the appeal of the MapReduce/Hadoop approach should be clear. There are virtually no restrictions on the form of the inputs to the overall job. There only needs to be some rational basis for creating splits and reading records, in this case the record identifier in Tom White’s example. Actual logic in the mappers and the reducers can be programmed in virtually any programming language and can be as simple as the above example, or much more complicated UDFs. The reader should be able to visualize how some of the more complex use cases (e.g., comparison of satellite images) described earlier in the paper could fit into this framework.
Tools for the Hadoop environment
What we have described thus far is the core processing component when MapReduce is run in the Hadoop environment. This is roughly equivalent to describing the inner processing loop in a relational database management system. In both cases there’s a lot more to these systems to implement a complete functioning environment. The following is a brief overview of typical tools used in a MapReduce/Hadoop environment. We group these tools by overall function. Tom White’s book, mentioned above, is an excellent starting point for understanding how these tools are used.
Getting data in and getting data out
- ETL platforms — ETL platforms, with their long history of importing and exporting data to relational databases, provide specific interfaces for moving data into and out of HDFS. The platform-based approach, as contrasted with hand coding, provides extensive support for metadata, data quality, documentation, and a visual style of system building.
- Sqoop – Sqoop, developed by Cloudera, is an open source tool that allows importing data from a relational source to HDFS and exporting data from HDFS to a relational target. Data imported by Sqoop into HDFS can be used both by MapReduce applications and HBase applications. HBase is described below.
- Scribe – Scribe, developed at Facebook and released as open source, is used to aggregate log data from a large number of Web servers.
- Flume – Flume, developed by Cloudera, is a distributed reliable streaming data collection service. It uses a central configuration managed by Zookeeper and supports tunable reliability and automatic failover and recovery.
- Low-level MapReduce programming — primary code for mappers and reducers can be written in a number of languages. Hadoop’s native language is Java but Hadoop exposes APIs for writing code in other languages such as Ruby and Python. An interface to C++ is provided, which is named Hadoop Pipes. Programming MapReduce at the lowest level obviously provides the most potential power, but this level of programming is very much like assembly language programming. It can be very laborious, especially when attempting to do conceptually simple tasks like joining two data sets.
- High level MapReduce programming — Apache Pig, or simply Pig, is a client-side open-source application providing a high level programming language for processing large data sets in MapReduce. The programming language itself is called Pig Latin. Hive is an alternative application designed to look much more like SQL, and is used for data warehousing use cases. When employed for the appropriate use cases, Pig and the Hive provide enormous programming productivity benefits over low-level MapReduce programming, often by a factor of 10 or more. Pig and Hive lift the application developer’s perspective up from managing the detailed mapper and reducer processes to more of an applications focus.
- Integrated development environment – MapReduce/Hadoop development needs to move decisively away from bare hand coding to be adopted by mainstream IT shops. An integrated development environment for MapReduce/Hadoop needs to include editors for source code, compilers, tools for automating system builds, debuggers, and a version control system.
- Integrated application environment – an even higher layer above an integrated development environment could be called an integrated application environment, where complex reusable analytic routines are assembled into complete applications via a graphical user interface. This kind of environment might be able to use open source algorithms such as provided by the Apache Mahout project which distributes machine learning algorithms on Hadoop platform.
- Cascading — Cascading is another tool that is an abstraction layer for writing complex MapReduce applications. It is best described as a thin Java library typically invoked from command line to be used as a query API and process scheduler. It is not intended to be a comprehensive alternative to Pig or Hive.
- HBase — HBase is an open-source, nonrelational, column oriented database that runs directly on Hadoop. It is not a MapReduce implementation. A principal differentiator of HBase from Pig or Hive (MapReduce implementations) is the ability to provide real-time read and write random-access to very large data sets.
- Oozie — Oozie is a server-based workflow engine specialized in running workflow jobs with actions that execute Hadoop jobs, such as MapReduce, Pig, Hive, Sqoop, HDFS operations, and sub-workflows.
- ZooKeeper – ZooKeeper is a centralized configuration manager for distributed applications. Zookeeper can be used independently of Hadoop as well.
- Embedded Hadoop admin features – Hadoop supports a comprehensive runtime environment including edit log, safe mode operation, audit logging, filesystem check, data node block verifier, data node block distribution balancer, performance monitor, comprehensive log files, metrics for administrators, counters for MapReduce users, metadata backup, data backup, filesystem balancer, commissioning and decommissioning nodes.
- Java management extensions – a standard Java API for monitoring and managing applications.
- GangliaContext – an open source distributed monitoring system for very large clusters.
Feature convergence in the coming decade
It is safe to say that relational database management systems and MapReduce/Hadoop systems will increasingly find ways to coexist gracefully in the coming decade. But the systems have distinct characteristics, as depicted in the following table:
In the upcoming decade RDBMSs will extend their support for hosting complex data types as “blobs”, and will extend APIs for arbitrary analytic routines to operate on the contents of records. MapReduce/Hadoop systems, especially Hive, will deepen their support for SQL interfaces and fuller support of the complete SQL language. But neither will take over the market for big data analytics exclusively. As remarked earlier, RDBMSs cannot provide “relational” semantics for many of the complex use cases required by big data analytics. At best, RDBMSs will provide relational structure surrounding the complex payloads.
Similarly, MapReduce/Hadoop systems will never take over ACID-compliant transaction processing, or become superior to RDBMSs for indexed queries on row and column oriented tables.
As this paper is being written, significant advances are being made in developing hybrid systems using both relational database technology and MapReduce/Hadoop technology. Figure 4 illustrates two primary alternatives. The first alternative delivers the data directly into a MapReduce/Hadoop configuration for primary non-relational analysis. As we have described, this analysis can range the full gamut from complex analytical routines to simple sorting that looks like a conventional ETL step. When the MapReduce/Hadoop step is complete, the results are loaded into an RDBMS for conventional structured querying with SQL.
The second alternative configuration loads the data directly to an RDBMS, even when the primary data payloads are not conventional scalar measurements. At that point two analysis modes are possible. The data can be analyzed with specially crafted user-defined functions, effectively from the BI layer, or passed to a downstream MapReduce/Hadoop application.
In the future even more complex combinations will tie these architectures more closely together, including MapReduce systems whose mappers and reducers are actually relational databases, and relational database systems whose underling storage consists of HDFS files.
Figure 4. Alternative hybrid architectures using both RDBMS and Hadoop.
It will probably be difficult for IT organizations to sort out the vendor claims which will almost certainly claim that their systems do everything. In some cases these claims are “objection removers” which means that they are claims that have a grain of truth to them, and are made to make you feel good, but do not stand up to scrutiny in a competitive and practical environment. Buyer beware!
Up to this point we have begged the issue of where does all the special analytic software come from. Big data analytics will never prosper if every instance is a custom coded solution. Both the RDBMS and the open-source communities recognize this and two main development themes have emerged. High-end statistical analysis vendors, such as SAS, have developed extensive and proprietary reusable libraries for a wide range of analytic applications, including advanced statistics, data mining, predictive analytics, feature detection, linear models, discriminant analysis, and many others. The open source community has a number of initiatives, the most notable of which are Hadoop-ML and Apache Mahout. Quoting from Hadoop-ML’s website:
“ Hadoop-ML (is) an infrastructure to facilitate the implementation of parallel machine learning/data mining (ML/DM) algorithms on Hadoop. Hadoop-ML has been designed to allow for the specification of both task-parallel and data-parallel ML/DM algorithms. Furthermore, it supports the composition of parallel ML/DM algorithms using both serial as well as parallel building blocks — this allows one to write reusable parallel code. The proposed abstraction eases the implementation process by requiring the user to only specify computations and their dependencies, without worrying about scheduling, data management, and communication. As a consequence, the codes are portable in that the user never needs to write Hadoop-specific code. This potentially allows one to leverage future parallelization platforms without rewriting one’s code.”
Apache Mahout provides free implementations of machine learning algorithms on Hadoop platform.
Complex event processing (CEP)
Complex event processing (CEP) consists of processing events happening inside and outside an organization to identify meaningful patterns in order to take subsequent action in real time. For example, CEP is used in utility networks (electrical, gas and water) to identify possible issues before they become detrimental. These CEP deployments allow for real-time intervention for critical network or infrastructure situations. The combination of deep DW analytics and CEP can be applied in retail customer settings to analyze behavior and identify situations where a company may lose a customer or be able to sell them additional products or services at the time of their direct engagement. In banking, sophisticated analytics might help to identify the 10 most common patterns of fraud and CEP can then be used to watch for those patterns so they may be thwarted before a loss.
At the time of this white paper, CEP is not generally thought of as part of the EDW, but this author believes that technical advances in continuous query processing will cause CEP and EDW to share data and work more closely together in the coming decade.