# Application Development¶

New applications can be integrated into the federation by developing two simple components, namely task generator and worker.

• Task generator. This module can be created using a simple API, which is loaded into the Task Manager. Its mission is to define the properties of all the tasks that need to be generated by an application in a programmable way. The idea is to provide users with the ability to define dynamic workflows, where the tasks are created at runtime depending on previously obtained results. Results of all stages of a workflow are accessible through the API. This provides tremendous flexibility as the workflow can evolve in different ways depending on the observed data. This proves to be very useful to, for example, investigate large search-spaces in a coordinated manner.

• Worker. The worker’s sole responsibility is to execute tasks. In many cases a user might be interested in executing third-party, perhaps closed-source, software. In such cases, the resulting worker becomes a mere container that acts as a facade for the target software. This significantly simplifies migration from traditional environments to our federation.

Note

Since Java 1.6 is still the environment most commonly found in our systems, our code current code is compiled with Java 1.6. You may experience issues if you develop a new application using a newer version of Java.

## Use Case¶

Let us assume that we want to develop an application with two stages, the first one is a MAP stage that creates N tasks per file found in the input data sources, and a second stage that is a REDUCE that collects all results and produce a final result.

We can include properties that can be used by the task generator. We can include as many properties as we would like. The structure is similar to the one used in the configuration files, i.e. one variable per line with the format variableName=value. In this use case we will have the following properties:

minTime=5
maxTime=10
PR=2


Note

These properties are not available at the worker side. If the worker needs any variable you can include it in this property files, and use the task generator methods to include them in the task tuples. For this you need to use the variable taskParams.

The task generator class has to extend the GenerateTasksAbstract class and implement the createTasks method. This method is called by the Task Generator to create the tasks of a stage.

public GenerateTasksObject createTasks(java.lang.String stageId,
java.util.List<FileProperties> input,
FileProperties output,
java.lang.String propertyFileValues,
java.util.List dependencies,
java.lang.String method)

• Parameters:
• stageId: Id of the stage for which we are generating tasks
• input: list of input data sources. The system generates one object of the type FileProperties for each InputData specified in the workflow description.
• output: This is the results directory. The system generates a FileProperties object with the information specified in the Results field of the workflow description.
• propertyFileValues: It contains the values of all properties specified in the PropertyFile field of the workflow description.
• dependencies: List of stages that need to be completed before this stage. This can be used to get results values from those previous stages.
• method: Name of the actual method to use for task generation. Specified in the value method of the AppGenerateClass field.
• Returns:
• Example of a modular approach where we use the createTasks method to call other method to generate tasks depending on the value of the method field.

@Override
String propertyFileValues, List dependencies, String method){
//load properties into the environment so we can obtain them using the getProperty(varName) method.
if(method.equals("map")){
}else if(method.equals("reduce")){
//obtain results of previous stages. We can use them as input of the following stage.
HashMap <String,FileProperties>previousFiles=this.generatePreviousResultFiles(stageId, dependencies);
}
}


Next we detail information about the methods map and reduce. Note that the signature of these methods is completely up to the developer, the only requirement is to return a GenerateTasksObject with tasks information at the end.

• Map method generates PR tasks for each file in the data sources. We assume that the property file specified in the XML has

public GenerateTasksObject map(List<FileProperties> input, FileProperties output, String propertyFile){
//input and output are address://dir. We get all using SSH

//we create lists to keep properties of each task
List <Double> minTime= new ArrayList();
List <Double> maxTime=new ArrayList();
List <List<FileProperties>>inputList=new ArrayList();

//getProperty get values from the property files specified in the workflow
//the property file has two variables to indicate the execution time interval of tasks.
double minTimeVal=Double.parseDouble(getProperty("minTime"));
double maxTimeVal=Double.parseDouble(getProperty("maxTime"));
//obtain the PR value
int PR=Integer.parseInt(getProperty("PR"));
//iterate on InputData sources
for(FileProperties inputFP: input){
//obtain information of each InputData source
String inputS=inputFP.getLocation();
String []parts=inputS.split(":");
//obtain list of files and size
String returnSsh=executeSsh(parts[0]," ls -l "+parts[1]+" | awk '{print $5,$9}'");
String [] files=returnSsh.split("\n"); //returns size and name
//for each file we generate PR tasks
for(int i=0;i<files.length;i++){
if(!files[i].trim().isEmpty()){
for (int j=0; j< PR;j++){
//Here each task only has one input, but the API requries a list
List <FileProperties>inputs=new ArrayList();
String [] fileParts=files[i].split(" ");//size and name
//create input file of task and add it to the list of input files of this task
inputFP.getZone(), inputFP.getSitename(), inputFP.getConstraints()));
//generate random duration for this task, which is used by the worker to emulate the execution of the task
double taskDuration=minTimeVal + (Math.random() * (maxTimeVal - minTimeVal));
//Define task's parameters. This is used to pass information to the worker.
//Thus, these parameters will be received for the worker in the same order.
"map",                 //0
output,                //1
inputs,                //2
Integer.toString(j+1)  //4
));
//we can add one custom value as requirements to the task tuple.
//we add the execution interval of task (this is for initial scheduling when no information of tasks exist)
}
}
}

}
//return object with the information of all tasks
}

