In this chapter we will present the different ProActive APIs: Scheduler API, Dataspace API, Synchronization API and REST API.

1 Scheduler API

Task Scheduler API can be used inside ProActive Task to interact directly with the scheduler server. This functionality is called Re-entrance. Task Scheduler API:

  • is provided as a script binding named schedulerapi (a Java object).
  • can be used only inside Java-compatible languages such as groovy, jython, jruby, scala. Can also be used in cpython.
A method connect(), without parameters, allows to connect to the scheduler using the current user account. The API is defined here:
The API can be used for example to submit jobs stored in the ProActive Catalog, build workflows manually using the workflow API, killing jobs, etc...

Let’s illustrate the scheduler api with an example: Create a new job with a single groovy task. Inside the task, enter the following code: schedulerapi.connect() // Submit a workflow from the catalog. Here you give the bucket and the workflow name schedulerapi.submitFromCatalog(variables.get("PA_CATALOG_REST_URL"), "basic-examples", "Native_Task_Linux") schedulerapi.disconnect() Observe in the scheduler portal how the task submits another workflow, and terminates immediately (does not wait for the second workflow to terminate).

Let’s modify the previous workflow to wait for subworkflow termination. Edit the previous code: schedulerapi.connect() jobid = schedulerapi.submitFromCatalog(variables.get("PA_CATALOG_REST_URL"), "basic-examples", "Native_Task_Linux") // Wait for the job to terminate, 1 minute timeout schedulerapi.waitForJob(jobid, 60000) schedulerapi.disconnect() Observe in the scheduler portal how the two jobs will be synchronized.
Task Scheduler API can be used in certain scripts only, for more info:

2 Dataspace API

Let’s imagine the following use case, the workflow Image Processor:

  • Image Processor list all JPEG files located in the user space, this is done by a List Task.
  • For each image file, it will execute a Processing Task operation on the file. Processing will occur on multiple machines.
As you can see, in order to write such a workflow, one needs to dynamically compute a list of files that will be processed, and to transfer each file to a dedicated task. Because of this dynamicity, static File Transfer directives ar not well adapted.

To resolve this kind of use-case, the Task DataSpace API comes very handy. Here are the main features:

To illustrate this, drag'n drop the Replicate workflow from the bucket Controls.
Rename the Split task to List_Task.
Replace its implementation code with this groovy script: result = new java.util.ArrayList() // Connect to the user space userspaceapi.connect() // List all jpg files jpgFiles = userspaceapi.listFiles(".", "*.jpg") for (int i = 0; i < jpgFiles.size(); i++) { file = jpgFiles.get(i) // Add the file names to the result list result.add(file) } Replace the Process task implementation code with this groovy script (here we return the image content in result): // The image will be available through the 'Preview' tab in the scheduler. // You can download it or save it import // Connect to the user space userspaceapi.connect() // Get the file to be processed from the parent result fileName = results[0].value().get(variables.get("PA_TASK_REPLICATION")) // Download the file file = userspaceapi.pullFile(fileName, new File(fileName)) // Get the content and display it println "Processing Image : " + file result = file.getBytes() resultMetadata.put("file.extension", ".jpg") resultMetadata.put("content.type", MediaType.JPEG.toString()) Now, download 3 or 4 images from Google and click on the userspace file manager button.

And upload your files.

Run the workflow and observe how it dynamically creates tasks to process images.
Here, each task stores an image and makes it accessible using the result and resultMetadata objects.
From the scheduler portal, click on a terminated Process task.
Under the Task Preview tab, you can either open the result (here the image) and simply download it.

3 Synchronization API

Task synchronization is traditionally handled by the ProActive Scheduler through Task Dependencies.
Task Dependencies does not allow fine grain synchronization (for example between sibling task or across multiple workflows).
Task Synchronization API is the tool to use when complex synchronization patterns are needed.
Task Synchronization API works as a key-value store organized in channels.
Each channel has a unique identifier name, each channel is a HashMap.
This means that each put operation is identified by a 3-tuple:

<Channel_identifier, Key_binding, Value> Users of the synchronization API can create or remove channels and put key/values inside channels.
The synchronization service is started with the scheduler automatically.
Channels can be made persistent (preserved at scheduler restart) or kept in memory only.
Each channel is implemented internally as a Java HashMap, all methods defined in the Java 8 Map interface can be used, with some signature changes.
Importantly, all lambda-related methods allow to perform operations on the map instead of simply replacing entries.
A key point is that all operations on the synchronization API are atomic. In other words, two operations coming from two different tasks, do not run concurrently.
In addition to methods provided in the Map interface, wait methods are implemented, this allows for example to block a task until a certain condition on the map occurs.
When lambdas are used, they must be passed as string values defining a Groovy closure. This is to ensure language interoperability (a python script can thus use the Synchronization API with lambdas).
For example : {k, x -> x+1} should be used instead of (k, x) -> x+1
Documentation :
JavaDoc for Synchronization API:

Let's illustrate this mechanism by a simple workflow. Create an empty workflow and select the Key_Value workflow under Controls->Synchronization API Examples
The implementation code of the Init task: // Get the current job id jobid = variables.get("PA_JOB_ID") // Create a channel inside the job synchronizationapi.createChannel(jobid, false) // Add a lock entry synchronizationapi.put(jobid, "lock", true) println "Channel " + jobid + " created." The selection script of the task TaskB_Wait: // Wait until lock is false selected = !(synchronizationapi.get(variables.get("PA_JOB_ID"), "lock")) The implementation code of the TaskA: println "Sleeping 5 seconds" Thread.sleep(5000) println "Unlocking Task B" synchronizationapi.put(variables.get("PA_JOB_ID"), "lock", false) // set the lock to false The implementation of the Clean: jobid = variables.get("PA_JOB_ID") // Delete the channel synchronizationapi.deleteChannel(jobid ) println "Channel " + jobid + " deleted." Execute the workflow, observe how tasks are synchronized: [;14:24:22] Channel 28 created. [;14:24:28] Sleeping 5 seconds [;14:24:33] Unlocking Task B [;14:24:40] Task B has been unlocked by Task A [;14:24:46] Channel 28 deleted. Synchronization API logs are available in the Scheduler Portal, in Server Logs panel: [2018-04-24 14:24:22,685 de72716883 INFO o.o.p.s.u.TaskLogger] task 28t2 (Init) [Synchronization]Created new memory channel '28' [2018-04-24 14:24:22,786 de72716883 INFO o.o.p.s.u.TaskLogger] task 28t2 (Init) [Synchronization][Channel=28] Put true on key 'lock', previous value was null [2018-04-24 14:24:34,431 de72716883 INFO o.o.p.s.u.TaskLogger] task 28t0 (TaskA) [Synchronization][Channel=28] Put false on key 'lock', previous value was true [2018-04-24 14:24:46,882 de72716883 INFO o.o.p.s.u.TaskLogger] task 28t3 (Clean) [Synchronization]Deleted memory channel '28'


The REST API web page:

Examples of the REST API usage in the tutorials, such as: