WorkflowHadoopAdaptor

From Gcube Wiki
Revision as of 13:42, 15 June 2011 by Stefanos.tsaklas (Talk | contribs) (Verbose progress filter)

(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to: navigation, search

Overview

This adaptor as part of the adaptors offered by the WorkflowEngine constructs an Execution Plan that can mediate to submit a job written under the Map Reduce design pattern and written against the utilities offered by the Hadoop infrastructure.. After its submission the job is monitored for its status and once completed the output files are retrieved and stored in the Storage System. The resources that are provided and need to be moved to the Hadoop infrastructure are all transfered through the Storage System. They are stored once the plan is constructed and are then retrieved once the execution has started. The Hadoop infrastructure utilized resides in a cloud infrastructure with which the ExecutionEngine negotiates the resource availability.

Plan Template

The entire execution process takes place in the Hadoop UI node. This node is picked from the Information System and is currently chosen randomly from all the available ones. Currently once the node has been picked, the execution cannot be moved to a different one even if there is a problem communicating with that node. The execution that takes place is a series of steps executed sequentially. These steps include the following:

  • Contact the remote node
  • Retrieval of the data stored in the Storage System and these include the resources marked as Archive, Configuration, Files, Libraries, Jar and Input resources
    • If some of these resources are declared as stored in the Hadoop HDFS instead of the Storage System, calls are made to the HDFS to retrieve the marked content
  • The Input resources are moved to the HDFS so that they are available to the MapReduce processes
  • Submit the job using the provided Jar file, specifying the Class containing the main method to execute and optionally any resources provided such as configuration, properties, files, library jars, archives and arguments
  • When the job is executed, and even if some error occurred, the standard output and standard error of the job is retrieved and stored to the Storage System
  • If the job terminated successfully and it is requested, retrieve the directory containing the output
    • A call to the HDFS is made to retrieve the directory, a tar.gz archive is created of this directory and stored to the Storage System
  • Finally remove the input and output directory if it has been requested.

Highlights

Processing filters

Verbose progress filter

Taking advantage the extensible filtering capabilities of the Filters the WorkflowEngine defines an external filter that can process the verbose output of the hadoop jar command. This way, outputs of the following format can be parsed and each line read by this output is emitted back to the caller as status report. This output is printed in the standard error of the command execution. The filter is applied to the output read from the command's standard error while it is being persisted to be moved to the Storage System.

10/02/24 09:46:17 INFO input.FileInputFormat: Total input paths to process : 7
10/02/24 09:46:18 INFO mapred.JobClient: Running job: job_201002181242_0052
10/02/24 09:46:19 INFO mapred.JobClient:  map 0% reduce 0%
10/02/24 09:46:29 INFO mapred.JobClient:  map 28% reduce 0%
10/02/24 09:46:30 INFO mapred.JobClient:  map 57% reduce 0%
10/02/24 09:46:33 INFO mapred.JobClient:  map 71% reduce 0%
10/02/24 09:46:35 INFO mapred.JobClient:  map 100% reduce 0%
10/02/24 09:46:38 INFO mapred.JobClient:  map 100% reduce 33%
10/02/24 09:46:44 INFO mapred.JobClient:  map 100% reduce 100%
10/02/24 09:46:46 INFO mapred.JobClient: Job complete: job_201002181242_0052
10/02/24 09:46:46 INFO mapred.JobClient: Counters: 18
10/02/24 09:46:46 INFO mapred.JobClient:   Job Counters
10/02/24 09:46:46 INFO mapred.JobClient:     Launched reduce tasks=1
10/02/24 09:46:46 INFO mapred.JobClient:     Rack-local map tasks=1
10/02/24 09:46:46 INFO mapred.JobClient:     Launched map tasks=7
10/02/24 09:46:46 INFO mapred.JobClient:     Data-local map tasks=6
10/02/24 09:46:46 INFO mapred.JobClient:   FileSystemCounters
10/02/24 09:46:46 INFO mapred.JobClient:     FILE_BYTES_READ=3538615
10/02/24 09:46:46 INFO mapred.JobClient:     HDFS_BYTES_READ=6866273
10/02/24 09:46:46 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=5927736
10/02/24 09:46:46 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=1165259
10/02/24 09:46:46 INFO mapred.JobClient:   Map-Reduce Framework
10/02/24 09:46:46 INFO mapred.JobClient:     Reduce input groups=0
10/02/24 09:46:46 INFO mapred.JobClient:     Combine output records=165570
10/02/24 09:46:46 INFO mapred.JobClient:     Map input records=146199
10/02/24 09:46:46 INFO mapred.JobClient:     Reduce shuffle bytes=2388897
10/02/24 09:46:46 INFO mapred.JobClient:     Reduce output records=0
10/02/24 09:46:46 INFO mapred.JobClient:     Spilled Records=409994
10/02/24 09:46:46 INFO mapred.JobClient:     Map output bytes=10780405
10/02/24 09:46:46 INFO mapred.JobClient:     Combine input records=1094930
10/02/24 09:46:46 INFO mapred.JobClient:     Map output records=1094930
10/02/24 09:46:46 INFO mapred.JobClient:     Reduce input records=165570

Usage

The following snippets demonstrate the usage of the adaptor.

Lets assume we want to run the following Word Count example:

public class WordCount {
	public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}
 
	public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
		private IntWritable result = new IntWritable();
		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) sum += val.get();
			result.set(sum);
			context.write(key, result);
		}
	}
 
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		if (args.length != 2) {
			System.err.println("Usage: wordcount <in> <out>");
			System.exit(2);
		}
		Job job = new Job(conf, "word count");
		job.setJarByClass(WordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		//The true arguments implied verbose execution so that we get back output 
		//on the execution of the mappers and reducers as well as the the counters
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

To submit the job, firstly we will need to define the resources that are needed for this job. These resources can include one of the following types

  • Jar - The jar file that contains the main method to execute (mandatory, exactly one resource of this type must be present)
  • MainClass - The class in the above jar that contains the main method (mandatory, exactly one resource of this type must be present)
  • Argument - Argument that must be provided to the main method (optional, one or more resources of this type can be present)
  • Configuration - Configuration to be provided to the application (optional, exactly one resource of this type can be present)
  • Property - Properties that should be made available to the application (optional, one or more resources of this type can be present)
  • File - Files that should be present in every node in the working directory of the executed mappers and reducers (optional, one or more resources of this type can be present)
  • Lib - Jars that should be in the classpath of the executed mappers and reducers (optional, one or more resources of this type can be present)
  • Archive - Hadoop Archives (.har) files that should be extracted in every node in the working directory of the executed mappers and reducers (optional, one or more resources of this type can be present)
  • Input - Input files that should be in the HDFS and are input files of the mappers (optional, one or more resources of this type can be present)
  • Output - Output location of the reducers products (optional, one or more resource of this type can be present)

For our case, the following snippet will declare the resources we need.

AdaptorHadoopResources attachedResources=new AdaptorHadoopResources();
//The jar that will be submitted for execution
attachedResources.Resources.add(AttachedHadoopResource.NewJarResource("WorCount.jar","local path/WordCount.jar",false));
//The main class in the above jar that contains the main method
attachedResources.Resources.add(AttachedHadoopResource.NewMainClassResource("some.package.name.WordCount"));
//The main method above waits as first argument the input folder where the input files are to be retrieved from
attachedResources.Resources.add(AttachedHadoopResource.NewArgumentResource(0,"testInput"));
//The main method above waits as second argument the input folder where the output files are to be stored
attachedResources.Resources.add(AttachedHadoopResource.NewArgumentResource(1,"testOutput"));
//Input files that are to be stroed in the input directory. The last argument if true specifies that the input folder needs to 
//be removed after completion. If one is set to true, then all the input payload is removed
attachedResources.Resources.add(AttachedHadoopResource.NewInputResource("testInput/textFile1.txt","local path/testFile1.txt",true));
attachedResources.Resources.add(AttachedHadoopResource.NewInputResource("testInput/textFile2.txt","local path/testFile2.txt",true));
attachedResources.Resources.add(AttachedHadoopResource.NewInputResource("testInput/textFile3.txt","local path/testFile3.txt",true));
//The location that the output files are stored. This whole folder is tar gzipped and retrieved after successful completion. The 
//last argument if true specifies that the output folder should be removed after successful retrieval of the output
attachedResources.Resources.add(AttachedHadoopResource.NewOutputResource("testOutput",true));

To create the actual ExecutionPlan using the adaptor the following snippet is enough

WorkflowHadoopAdaptor adaptor=new WorkflowHadoopAdaptor();
adaptor.SetAdaptorResources(attachedResources);
adaptor.CreatePlan();

The plan that is now created can be submitted to the ExecutionEngine for execution and after the execution is completed, we can retrieve the output of the job, which in our case will be std.err containing the output described in the Progress Filter, std.out of our main program as well as the tar gzipped output folder the with the following snippet

ExecutionPlan plan=adaptor.GetCreatedPlan();
for(IOutputResource res : adaptor.GetOutput()) {
  OutputHadoopResource out=(OutputHadoopResource)res;
  System.out.println("Output file "+out.Key+" of type "+out.TypeOfOutput.toString()+" is hosted in the StorageSystem with identifier "+plan.Variables.Get(out.VariableID));
}

Known Limitations

  • File staging can be performed in the context of the adaptor. But for large amounts of data an offline procedure is generally preferable and in some cases mandatory
  • Job submission must be defined to wait for the completion of the job
  • To be able to receive progress information while the hadoop job is running, verbose execution must be defined
  • Output is retrieved as tar.gz archives and not as individual files. These include also the logs of the job as well as the job hadoop configuration file