org.infinispan.distexec.mapreduce
Class MapReduceTask<KIn,VIn,KOut,VOut>

java.lang.Object
  extended by org.infinispan.distexec.mapreduce.MapReduceTask<KIn,VIn,KOut,VOut>

public class MapReduceTask<KIn,VIn,KOut,VOut>
extends Object

MapReduceTask is a distributed task allowing a large scale computation to be transparently parallelized across Infinispan cluster nodes.

Users should instantiate MapReduceTask with a reference to a cache whose data is used as input for this task. Infinispan execution environment will migrate and execute instances of provided Mapper and Reducer seamlessly across Infinispan nodes.

Unless otherwise specified using onKeys(Object...) filter all available key/value pairs of a specified cache will be used as input data for this task. For example, MapReduceTask that counts number of word occurrences in a particular cache where keys and values are String instances could be written as follows:

 MapReduceTask<String, String, String, Integer> task = new MapReduceTask<String, String, String, Integer>(cache);
 task.mappedWith(new WordCountMapper()).reducedWith(new WordCountReducer());
 Map<String, Integer> results = task.execute();
 
The final result is a map where key is a word and value is a word count for that particular word.

Accompanying Mapper and Reducer are defined as follows:

    private static class WordCountMapper implements Mapper<String, String, String,Integer> {
     
     public void map(String key, String value, Collector<String, Integer> collector) {
        StringTokenizer tokens = new StringTokenizer(value);
       while (tokens.hasMoreElements()) {
           String s = (String) tokens.nextElement();
           collector.emit(s, 1);
        }         
     }
  }

   private static class WordCountReducer implements Reducer<String, Integer> {
     
      public Integer reduce(String key, Iterator<Integer> iter) {
         int sum = 0;
         while (iter.hasNext()) {
            Integer i = (Integer) iter.next();
            sum += i;
        }
         return sum;
      }
   }
 

Finally, as of Infinispan 5.2 release, MapReduceTask can also specify a Combiner function. The Combiner is executed on each node after the Mapper and before the global reduce phase. The Combiner receives input from the Mapper's output and the output from the Combiner is then sent to the reducers. It is useful to think of the Combiner as a node local reduce phase before global reduce phase is executed.

Combiners are especially useful when reduce function is both commutative and associative! In such cases we can use the Reducer itself as the Combiner; all one needs to do is to specify the Combiner:

 MapReduceTask<String, String, String, Integer> task = new MapReduceTask<String, String, String, Integer>(cache);
 task.mappedWith(new WordCountMapper()).reducedWith(new WordCountReducer()).combineWith(new WordCountReducer());
 Map<String, Integer> results = task.execute();
 
Note that Mapper and Reducer should not be specified as inner classes. Inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.

If you are not familiar with concept of map reduce distributed execution model start with Google's MapReduce research paper.

Since:
5.0
Author:
Manik Surtani, Vladimir Blagojevic, Sanne Grinovero

Field Summary
protected  AdvancedCache<KIn,VIn> cache
           
protected  List<org.infinispan.distexec.mapreduce.MapReduceTask.CancellableTaskPart> cancellableTasks
           
protected  CancellationService cancellationService
           
protected  ClusteringDependentLogic clusteringDependentLogic
           
protected  Reducer<KOut,VOut> combiner
           
static String DEFAULT_TMP_CACHE_CONFIGURATION_NAME
           
protected  boolean distributeReducePhase
           
protected  boolean isLocalOnly
           
protected  Collection<KIn> keys
           
protected  Mapper<KIn,VIn,KOut,VOut> mapper
           
protected  MapReduceManager mapReduceManager
           
protected  Marshaller marshaller
           
protected  Reducer<KOut,VOut> reducer
           
protected  RpcOptionsBuilder rpcOptionsBuilder
           
protected  UUID taskId
           
protected  boolean useIntermediateSharedCache
           
 
Constructor Summary
MapReduceTask(Cache<KIn,VIn> masterCacheNode)
          Create a new MapReduceTask given a master cache node.
