Notes on Hadoop


Notes on Hadoop

What is Hadoop?
Hadoop is an open source framework for writing and running distributed applications that process large amounts of data.
Hadoop is designed for data-intensive processing, and it adopts move-code-to-data philosophy.
Its main features are distributed storage and distributed computing.
1. Accessible, hadoop runs on large clusters of (low-end) commodity machines.
2. It's robust(Fault Tolerance). It can gracefully handle frequent hardware malfunction and failures.
3. It's scalable(Scalability), Hadoop scales linearly to handle larger data by adding more nodes to the cluster.
4. Simple—Hadoop allows users to quickly write efficient parallel code.
By using distributed storage and transferring code instead of data, Hadoop avoids the costly transmission step when working with large data sets. Moreover, the redundancy of data allows Hadoop to recover should a single node fail.
It's east to create programs with Hadoop MapReduce framework.
You don't have to worry about partitioning the data, determining which nodes will perform which tasks, or handling communication between nodes. Hadoop handles this for you, leaving you free to focus on what’s most important to you—your data and what you want to do with it.

Hadoop is a core part of the computing infrastructure for many web companies, such as Yahoo, Facebook, LinkedIn, and Twitte.
Compared with Parallel Databases
A few scales to low hundreds of nodes and about 5 PB
Primary design goal is “performance”
Requires homogeneous hardware
Anomalous behavior is not well tolerated:
A slow network can cause serious performance degradation
Most queries fail when one node fails
What is MapReduce?
MapReduce is a programming model and software framework for writing applications that rapidly process vast amounts of data in parallel on large clusters of compute nodes.

