Defining Workflows

In our system we model data-driven workflow as a graph, \(G=(N,E,o,\lambda)\), where \(N\) is the set of stages composing the workflow, and \(E\) is the set of edges representing the data dependencies between corresponding stages. Each stage \(n_{k} \in N\) is composed by a set of independent tasks \(T_{k}= \lbrace t_{k1},...,t_{kp} \rbrace\). Additionally, objectives and constraints for stage \(n_{k}\) are represented by \(o(n_{k})\) and \(\lambda(n_{k})\) respectively. Objectives and constraints can be used by users to specify resource requirements (e.g., compute, memory, location) and desired qualities of service (QoS). Note that we use use a general graph to model our workflow rather than a direct acyclic graph (DAG) as workflows may contain loops and complex structures.

Assuming that all applications required by our workflow are already deployed in the infrastructure, we just simple need to create a workflow description. For example, next we define a simple two stages map/reduce workflow. A workflow description is divided in four sections, namely stages definition, scheduling policies, dependencies, and loops.

Stages Definition

In the stage definition we define different properties of each stage.

  • AppGenerateClass identifies the class and method that generate tasks for each stage. Different stages can specify different classes and a class can have several methods to generate tasks. More information can be found in the Application Development Section.

  • PropertyFile specify the path to a file where input properties can be defined. More information about property files in PropertyFile section.

  • Application identifies the application that will be used to compute these tasks. Application names are specified in the Agent configuration, see the Agent Configuration Section.

  • InputData contains a list of input data sources. Each input data entry contains:
    • A scp/rsync-like path (i.e. value=user@machine:/path/to/inputdir/)
    • A zone that defines a region where the site is located. This can be used to enforce data movement constraints.
    • A site name where the files are located (site). If this site is an Agent of our federation, it has to match the agent name defined in its configuration (see Agent Configuration Section)
    • A list of constraints that allow us to restrict the movement of data within specific sites or regions. We can define a comma separated list of zones and specific site names. Alternatively it can be empty if not constraints are defined.
  • Results defines the destination of the results of a stage. The format is the same as the input data sources. A results entry might be filled with empty strings (“”) in the case of intermediate results. This allows the scheduler to leave intermediate results in the staging area of the site where those results were generated, where they can be reused by upcoming stages of the workflow. As before, we can restrict the data movement of the results.

Scheduling Policies

In the scheduling policies section we can define objectives for each stages (e.g., deadline or budget). Currently we support the following policies:

Policy Name Description
MinRunningTime Minimum completion time policy finds for each task the machine that provides the minimum completion time, which is calculated by the sum of the estimated time of completion of a task t in a machine m, the estimated time needed to transfer the input data, and the estimated waiting time in the machine m.
MinRunningTimeLimit In this policy, the scheduling is done in the same way as in MinRunningTime, but this one limits the maximum number of machines you can allocate. This limit is controlled by the user.
DeadlineLocalityAwareProc This policy finds the minimal set of resources needed to complete all tasks within a given deadline while satisfying an objective function. In this case the objective function is the performance of the machine where the task is going to be executed (Proc). Thus tasks are allocated in the most powerful resource until they cannot be completed within the deadline, then they will be allocated to the next most powerful available machine, and so on.
DeadlineLocalityAware This policy finds the minimal set of resources needed to complete all tasks within a given deadline while satisfying an objective function. In this case the objective function is the performance of the machine where the task is going to be executed and estimated time needed to transfer the input data (ProcData). Thus, tasks are allocated to the resource that has the best trade-off between the highest performance and the lowest estimated transfer time.
DeadlineLocalityAwareCost This policy finds the minimal set of resources needed to complete all tasks within a given deadline while satisfying an objective function. In this case the objective function is the cost of the machine where the task is going to be executed (Cost). Thus, tasks are allocated to the cheapest available resource that can complete the task within the given deadline.
DeadlineLocalityAwareData This policy finds the minimal set of resources needed to complete all tasks within a given deadline while satisfying an objective function. In this case the objective function is the time required to transfer the input data (Data). Thus, tasks are allocated to the resource where the estimated transfer time is minimum.
BudgetConstraint This policy finds a solution that minimizes the total execution time (i.e., critical path) while keeping the cost of the solution within the given budget.