MapReduceTask(Cache<KIn,VIn> masterCacheNode, boolean distributeReducePhase)
          Create a new MapReduceTask given a master cache node.
MapReduceTask(Cache<KIn,VIn> masterCacheNode, boolean distributeReducePhase, boolean useIntermediateSharedCache)
          Create a new MapReduceTask given a master cache node.
 
Method Summary
protected  void aggregateReducedResult(Map<KOut,List<VOut>> finalReduced, Map<KOut,VOut> mapReceived)
           
protected  Mapper<KIn,VIn,KOut,VOut> clone(Mapper<KIn,VIn,KOut,VOut> mapper)
           
protected  Reducer<KOut,VOut> clone(Reducer<KOut,VOut> reducer)
           
 MapReduceTask<KIn,VIn,KOut,VOut> combinedWith(Reducer<KOut,VOut> combiner)
          Specifies Combiner to use for this MapReduceTask
protected
<V> org.infinispan.distexec.mapreduce.MapReduceTask.ReduceTaskPart<V>
createReducePart(ReduceCommand<KOut,VOut> cmd, Address target, String destCacheName)
           
protected
<V> org.infinispan.distexec.mapreduce.MapReduceTask.MapTaskPart<V>
createTaskMapPart(MapCombineCommand<KIn,VIn,KOut,VOut> cmd, Address target, boolean distributedReduce)
           
protected  boolean distributeReducePhase()
           
 boolean equals(Object obj)
           
 Map<KOut,VOut> execute()
          Executes this task across Infinispan cluster nodes.
<R> R
execute(Collator<KOut,VOut,R> collator)
          Executes this task across Infinispan cluster but the final result is collated using specified Collator
 Future<Map<KOut,VOut>> executeAsynchronously()
          Executes this task across Infinispan cluster nodes asynchronously.
<R> Future<R>
executeAsynchronously(Collator<KOut,VOut,R> collator)
          Executes this task asynchronously across Infinispan cluster; final result is collated using specified Collator and wrapped by Future
protected  Set<KOut> executeMapPhase(boolean useCompositeKeys)
           
protected  Map<KOut,VOut> executeMapPhaseWithLocalReduction()
           
protected  Map<KOut,VOut> executeReducePhase(Set<KOut> allMapPhasesResponses, boolean useCompositeKeys)
           
protected  void executeTaskInit(String tmpCacheName)
           
 int hashCode()
           
protected  boolean inputTaskKeysEmpty()
           
protected
<T> Map<Address,? extends Collection<T>>
mapKeysToNodes(Collection<T> keysToMap)
           
protected
<T> Map<Address,? extends Collection<T>>
mapKeysToNodes(Collection<T> keysToMap, boolean useIntermediateCompositeKey)
           
protected
<T> Map<Address,? extends Collection<T>>
mapKeysToNodes(DistributionManager dm, Collection<T> keysToMap, boolean useIntermediateCompositeKey)
           
 MapReduceTask<KIn,VIn,KOut,VOut> mappedWith(Mapper<KIn,VIn,KOut,VOut> mapper)
          Specifies Mapper to use for this MapReduceTask
 MapReduceTask<KIn,VIn,KOut,VOut> onKeys(KIn... input)
          Rather than use all available keys as input onKeys allows users to specify a subset of keys as input to this task
 MapReduceTask<KIn,VIn,KOut,VOut> reducedWith(Reducer<KOut,VOut> reducer)
          Specifies Reducer to use for this MapReduceTask
 MapReduceTask<KIn,VIn,KOut,VOut> timeout(long timeout, TimeUnit unit)
          See timeout(TimeUnit).
 long timeout(TimeUnit outputTimeUnit)
           
 String toString()
           
protected  boolean useIntermediatePerTaskCache()
           
protected  boolean useIntermediateSharedCache()
           
 
Methods inherited from class java.lang.Object
clone, finalize, getClass, notify, notifyAll, wait, wait, wait
 

Field Detail

DEFAULT_TMP_CACHE_CONFIGURATION_NAME

public static final String DEFAULT_TMP_CACHE_CONFIGURATION_NAME
See Also:
Constant Field Values