• Reduce method generates one task that aggregates all previous results into a single final output.

public GenerateTasksObject reduce(List<FileProperties> input, FileProperties output, String propertyFile,
HashMap <String,FileProperties>previousResults){
//we create lists to keep properties of each task
List <Double> minTime= new ArrayList();
List <Double> maxTime=new ArrayList();
List <List<FileProperties>>inputList=new ArrayList();

//create one task that aggregates all previous results
List <FileProperties>inputs=new ArrayList();
for (String key:previousResults.keySet()){
}
//getProperty get values from the property files specified in the workflow
//the property file has two variables to indicate the execution time interval of tasks.
double minTimeVal=Double.parseDouble(getProperty("minTime"));
double maxTimeVal=Double.parseDouble(getProperty("maxTime"));
//generate random duration for this task, which is used by the worker to emulate the execution of the task
double taskDuration=minTimeVal + (Math.random() * (maxTimeVal - minTimeVal));
//Define task's parameters. This is used to pass information to the worker.
//Thus, these parameters will be received for the worker in the same order.
"reduce",              //0
output,                //1
inputs,                //2
));
//we can add one custom value as requirements to the task tuple.
//we add the execution interval of task (this is for initial scheduling when no information of tasks exist)

}


## Worker¶

The worker class has to extend the WorkflowMeteorGenericWorker class and implement the computeTaskSpecific and the cancelJob methods.

public java.lang.Object computeTaskSpecific(java.lang.Object dataobj,

• Parameters:
• dataobj - It is the data associated to the task. The one send from the master. In the previous example, a List is attached to each task (see taskParams.add(taskid, Arrays.asList(..) )
• Returns:
• Returns an array of Objects. It has to contain: (i) A String with exit status (“OK” or “Fail”); (ii) A list of FileProperties objects with each result file generated; and (iii) An optional boolean parameter that tells whether we want to iterate or not. This last parameter is only used when loops are defined.
• Example:

@Override

List data = (List) dataobj;

String method=(String)data.get(0);
FileProperties outputFP=(FileProperties)data.get(1);
List <FileProperties> inputs=(List<FileProperties>)data.get(2);
double time=(Double)data.get(3);
//working directory specified in the Agent configuration
String workingdir=System.getProperty("WorkingDir");
//GET INPUT - it will have site:{file,file,},..
HashMap <String,List>inputsHash=new HashMap();
for (FileProperties fp:inputs){
List temp=inputsHash.get(fp.getLocation());
if (temp==null){
temp=new ArrayList();
inputsHash.put(fp.getLocation(), temp);
}
}
//retrieve all input files, we make a call per data source
for(String site:inputsHash.keySet()){
//method that retrieve input files and place them on working dir
String status=this.getFile(true, site, inputsHash.get(site), workingdir);;
}
//EXECUTE
if(method.equals("map")){
//do something for the map
}else if (method.equals("reduce")){
//do something for the reduce
}
//we can sleep to emulate execution
try {
} catch (InterruptedException ex) {
Logger.getLogger(AppWorker.class.getName()).log(Level.SEVERE, null, ex);
}
//we can generate a random file as output
File f=new File(workingdir+"/"+outfile);
if(!f.isFile()){
try {
RandomAccessFile r=new RandomAccessFile(workingdir+"/"+outfile, "rw");
r.setLength(50000);
r.close();
} catch (FileNotFoundException ex) {
Logger.getLogger(AppWorker.class.getName()).log(Level.SEVERE, null, ex);
}catch (IOException ex) {
Logger.getLogger(AppWorker.class.getName()).log(Level.SEVERE, null, ex);
}
}

//RESULTS. We add the file generated by our application in the working directory.
List outfiles=new ArrayList();

//return OK, and list of results
return new Object[]{"OK",resultFiles};
}

//this function is not yet supported, we would leave it as follows.
@Override
public void cancelJob() {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}


Note