HDFS
Hadoop Distributed File System is the primary storage system used by Hadoop applications.
HDFS divides data set into smaller (typically 64 MB) blocks that are spread among many machines in the cluster.
HDFS creates multiple replicas of data blocks and distributes them on compute nodes throughout a cluster.
This way, the data can be read in parallel and it provides a much higher throughput, also it is much more reliable, as even when one node fails, it can still read the data's replica from other nodes.
And using HDFS is also much cheaper, as a cluster of commodity machines turns out to be much cheaper than one high-end server!
Computationally intensive processing vs data-intensive processing
Hadoop is designed for data-intensive processing, and it adopts move-code-to-data philosophy.
The programs to run (“code”) are much smaller than the data and are easier to move around.
Let the data remain where it is and move the executable code to its hosting machine.
The clients send only the MapReduce programs to be executed, and these programs are usually small (often in kilobytes).
Data is broken up and distributed across the cluster, and as much as possible, computation on a piece of data takes place on the same machine where that piece of data resides.
SCALE-OUT INSTEAD OF SCALE-UP
Hadoop is designed to be a scale-out architecture operating on a cluster of com-
modity PC machines. Adding more resources means adding more machines to the Hadoop cluster. Hadoop clusters with ten to hundreds of machines is standard.
KEY/VALUE PAIRS INSTEAD OF RELATIONAL TABLES
Hadoop uses key/value pairs as its basic data unit, which is flexible enough to work with the less-structured data types.
FUNCTIONAL PROGRAMMING (MAPREDUCE) INSTEAD OF DECLARATIVE QUERIES (SQL)
Hadoop use MapReduce, Under MapReduce you specify the actual steps in processing the data.
Under SQL you have query statements; under MapReduce you have scripts and codes.
OFFLINE BATCH PROCESSING INSTEAD OF ONLINE TRANSACTIONS
Hadoop is designed for offline processing and analysis of large-scale data. It doesn’t work for random reading and writing of a few records, which is the type of load for online transaction processing.
Hadoop is best used as a write-once, read-many-times type of data store.
SQL database is by design targeted at structured data. Many of Hadoop’s initial applications deal with unstructured data such as text.
Understanding MapReduce
Mapper, reducer, combiner, patitioner
MapReduce programs are executed in two main phases, called mapping and reducing. Each phase is defined by a data processing function, and these functions are called mapper and reducer, respectively. In the mapping phase, MapReduce takes the input data and feeds each data element to the mapper. In the reducing phase, the reducer processes all the outputs from the mapper and arrives at a final result.
In simple terms, the mapper is meant to filter and transform the input into something that the reducer can aggregate over.
The building blocks of Hadoop
NameNode
Hadoop employs a master/slave architecture, The NameNode is the master of HDFS that directs the slave DataNode daemons to perform the low-level I/O tasks. The NameNode is the bookkeeper of HDFS; it keeps track of how your files are broken down into file blocks, which nodes store those blocks, and the overall health of the distributed filesystem.
It doesn’t store any user data or perform any computations for a MapReduce program to lower the workload on the machine.
The NameNode keeps track of the file metadata—which files are in the system and how each file is broken down into blocks.
NameNode is a single point of failure of your Hadoop cluster.
DataNode
In HDFS file system, the file is broken into blocks and distributed among the DataNodes.
The NameNode will tell your client which DataNode each block resides in.
Your client communicates directly with the DataNode daemons to process the local files corresponding to the blocks. Furthermore, a DataNode may communicate with other DataNodes to replicate its data blocks for redundancy.
The DataNodes provide backup store of the blocks and constantly report to the NameNode to keep the metadata current.
Upon initialization, each of the DataNodes informs the NameNode of the blocks it’s currently storing. After this mapping is complete, the DataNodes continually poll the NameNode to provide information regarding local changes as well as receive instructions to create, move, or delete blocks from the local disk.
Secondary NameNode - SNN
The Secondary NameNode (SNN) is an assistant daemon for monitoring the state of the cluster HDFS.
The SNN differs from the NameNode in that this process doesn’t receive or record any real-time changes to HDFS. Instead, it communicates with the NameNode to take snapshots of the HDFS metadata at intervals defined by the cluster configuration.
The NameNode is a single point of failure for a Hadoop cluster, and the SNN snapshots help minimize the downtime and loss of data. Nevertheless, a NameNode failure requires human intervention to reconfigure the cluster to use the SNN as the primary NameNode
JobTracker
The JobTracker daemon is the liaison between your application and Hadoop. Once you submit your code to your cluster, the JobTracker determines the execution plan by determining which files to process, assigns nodes to different tasks, and monitors all tasks as they’re running. Should a task fail, the JobTracker will automatically relaunch the task, possibly on a different node, up to a predefined limit of retries.
There is only one JobTracker daemon per Hadoop cluster. It’s typically run on a server as a master node of the cluster.
TaskTracker
The computing daemons also follow a master/slave architecture: the JobTracker is the master overseeing the overall execution of a MapReduce job and the TaskTrackers manage the execution of individual tasks on each slave node.
Each TaskTracker is responsible for executing the individual tasks that the JobTracker assigns. Although there is a single TaskTracker per slave node, each TaskTracker can spawn multiple JVMs to handle many map or reduce tasks in parallel.
One responsibility of the TaskTracker is to constantly communicate with the
JobTracker. If the JobTracker fails to receive a heartbeat from a TaskTracker within a specified amount of time, it will assume the TaskTracker has crashed and will resubmit the corresponding tasks to other nodes in the cluster.
NameNode: Maps a file to a file-id and list of DataNodes
DataNode: Maps a block-id to a physical location on disk
SecondaryNameNode: Periodic merge of Transaction log
Distributed File System
Single Namespace for entire cluster
Data Coherency
Write-once-read-many access model
Client can only append to existing files
Files are broken up into blocks
Typically 128 MB block size
Each block replicated on multiple DataNodes
Intelligent Client
Client can find location of blocks
Client accesses data directly from DataNode
NameNode Metadata
Meta-data in Memory
The entire metadata is in main memory
No demand paging of meta-data
Types of Metadata
List of files
List of Blocks for each file
List of DataNodes for each block
File attributes, e.g creation time, replication factor
A Transaction Log
Records file creations, file deletions. etc
DataNode
A Block Server
Stores data in the local file system (e.g. ext3)
Stores meta-data of a block (e.g. CRC32)
Serves data and meta-data to Clients
- Periodic validation of checksums
Block Report
Periodically sends a report of all existing blocks to t
NameNode
Facilitates Pipelining of Data
Forwards data to other specified DataNodes
Block Placement
Current Strategy
-- One replica on local node
-- Second replica on a remote rack
-- Third replica on same remote rack
-- Additional replicas are randomly placed
Clients read from nearest replica
Pluggable policy for placing block replicas
Data Pipelining
Client writes block to the first DataNode
The first DataNode forwards the data to the next DataNode in the Pipeline, and so on
When all replicas are written, the Client moves on to write the next block in file
Topology
This topology features a master node running the NameNode and JobTracker daemons and a standalone node with the SNN in case the master node fails.
Hadoop can be run in standalone mode, Pseudo-distributed mode, Fully distributed mode.
Local (standalone) mode
The standalone mode is the default mode for Hadoop.
Because there’s no need to communicate with other nodes, the standalone mode doesn’t use HDFS, nor will it launch any of the Hadoop daemons. Its primary use is for developing and debugging the application logic of a MapReduce program without the additional complexity of interacting with the daemons.
Pseudo-distributed mode
The pseudo-distributed mode is running Hadoop in a “cluster of one” with all daemons running on a single machine.
This mode complements the standalone mode for debugging your code, allowing you to examine memory usage, HDFS input/output issues, and other daemon interactions.
Setup Hadoop cluster
Master node communicates with other nodes via passphraseless SSH.
The public key is stored locally on every node in the cluster, and the master node sends the private key when attempting to access a remote machine.
Define a common account on all nodes
Generate SSH key pair, distribute public key to other nodes, and validate logins.
scp ~/.ssh/id_rsa.pub hadoop-user@target:~/master_key
mv ~/master_key ~/.ssh/authorized_keys
chmod 600 ~/.ssh/authorized_keys

