org.jboss.soa.esb.actions
Class Aggregator

java.lang.Object
  extended by org.jboss.soa.esb.actions.AbstractActionLifecycle
      extended by org.jboss.soa.esb.actions.AbstractActionPipelineProcessor
          extended by org.jboss.soa.esb.actions.Aggregator
All Implemented Interfaces:
ActionLifecycle, ActionPipelineProcessor

public class Aggregator
extends AbstractActionPipelineProcessor

Simple Aggregator. The aggregator relies on 'aggregatorTags'. To puzzle the individual back together. The aggregatorTag is set in the MessageRouter.deliverAsync() method. The aggregator adds a collected message in the series as attachements to a new message. When all messages are received or if we are timeout an aggregated message is returned. In all other cases null is returned. Future enhancement should be - all sort of waitfor/timout algorithms. - persisting the map so no messages get lost. - make the map managable.

Author:
kurt.stam@redhat.com

Field Summary
static java.lang.String AGGEGRATOR_TAG
           
protected  ConfigTree config
           
static java.lang.String SPLITTER_TIME_STAMP
           
 
Fields inherited from interface org.jboss.soa.esb.actions.ActionPipelineProcessor
PROCESS_EXCEPTION_METHOD, PROCESS_METHOD, PROCESS_SUCCESS_METHOD
 
Constructor Summary
Aggregator(ConfigTree config)
           
 
Method Summary
 Message createAggregateMessage(Message message)
          Aggregates a single message into an aggregated message.
 Message createAggregateMessage(java.lang.String uuId, java.util.Map<java.lang.Integer,Message> messageMap)
          Aggregates the messages into 1 new message with an attachment for each message.
static void decorate(Message message)
           
 void destroy()
          Destroy the action instance.
 java.util.Map<java.lang.String,java.util.Map<java.lang.Integer,Message>> getAggregatedMessageMap()
           
static AggregationDetails getAggregatorDetails(Message message, int tagIndex)
           
static java.util.List<java.lang.String> getAggregatorTags(Message message)
           
 void initialise()
          Initialise the action instance.
 Message process(Message message)
          Processes an incoming message, aggregates messages in a set and returns a aggregated message when isComplete() is satisfied.
static void setAggregatorTags(Message message, java.util.List<java.lang.String> tags)
           
 
Methods inherited from class org.jboss.soa.esb.actions.AbstractActionPipelineProcessor
processException, processSuccess
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

AGGEGRATOR_TAG

public static final java.lang.String AGGEGRATOR_TAG
See Also:
Constant Field Values

SPLITTER_TIME_STAMP

public static final java.lang.String SPLITTER_TIME_STAMP
See Also:
Constant Field Values

config

protected ConfigTree config
Constructor Detail

Aggregator

public Aggregator(ConfigTree config)
           throws ConfigurationException,
                  RegistryException
Throws:
ConfigurationException
RegistryException
Method Detail

initialise

public void initialise()
                throws ActionLifecycleException
Initialise the action instance.

This method is called after the action instance has been instantiated so that configuration options can be validated.

Specified by:
initialise in interface ActionLifecycle
Overrides:
initialise in class AbstractActionLifecycle
Throws:
ActionLifecycleException - for errors during initialisation.

getAggregatedMessageMap

public java.util.Map<java.lang.String,java.util.Map<java.lang.Integer,Message>> getAggregatedMessageMap()

destroy

public void destroy()
             throws ActionLifecycleException
Destroy the action instance.

This method is called prior to the release of the action instance. All resources associated with this action instance should be released as the instance will no longer be used.

Specified by:
destroy in interface ActionLifecycle
Overrides:
destroy in class AbstractActionLifecycle
Throws:
ActionLifecycleException

process

public Message process(Message message)
                throws ActionProcessingException
Processes an incoming message, aggregates messages in a set and returns a aggregated message when isComplete() is satisfied. The aggregated messages are set as attachments to this message. Next the message can be send to a transformer to do the second part of the aggregation which is to convert the attachement messages into to one message.

Parameters:
message -
Returns:
a aggregated message, or null if the aggregation has not been completed.
Throws:
ActionProcessingException

getAggregatorTags

public static java.util.List<java.lang.String> getAggregatorTags(Message message)

setAggregatorTags

public static void setAggregatorTags(Message message,
                                     java.util.List<java.lang.String> tags)

getAggregatorDetails

public static AggregationDetails getAggregatorDetails(Message message,
                                                      int tagIndex)
                                               throws ActionProcessingException
Throws:
ActionProcessingException

decorate

public static void decorate(Message message)

createAggregateMessage

public Message createAggregateMessage(Message message)
                               throws ActionProcessingException
Aggregates a single message into an aggregated message.

This method is called for messages that are recived without aggregation tags.

Parameters:
message -
Returns:
the aggregated message
Throws:
ActionProcessingException

createAggregateMessage

public Message createAggregateMessage(java.lang.String uuId,
                                      java.util.Map<java.lang.Integer,Message> messageMap)
                               throws ActionProcessingException
Aggregates the messages into 1 new message with an attachment for each message.

Parameters:
uuId -
messageMap -
Returns:
the aggregated message
Throws:
ActionProcessingException