Workflow Engine

From Gcube Wiki
Jump to: navigation, search

Overview

The Workflow Engine operates on top of the ExecutionEngine. Its purpose is to abstract over the low level details that are needed by the ExecutionEngine and the Execution Plan it is provided with.

Execution Environment

The Workflow Engine is constructed to operate on a variety of environments. What is required for this to happen is to be able to identify hooks with which to attach it self to the available Information Space as well as access to permanent storage. These hooks are provided through the the Execution Environment Providers.

gCube Web Service Interface

When the Workflow Engine is acting in the context of the gCube platform, it provides a gCube compliant Web Service interface. This Web Service acts as the front end not only to Workflow definition facilities, but it is also the "face" of the component with respect to the gCube platform. The Running Instance profile of the service is the placeholder where the underlying Workflow Engine instance Execution Environment Providers pushes information that needs to be made available to other engine instances. Additionally, the service etc folder contains the logging properties that control the logging granularity of the service as well as the rest of the underlying components. Configuration properties that are used throughout the Workflow Engine instance are retrieved from the service jndi and are used to initiate services and providers once the service receives the init event.

Adaptors

One of the functionalities offered by the WorkflowEngine is the possibility to bridge between existing well known job description and workflow definition languages and the internally used Workflow Language which is subsequently transformed in an Execution Plan. This bridging is performed by means of Adaptors.

Adaptors are implemented to operate on a specific third party language which they can understand, parse, and translate into internally used constructs. This way the WorkflowEngine opens up its usability level since existing workflows already defined in third party languages can be easily incorporated. Additionally, the learning curve for anyone wishing to use the execution and workflow capabilities of the system is greatly reduced as depending on ones needs one can simply focus on one of the existing supported languages which will best match the job at hand. Additionally, for the same language, more than one adaptors can be implemented that will offer different type of functionality either by modifying the semantics of the produced Execution Plan or even by incorporating external components to the evaluation.

The following list of adaptors is currently provided:

Workflow Layer

The aforementioned adaptors are exploited by the Workflow Layer, which provides a high level interface that enables the description and handling of the execution of workflows of jobs, over heterogeneous processing infrastructures. An extension of JDL, namely gJDL, is used for the description of workflows.

Adaptor CLI

A Command Line Interface is provided to define jobs to be submitted to one of the adaptors, monitor its execution as well as retrieve the output of the processing. These utilities are provided as part of the package WorkflowEngineServiceClient from the central distribution site.

This package includes the following utilities:

  • StartJDLAdaptorTest.sh
  • StartGridAdaptorTest.sh
  • StartCondorAdaptorTest.sh
  • StartHadoopAdaptorTest.sh
  • StartJobMonitor.sh
  • RetrieveFile.sh
  • Example files

The first four utilities, StartJDLAdaptor, StartGridAdaptor, StartCondorAdaptor and StartHadoopAdaptor are the ones responsible of submitting the work to the respective adaptor as they are documented in the Adaptors section. All of them define the same interface expecting as first parameter a file locally available that describes the job that is to be submitted, and as a second argument a local file to be used as an output location where a reference identifier to the submitted job is stored to be used for later interactions. Depending on the adaptor used, a different syntax is needed for the resource file. The syntax for each of these are described in the following sections

JDL Adaptor resource file syntax

The syntax of the resource file expected by the JDL adaptor test utility is the one described bellow.

# The scope of the job submitted. Scope is an internal gCube construct described elsewhere. 
# For the purpose of this clients, one should just keep in mind that the scope used as a 
# value here must be one of the supported scopes defined in the gHN container installation
# available in the same machine as the one that is running the clients and to which the defined
# $GLOBUS_LOCATION environment variable points to
 
scope#<the scope to use>
 
# The jdl based description of the job that should be available in the local machine running
# this clients. A definition of the JDL syntax is out of scope but the supported attributes
# of the adaptor can be found at WorkflowJDLAdaptor
 
jdl#<path to the jdl file>
 
# While the job is running, the ExecutionEngine emits events about the progress of the execution
# as described in Execution Events. Using this flag one
# can choose to choke these events so that they are not emitted at all.
 
chokeProgressEvents#<true | false>
 
# While the job is running, the ExecutionEngine emits events about the performance of the execution
# as described in Execution Events. Using this flag one
# can choose to choke these events so that they are not emitted at all.
 
chokePerformanceEvents#<true | false>
 