Running Hadoop
Switching between modes
Create a separate configuration folder for each of the modes and place the appropriate version of the XML files in the corresponding folder, then create symbolic links to switch between Hadoop modes
Web-based cluster UI
http://localhost:50070/
Working with files in HDFS
HDFS is a filesystem designed for large-scale distributed data processing under frameworks such as MapReduce. You can store a big data set of (say) 100 TB as a single file in HDFS.
As HDFS isn’t a native Unix filesystem, standard Unix file tools don’t work on it,
Hadoop does provide a set of command line utilities.
There is a project MountableHDFS that is trying to make HDFS mountable as a Unix filesystem.
Hadoop commands
Hadoop file commands can interact with both the HDFS filesystem and the local
filesystem.
Edit the script conf/hadoop-env.sh to set JAVA_HOME
Run in standlone mode
hadoop jar hadoop-examples-*.jar wordcount /home/essniuser/dscli/log output
hadoop namenode -format
jps,start-all.sh
hadoop fs -ls, -lsr
hadoop fs -cat hdfs://localhost:9000/user/chuck/example.txt
hadoop fs -mkdir /user/chuck, -put example.txt., -get example.txt.-cat example.txt, -cat example.txt | head, -tail example.txt, –rm example.txt
In general, Hadoop works more effectively with a single large file rather than a number of smaller ones.
Configuration conf = new Configuration(); // Factory method pattern
FileSystem hdfs = FileSystem.get(conf); // get HDFS fiel system
FileSystem local = FileSystem.getLocal(conf); // get local file system
Anatomy of a MapReduce program
a MapReduce program processes data by manipulating
(key/value) pairs in the general form
map: (K1,V1) -> list(K2,V2)
reduce: (K2,list(V2)) -> list(K3,V3)
After distributing input data to different nodes, the only time nodes communicate with each other is at the “shuffle” step. This restriction on communication greatly helps scalability.


