JBoss.orgCommunity Documentation

Chapter 5. Analyzing Events

5.1. Configuring an Event Processor Network
5.1.1. Defining the Network
5.1.2. Registering the Network
5.1.3. Supporting Multiple Versions
5.2. Event Processors
5.2.1. Drools Event Processor
5.2.2. JPA Event Processor
5.2.3. Mail Event Processor
5.2.4. MVEL Event Processor
5.2.5. Supporting Services
5.3. Predicates
5.3.1. MVEL Predicate

An Event Processor Network is a mechanism for processing a stream of events through a network of linked nodes established to perform specific filtering, transformation and/or analysis tasks.

The network can be defined as an object model or specified as a JSON representation for packaging in a suitable form, and subsequently de-serialized when deployed to the runtime governance server.

The following is an example of the JSON representation of an Event Processor Network. This particular example defines the "out of the box" EPN installed with the distribution:

{
  "name" : "Overlord-RTGov-EPN",
  "version" : "1.0.0.Final",
  "subscriptions" : [ {
    "nodeName" : "SOAEvents",
    "subject" : "ActivityUnits"
  },
  {
    "nodeName" : "ServiceDefinitions",
    "subject" : "ActivityUnits"
  },
  {
    "nodeName" : "SituationsStore",
    "subject" : "Situations"
  } ],
  "nodes" : [
    {
      "name" : "SOAEvents",
      "sourceNodes" : [ ],
      "destinationSubjects" : [ "SOAEvents" ],
      "maxRetries" : 3,
      "retryInterval" : 0,
      "eventProcessor" : {
        "@class" : "org.overlord.rtgov.content.epn.SOAActivityTypeEventSplitter"
      },
      "predicate" : null,
      "notifications" : [ ]
    },{
      "name" : "ServiceDefinitions",
      "sourceNodes" : [ ],
      "destinationSubjects" : [ ],
      "maxRetries" : 3,
      "retryInterval" : 0,
      "eventProcessor" : {
        "@class" : "org.overlord.rtgov.content.epn.ServiceDefinitionProcessor"
      },
      "predicate" : null,
      "notifications" : [ {
      	"type" : "Results",
      	"subject" : "ServiceDefinitions"
      } ]
    },{
      "name" : "ServiceResponseTimes",
      "sourceNodes" : [ "ServiceDefinitions" ],
      "destinationSubjects" : [ "ServiceResponseTimes" ],
      "maxRetries" : 3,
      "retryInterval" : 0,
      "eventProcessor" : {
        "@class" : "org.overlord.rtgov.content.epn.ServiceResponseTimeProcessor"
      },
      "predicate" : null,
      "notifications" : [ {
      	"type" : "Results",
      	"subject" : "ServiceResponseTimes"
      } ]
    },{
      "name" : "SituationsStore",
      "maxRetries" : 3,
      "retryInterval" : 0,
      "eventProcessor" : {
        "@class" : "org.overlord.rtgov.ep.jpa.JPAEventProcessor",
        "entityManager" : "overlord-rtgov-epn-non-jta"
      }
    }
  ]
}

Another example of a network, used within one of the quickstarts is:

{
  "name" : "AssessCreditPolicyEPN",
  "version" : "${project.version}",
  "subscriptions" : [ {
    "nodeName" : "AssessCredit",
    "subject" : "SOAEvents"
  } ],
  "nodes" : [
    {
      "name" : "AssessCredit",
      "sourceNodes" : [ ],
      "destinationSubjects" : [ ],
      "maxRetries" : 3,
      "retryInterval" : 0,
      "predicate" : {
        "@class" : "org.overlord.rtgov.ep.mvel.MVELPredicate",
        "expression" : "event.serviceProvider && !event.request && event.serviceType == \"{urn:switchyard-quickstart-demo:orders:0.1.0}OrderService\""
      },
      "eventProcessor" : {
        "@class" : "org.overlord.rtgov.ep.mvel.MVELEventProcessor",
        "script" : "AssessCredit.mvel",
        "services" : {
          "CacheManager" : {
            "@class" : "org.overlord.rtgov.common.infinispan.service.InfinispanCacheManager"
          }
        },
        "parameters" : {
        	"creditLimit" : 150
        }
      }
    }
  ]
}

This example illustrates the configuration of a service associate with the event processor, as well as a predicate.

The top level elements of this descriptior are:

FieldDescription

name

The name of the network.

subscriptions

The list of subscriptions associated with the network, discussed below.

nodes

The nodes that form the connected graph within the network, discussed below.

version

The version of the network. Versions can be expressed using three schemes:

Numeric - i.e. simply define the version as a number

Dot Format - i.e. 1.5.1.Final Any alpha, numeric and symbols