# The testing utilities do not create an Execution Plan but
# simply contacts the Workflow Engine Service
# and the respective adaptor where the plan is created. Setting this flag, one can request
# that the created plan is retrieved and stored locally. Note that this feature may be unavailable 
# depending on the version used as the respective functionality is moved to another utility
 
storePlans#<true | false>
 
# The resources mentioned in the provided jdl and are either inputs or executables not already
# available in the host machines, must be made available to the adaptors so that they can be moved
# to the execution location. They are declared in the resource file using the name by which they 
# appear in the jdl. In order to be transfered to the execution location, they must first be made 
# available and accessible for the rest of the platform. This can be done by either providing the full
# payload of the resource as an in message blob using the "local" key, or they can already be available 
# in the gCube content management system using the "ss" key, or they can be accessible through an ftp 
# or http url using the "url" key. Depending on the way the resource is declared availalbe, the respective
# identifier must be provided. As many of these entries needed can be declared
 
<name of resource as mentioned in jdl>#<local | ss | url depending on where to access the payload from>#<the path / id / url to retrieve the payload from>
<name of resource as mentioned in jdl>#<local | ss | url depending on where to access the payload from>#<the paath / id / url to retrieve the payload from>

Examples of the above syntax can be found in the testing utility package.

Grid Adaptor resource file syntax

The syntax of the resource file expected by the Grid adaptor test utility is the one described bellow.

# The scope of the job submitted. Scope is an internal gCube construct described elsewhere. 
# For the purpose of this clients, one should just keep in mind that the scope used as a 
# value here must be one of the supported scopes defined in the gHN container installation
# available in the same machine as the one that is running the clients and to which the defined
# $GLOBUS_LOCATION environment variable points to
 
scope#<the scope to use>
 
# A period after which if the job has not completed, the job should be canceled. The timeout is defined
# in milliseconds. A negative value means no timeout
 
timeout#<The time in milliseconds to wait for the job before canceling it or negative for no timeout>
 
# After the job is submitted, the engine will periodically poll for the status and log of the job.
# This period is defined in this flag with the value expressed in milliseconds.
 
pollPeriod#<The period in milliseconds to wait before checking the job status>
 
# While the job is running, the [[ExecutionEngine]] emits events about the progress of the execution
# as described in [[ExecutionEngine#Execution_Events | Execution Events]]. Using this flag one
# can choose to choke these events so that they are not emitted at all.
 
chokeProgressEvents#<true | false>
 
# While the job is running, the [[ExecutionEngine]] emits events about the performance of the execution
# as described in [[ExecutionEngine#Execution_Events | Execution Events]]. Using this flag one
# can choose to choke these events so that they are not emitted at all.
 
chokePerformanceEvents#<true | false>
 
# The testing utilities do not create an [[ExecutionEngine#Execution_Plan | Execution Plan]] but
# simply contact the [[WorkflowEngine#gCube_Web_Service_Interface | Workflow Engine Service]]
# and the respective adaptor where the plan is created. Setting this flag, one can request
# that the created plan is retrieved and stored locally. Note that this feature may be unavailable 
# depending on the version used as the respective functionality is moved to another utility
 
storePlans#<true | false>
 
# In case an error occurs when trying to poll the status of a submitted job, this flag
# can be set to make this process resilient to such errors by retrying the same operation.
# The value specified here defines the number of retries the engine should attempt
 
retryOnErrorTimes : <The times to retry rechecking the status of a job after an error occured>
 
# In case an error occurs when trying to poll the status of a submitted job, and if the respective
# flag has been set to be resilient to such errors and retry, this flag specifies the amount of
# time to wait before polling again for the status. The value is specified in milliseconds
 
retryOnErrorPeriod : <The period in milliseconds to wait before rechecking the status of a job after an error occured>
 
# The jdl based description of the job that should be available in the local machine running
# this clients. A definition of the JDL syntax is out of scope
 
jdl#<path to the jdl file>
 
# The user proxy that should be used when submitting the provided job.
 
userProxy#<the name that this file should have once moved to the ui node>#<the path where the user proxy is stored in the local file system>
 
# In case an overriding configuration should be used when submitting the job, it can be defined 
# using this property
 
config#<the name that this file should have once moved to the ui node>#<the path where the overriding config file is stored in the local file system>
 
