Application Development

New applications can be integrated into the federation by developing two simple components, namely task generator and worker.

  • Task generator. This module can be created using a simple API, which is loaded into the Task Manager. Its mission is to define the properties of all the tasks that need to be generated by an application in a programmable way. The idea is to provide users with the ability to define dynamic workflows, where the tasks are created at runtime depending on previously obtained results. Results of all stages of a workflow are accessible through the API. This provides tremendous flexibility as the workflow can evolve in different ways depending on the observed data. This proves to be very useful to, for example, investigate large search-spaces in a coordinated manner.

  • Worker. The worker’s sole responsibility is to execute tasks. In many cases a user might be interested in executing third-party, perhaps closed-source, software. In such cases, the resulting worker becomes a mere container that acts as a facade for the target software. This significantly simplifies migration from traditional environments to our federation.

    Note

    Since Java 1.6 is still the environment most commonly found in our systems, our code current code is compiled with Java 1.6. You may experience issues if you develop a new application using a newer version of Java.

Use Case

Let us assume that we want to develop an application with two stages, the first one is a MAP stage that creates N tasks per file found in the input data sources, and a second stage that is a REDUCE that collects all results and produce a final result.

PropertyFile for Task Generator

We can include properties that can be used by the task generator. We can include as many properties as we would like. The structure is similar to the one used in the configuration files, i.e. one variable per line with the format variableName=value. In this use case we will have the following properties:

minTime=5
maxTime=10
PR=2

Note

These properties are not available at the worker side. If the worker needs any variable you can include it in this property files, and use the task generator methods to include them in the task tuples. For this you need to use the variable taskParams.

Task generator

The task generator class has to extend the GenerateTasksAbstract class and implement the createTasks method. This method is called by the Task Generator to create the tasks of a stage.

public GenerateTasksObject createTasks(java.lang.String stageId,
                                   java.util.List<FileProperties> input,
                                   FileProperties output,
                                   java.lang.String propertyFileValues,
                                   java.util.List dependencies,
                                   java.lang.String method)
  • Parameters:
    • stageId: Id of the stage for which we are generating tasks
    • input: list of input data sources. The system generates one object of the type FileProperties for each InputData specified in the workflow description.
    • output: This is the results directory. The system generates a FileProperties object with the information specified in the Results field of the workflow description.
    • propertyFileValues: It contains the values of all properties specified in the PropertyFile field of the workflow description.
    • dependencies: List of stages that need to be completed before this stage. This can be used to get results values from those previous stages.
    • method: Name of the actual method to use for task generation. Specified in the value method of the AppGenerateClass field.
  • Returns:
    • Returns an object of type GenerateTasksObject with all the information about generated tasks.
  • Example of a modular approach where we use the createTasks method to call other method to generate tasks depending on the value of the method field.

    @Override
    public GenerateTasksObject createTasks(String stageId, List<FileProperties> input, FileProperties output,
                        String propertyFileValues, List dependencies, String method){
        GenerateTasksObject taskObj=null;
        //load properties into the environment so we can obtain them using the getProperty(varName) method.
        this.loadProperties(propertyFileValues);
        if(method.equals("map")){
            //call method that generate tasks
            taskObj=map(input,output,propertyFileValues);
        }else if(method.equals("reduce")){
            //obtain results of previous stages. We can use them as input of the following stage.
            HashMap <String,FileProperties>previousFiles=this.generatePreviousResultFiles(stageId, dependencies);
            //call method that generate tasks
            taskObj=reduce(input,output,propertyFileValues,previousFiles);
        }
        return taskObj;
    }
    

