Introduction
Infinispan provides distributed execution through a standard JDK ExecutorService interface. Tasks submitted for execution, instead of being executed in a local JVM, are executed on an entire cluster of Infinispan nodes. Every DistributedExecutorService is bound to one particular cache. Tasks submitted will have access to key/value pairs from that particular cache if and only if the task submitted is an instance of DistributedCallable. Also note that there is nothing preventing users from submitting a familiar Runnable or Callable just like to any other ExecutorService. However, DistributedExecutorService, as it name implies, will likely migrate submitted Callable or Runnable to another JVM in Infinispan cluster, execute it and return a result to task invoker. Due to a potential task migration to other nodes every Callable, Runnable and/or DistributedCallable submitted must be either Serializable or Externalizable. Also the value returned from a callable must be Serializable or Externalizable as well. If the value returned is not serializable a NotSerializableException will be thrown.
Infinispan's distributed task executors use data from Infinispan cache nodes as input for execution tasks. Most other distributed frameworks do not have that leverage and users have to specify input for distributed tasks from some well known location. Furthermore, users of Infinispan distributed execution framework do not have to configure store for intermediate and final results thus removing another layer of complexity and maintenance.
Our distributed execution framework capitalizes on the fact input data in Infinispan data grid is already load balanced (in case of DIST mode). Since input data is already balanced execution tasks will be automatically balanced as well; users do not have to explicitly assign work tasks to specific Infinispan nodes. However, our framework accommodates users to specify arbitrary subset of cache keys as input for distributed execution tasks.
DistributedCallable API
In case users needs access to Infinispan cache data for an execution of a task we recommend that you encapsulate task in DistributedCallable interface. DistributedCallable is a subtype of the existing Callable from java.util.concurrent package; DistributedCallable can be executed in a remote JVM and receive input from Infinispan cache. Task's main algorithm could essentially remain unchanged, only the input source is changed. Exisiting Callable implementations most likely get its input in a form of some Java object/primitive while DistributedCallable gets its input from Infinispan cache. Therefore, users who have already implemented Callable interface to describe their task units would simply extend DistributedCallable and use keys from Infinispan execution environment as input for the task. Implentation of DistributedCallable can in fact continue to support implementation of an already existing Callable while simultaneously be ready for distribited execution by extending DistributedCallable.
public interface DistributedCallable<K, V, T> extends Callable<T> {
/**
* Invoked by execution environment after DistributedCallable
* has been migrated for execution to a specific Infinispan node.
*
* @param cache
* cache whose keys are used as input data for this
* DistributedCallable task
* @param inputKeys
* keys used as input for this DistributedCallable task
*/
public void setEnvironment(Cache<K, V> cache, Set<K> inputKeys);
}
Callable and CDI
Users that do not want or can not implement DistributedCallable yet need a reference to input cache used in DistributedExecutorService have an option of the input cache being injected by CDI mechanism. Upon arrival of user's Callable to an Infinispan executing node, Infinispan CDI mechanism will provide appropriate cache reference and inject it to executing Callable. All one has to do is to declare a Cache field in Callable and annotate it with org.infinispan.cdi.Input annotation along with mandatory @Inject annotation.
public class CallableWithInjectedCache implements Callable<Integer>, Serializable {
@Inject
@Input
private Cache<String, String> cache;
@Override
public Integer call() throws Exception {
//use injected cache reference
return 1;
}
}
DistributedExecutorService, DistributedTaskBuilder and DistributedTask API
DistributedExecutorService is a simple extension of a familiar ExecutorService from java.util.concurrent package. However, advantages of DistributedExecutorService are not to be overlooked. Existing Callable tasks, instead of being executed in JDK's ExecutorService, are also eligible for execution on Infinispan cluster. Infinispan execution environment would migrate a task to execution node(s), run the task and return the result(s) to the calling node. Of course, not all Callable tasks would benefit from parallel distributed execution. Excellent candidates are long running and computationally intensive tasks that can run concurrently and/or tasks using input data that can be processed concurrently. For more details about good candidates for parallel execution and parallel algorithms in general refer to Introduction to Parallel Computing.
The second advantage of the DistributedExecutorService is that it allows a quick and simple implementation of tasks that take input from Infinispan cache nodes, execute certain computation and return results to the caller. Users would specify which keys to use as input for specified DistributedCallable and submit that callable for execution on Infinispan cluster. Infinispan runtime would locate the appriate keys, migrate DistributedCallable to target execution node(s) and finally return a list of results for each executed Callable. Of course, users can omit specifying input keys in which case Infinispan would execute DistributedCallable on all keys for a specified cache.
Lets see how we can use DistributedExecutorService If you already have Callable/Runnable tasks defined! Well, simply submit them to an instance of DefaultExecutorService for execution!
ExecutorService des = new DefaultExecutorService(cache);
Future<Boolean> future = des.submit(new SomeCallable());
Boolean r = future.get();
In case you need to specify more task parameters like task timeout, custom failover policy or execution policy use DistributedTaskBuilder and DistributedTask API.
DistributedExecutorService des = new DefaultExecutorService(cache);
DistributedTaskBuilder<Boolean> taskBuilder = des.createDistributedTaskBuilder(new SomeCallable());
taskBuilder.timeout(10,TimeUnit.SECONDS);
...
...
DistributedTask<Boolean> distributedTask = taskBuilder.build();
Future<Boolean> future = des.submit(distributedTask);
Boolean r = future.get();
Distributed task failover
Distributed execution framework supports task failover. By default no failover policy is installed and task's Runnable/Callable/DistributedCallable will simply fail. Failover mechanism is invoked in the following cases:
a) Failover due to a node failure where task is executing
b) Failover due to a task failure (e.g. Callable task throws Exception).
Infinispan provides random node failover policy which will attempt execution of a part of distributed task on another random node, if such node is available. However, users that have a need to implement a more sophisticated failover policy can implement DistributedTaskFailoverPolicy interface. For example, users might want to use consistent hashing (CH) mechanism for failover of uncompleted tasks. CH based failover might for example migrate failed task T to cluster node(s) having a backup of input data that was executed on a failed node F.
/**
* DistributedTaskFailoverPolicy allows pluggable fail over target selection for a failed remotely
* executed distributed task.
*
*/
public interface DistributedTaskFailoverPolicy {
/**
* As parts of distributively executed task can fail due to the task itself throwing an exception
* or it can be an Infinispan system caused failure (e.g node failed or left cluster during task
* execution etc).
*
* @param failoverContext
* the FailoverContext of the failed execution
* @return result the Address of the Infinispan node selected for fail over execution
*/
Address failover(FailoverContext context);
/**
* Maximum number of fail over attempts permitted by this DistributedTaskFailoverPolicy
*
* @return max number of fail over attempts
*/
int maxFailoverAttempts();
}
Therefore one could for example specify random failover execution policy simply by:
DistributedExecutorService des = new DefaultExecutorService(cache);
DistributedTaskBuilder<Boolean> taskBuilder = des.createDistributedTaskBuilder(new SomeCallable());
taskBuilder.failoverPolicy(DefaultExecutorService.RANDOM_NODE_FAILOVER);
DistributedTask<Boolean> distributedTask = taskBuilder.build();
Future<Boolean> future = des.submit(distributedTask);
Boolean r = future.get();
Distributed task execution policy
DistributedTaskExecutionPolicy is an enum that allows tasks to specify its custom task execution policy across Infinispan cluster. DistributedTaskExecutionPolicy effectively scopes execution of tasks to a subset of nodes. For example, someone might want to exclusively execute tasks on a local network site instead of a backup remote network centre as well. Others might, for example, use only a dedicated subset of a certain Infinispan rack nodes for specific task execution. DistributedTaskExecutionPolicy is set per instance of DistributedTask.
DistributedExecutorService des = new DefaultExecutorService(cache);
DistributedTaskBuilder<Boolean> taskBuilder = des.createDistributedTaskBuilder(new SomeCallable());
taskBuilder.executionPolicy(DistributedTaskExecutionPolicy.SAME_RACK);
DistributedTask<Boolean> distributedTask = taskBuilder.build();
Future<Boolean> future = des.submit(distributedTask);
Boolean r = future.get();
Examples
Pi approximation can greatly benefit from parallel distributed execution in DistributedExecutorService. Recall that area of the square is Sa = 4r2 and area of the circle is Ca=pi*r2. Substituting r2 from the second equation into the first one it turns out that pi = 4 * Ca/Sa. Now, image that we can shoot very large number of darts into a square; if we take ratio of darts that land inside a circle over a total number of darts shot we will approximate Ca/Sa value. Since we know that pi = 4 * Ca/Sa we can easily derive approximate value of pi. The more darts we shoot the better approximation we get. In the example below we shoot 10 million darts but instead of "shooting" them serially we parallelize work of dart shooting across entire Infinispan cluster.
public class PiAppx {
public static void main (String [] arg){
List<Cache> caches = ...;
Cache cache = ...;
int numPoints = 10000000;
int numServers = caches.size();
int numberPerWorker = numPoints / numServers;
DistributedExecutorService des = new DefaultExecutorService(cache);
long start = System.currentTimeMillis();
CircleTest ct = new CircleTest(numberPerWorker);
List<Future<Integer>> results = des.submitEverywhere(ct);
int countCircle = 0;
for (Future<Integer> f : results) {
countCircle += f.get();
}
double appxPi = 4.0 * countCircle / numPoints;
System.out.println("Distributed PI appx is " + appxPi +
" completed in " + (System.currentTimeMillis() - start) + " ms");
}
private static class CircleTest implements Callable<Integer>, Serializable {
/** The serialVersionUID */
private static final long serialVersionUID = 3496135215525904755L;
private final int loopCount;
public CircleTest(int loopCount) {
this.loopCount = loopCount;
}
@Override
public Integer call() throws Exception {
int insideCircleCount = 0;
for (int i = 0; i < loopCount; i++) {
double x = Math.random();
double y = Math.random();
if (insideCircle(x, y))
insideCircleCount++;
}
return insideCircleCount;
}
private boolean insideCircle(double x, double y) {
return (Math.pow(x - 0.5, 2) + Math.pow(y - 0.5, 2))
<= Math.pow(0.5, 2);
}
}
}