Difference between revisions of "WorkflowJDLAdaptor"

From Gcube Wiki
Jump to: navigation, search
(Usage)
(Parellization factor)
Line 367: Line 367:
  
 
=Highlights=
 
=Highlights=
==Parellization factor==
+
==Parallelization factor==
 
Depending on the configuration, the adaptor will create the [[ExecutionEngine#ExecutionPlan | ExecutionPlan]] that will orchestrate the execution of a DAG of jobs either as a series of [[ExecutionPlan_Elements#Sequence | Sequence]] and [[ExecutionPlan_Elements#Flow | Flow]] elements or as a single [[ExecutionPlan_Elements#Bag | Bag]]. The first case allows for a well defined series of operation but since the creation of such a series of constructs is an exercise on graph topological sorting, which as a problem can provide multiple answers that depending on the nature of the original graph might restrict the parallelization factor of the overall DAG, in cases of complex graphs, this case can damage the parallelization capabilities of the overall plan. The second case is much more dynamic. It allows for execution time decision making of the nodes to be executed. This of course comes as a trade off with increased complexity at runtime with respect to the well defined plan, but it can provide optimal parallelization capabilities.
 
Depending on the configuration, the adaptor will create the [[ExecutionEngine#ExecutionPlan | ExecutionPlan]] that will orchestrate the execution of a DAG of jobs either as a series of [[ExecutionPlan_Elements#Sequence | Sequence]] and [[ExecutionPlan_Elements#Flow | Flow]] elements or as a single [[ExecutionPlan_Elements#Bag | Bag]]. The first case allows for a well defined series of operation but since the creation of such a series of constructs is an exercise on graph topological sorting, which as a problem can provide multiple answers that depending on the nature of the original graph might restrict the parallelization factor of the overall DAG, in cases of complex graphs, this case can damage the parallelization capabilities of the overall plan. The second case is much more dynamic. It allows for execution time decision making of the nodes to be executed. This of course comes as a trade off with increased complexity at runtime with respect to the well defined plan, but it can provide optimal parallelization capabilities.
  

Revision as of 13:27, 15 June 2011

Overview

This adaptor as part of the adaptors offered by the WorkflowEngine constructs an Execution Plan based on the description of a job defined in JDL syntax. This description can be of a single job or it can include a Directed Acyclic Graph (DAG) of jobs. The JDL description is parsed and the adaptor then processes the retrieved parsed info to create the Execution Plan. During the parsing procedure, the jobs Input and Output sandboxes are examined to determine what is the overall input and output set of the workflow. The input set is covered by the submitting client through the attached resources he provides. The output resources of the workflow are constructed from the elements found in all the jobs Output Sandboxes.

JDL Support

Attributes

Job

Supported attributes in a JDL description defining a job are the following:

  • JobType (mandatory)
    The type of the job. Currently only Normal job type is supported.
  • Executable (mandatory)
    The executable that performs the actual job.
  • Arguments (optional)
    The arguments to pass to the executable. The arguments that are included in this attribute are split using the space character as a delimiter. This means that no space containing phrase can be passed as a single argument.
  • StdInput (optional)
    What to write to the executable's standard input once the execution is started. The content of the attribute is expected to be a file that either exists in the host initially, or is transfered as one of the elements contained in the Input Sandbox of the job.
  • StdOutput (optional)
    Where to write the executable's standard output once the execution is started. The content of the attribute is expected to be a file that if needed to be later available will be included in the job's Output Sandbox.
  • StdError (optional)
    Where to write the executable's standard error once the execution is started. The content of the attribute is expected to be a file that if needed to be later available will be included in the job's Output Sandbox.
  • InputSandbox (optional)
    This element contains the files that need to be staged to the host that is executing the job.
  • OutputSandbox (optional)
    This element contains the files that are produced by the job and need to be retrievable.
  • Environment (optional)
    This element contains key value pairs that define the environmental variables and their values that need to be set for the execution of the job.
  • RetryCount (optional)
    The number of retries that are to be performed in case a job fails once the execution is at the stage of executing the actual executable.
  • ShallowRetryCount (optional)
    The number of retries that are to be performed in case there is a problem contacting the host to be used.
  • Rank (optional)
    This element defines the rank function to use for the selection of the job host.
  • Requirements (optional)
    This element defines the requirements function to use for the selection of the job host.

Workflow Engine Extensions to the JDL syntax

  • ParsingMode (optional)
    Whether the adaptor should create a plan that orchestrates the execution as a series of sequential and parallel steps, or by defining a bag of execution units with execution conditions made up from their dependency lists. Supported values for this are Plan and Bag for the two alternatives respectively.
  • ConnectionMode (optional)
    When contacting a remote execution unit, the execution engine can either maintain a steady connection with the remote site or wait for the remote side to perform a callback to report its completion. Supported values for this are KeepAlive and Callback for the two alternatives respectively.

DAG

Supported attributes in a JDL description defining a DAG of jobs are the following:

  • Max_Running_Nodes (optional)
    The maximum number of concurrently running execution units. In cases where node collocation is not requested, this is performed at the level of Boundary element, while in cases of node collocation, since a single Boundary element is used, it is applied to the Shell plan element. More details on execution restrictions can be found in Execution Plan Configuration section.
  • Nodes (mandatory)
    The Nodes attributes is where the jobs of a DAG are defined. At least one node must be defined.
  • Dependencies (optional)
    The Dependencies attribute contains a list of pairs where the dependencies that formulate the DAG are defined. Only pairs of dependencies are supported and not inner lists. A pair in the form of {A,B} means that node B cannot be executed before node A has been completed.
  • NodesCollocation (optional)
    Boolean indication on a requirement to host all DAG nodes in the same host.
  • Rank (optional)
    In cases where node collocation is requested, this defines the rank function to use for the selection of the common host.
  • Requirements (optional)
    In cases where node collocation is requested, this defines the requirements function to use for the selection of the common host.

Workflow Engine Extensions to the JDL syntax

  • ParsingMode (optional)
    Whether the adaptor should create a plan that orchestrates the execution as a series of sequential and parallel steps, or by defining a bag of execution units with execution conditions made up from their dependency lists. Supported values for this are Plan and Bag for the two alternatives respectively.
  • ConnectionMode (optional)
    When contacting a remote execution unit, the execution engine can either maintain a steady connection with the remote site or wait for the remote side to perform a callback to report its completion. Supported values for this are KeepAlive and Callback for the two alternatives respectively.

Examples

Job

As this is the "Hello World" equivalent for the WorkflowJDLAdaptor, lets assume we want to execute the following script on some node:

#!/bin/sh
# job.sh
echo $1 > job.output
echo $PATH >> job.output
echo $TEST_ENV_VAR1 >> job.output
echo $TEST_ENV_VAR2 >> job.output
cat sig.txt >> job.output
echo "Hello World of stdOut"
echo $1
cat sig.txt
echo "hello World of stdErr" 1>&2
echo $1 1>&2
cat sig.txt 1>&2

This script is expecting for a file named sig.txt to be available in the same folder as the one it is running in. This file could be the following:

Powered by Pomolo!!!

The respective jdl file to describe this job can be the following:

[
	Type = "Job";
	JobType = "Normal";
	Executable = "job.sh";
	Arguments = "Hello_World_Of_Diligent...";
	StdOutput = "job.out";
	StdError = "job.err";
	OutputSandbox = {
		"job.err",
		"job.out",
		"job.output"
	};
	InputSandbox = {
		"job.sh",
		"sig.txt"
	};
	Environment = {
		"TEST_ENV_VAR1=Environment Value1",
		"TEST_ENV_VAR2=Environment Value2"
	};
	Rank = (other.GlueCEPolicyMaxRunningJobs-other.GlueCEStateRunningJobs);
]

DAG

For an example to a DAG defining JDL description we will use the above script as the executable and define a workflow with the following JDL:

[
	Type = "DAG";
	NodesCollocation = false;
	Max_Running_Nodes=5;
	Nodes = 
	[
		nodeA = 
		[
			Description = 
			[
				JobType = "Normal";
				Executable = "job.sh";
				Arguments = "Hello_World_Of_Diligent_from_Node_A...";
				StdOutput = "job.out";
				StdError = "job.err";
				OutputSandbox = {
					"job.err",
					"job.out",
					"job.output"
				};
				InputSandbox = {
					"job.sh",
					"sig.txt"
				};
				Environment = {
					"TEST_ENV_VAR1=Environment Value1",
					"TEST_ENV_VAR2=Environment Value2"
				};
			];
		];
		nodeB = 
		[
			Description = 
			[
				JobType = "Normal";
				Executable = "job.sh";
				Arguments = "Hello_World_Of_Diligent_from_Node_B...";
				StdOutput = "job.out";
				StdError = "job.err";
				OutputSandbox = {
					"job.err",
					"job.out",
					"job.output"
				};
				InputSandbox = {
					"job.sh",
					"sig.txt"
				};
				Environment = {
					"TEST_ENV_VAR1=Environment Value1",
					"TEST_ENV_VAR2=Environment Value2"
				};
			];
		];
		nodeC = 
		[
			Description = 
			[
				JobType = "Normal";
				Executable = "job.sh";
				Arguments = "Hello_World_Of_Diligent_from_Node_C...";
				StdOutput = "job.out";
				StdError = "job.err";
				OutputSandbox = {
					"job.err",
					"job.out",
					"job.output"
				};
				InputSandbox = {
					"job.sh",
					"sig.txt"
				};
				Environment = {
					"TEST_ENV_VAR1=Environment Value1",
					"TEST_ENV_VAR2=Environment Value2"
				};
			];
		];
		nodeD = 
		[
			Description = 
			[
				JobType = "Normal";
				Executable = "job.sh";
				Arguments = "Hello_World_Of_Diligent_from_Node_D...";
				StdOutput = "job.out";
				StdError = "job.err";
				OutputSandbox = {
					"job.err",
					"job.out",
					"job.output"
				};
				InputSandbox = {
					"job.sh",
					"sig.txt"
				};
				Environment = {
					"TEST_ENV_VAR1=Environment Value1",
					"TEST_ENV_VAR2=Environment Value2"
				};
			];
		];
		nodeE = 
		[
			Description = 
			[
				JobType = "Normal";
				Executable = "job.sh";
				Arguments = "Hello_World_Of_Diligent_from_Node_E...";
				StdOutput = "job.out";
				StdError = "job.err";
				OutputSandbox = {
					"job.err",
					"job.out",
					"job.output"
				};
				InputSandbox = {
					"job.sh",
					"sig.txt"
				};
				Environment = {
					"TEST_ENV_VAR1=Environment Value1",
					"TEST_ENV_VAR2=Environment Value2"
				};
			];
		];
		nodeF = 
		[
			Description = 
			[
				JobType = "Normal";
				Executable = "job.sh";
				Arguments = "Hello_World_Of_Diligent_from_Node_F...";
				StdOutput = "job.out";
				StdError = "job.err";
				OutputSandbox = {
					"job.err",
					"job.out",
					"job.output"
				};
				InputSandbox = {
					"job.sh",
					"sig.txt"
				};
				Environment = {
					"TEST_ENV_VAR1=Environment Value1",
					"TEST_ENV_VAR2=Environment Value2"
				};
			];
		];
	];
	Dependencies = {{nodeA,nodeB},{nodeA,nodeC},{nodeB,nodeD},{nodeC,nodeD},{nodeB,nodeE},{nodeC,nodeE},{nodeA,nodeD},{nodeD,nodeF}};
]

DAG with intermediate file staging

To make an example of the usage of intermediate file staging lets assume we have a script that can concatenate multiple files such as the following:

#!/bin/sh
# Concat.sh
while [ $# != 0 ]
do
  cat $1 >> concat.out
  shift
done

We will use this script as the final aggregating node of a DAG which has as previous steps the production of the files that will be concatenated by this node. The above case is represented in the following JDL:

[
	Type = "DAG";
	ParsingMode="Bag";
	ConnectionMode="KeepAlive";
	NodesCollocation = false;
	Nodes = 
	[
		nodeA = 
		[
			Description = 
			[
				JobType = "Normal";
				Executable = "/bin/ls";
				Arguments = "-al";
				StdOutput = "lsA.out";
				StdError = "lsA.err";
				OutputSandbox = {
					"lsA.err",
					"lsA.out"
				};
			];
		];
		nodeB = 
		[
			Description = 
			[
				JobType = "Normal";
				Executable = "/bin/ls";
				Arguments = "-al";
				StdOutput = "lsB.out";
				StdError = "lsB.err";
				OutputSandbox = {
					"lsB.err",
					"lsB.out"
				};
			];
		];
		nodeC = 
		[
			Description = 
			[
				JobType = "Normal";
				Executable = "/bin/ls";
				Arguments = "-al";
				StdOutput = "lsC.out";
				StdError = "lsC.err";
				OutputSandbox = {
					"lsC.err",
					"lsC.out"
				};
			];
		];
		nodeD = 
		[
			Description = 
			[
				JobType = "Normal";
				Executable = "Concat.sh";
				Arguments = "lsA.out lsB.out lsC.out";
				StdOutput = "concatD.out";
				StdError = "concatD.err";
				InputSandbox = {
					"Concat.sh",
					root.nodes.nodeA.description.OutputSandbox[1],
					root.nodes.nodeB.description.OutputSandbox[1],
					root.nodes.nodeC.description.OutputSandbox[1]
				};
				OutputSandbox = {
					"concatD.out",
					"concatD.err",
					"concat.out"
				};
			];
		];
	];
 
	Dependencies = {{nodeA,nodeD},{nodeB,nodeD},{nodeC,nodeD}};
]

Highlights

Parallelization factor

Depending on the configuration, the adaptor will create the ExecutionPlan that will orchestrate the execution of a DAG of jobs either as a series of Sequence and Flow elements or as a single Bag. The first case allows for a well defined series of operation but since the creation of such a series of constructs is an exercise on graph topological sorting, which as a problem can provide multiple answers that depending on the nature of the original graph might restrict the parallelization factor of the overall DAG, in cases of complex graphs, this case can damage the parallelization capabilities of the overall plan. The second case is much more dynamic. It allows for execution time decision making of the nodes to be executed. This of course comes as a trade off with increased complexity at runtime with respect to the well defined plan, but it can provide optimal parallelization capabilities.

Staging

Staging of input for the executables constituting the execution units of the jobs, is performed at a level of Input Sandbox defined for each job. The resources that are attached to the adaptor are stored in the StorageSystem and are retrieved in the node that hosts the Input Sandbox that declares them. The files declared in the Output Sandbox of a job are stored in the StorageSystem and information on the way to retrieve the output is provided through the Output Resources defined by the adaptor and are valid after the completion of the execution.

Staging of intermediate files is also possible through reference expressions defined in a job's Input Sandbox if this job is part of a DAG. Through this expression a job can define that it needs as input the file produced from some other job and is declared in its Output Sandbox. This is the only case where reference expressions are allowed and used.

Usage

The following snippets demonstrate the usage of the adaptor.

To submit the job, firstly we will need to define the resources that are needed for this job. These resources include the resources that are consumed by the job and are defined as the set defined by all the Input Sanboxes that are included in the JDL description. In the following snippets we define how these resources should be declared for both the simple job presented in the example above, as well as for the DAG of jobs. Since the job we have used for each node of the DAG is the same, we are just passing different command line arguments, there is no need to define multiple times the input files. So for both cases the following snippet is enough. The first argument of the AttachedJDLResource is the name that we want the resource to have once it has been moved to the host for the execution. The second argument represents the path to the resource that we want to attach and is stored in the machine that we are running the adaptor code.

AdaptorJDLResources attachedResources=new AdaptorJDLResources();
attachedResources.Resources.add(new AttachedJDLResource("job.sh","local path/job.sh"));
attachedResources.Resources.add(new AttachedJDLResource("sig.txt","local path/sig.txt"));

For the third example with the intermediate file staging, the resources needed could be defined as follows:

AdaptorJDLResources attachedResources=new AdaptorJDLResources();
attachedResources.Resources.add(new AttachedJDLResource("Concat.sh","local path/Concat.sh"));

To create the actual Execution Plan using the adaptor the following snippet is enough

WorkflowJDLAdaptor adaptor=new WorkflowJDLAdaptor();
adaptor.SetAdaptorResources(attachedResources);
adaptor.SetJDL(jdlFile);
adaptor.CreatePlan();

The plan that is now created can be submitted to the ExecutionEngine for execution and after the execution is completed, we can retrieve the output of the job, which in our case will be job.err, job.out and job.output with the following snippet

ExecutionPlan plan=adaptor.GetCreatedPlan();
for(IOutputResource res : adaptor.GetOutput()){
  OutputSandboxJDLResource out=(OutputSandboxJDLResource)res;
  System.out.println("Output file of node "+out.NodeName+" with jdl name : "+out.SandboxName+" "+
                     "is hosted in the StorageSystem with identifier "+plan.Variables.Get(out.VariableID));
}

Known limitations

  • JDL parsing needs all declarations to start and begin with angle brackets ([,])
  • JDL attributes parsing is case sensitive
  • JDL Dependencies attribute only supports pairs not inner lists
  • Supported jobs are only those of type Normal
  • Node collocation
    The case of node collocation in DAG jobs is not handled correctly because multiple Boundary are created. The node used is still a single one but it is contacted multiple times and data locality is not exploited correctly.
  • The arguments defined for an executable in the respective JDL attribute, when passed to the Shell are split using the space character (' ') as a delimiter. This way no space containing phrase can be passed as a single argument
  • The Retry and Shallow Retry attributes of the JDL are treated equally and are used at the level of Shell and not at the level of Boundary depending on the case