mapper

protected Mapper<KIn,VIn,KOut,VOut> mapper

reducer

protected Reducer<KOut,VOut> reducer

combiner

protected Reducer<KOut,VOut> combiner

distributeReducePhase

protected final boolean distributeReducePhase

useIntermediateSharedCache

protected final boolean useIntermediateSharedCache

keys

protected final Collection<KIn> keys

cache

protected final AdvancedCache<KIn,VIn> cache

marshaller

protected final Marshaller marshaller

mapReduceManager

protected final MapReduceManager mapReduceManager

cancellationService

protected final CancellationService cancellationService

cancellableTasks

protected final List<org.infinispan.distexec.mapreduce.MapReduceTask.CancellableTaskPart> cancellableTasks

taskId

protected final UUID taskId

clusteringDependentLogic

protected final ClusteringDependentLogic clusteringDependentLogic

isLocalOnly

protected final boolean isLocalOnly

rpcOptionsBuilder

protected RpcOptionsBuilder rpcOptionsBuilder
Constructor Detail

MapReduceTask

public MapReduceTask(Cache<KIn,VIn> masterCacheNode)
Create a new MapReduceTask given a master cache node. All distributed task executions will be initiated from this cache node. This task will by default only use distributed map phase while reduction will be executed on task originating Infinispan node.

Large and data intensive tasks whose reduction phase would exceed working memory of one Infinispan node should use distributed reduce phase

Parameters:
masterCacheNode - cache node initiating map reduce task

MapReduceTask

public MapReduceTask(Cache<KIn,VIn> masterCacheNode,
                     boolean distributeReducePhase)
Create a new MapReduceTask given a master cache node. All distributed task executions will be initiated from this cache node.

Parameters:
masterCacheNode - cache node initiating map reduce task
distributeReducePhase - if true this task will use distributed reduce phase execution

MapReduceTask

public MapReduceTask(Cache<KIn,VIn> masterCacheNode,
                     boolean distributeReducePhase,
                     boolean useIntermediateSharedCache)
Create a new MapReduceTask given a master cache node. All distributed task executions will be initiated from this cache node.

Parameters:
masterCacheNode - cache node initiating map reduce task
distributeReducePhase - if true this task will use distributed reduce phase execution
useIntermediateSharedCache - if true this tasks will share intermediate value cache with other executing MapReduceTasks on the grid. Otherwise, if false, this task will use its own dedicated cache for intermediate values
Method Detail

onKeys

public MapReduceTask<KIn,VIn,KOut,VOut> onKeys(KIn... input)
Rather than use all available keys as input onKeys allows users to specify a subset of keys as input to this task

Parameters:
input - input keys for this task
Returns:
this task

mappedWith

public MapReduceTask<KIn,VIn,KOut,VOut> mappedWith(Mapper<KIn,VIn,KOut,VOut> mapper)
Specifies Mapper to use for this MapReduceTask

Note that Mapper should not be specified as inner class. Inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.

Parameters:
mapper - used to execute map phase of MapReduceTask
Returns:
this MapReduceTask itself

reducedWith

public MapReduceTask<KIn,VIn,KOut,VOut> reducedWith(Reducer<KOut,VOut> reducer)
Specifies Reducer to use for this MapReduceTask

Note that Reducer should not be specified as inner class. Inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.

Parameters:
reducer - used to reduce results of map phase
Returns:
this MapReduceTask itself

combinedWith

public MapReduceTask<KIn,VIn,KOut,VOut> combinedWith(Reducer<KOut,VOut> combiner)
Specifies Combiner to use for this MapReduceTask

Note that Reducer should not be specified as inner class. Inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.

Parameters:
combiner - used to immediately combine results of map phase before reduce phase is invoked
Returns:
this MapReduceTask itself
Since:
5.2

timeout

public final MapReduceTask<KIn,VIn,KOut,VOut> timeout(long timeout,
                                                      TimeUnit unit)
See timeout(TimeUnit). Note: the timeout value will be converted to milliseconds and a value less or equal than zero means wait forever.