Next we detail information about the methods map and reduce. Note that the signature of these methods is completely up to the developer, the only requirement is to return a GenerateTasksObject with tasks information at the end.

  • Map method generates PR tasks for each file in the data sources. We assume that the property file specified in the XML has

    public GenerateTasksObject map(List<FileProperties> input, FileProperties output, String propertyFile){
        //input and output are address://dir. We get all using SSH
        int taskid=0;//NOT THE REAL TASKID(this is not used outside here)
    
        //we create lists to keep properties of each task
        List <Double> minTime= new ArrayList();
        List <Double> maxTime=new ArrayList();
        List <List>taskParams=new ArrayList();
        List <String>taskRequirement=new ArrayList<String>();
        List <List<FileProperties>>inputList=new ArrayList();
    
        //getProperty get values from the property files specified in the workflow
        //the property file has two variables to indicate the execution time interval of tasks.
        double minTimeVal=Double.parseDouble(getProperty("minTime"));
        double maxTimeVal=Double.parseDouble(getProperty("maxTime"));
        //obtain the PR value
        int PR=Integer.parseInt(getProperty("PR"));
        //iterate on InputData sources
        for(FileProperties inputFP: input){
            //obtain information of each InputData source
            String inputS=inputFP.getLocation();
            String []parts=inputS.split(":");
            //obtain list of files and size
            String returnSsh=executeSsh(parts[0]," ls -l "+parts[1]+" | awk '{print $5, $9}'");
            String [] files=returnSsh.split("\n"); //returns size and name
            //for each file we generate PR tasks
            for(int i=0;i<files.length;i++){
                if(!files[i].trim().isEmpty()){
                    for (int j=0; j< PR;j++){
                            //Here each task only has one input, but the API requries a list
                            List <FileProperties>inputs=new ArrayList();
                            String [] fileParts=files[i].split(" ");//size and name
                            //create input file of task and add it to the list of input files of this task
                            inputs.add(new FileProperties(fileParts[1],inputS,Double.parseDouble(fileParts[0]),
                                                inputFP.getZone(), inputFP.getSitename(), inputFP.getConstraints()));
                            //generate random duration for this task, which is used by the worker to emulate the execution of the task
                            double taskDuration=minTimeVal + (Math.random() * (maxTimeVal - minTimeVal));
                            //Define task's parameters. This is used to pass information to the worker.
                            //Thus, these parameters will be received for the worker in the same order.
                            taskParams.add(taskid, Arrays.asList(
                                                         "map",                 //0
                                                         output,                //1
                                                         inputs,                //2
                                                         taskDuration,          //3
                                                         Integer.toString(j+1)  //4
                                            ));
                            //we can add one custom value as requirements to the task tuple.
                            taskRequirement.add("large"); //Requirement
                            //we add the execution interval of task (this is for initial scheduling when no information of tasks exist)
                            minTime.add(minTimeVal);
                            maxTime.add(maxTimeVal);
                            //add input files of task
                            inputList.add(inputs);
                            taskid++;
                    }
                }
            }
    
        }
        //return object with the information of all tasks
        return new GenerateTasksObject(taskParams,taskRequirement, minTime, maxTime,inputList,null);
    }
    
  • Reduce method generates one task that aggregates all previous results into a single final output.

    public GenerateTasksObject reduce(List<FileProperties> input, FileProperties output, String propertyFile,
                                HashMap <String,FileProperties>previousResults){
        int taskid=0;//NOT THE REAL TASKID(this is not used outside here)
        //we create lists to keep properties of each task
        List <Double> minTime= new ArrayList();
        List <Double> maxTime=new ArrayList();
        List <List>taskParams=new ArrayList();
        List <String>taskRequirement=new ArrayList<String>();
        List <List<FileProperties>>inputList=new ArrayList();
    
        //create one task that aggregates all previous results
        List <FileProperties>inputs=new ArrayList();
        for (String key:previousResults.keySet()){
            inputs.add(previousResults.get(key));
        }
        inputList.add(inputs);
        //getProperty get values from the property files specified in the workflow
        //the property file has two variables to indicate the execution time interval of tasks.
        double minTimeVal=Double.parseDouble(getProperty("minTime"));
        double maxTimeVal=Double.parseDouble(getProperty("maxTime"));
        //generate random duration for this task, which is used by the worker to emulate the execution of the task
        double taskDuration=minTimeVal + (Math.random() * (maxTimeVal - minTimeVal));
        //Define task's parameters. This is used to pass information to the worker.
        //Thus, these parameters will be received for the worker in the same order.
        taskParams.add(taskid, Arrays.asList(
                                     "reduce",              //0
                                     output,                //1
                                     inputs,                //2
                                     taskDuration           //3
                        ));
        //we can add one custom value as requirements to the task tuple.
        taskRequirement.add("large"); //Requirement
        //we add the execution interval of task (this is for initial scheduling when no information of tasks exist)
        minTime.add(minTimeVal);
        maxTime.add(maxTimeVal);
        //add input files of task
        inputList.add(inputs);
        taskid++;
    
        return new GenerateTasksObject(taskParams,taskRequirement, minTime, maxTime,inputList,null);
    }
    