Hadoop data types . The MapReduce framework won’t allow keys or values to be any arbitrary class.
This is because the MapReduce framework has a certain defined way of serializing the key/value pairs to move them across the cluster’s network, and only classes that support this kind of serialization can function as keys or values in the framework.
More specifically, classes that implement the Writable interface can be values, and classes that implement the WritableComparable interface can be either keys or values.
Predefined data types
BooleanWritable, ByteWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, Text, NullWritable
You can create your own custom type as long as it implements the Writable
(or WritableComparable) interface.
Mapper
To serve as the mapper, a class implements from the Mapper interface and inherits the MapReduceBase class
In Mapper, we need implement its map function, which generates a (possibly empty) list of (K2, V2) pairs for a given (K1, V1) input pair.
Predefined Mappper
IdentityMapper, InverseMapper, RegexMapper, TokenCountMapper
Reducer
void reduce(K2 key,Iterator values,OutputCollector output, Reporter reporter) throws IOException
When the reducer task receives the output from the various mappers, it sorts the incoming data on the key of the (key/value) pair and groups together all values of the same key. The reduce() function is then called, and it generates a (possibly empty) list of (K3, V3) pairs by iterating over the values associated with a given key.
IdentityReducer, LongSumReducer
Partitioner— redirecting output from Mapper
A common misconception for first-time MapReduce programmers is to use only a single reducer.
With multiple reducers, we need some way to determine the appropriate one to send a (key/value) pair outputted by a mapper. The default behavior is to hash the key to determine the reducer. Hadoop enforces this strategy by use of the HashPartitioner class, we can define our own Partitioner.
It defines method that returns an integer between 0 and the number of reduce tasks indexing to which reducer the (key/value) pair will be sent.
Shuffling
Between the map and reduce stages, a MapReduce application must take the output from the mapper tasks and distribute the results among the reducer tasks. This process is typically called shuffling, because the output of a mapper on a single node may be sent to reducers across multiple nodes in the cluster.
Combiner—local reduce
Perform a “local reduce“ before we distribute the mapper results, this can reduce the results/data that are distributed to reducers.
One of the fundamental principles of MapReduce’s processing power is the splitting of the input data into chunks. You can process these chunks in parallel using multiple machines.
HDFS provides efficient way to random read access.
HDFS stores files in blocks spread over multiple machines. Roughly speaking, each file block is a split. As different machines will likely have different blocks, parallelization is automatic if each split/block is processed by the machine that it’s residing at. Furthermore, as HDFS replicates blocks in multiple nodes for reliability, MapReduce can choose any of the nodes that have a copy of a split/block.
InputFormat
TextInputFormat: Key is the byte offset of the line, and value is the content of the line.
KeyValueTextInputFormat, SequenceFileInputFormat
NLineInputFormat: Same as TextInputFormat, but each split is guaranteed to have exactly N lines.
OutputFormat
The output has no splits, as each reducer writes its output only to its own file. The output files reside in a common directory and are typically named part-nnnnn, where nnnnn is the partition ID of the reducer.
RecordWriter
Objects format the output and RecordReaders parse the format of the input.
TextOutputFormat, SequenceFileOutputFormat, NullOutputFormat
Writing basic MapReduce programs
In general, MapReduce programs follows basic template, we generally take an existing MapReduce program and modify it until it does what we want.
Adapting for Hadoop’s API changes[How]
The 0.20 release supports the future
API while maintaining backward-compatibility with the old one by marking it as deprecated.
Streaming in Hadoop
Hadoop supports other languages via a generic API called Streaming.
Hadoop Streaming interacts with programs using the Unix streaming paradigm.
Inputs come in through STDIN and outputs go to STDOUT.
Streaming with Unix commands
Under Streaming, each mapper sees the entire stream of data, and it’s the mapper that takes on the responsibility of breaking the stream into (line-oriented) records.
In the standard Java model, the framework itself breaks input data into records, and gives the map() method only one record at a time. The Streaming model makes it easy to keep state information across records in a split.
hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar -input input/cite75_99.txt -output output -mapper 'cut -f 2 -d,' -reducer 'uniq'
hadoop jar hadoop-0.19.1-streaming.jar -input output -output output_a -mapper 'wc -l' -D mapred.reduce.tasks=0
We want the mapper to directly output the record count without any reducer, so we set mapred.reduce.tasks to 0 and don’t specify the -reducer option at all.
Streaming with scripts
hadoop jar hadoop-0.19.1-streaming.jar -input input/cite75_99.txt -output output -mapper 'RandomSample.py 10' -file RandomSample.py -D mapred.reduce.tasks=1
hadoop jar hadoop-0.18.1-streaming.jar -input input/apat63_99.txt -output output -mapper 'AttributeMax.py 8' -reducer 'AttributeMax.py 0' -file AttributeMax.py -D mapred.reduce.tasks=1
As we haven’t specified any particular reducer, it will use the default IdentityReducer.
Classes of aggregation functions: distributive, algebraic, and holistic
Streaming with the Aggregate package
You only have to provide a mapper that processes records and sends out a specially formatted output. Each line of the mapper’s output looks like
function:key\tvalue.
Improving performance with combiners
 It’s supposed to whittle down the output of the mapper to lessen the load on the network and on the reducer.
If we specify a combiner, the MapReduce framework may apply it to the intermediate data from mapper.
Chaining Multiple MapReduce jobs
Chain multiple MapReduce jobs to run sequentially, with the output of one MapReduce job being the input to the next.
As JobClient.runJob() blocks until the end of a job, chaining MapReduce jobs involves calling the driver of one MapReduce job after another. The driver at each job will have to create a new JobConf object and set its input path to be the output path of the previous job. You can delete the intermediate data generated at each step of the chain at the end.
Chaining MapReduce jobs with complex dependency
In addition to holding job configuration information, Job also holds dependency information, specified through the addDependingJob() method.
JobControl objects do the managing and monitoring of the job execution. You can add jobs to a JobControl object via the addJob() method.
After adding all the jobs and dependencies, call JobControl’s run() method to spawn a thread to submit and monitor jobs for execution. JobControl has methods like allFinished() and getFailedJobs() to track the execution of various jobs within the batch.