When comparing versions, for example when determining whether a newly deployed EPN has a higher version than an existing network with the same name, then initially the versions will be compared as numeric values. If either are not numeric, then they will be compared using dot format, with each field being compared first as numeric values, and if not based on lexical comparison. If both fields don’t have a dot, then they will just be compared lexically.

This element is used to define a particular node in the graph that forms the network, and has the following fields:

FieldDescription

name

The name of the node.

sourceNodes

A list of node names that represent the source nodes, within the same network, that this node receives its events from. Therefore, if this list is empty, it means that the node is a root node and should be the target of a subscription.

destinationSubjects

A list of inter-EPN subjects to publish any resulting events to. Note: these subjects are only of relevance to other networks.

maxRetries

The maximum number of times an event should be retried, following a failure, before giving up on the event.

retryInterval

The delay that should occur between retry attempts - may only be supported in some environments.

eventProcessor

Defines the details for the event processor implementation being used. At a minimum, the value for this field should define a @class property to specify the Java class name for the event process implementation to use. Other general fields that can be configured are, the map of services and the map of parameters that can be used by the event processor. Depending upon which implementation is selected, the other fields within the value will apply to the event processor implementation.

predicate

This field is optional, but if specified will define a predicate implementation. As with the event processor, it must at a minimum define a @class field that specifies the Java class name for the implementation, with any additional fields be used to initialize the predicate implementation.

notifications

A list of notifications. A notification entry will define its type (explained below) and the notification subject upon which the information should be published. Unlike the destinationSubjects described above, which are subjects for inter-EPN communication, these notification subjects are the mechanism for distribution information out of the EPN capability, for presentation to end-users through various means.

Notify Types

The notify types field defines what type of notifications should be emitted from a node when processing an event. The notifications are the mechanism used by potentially interested applications to observe what information each node is processing, and the results they produce.

The possible values for this field are:

FieldDescription

Processed

This type indicates that a notification should be created when an event is considered suitable for processing by the node. An event is suitable either if no predicate is defined, or if the predicate indicates the event is valid.

Results

This type indicates that a notification should be created for any information produced as the result of the event processor processing the event.

Tip

Notifications are the mechanism for making information processed by the Event Processor Network accessible by interested parties. If a notity type(s) is not defined for a node, then it will only be used for internal processing, potentially supplying the processed event to other nodes in the network (or other networks if destination subject(s) are specified).

The Event Processor Network is deployed within the JEE container as a WAR file with the following structure:

warfile
|
|-META-INF
|    |- beans.xml
|
|-WEB-INF
|    |-classes
|    |    |-epn.json
|    |    |-<custom classes/resources>
|    |
|    |-lib
|       |-epn-loader-jee.jar
|       |-<additional libraries>

The epn.json file contains the JSON representation of the EPN configuration.

The epn-loader-jee.jar acts as a bootstrapper to load and register the Event Processor Network.

If custom predicates and/or event processors are defined, then the associated classes and resources can be defined in the WEB-INF/classes folder or within additional libraries located in the WEB-INF/lib folder.