# The resources mentioned in the provided jdl and are either inputs or executables not already
# available in the host machines, must be made available to the adaptors so that they can be moved
# to the execution location. They are declared in the resource file using the name by which they 
# appear in the jdl. In order to be transfered to the execution location, they must first be made 
# available and accessible for the rest of the platform. This can be done by either providing the full
# payload of the resource as an in message blob using the "local" key, or they can already be available 
# in the gCube content management system using the "ss" key, or they can be accessible through an ftp 
# or http url using the "url" key. Depending on the way the resource is declared availalbe, the respective
# identifier must be provided. As many of these entries needed can be declared
 
inData#<the name of the corresponding resource as it appears in the jdl>#<the path where the data that are to be moved to the ui are stored>#<local | ss | url where to retrieve the data from>
 
# The resources mentioned in the provided jdl and are included in the Output sandbox of some job and
# the client wants to make it available for retrieval after the job has completed its execution, must
# be declared using these elements. The reference is made through the name used in the jdl file.
 
outData : <the name of the data that are to be retrieved from the grid output as defined in the jdl>

Examples of the above syntax can be found in the testing utility package.

Condor Adaptor resource file syntax

The syntax of the resource file expected by the Condor adaptor test utility is the one described bellow.

# The scope of the job submitted. Scope is an internal gCube construct described elsewhere. 
# For the purpose of this clients, one should just keep in mind that the scope used as a 
# value here must be one of the supported scopes defined in the gHN container installation
# available in the same machine as the one that is running the clients and to which the defined
# $GLOBUS_LOCATION environment variable points to
 
scope#<the scope to use>
 
# A period after which if the job has not completed, the job should be canceled. The timeout is defined
# in milliseconds. A negative value means no timeout
 
timeout#<The time in milliseconds to wait for the job before candceling it or negative for no timeout>
 
# After the job is submitted, the engine will periodically poll for the status and classad of the job.
# This period is defined in this flag with the value expressed in milliseconds.
 
pollPeriod#<The period in milliseconds to wait before checking the job status and classad>
 
# While the job is running, the [[ExecutionEngine]] emits events about the progress of the execution
# as described in [[ExecutionEngine#Execution_Events | Execution Events]]. Using this flag one
# can choose to choke these events so that they are not emitted at all.
 
chokeProgressEvents#<true | false>
 
# While the job is running, the [[ExecutionEngine]] emits events about the performance of the execution
# as described in [[ExecutionEngine#Execution_Events | Execution Events]]. Using this flag one
# can choose to choke these events so that they are not emitted at all.
 
chokePerformanceEvents#<true | false>
 
# The testing utilities do not create an [[ExecutionEngine#Execution_Plan | Execution Plan]] but
# simply contacts the [[WorkflowEngine#gCube_Web_Service_Interface | Workflow Engine Service]]
# and the respective adaptor where the plan is created. Setting this flag, one can request
# that the created plan is retrieved and stored locally. Note that this feature may be unavailable 
# depending on the version used as the respective functionality is moved to another utility
 
storePlans#<true | false>
 
# Depending on whether the submission file descriubes a single job or a DAG of jobs, this flag
# should be set in order for the correct submission executable to be used
 
isDag#<true | false>
 
# After the job is submited, one can request that after the defined polling period the engine
# should, after it has checked the status of the submited job, retrieve the submited job classad
# and emit it back to the client for inspection. Notice that if the submited job is a DAG, the
# job reported by Condor includes the dag managing process and not the user jobs
 
retrieveJobClassAd#<true | false>
 
# The jdl based description of the job that should be available in the local machine running
# this clients. A definition of the JDL syntax is out of scope
 
submit#<path to the condor submission description file>
 
# The resources mentioned in the provided submission file and are inputs or needed files not already
# available in the host machines, must be made available to the adaptors so that they can be moved
# to the execution location. They are declared in the resource file using the name by which they 
# appear in the submission file. In order to be transfered to the execution location, they must first be made 
# available and accessible for the rest of the platform. This can be done by either providing the full
# payload of the resource as an in message blob using the "local" key, or they can already be available 
# in the gCube content management system using the "ss" key, or they can be accessible through an ftp 
# or http url using the "url" key. Depending on the way the resource is declared availalbe, the respective
# identifier must be provided. As many of these entries needed can be declared
 
inData#<the name of the corresponding resource as it should appear in the execution location>#<the path where the data that are to be moved to the ui are stored>#<local | ss | url where to retrieve the data from>
 