Worker

The worker class has to extend the WorkflowMeteorGenericWorker class and implement the computeTaskSpecific and the cancelJob methods.

public java.lang.Object computeTaskSpecific(java.lang.Object dataobj,
                               WorkflowTaskTuple tasktuple)
  • Parameters:
    • dataobj - It is the data associated to the task. The one send from the master. In the previous example, a List is attached to each task (see taskParams.add(taskid, Arrays.asList(..) )
    • tasktuple - TaskTuple object with the properties of the task.
  • Returns:
    • Returns an array of Objects. It has to contain: (i) A String with exit status (“OK” or “Fail”); (ii) A list of FileProperties objects with each result file generated; and (iii) An optional boolean parameter that tells whether we want to iterate or not. This last parameter is only used when loops are defined.
  • Example:

@Override
public Object computeTaskSpecific(Object dataobj, WorkflowTaskTuple tasktuple) {
    Logger.getLogger(AppWorker.class.getName()).log(Level.INFO, "AppWorker "+this.getPeerIP()+" gets taskid " + tasktuple.getTaskid());

    List data = (List) dataobj;

    String method=(String)data.get(0);
    FileProperties outputFP=(FileProperties)data.get(1);
    List <FileProperties> inputs=(List<FileProperties>)data.get(2);
    double time=(Double)data.get(3);
    //working directory specified in the Agent configuration
    String workingdir=System.getProperty("WorkingDir");
    //GET INPUT - it will have site:{file,file,},..
    HashMap <String,List>inputsHash=new HashMap();
    for (FileProperties fp:inputs){
        List temp=inputsHash.get(fp.getLocation());
        if (temp==null){
            temp=new ArrayList();
            inputsHash.put(fp.getLocation(), temp);
        }
        temp.add(fp.getName());
    }
    //retrieve all input files, we make a call per data source
    for(String site:inputsHash.keySet()){
        //method that retrieve input files and place them on working dir
        String status=this.getFile(true, site, inputsHash.get(site), workingdir);;
    }
    //EXECUTE
    if(method.equals("map")){
       //do something for the map
    }else if (method.equals("reduce")){
       //do something for the reduce
    }
    //we can sleep to emulate execution
    try {
        Thread.sleep(((long)time)*1000);
    } catch (InterruptedException ex) {
        Logger.getLogger(AppWorker.class.getName()).log(Level.SEVERE, null, ex);
    }
    //we can generate a random file as output
    String outfile=tasktuple.getTaskid()+"_"+System.getProperty("Name")+".txt";
    File f=new File(workingdir+"/"+outfile);
    if(!f.isFile()){
        try {
            RandomAccessFile r=new RandomAccessFile(workingdir+"/"+outfile, "rw");
            r.setLength(50000);
            r.close();
        } catch (FileNotFoundException ex) {
            Logger.getLogger(AppWorker.class.getName()).log(Level.SEVERE, null, ex);
        }catch (IOException ex) {
            Logger.getLogger(AppWorker.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    //RESULTS. We add the file generated by our application in the working directory.
    List outfiles=new ArrayList();
    outfiles.add(outfile);

    //upload file and add it to results
    List<FileProperties> resultFiles=this.uploadResults(outfiles, workingdir, outputFP);
    //return OK, and list of results
    return new Object[]{"OK",resultFiles};
}

//this function is not yet supported, we would leave it as follows.
@Override
public void cancelJob() {
    throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}

Note

More information about the API can be found in the javadoc