Dependencies

Next, the dependencies section is used to establish data dependencies across stages. These dependencies can be blocking, which means that all tasks in S1 have to finish before S2 can be scheduled, or it can be non-blocking, which means that tasks from stage S2 can be scheduled as soon as their input data dependencies are available (i.e. a S2 task only depends on some results from S1).

Loops

This section is optional and allows us to define iterations between stages. We assume that the last stage of the loop, typically an aggregation or reduce stage, decides whether new iterations are needed or not. This is a typical assumption in iterative procedures, such as machine learning algorithms.

Note

At this moment we do not support nested loops.

Workflow Example

<xflow name="SimpleWorkflow">

<!--Stages definition -->
<stages>

<!-- Stage 1 -->
  <stage id="S1" type="AppGenerateClass" value="tassl.application.cometcloud.sample.GenerateTasks" method="map"/>
  <stage id="S1" type="PropertyFile" value="./sample.properties"/>
  <stage id="S1" type="Application" value="cbir"/>
  <stage id="S1" type="InputData">
    <InputData value="jdiaz@sierra.futuregrid.org:/home/inputs/" zone="zoneA" site="siteSierra" constraint="zoneA,siteIndia,siteAlamo"/>
    <InputData value="jdiaz@hotel.futuregrid.org:/home/inputs/" zone="zoneA" site="siteHotel" constraint="zoneA,siteIndia"/>
    <InputData value="jdiaz@alamo.futuregrid.org:/home/inputs/" zone="zoneB" site="siteAlamo" constraint=""/>
  </stage>
  <stage id="S1" type="Results" value="" zone="" site="" constraint="zoneA,siteIndia,siteSierra,zoneB"/>

<!-- Stage 2 -->
  <stage id="S2" type="AppGenerateClass" value="tassl.application.cometcloud.sample.GenerateTasks" method="reduce"/>
  <stage id="S2" type="PropertyFile" value="./sample.properties"/>
  <stage id="S2" type="Application" value="cbir"/>
  <stage id="S2" type="InputData">
        <InputData value="jdiaz@alamo.futuregrid.org:/home/inputs/" zone="zoneB" site="siteAlamo"  constraint=""/>
  </stage>
  <stage id="S2" type="Results" value="jdiaz@sierra.futuregrid.org:/home/output/" zone="zoneB" site="siteSierra" constraint="siteSierra"/>

</stages>

<!-- Scheduling Policies -->
<objectives>
  <objective id="S1" type="DeadlineLocalityAware" value="1377" />
  <objective id="S2" type="DeadlineLocalityAware" value="4776" />
</objectives>

<!-- Dependencies -->
<transitions>
  <transition from="S1" to="S2" blocking="true"/>
</transitions>

<!-- Loops -->
<loops>
   <loop from="S1" to="S2"/>
</loops>

</xflow>

PropertyFile

The property file allows us to send variables to the method that generate our tasks. The structure is similar to the one used in the configuration files, i.e. one variable per line with the format variableName=value. Any variable defined here can be accessed in the AppGenerateClass class using the method getProperty("variableName") defined in the API.

Special variables are:

  • RelatedWorkflowIds, which allows us to load results from previously completed workflows. For example, if we have a ensamble of independent workflows and we would like to aggregate their results. This variable consist of a comma-separated list of workflow Ids, e.g., RelatedWorkflowIds=workflowId1,workflowId2.
  • ReferenceBenchmark, which allows the user to indicate a reference benchmark for his/her application. For example, the user can define two variables that represent the maximum and minimum estimated completion time for each task in a stage. These values are assumed to be related to a machine known by the used. Then the user can run in said machine the same benchmark that the rest of the resources (e.g. Unixbench). In this way the system can estimate what would the performance of this application be in other resources. Note that this is only used if the Metrics service is disabled or if the Metrics service does not have information of this application.