# The resources mentioned in the provided submission file and are executable not already
# available in the host machines, must be made available to the adaptors so that they can be moved
# to the execution location. They are declared in the resource file using the name by which they 
# appear in the submission file. In order to be transfered to the execution location, they must first be made 
# available and accessible for the rest of the platform. This can be done by either providing the full
# payload of the resource as an in message blob using the "local" key, or they can already be available 
# in the gCube content management system using the "ss" key, or they can be accessible through an ftp 
# or http url using the "url" key. Depending on the way the resource is declared availalbe, the respective
# identifier must be provided. As many of these entries needed can be declared. The deferentiation between
# executables and input files is made so that the proper attributes are set to the executables to allow for
# them to be treated as execution units by Condor which does not set the execution flag to declared executables
 
executable#<the name of the corresponding executable as it should appear in the execution location>#<the path where the executable that is to be moved to the ui are stored>#<local | ss | url where to retrieve the executable from>

Examples of the above syntax can be found in the testing utility package.

Hadoop Adaptor resource file syntax

The syntax of the resource file expected by the Hadoop adaptor test utility is the one described bellow.

# The scope of the job submitted. Scope is an internal gCube construct described elsewhere. 
# For the purpose of this clients, one should just keep in mind that the scope used as a 
# value here must be one of the supported scopes defined in the gHN container installation
# available in the same machine as the one that is running the clients and to which the defined
# $GLOBUS_LOCATION environment variable points to
 
scope#<the scope to use>
 
# While the job is running, the ExecutionEngine emits events about the progress of the execution
# as described in Execution Events. Using this flag one
# can choose to choke these events so that they are not emitted at all.
 
chokeProgressEvents#<true | false>
 
# While the job is running, the ExecutionEngine emits events about the performance of the execution
# as described in Execution Events. Using this flag one
# can choose to choke these events so that they are not emitted at all.
 
chokePerformanceEvents#<true | false>
 
# The testing utilities do not create an Execution Plan but
# simply contacts the Workflow Engine Service
# and the respective adaptor where the plan is created. Setting this flag, one can request
# that the created plan is retrieved and stored locally. Note that this feature may be unavailable 
# depending on the version used as the respective functionality is moved to another utility
 
storePlans#<true | false>
 
# The jar that contains the job to be run in Hadoop. This jar must be made available to the adaptor
# so that it can be moved to the execution location. In order to be transfered to the execution location,
# it must first be made available and accessible for the rest of the platform. This can be done by either 
# providing the full payload of the resource as an in message blob using the "local" key, or it can already 
# be available in the gCube content management system using the "ss" key, or they can be accessible through an ftp 
# or http url using the "url" key. Additionaly, the content might be stored in hdfs. Depending on the way 
# the resource is declared available, the respective identifier must be provided.
 
Jar#<the name that this file should have once moved to the ui node>#<local | ss | url | hdfs>#<path to retrieve the file from>
 
# The main class qualified name contained in the defined Jar that is the entry point to the job
 
MainClass#<the name of the class containing the main method to run in the jar file>
 
# The arguments that must be provfided to hte main class. There can be as many arguments needed defining
# multiple instances of the entry, and the ordering of the arguments is explicit.
 
Argument#<the order of the argument in the call>#<the argument to pass>
 
# A configuration file that should be available for the job. This file must be made available to the adaptor
# so that it can be moved to the execution location. In order to be transfered to the execution location,
# it must first be made available and accessible for the rest of the platform. This can be done by either 
# providing the full payload of the resource as an in message blob using the "local" key, or it can already 
# be available in the gCube content management system using the "ss" key, or they can be accessible through an ftp 
# or http url using the "url" key. Additionally, the content might be stored in hdfs. Depending on the way the 
# resource is declared available, the respective identifier must be provided.
 
Configuration#<the name that this file should have once moved to the ui node>#<local | ss | url | hdfs>#<path to retrieve the file from>
 
# A Property that should be available in the JVM that will run the job. Any number of properties
# can be defined
 
Property#<the property value in the form of key=value>
 
# A file that should be available for the job. This file must be made available to the adaptor
# so that it can be moved to the execution location. In order to be transfered to the execution location,
# it must first be made available and accessible for the rest of the platform. This can be done by either 
# providing the full payload of the resource as an in message blob using the "local" key, or it can already 
# be available in the gCube content management system using the "ss" key, or they can be accessible through an ftp 
# or http url using the "url" key. Additionally, the content might be stored in hdfs. Depending on the way 
# the resource is declared available, the respective identifier must be provided. Any number of files can be defined.
 
File#<the name that this file should have once moved to the ui node>#<local | ss | url | hdfs>#<path to retrieve the file from>
 