A maven pom.xml that will create this structure is:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  	<modelVersion>4.0.0</modelVersion>
	<groupId>....</groupId>
	<artifactId>....</artifactId>
	<version>....</version>
	<packaging>war</packaging>
	<name>....</name>

	<properties>
		<rtgov.version>....</rtgov.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.overlord.rtgov.event-processor-network</groupId>
			<artifactId>epn-core</artifactId>
			<version>${rtgov.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.overlord.rtgov.event-processor-network</groupId>
			<artifactId>epn-loader-jee</artifactId>
			<version>${rtgov.version}</version>
		</dependency>
		....
	</dependencies>

</project>

If deploying in JBoss Application Server, then the following fragment also needs to be included, to define the dependency on the core Overlord Runtime Governance modules:

.....
	<build>
		<finalName>slamonitor-epn</finalName>
		<resources>
			<resource>
				<directory>src/main/resources</directory>
				<filtering>true</filtering>
			</resource>
		</resources>
		<plugins>
			<plugin>
				<artifactId>maven-war-plugin</artifactId>
				<configuration>
					<failOnMissingWebXml>false</failOnMissingWebXml>
 					<archive>
						<manifestEntries>
							<Dependencies>deployment.overlord-rtgov.war</Dependencies>
						</manifestEntries>
					</archive>
				</configuration>
			</plugin>
		</plugins>
	</build>
	.....

The Event Processor Network is deployed within the OSGi container as a JAR file with the following structure:

jarfile
|
|-META-INF
|    |- MANIFEST.MF
|
|-epn.json
|-epn-loader-osgi.jar
|-<custom classes/resources>
|-<additional libraries>

The MANIFEST.MF file is important, as it contains the OSGi metadata required for the container to understand the contents and imported packages.

The epn.json file contains the JSON representation of the EPN configuration.

The epn-loader-osgi.jar acts as a bootstrapper to load and register the Event Processor Network.

If custom predicates and/or event processors are defined, then the associated classes, resources and additional libraries can be located in the top level folder.

A maven pom.xml that will create this structure is:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  	<modelVersion>4.0.0</modelVersion>
	<groupId>....</groupId>
	<artifactId>....</artifactId>
	<version>....</version>
	<packaging>war</packaging>
	<name>....</name>

	<properties>
		<rtgov.version>....</rtgov.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.overlord.rtgov.event-processor-network</groupId>
			<artifactId>epn-core</artifactId>
			<version>${rtgov.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.overlord.rtgov.event-processor-network</groupId>
			<artifactId>epn-loader-osgi</artifactId>
			<version>${rtgov.version}</version>
		</dependency>
		....
	</dependencies>

	<build>
		<finalName>....</finalName>
		<resources>
			<resource>
				<directory>src/main/resources</directory>
				<filtering>true</filtering>
			</resource>
		</resources>
		<plugins>
			<plugin>
				<groupId>org.apache.felix</groupId>
				<artifactId>maven-bundle-plugin</artifactId>
 				<extensions>true</extensions>
				<configuration>
					<instructions>
						<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
						<Bundle-Version>${project.version}</Bundle-Version>
						<Bundle-Activator>org.overlord.rtgov.epn.loader.osgi.EPNActivator</Bundle-Activator>
						<Import-Package>
							!javax.inject.*,!javax.enterprise.*,!javax.persistence.*,
                            ....,
							*
						</Import-Package>
						<Embed-Dependency>*;scope=compile|runtime</Embed-Dependency>
					</instructions>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

As previously mentioned, all EventProcessor implementations can define the following information:

FieldDescription

services

The optional map of names to services. The current service types are listed at the bottom of this section.

parameters

The optional map of names to parameters. These parameters can be used to customize the behaviour of an event processor.

asynchronous

This optional and experimental boolean flag enables an event processor to produce its results asynchronously. This has been added to support CEP, and currently means any results are processed as individual events which may be less efficient.

Although custom event processors can be defined, there are some "out of the box" implementations. These are discussed in the following sub-sections.

The Drools Event Processor implementation (org.overlord.rtgov.ep.drools.DroolsEventProcessor) enables events to be processed by a Complex Event Processing (CEP) rule. This implementation defines the following additional fields:

FieldDescription

ruleName

The name of the rule, used to locate the rule definition in a file called "<ruleName>.drl".

eventProcessingMode

This optional field identifies the event processing mode. Valid values are cloud (default) and stream. If stream is chosen, then you will also need to set the asynchronous property to true.

clockType

The optional clock type. Valid values are realtime (default) and pseudo.

An example of such a rule is:

import org.overlord.rtgov.activity.model.soa.RequestReceived
import org.overlord.rtgov.activity.model.soa.ResponseSent

global org.overlord.rtgov.ep.EPContext epc

declare RequestReceived
    @role( event )
    @timestamp( timestamp )
    @expires( 2m20s )
end

declare ResponseSent
    @role( event )
    @timestamp( timestamp )
    @expires( 2m20s )
end

rule "correlate request and response"
when
    $req : RequestReceived( $id : messageId ) from entry-point "Purchasing"
    $resp : ResponseSent( replyToId == $id, this after[0,2m20s] $req )  from entry-point "Purchasing"
then

    epc.logInfo("REQUEST: "+$req+" RESPONSE: "+$resp);

    java.util.Properties props=new java.util.Properties();
    props.put("requestId", $req.getMessageId());
    props.put("responseId", $resp.getMessageId());

    long responseTime=$resp.getTimestamp()-$req.getTimestamp();

    epc.logDebug("CORRELATION on id '"+$id+"' response time "+responseTime);

    props.put("responseTime", responseTime);

    epc.handle(props);

end

This is an example of a rule used to correlate request and response events. When a correlation is found, then a ResponseTime object is created and "forwarded" to the Event Processor Network for further processing using the handle method.

The source of the events into the rule are named entry points, where the name relates to the source node or subject that supplies the events.

The rule has access to external capabilities through the EPContext, which is defined in the statements:

global org.overlord.rtgov.ep.EPContext epc

This component is used at the end of the above example to handle the result of the event processing (i.e. to forward a derived event back into the network).

The rule can also access parameters using the getParameter(name) method on the context. See the javadoc for the org.overlord.rtgov.ep.EPContext interface for more information.

If an error occurs, that requires the event to be retried (within the Event Processor Network), or the business transaction blocked (when used as a synchronous policy), then the rule can either throw an exception or return the exception as the result using the handle() method.

Caution

Temporal rules do not currently work in a clustered environment. This is because correlation between events occurs in working memory, which is not shared across servers. Therefore for the correlation to work, all relevant events must be received by a single server.

Although custom predicates can be defined, there are some "out of the box" implementations: