First HadoopApp on the NCIT Cluster
- Details
- Hits: 6534
First HadoopApp on the NCIT Cluster
This example is based on the hadoop WordCount tutorial at: http://hadoop.apache.org/common/docs/current/mapred_tutorial.html. First of all we need our Map-Reduce program. All files presented here are in this archive.
package org.myorg;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
Ok, then we want to compile this (build.sh). We use module load to set any environment variables.
#!/bin/bash
#
# Original tutorial at:
# http://hadoop.apache.org/common/docs/current/mapred_tutorial.html
. /opt/modules/Modules/3.2.5/init/bash
module load java/jdk1.6.0_23-64bit
module load libraries/hadoop-0.20.2
[[ ! -d build ]] && mkdir build;
javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar \
-sourcepath src \
-d build \
src/org/myorg/WordCount.java
jar -cvf wordcount.jar -C build/ .
Let's run it:
[alexandru.herisanu@fep-53-1 ex3]$ ./build.sh
Note: src/org/myorg/WordCount.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
added manifest
adding: org/(in = 0) (out= 0)(stored 0%)
adding: org/myorg/(in = 0) (out= 0)(stored 0%)
adding: org/myorg/WordCount$Reduce.class(in = 1611) (out= 649)(deflated 59%)
adding: org/myorg/WordCount.class(in = 1546) (out= 749)(deflated 51%)
adding: org/myorg/WordCount$Map.class(in = 1938) (out= 798)(deflated 58%)
Now you want to upload your files, see here. Let's run it using SGE integration. The HDFS filesystem is always up and running but the job trackers are not (run.sh).
#!/bin/bash
#
# i presume you just compiled the program and uploaded the input files
# into /user/alexandru.herisanu/myjob (well your directory)
#
qsub -q ibm-nehalem.q -pe hadoop 4 -N HadoopExample -cwd \
-jsv /opt/n1sge6/sge-6.2u5/ncit-hadoop/jsv.sh \
-l hdfs_input=/user/alexandru.herisanu/myjob <<EOF
module load java/jdk1.6.0_23-64bit
module load libraries/hadoop-0.20.2
hadoop --config \$TMPDIR/conf jar wordcount.jar org.myorg.WordCount \
/user/alexandru.herisanu/myjob /user/alexandru.herisanu/myjob/output
hadoop --config \$TMPDIR/conf fs -cat /user/hadoop-alexandru.herisanu/myjob/output/part*
EOF
Let's run it.
[alexandru.herisanu@fep-53-1 ex3]$ cat *149654
11/05/31 12:58:38 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
11/05/31 12:58:39 INFO mapred.FileInputFormat: Total input paths to process : 2
11/05/31 12:58:39 INFO mapred.JobClient: Running job: job_201105311258_0001
11/05/31 12:58:40 INFO mapred.JobClient: map 0% reduce 0%
11/05/31 12:58:49 INFO mapred.JobClient: map 33% reduce 0%
11/05/31 12:58:52 INFO mapred.JobClient: map 66% reduce 0%
11/05/31 12:58:53 INFO mapred.JobClient: map 100% reduce 0%
11/05/31 12:59:01 INFO mapred.JobClient: map 100% reduce 100%
11/05/31 12:59:03 INFO mapred.JobClient: Job complete: job_201105311258_0001
11/05/31 12:59:03 INFO mapred.JobClient: Counters: 19
11/05/31 12:59:03 INFO mapred.JobClient: Job Counters
11/05/31 12:59:03 INFO mapred.JobClient: Launched reduce tasks=1
11/05/31 12:59:03 INFO mapred.JobClient: Rack-local map tasks=2
11/05/31 12:59:03 INFO mapred.JobClient: Launched map tasks=3
11/05/31 12:59:03 INFO mapred.JobClient: Data-local map tasks=1
11/05/31 12:59:03 INFO mapred.JobClient: FileSystemCounters
11/05/31 12:59:03 INFO mapred.JobClient: FILE_BYTES_READ=79
11/05/31 12:59:03 INFO mapred.JobClient: HDFS_BYTES_READ=55
11/05/31 12:59:03 INFO mapred.JobClient: FILE_BYTES_WRITTEN=266
11/05/31 12:59:03 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=41
11/05/31 12:59:03 INFO mapred.JobClient: Map-Reduce Framework
11/05/31 12:59:03 INFO mapred.JobClient: Reduce input groups=5
11/05/31 12:59:03 INFO mapred.JobClient: Combine output records=6
11/05/31 12:59:03 INFO mapred.JobClient: Map input records=2
11/05/31 12:59:03 INFO mapred.JobClient: Reduce shuffle bytes=91
11/05/31 12:59:03 INFO mapred.JobClient: Reduce output records=5
11/05/31 12:59:03 INFO mapred.JobClient: Spilled Records=12
11/05/31 12:59:03 INFO mapred.JobClient: Map output bytes=82
11/05/31 12:59:03 INFO mapred.JobClient: Map input bytes=51
11/05/31 12:59:03 INFO mapred.JobClient: Combine input records=8
11/05/31 12:59:03 INFO mapred.JobClient: Map output records=8
11/05/31 12:59:03 INFO mapred.JobClient: Reduce input records=6
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
Starting Hadoop PE
$HADOOP_HOME = /opt/lib/hadoop/hadoop-0.20.2
starting jobtracker, logging to /export/home/ncit-cluster/prof/alexandru.herisanu/hadoop-alexandru.herisanu-jobtracker-nehalem-wn14.grid.pub.ro.out
modified context of job 149654
modified context of job 149654
Stopping Hadoop PE
stopping jobtracker
nehalem-wn11.grid.pub.ro: stopping tasktracker
nehalem-wn13.grid.pub.ro: stopping tasktracker
nehalem-wn14.grid.pub.ro: stopping tasktracker
nehalem-wn12.grid.pub.ro: stopping tasktracker
[alexandru.herisanu@fep-53-1 ex3]$
Done!
File input/output operations on HDFS
- Details
- Hits: 7696
All operations are made from worker nodes. The way SGE integration works, it creates the masters, slaves and mapred-site.xml dynamically and starts the JobTrackers for you. The Hadoop configuration is generated in $TMPDIR. Here are some short examples:
hadoop --config \$TMPDIR/conf fs -lsr /
hadoop --config \$TMPDIR/conf fs -mkdir myjob
hadoop --config \$TMPDIR/conf fs -put file01 myjob
hadoop --config \$TMPDIR/conf fs -put file02 myjob
hadoop --config \$TMPDIR/conf fs -lsr /
Ofcourse you have to write this using qsub. \$TMPDIR means we want this var inside our script at runtime. So here goes (example1.sh):
#!/bin/bash
#
# List contents of the HDFS
qsub -q ibm-nehalem.q -pe hadoop 4 -N HadoopExample -cwd <<EOF
module load java/jdk1.6.0_23-64bit
module load libraries/hadoop-0.20.2
hadoop --config \$TMPDIR/conf fs -lsr /
hadoop --config \$TMPDIR/conf fs -mkdir myjob
hadoop --config \$TMPDIR/conf fs -put file01 myjob
hadoop --config \$TMPDIR/conf fs -put file02 myjob
hadoop --config \$TMPDIR/conf fs -lsr /
EOF
Let's run it and see the results.
[alexandru.herisanu@fep-53-1 hadoop]$ ./example1.sh
http://blogs.oracle.com/ravee/entry/creating_hadoop_pe_under_sge
[alexandru.herisanu@fep-53-1 hadoop]$ hadoop dfs
Generic options supported are
-conf <configuration file> specify an application configuration file
-D <property=value> use value for given property
-fs <local|namenode:port> specify a namenode
-jt <local|jobtracker:port> specify a job tracker
-files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars> specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]