# A library that should be included in the job's classpath. This library must be made available to the adaptor
# so that it can be moved to the execution location. In order to be transfered to the execution location,
# it must first be made available and accessible for the rest of the platform. This can be done by either 
# providing the full payload of the resource as an in message blob using the "local" key, or it can already 
# be available in the gCube content management system using the "ss" key, or they can be accessible through an ftp 
# or http url using the "url" key. Additionally, the content might be stored in hdfs. Depending on the way the 
# resource is declared available, the respective identifier must be provided. Any number of libraries can be defined.
 
Lib#<the name that this file should have once moved to the ui node>#<local | ss | url | hdfs>#<path to retrieve the file from>
 
# A Hadoop archive that should be included in the job's classpath. This archive must be made available to the adaptor
# so that it can be moved to the execution location. In order to be transfered to the execution location,
# it must first be made available and accessible for the rest of the platform. This can be done by either 
# providing the full payload of the resource as an in message blob using the "local" key, or it can already 
# be available in the gCube content management system using the "ss" key, or they can be accessible through an ftp 
# or http url using the "url" key. Additionally, the content might be stored in hdfs. Depending on the way the 
# resource is declared available, the respective identifier must be provided. Any number of archives can be defined.
 
Archive#<the name that this file should have once moved to the ui node>#<local | ss | url | hdfs>#<path to retrieve the file from>
 
# A file that should be available in the HDFS. This file must be made available to the adaptor
# so that it can be moved to the execution location. In order to be transfered to the execution location,
# it must first be made available and accessible for the rest of the platform. This can be done by either 
# providing the full payload of the resource as an in message blob using the "local" key, or it can already 
# be available in the gCube content management system using the "ss" key, or they can be accessible through an ftp 
# or http url using the "url" key. Depending on the way the resource is declared available, the respective
# identifier must be provided. Any number of files can be defined. The location specified can contain any number 
# of nested directories which will be created as needed. After the execution is completed the file can be marked
# for deletion of marked to persist and be available in the HDFS for later usage (tmp | persist)
 
Input#<the name that this file should have once moved to the HDFS system>#<tmp | persist>#<local | ss | url>#<path to retrieve the file from>
 
# After the job is completed, these elements can be used to define subtrees or single files available in HDFS that 
# should be retrieved. The retrieved content is archived and compressed as a tar.gz archive. After the content is retrieved, 
# it can be marked to be deleted or persisted to be used by later jobs
 
Output#<the name that the output directory in hdfs has>#<tmp | persist>

Examples of the above syntax can be found in the testing utility package.

Monitoring

Another utility available in the package enables monitoring of the submitted job. Using this utility one can retrieve the current status of the job, events that have been emitted since the last time the job was polled for its status, the output of the job in case it has completed successfully, the error that occured if the job did not complete successfully as well as the plan that is executed to accomplish the submitted job.

The utility expects three arguments to be specified. The first is the path to the output file of the Start***Test.sh The second argument defines whether the plan that is executed should be retrieved or not. In case the plan has finished its execution, the plan is always retrieved regardless of the choice made. The third argument defines the behavior of the monitoring script after its first communication with the service. It can be set to either stop or to persist and keep polling the status of the job. In the latter case, the polling is performed with a predefined fixed period of 60 seconds.

Result Retrieval

Once the monitoring utility indicates that the operation has completed, a printout is presented containing the available outputs from the job. This printout contains dome human friendly identifiers to indicate which output resource is referenced, such as for example the jdl name of the output sandbox in the case of the JDL adaptor. Along with that name, an identifier to the Content Management System is provided. This identifier can be supplied to the RetrieveFile.sh utility as the first argument. The utility also needs as a second argument the scope under which the job was submitted and which must be the same as the one defined in the respective resource attribute. The retrieved content is stored in a local temporary file.

CLI Deployment

The Workflow Engine CLI is packaged along with the WorkflowEngineService as a Software Archive following the adopted packaging methodology of the gCube system. This way, it can be deployed with the automatic deploying module of gCube. However, for a standalone client setup, this operation has to be made manually. In order for a client to create a standalone installation of the CLI, the following actions need to be taken:

(One must make sure that the downloaded relases that will make up the client installation, are of the same release with the target infrastructure and its release)



  • From the WorkflowService-servicearchive previously downloaded, org.gcube.execution.workflowengine.service.stubs.jar library located in the folder WorkflowEngineService-stubs must be copied to the $GLOBUS_LOCATION/lib location



Workflow Language