Joining data from different sources
Reduce-side joining
Hadoop has a contrib package called datajoin that works as a generic framework for data joining in Hadoop. Its jar file is at contrib/datajoin/hadoop-*-datajoin.jar.
Hadoop MapReduce code example
public class CitationHistogram extends Configured implements Tool {
    public static class MapClass extends MapReduceBase
        implements Mapper {
        private final static IntWritable uno = new IntWritable(1);
        private IntWritable citationCount = new IntWritable();
        // these two are defined to improve preformance
        // The map() method will be called as many times as there are records
        // (in a split, for each JVM). Reducing the number of objects created
        // inside the map() method can increase performance and reduce
        // garbage collection  .
        public void map(Text key, Text value,
                        OutputCollector output,
                        Reporter reporter) throws IOException {
            citationCount.set(Integer.parseInt(value.toString()));
            output.collect(citationCount, uno);
        }
    }
    public static class Reduce extends MapReduceBase
        implements Reducer
    {
        public void reduce(IntWritable key, Iterator values,
                           OutputCollectoroutput,
                           Reporter reporter) throws IOException {
            int count = 0;
            while (values.hasNext()) {
                count += values.next().get();
            }
            output.collect(key, new IntWritable(count));
        }
    }
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        JobConf job = new JobConf(conf, CitationHistogram.class);
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);
        job.setJobName("CitationHistogram");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
        job.setInputFormat(KeyValueTextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        JobClient.runJob(job);
        return 0;
    }
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(),
                                 new CitationHistogram(),
                                 args);
        System.exit(res);
    }
}       
Revised version of the WordCount example
public class WordCount2 {
    public static void main(String[] args) {
        JobClient client = new JobClient();
        JobConf conf = new JobConf(WordCount2.class);
        FileInputFormat.addInputPath(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(LongWritable.class);
        conf.setMapperClass(TokenCountMapper.class);    q         conf.setCombinerClass(LongSumReducer.class);
        conf.setReducerClass(LongSumReducer.class);    w
        client.setConf(conf);
        try {
            JobClient.runJob(conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Resource
Post a Comment

Labels

Java (159) Lucene-Solr (112) Interview (61) All (58) J2SE (53) Algorithm (45) Soft Skills (38) Eclipse (33) Code Example (31) Linux (25) JavaScript (23) Spring (22) Windows (22) Web Development (20) Tools (19) Nutch2 (18) Bugs (17) Debug (16) Defects (14) Text Mining (14) J2EE (13) Network (13) Troubleshooting (13) PowerShell (11) Chrome (9) Design (9) How to (9) Learning code (9) Performance (9) Problem Solving (9) UIMA (9) html (9) Http Client (8) Maven (8) Security (8) bat (8) blogger (8) Big Data (7) Continuous Integration (7) Google (7) Guava (7) JSON (7) Shell (7) ANT (6) Coding Skills (6) Database (6) Lesson Learned (6) Programmer Skills (6) Scala (6) Tips (6) css (6) Algorithm Series (5) Cache (5) Dynamic Languages (5) IDE (5) System Design (5) adsense (5) xml (5) AIX (4) Code Quality (4) GAE (4) Git (4) Good Programming Practices (4) Jackson (4) Memory Usage (4) Miscs (4) OpenNLP (4) Project Managment (4) Spark (4) Testing (4) ads (4) regular-expression (4) Android (3) Apache Spark (3) Become a Better You (3) Concurrency (3) Eclipse RCP (3) English (3) Happy Hacking (3) IBM (3) J2SE Knowledge Series (3) JAX-RS (3) Jetty (3) Restful Web Service (3) Script (3) regex (3) seo (3) .Net (2) Android Studio (2) Apache (2) Apache Procrun (2) Architecture (2) Batch (2) Bit Operation (2) Build (2) Building Scalable Web Sites (2) C# (2) C/C++ (2) CSV (2) Career (2) Cassandra (2) Distributed (2) Fiddler (2) Firefox (2) Google Drive (2) Gson (2) How to Interview (2) Html Parser (2) Http (2) Image Tools (2) JQuery (2) Jersey (2) LDAP (2) Life (2) Logging (2) Python (2) Software Issues (2) Storage (2) Text Search (2) xml parser (2) AOP (1) Application Design (1) AspectJ (1) Chrome DevTools (1) Cloud (1) Codility (1) Data Mining (1) Data Structure (1) ExceptionUtils (1) Exif (1) Feature Request (1) FindBugs (1) Greasemonkey (1) HTML5 (1) Httpd (1) I18N (1) IBM Java Thread Dump Analyzer (1) JDK Source Code (1) JDK8 (1) JMX (1) Lazy Developer (1) Mac (1) Machine Learning (1) Mobile (1) My Plan for 2010 (1) Netbeans (1) Notes (1) Operating System (1) Perl (1) Problems (1) Product Architecture (1) Programming Life (1) Quality (1) Redhat (1) Redis (1) Review (1) RxJava (1) Solutions logs (1) Team Management (1) Thread Dump Analyzer (1) Visualization (1) boilerpipe (1) htm (1) ongoing (1) procrun (1) rss (1)

Popular Posts