Parameters:
timeout -
unit -
Returns:
this MapReduceTask itself

timeout

public final long timeout(TimeUnit outputTimeUnit)
Returns:
the timeout value in TimeUnit to wait for the remote map/reduce task to finish. The default value given by SyncConfiguration.replTimeout()

execute

public Map<KOut,VOut> execute()
                       throws CacheException
Executes this task across Infinispan cluster nodes.

Returns:
a Map where each key is an output key and value is reduced value for that output key
Throws:
CacheException

distributeReducePhase

protected boolean distributeReducePhase()

useIntermediateSharedCache

protected boolean useIntermediateSharedCache()

useIntermediatePerTaskCache

protected boolean useIntermediatePerTaskCache()

executeTaskInit

protected void executeTaskInit(String tmpCacheName)

executeMapPhase

protected Set<KOut> executeMapPhase(boolean useCompositeKeys)
                             throws InterruptedException,
                                    ExecutionException
Throws:
InterruptedException
ExecutionException

executeMapPhaseWithLocalReduction

protected Map<KOut,VOut> executeMapPhaseWithLocalReduction()
                                                    throws InterruptedException,
                                                           ExecutionException
Throws:
InterruptedException
ExecutionException

createTaskMapPart

protected <V> org.infinispan.distexec.mapreduce.MapReduceTask.MapTaskPart<V> createTaskMapPart(MapCombineCommand<KIn,VIn,KOut,VOut> cmd,
                                                                                               Address target,
                                                                                               boolean distributedReduce)

executeReducePhase

protected Map<KOut,VOut> executeReducePhase(Set<KOut> allMapPhasesResponses,
                                            boolean useCompositeKeys)
                                     throws InterruptedException,
                                            ExecutionException
Throws:
InterruptedException
ExecutionException

createReducePart

protected <V> org.infinispan.distexec.mapreduce.MapReduceTask.ReduceTaskPart<V> createReducePart(ReduceCommand<KOut,VOut> cmd,
                                                                                                 Address target,
                                                                                                 String destCacheName)

executeAsynchronously

public Future<Map<KOut,VOut>> executeAsynchronously()
Executes this task across Infinispan cluster nodes asynchronously.

Returns:
a Future wrapping a Map where each key is an output key and value is reduced value for that output key

execute

public <R> R execute(Collator<KOut,VOut,R> collator)
Executes this task across Infinispan cluster but the final result is collated using specified Collator

Parameters:
collator - a Collator to use
Returns:
collated result

executeAsynchronously

public <R> Future<R> executeAsynchronously(Collator<KOut,VOut,R> collator)
Executes this task asynchronously across Infinispan cluster; final result is collated using specified Collator and wrapped by Future

Parameters:
collator - a Collator to use
Returns:
collated result

aggregateReducedResult

protected void aggregateReducedResult(Map<KOut,List<VOut>> finalReduced,
                                      Map<KOut,VOut> mapReceived)

mapKeysToNodes

protected <T> Map<Address,? extends Collection<T>> mapKeysToNodes(DistributionManager dm,
                                                                  Collection<T> keysToMap,
                                                                  boolean useIntermediateCompositeKey)

mapKeysToNodes

protected <T> Map<Address,? extends Collection<T>> mapKeysToNodes(Collection<T> keysToMap,
                                                                  boolean useIntermediateCompositeKey)

mapKeysToNodes

protected <T> Map<Address,? extends Collection<T>> mapKeysToNodes(Collection<T> keysToMap)

clone

protected Mapper<KIn,VIn,KOut,VOut> clone(Mapper<KIn,VIn,KOut,VOut> mapper)

clone

protected Reducer<KOut,VOut> clone(Reducer<KOut,VOut> reducer)

inputTaskKeysEmpty

protected boolean inputTaskKeysEmpty()

hashCode

public int hashCode()
Overrides:
hashCode in class Object

equals

public boolean equals(Object obj)
Overrides:
equals in class Object

toString

public String toString()
Overrides:
toString in class Object

-->

Copyright © 2013 JBoss, a division of Red Hat. All Rights Reserved.