org.infinispan.distexec.mapreduce
Class MapReduceTask<KIn,VIn,KOut,VOut>

java.lang.Object
  extended by org.infinispan.distexec.mapreduce.MapReduceTask<KIn,VIn,KOut,VOut>

public class MapReduceTask<KIn,VIn,KOut,VOut>
extends Object

MapReduceTask is a distributed task allowing a large scale computation to be transparently parallelized across Infinispan cluster nodes.

Users should instantiate MapReduceTask with a reference to a cache whose data is used as input for this task. Infinispan execution environment will migrate and execute instances of provided Mapper and Reducer seamlessly across Infinispan nodes.

Unless otherwise specified using onKeys(Object...) filter all available key/value pairs of a specified cache will be used as input data for this task. For example, MapReduceTask that counts number of word occurrences in a particular cache where keys and values are String instances could be written as follows:

 MapReduceTask<String, String, String, Integer> task = new MapReduceTask<String, String, String, Integer>(cache);
 task.mappedWith(new WordCountMapper()).reducedWith(new WordCountReducer());
 Map<String, Integer> results = task.execute();
 
The final result is a map where key is a word and value is a word count for that particular word.

Accompanying Mapper and Reducer are defined as follows:

    private static class WordCountMapper implements Mapper<String, String, String,Integer> {
     
     public void map(String key, String value, Collector<String, Integer> collector) {
        StringTokenizer tokens = new StringTokenizer(value);
       while (tokens.hasMoreElements()) {
           String s = (String) tokens.nextElement();
           collector.emit(s, 1);
        }         
     }
  }

   private static class WordCountReducer implements Reducer<String, Integer> {
     
      public Integer reduce(String key, Iterator<Integer> iter) {
         int sum = 0;
         while (iter.hasNext()) {
            Integer i = (Integer) iter.next();
            sum += i;
        }
         return sum;
      }
   }
 
Note that Mapper and Reducer should not be specified as inner classes. Inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.

If you are not familiar with concept of map reduce distributed execution model start with Google's MapReduce research paper.

Since:
5.0
Author:
Manik Surtani, Vladimir Blagojevic, Sanne Grinovero

Field Summary
protected  Marshaller marshaller
           
 
Constructor Summary
MapReduceTask(Cache<KIn,VIn> masterCacheNode)
          Create a new MapReduceTask given a master cache node.
 
Method Summary
protected  Mapper<KIn,VIn,KOut,VOut> clone(Mapper<KIn,VIn,KOut,VOut> mapper)
           
protected  Reducer<KOut,VOut> clone(Reducer<KOut,VOut> reducer)
           
 Map<KOut,VOut> execute()
          Executes this task across Infinispan cluster nodes.
<R> R
execute(Collator<KOut,VOut,R> collator)
          Executes this task across Infinispan cluster but the final result is collated using specified Collator
 Future<Map<KOut,VOut>> executeAsynchronously()
          Executes this task across Infinispan cluster nodes asynchronously.
<R> Future<R>
executeAsynchronously(Collator<KOut,VOut,R> collator)
          Executes this task asynchronously across Infinispan cluster; final result is collated using specified Collator and wrapped by Future
protected  void groupKeys(Map<KOut,List<VOut>> finalReduced, Map<KOut,VOut> mapReceived)
           
protected  Map<Address,List<KIn>> mapKeysToNodes()
           
 MapReduceTask<KIn,VIn,KOut,VOut> mappedWith(Mapper<KIn,VIn,KOut,VOut> mapper)
          Specifies Mapper to use for this MapReduceTask
 MapReduceTask<KIn,VIn,KOut,VOut> onKeys(KIn... input)
          Rather than use all available keys as input onKeys allows users to specify a subset of keys as input to this task
 MapReduceTask<KIn,VIn,KOut,VOut> reducedWith(Reducer<KOut,VOut> reducer)
          Specifies Reducer to use for this MapReduceTask
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

marshaller

protected final Marshaller marshaller
Constructor Detail

MapReduceTask

public MapReduceTask(Cache<KIn,VIn> masterCacheNode)
Create a new MapReduceTask given a master cache node. All distributed task executions will be initiated from this cache node.

Parameters:
masterCacheNode - cache node initiating map reduce task
Method Detail

onKeys

public MapReduceTask<KIn,VIn,KOut,VOut> onKeys(KIn... input)
Rather than use all available keys as input onKeys allows users to specify a subset of keys as input to this task

Parameters:
input - input keys for this task
Returns:
this task

mappedWith

public MapReduceTask<KIn,VIn,KOut,VOut> mappedWith(Mapper<KIn,VIn,KOut,VOut> mapper)
Specifies Mapper to use for this MapReduceTask

Note that Mapper should not be specified as inner class. Inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.

Parameters:
mapper -
Returns:

reducedWith

public MapReduceTask<KIn,VIn,KOut,VOut> reducedWith(Reducer<KOut,VOut> reducer)
Specifies Reducer to use for this MapReduceTask

Note that Reducer should not be specified as inner class. Inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.

Parameters:
reducer -
Returns:

execute

public Map<KOut,VOut> execute()
                       throws CacheException
Executes this task across Infinispan cluster nodes.

Returns:
a Map where each key is an output key and value is reduced value for that output key
Throws:
CacheException

executeAsynchronously

public Future<Map<KOut,VOut>> executeAsynchronously()
Executes this task across Infinispan cluster nodes asynchronously.

Returns:
a Future wrapping a Map where each key is an output key and value is reduced value for that output key

execute

public <R> R execute(Collator<KOut,VOut,R> collator)
Executes this task across Infinispan cluster but the final result is collated using specified Collator

Parameters:
collator - a Collator to use
Returns:
collated result

executeAsynchronously

public <R> Future<R> executeAsynchronously(Collator<KOut,VOut,R> collator)
Executes this task asynchronously across Infinispan cluster; final result is collated using specified Collator and wrapped by Future

Parameters:
collator - a Collator to use
Returns:
collated result

groupKeys

protected void groupKeys(Map<KOut,List<VOut>> finalReduced,
                         Map<KOut,VOut> mapReceived)

mapKeysToNodes

protected Map<Address,List<KIn>> mapKeysToNodes()

clone

protected Mapper<KIn,VIn,KOut,VOut> clone(Mapper<KIn,VIn,KOut,VOut> mapper)

clone

protected Reducer<KOut,VOut> clone(Reducer<KOut,VOut> reducer)

-->

Copyright © 2012 JBoss, a division of Red Hat. All Rights Reserved.