About this page: This page lists code and information that I found useful and/or hard to find
when programming with Hadoop. The Java code was tested under Hadoop 0.16.3.
Table of Contents
- Introduction
- HOWTO: Access HDFS from a Java Application
- HOWTO: Use the Distributed Cache Facility
- HOWTO: Decide the number of Map and Reduce tasks
Introduction
Hadoop is an open-source implementation
of the
MapReduce
programming framework.
MapReduce has recently been enjoying more and more popularity due to Google, Yahoo!, Amazon's EC2, etc.
Therefore, it is especially important that you understand its strengths and weaknesses.
In my opinion, its main strength is its ease of use for the programmer.
It takes care of the "ugly" details of parallel programming such as communication, scheduling,
fault tolerance, etc. which can speed up development significantly.
However, it may not be the most efficient solution to a
given problem - that is the nature of its generality. Also, it may be hard to model parallel problems
involving complex interdependent subtasks as Map and Reduce.
I encourage you to read some critical articles on the subject to gain a perspective on the hype.
A good start is here
MapReduce: A major step backwards and
MapReduce II.
HOWTO: Access HDFS from a Java Application
The following code snippet shows how you can access the Hadoop Distributed File System (HDFS)
from a regular Java Application, i.e. outside of the Hadoop environment. It is assumed that the machine
running this code has been properly configured to operate with Hadoop, meaning the hadoop-site.xml and hadoop-default.xml
files have been set up.
Configuration config = new Configuration();
// add the hadoop configuration files residing in the installation path of hadoop
config.addResource(new Path("/home/hadoop/hadoop_install/conf/hadoop-default.xml"));
config.addResource(new Path("/home/hadoop/hadoop_install/conf/hadoop-site.xml"));
// pass the username and password required to access the HDFS (set up on the namenode)
config.set("hadoop.job.ugi", "username, password");
FileSystem fs = FileSystem.get(config);
HOWTO: Use the Distributed Cache Facility
The Distributed Cache facility allows you to transfer files from the distributed file system to the local
filesystem (for reading only) of all participating nodes before the beginning of a job (conceptually).
This can boost efficiency when many map and/or reduce tasks need access some common data because a node
can read the file from its local filesystem, eliminating the need to retrieve the file chunks from other nodes.
// place this somewhere before starting the job, e.g. in the main function
// the URI signifies a file on the distributed file system
DistributedCache.addCacheFile(new URI("/home/hadoop/file_to_distribute"), conf);
// example Reduce class, works the same way for the mapper class
public static class Reduce extends MapReduceBase implements Reducer<...> {
private FileSystem fs; // local filesystem
private Path[] localFiles; // local filenames from the distributed cache
// get the local filesystem and read the local filenames from the distributed cache
public void configure(JobConf job) {
try {
fs = FileSystem.getLocal(new Configuration());
localFiles = DistributedCache.getLocalCacheFiles(job);
} catch (IOException e) {
e.printStackTrace();
}
}
// perform the reduce
public void reduce(...) {
// open the first file in the distributed cache on the local filesystem
FSDataInputStream localFile = fs.open(localFiles[0]);
}
// cleanup, close the local filesystem
public void close() throws IOException {
fs.close();
}
}
HOWTO: Decide the number of Map and Reduce tasks
This is just a summary of what is presented on the
Hadoop Wiki.
Number Map tasks: 10-100 * NumberClusterNodes
Number Reduce tasks: 0.95 or 1.75 * (nodes * mapred.tasktracker.tasks.maximum)
mapred.tasktracker.tasks.maximum is defined in your hadoop installation under conf/hadoop-default.xml.
What exactly do these values influence?
Number of Map tasks: Determines the number and "size" of InputSlits (some boundary cases excluded).
If you think of your input as one large file, then Hadoop will split the file into equally sized chunks,
the number of chunks being the number of map tasks.
Equivalently, one map task is spawned for each InputSplit.
Number of Reduce tasks: Each distinct input key will be mapped to one reduce task. This means
that one reduce task may handle multiple distinct input keys. Each reduce task will produce one output file
containing complete output records.
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WikiArticleIndexer.class);
conf.setJobName("WikiDocMagnitudeIndexer");
int nodes = 5; // there are 5 Cluster Nodes in this setup
int maxTasksPerNode = 2; // mapred.tasktracker.tasks.maximum
int mapTasks = 20 * nodes; // calculate number of map tasks
int reduceTasks = (int)Math.floor(0.95 * nodes * maxTasksPerNode); // calculate number of reduce tasks
conf.setNumMapTasks(mapTasks);
conf.setNumReduceTasks(reduceTasks);
}