Chapter 18. 异步和消息

Seam使得异步执行一个来自Web请求的工作变得非常容易。当大多数人在Java EE里考虑异步时,他们想到用JMS。 在Seam中,这确实是一种解决方案,当你有严格和明确定义的QoS服务需求时,这是正确的。Seam利用Seam组件让发送和接收JMS消息更容易进行。

但是对于多数用例来说,用JMS无异于杀鸡用牛刀。Seam将简单的异步方法和事件应用分层,置于你选择的 dispatchers 之上。

18.1. 异步

异步的事件和方法调用与底层的分配机制有着相同的服务期待质量。 基于 ScheduledThreadPoolExecutor 的默认dispatcher执行得很好,但不提供对持久化异步任务的支持,因此不保证一项任务真正会被执行。 如果你在一个支持EJB 3.0的环境中工作,并将下面这一行添加到 components.xml 中:

<async:timer-service-dispatcher/>

那么,你的异步任务将由容器的EJB定时服务处理。 如果你不熟悉Timer服务,也不必担心,如果你想要在Seam中使用异步方法,并不需要与它直接交互。 要了解一件重要的事情:任何好的EJB 3.0实现都将有使用持久化定时器的选择,它为任务最终得到处理提供了一些保证。

另一种选择是使用开源的Quartz库来管理异步的方法。 你要将Quartz库JAR(在 lib 路径中)绑定在你的EAR中,并在 application.xml 中将它声明成一个Java模块。 另外,你还需要将下面的行添加到 components.xml 中来安装Quartz Dispatcher。

<async:quartz-dispatcher/>

Seam的API对于默认的 ScheduledThreadPoolExecutor 的Seam API,及EJB3 Timer与Quartz Scheduler 大体相同。 它们可以只是通过在 components.xml 中添加一行来进行”即插即用(plug and play)“。

18.1.1. 异步方法

最简单的形式,一个异步的调用只是异步地处理来自访问者的方法调用(在不同的线程中)。 当我们要返回一个即时响应给客户端时,通常使用一个异步调用,并让一些费时的工作在后台处理。 此模式在使用AJAX的应用程序中运行良好,在AJAX应用中客户端能够自动地从服务器上获得工作结果。

对于EJB组件,我们在本地接口上进行注解,来指定某个方法要被异步地处理。

@Local
public interface PaymentHandler
{
    @Asynchronous
    public void processPayment(Payment payment);
}

(对于JavaBean组件,如果喜欢的话,我们可以注解组件实现类。)

异步的使用对于Bean类来说是透明的:

@Stateless
@Name("paymentHandler")
public class PaymentHandlerBean implements PaymentHandler
{
    public void processPayment(Payment payment)
    {
        //do some work!
    }
}

并且对客户端也是透明的:

@Stateful
@Name("paymentAction")
public class CreatePaymentAction
{
    @In(create=true) PaymentHandler paymentHandler;
    @In Bill bill;

    public String pay()
    {
        paymentHandler.processPayment( new Payment(bill) );
        return "success";
    }
}

异步方法在一个全新的事件上下文中处理,而且无法访问调用者的会话或对话上下文状态。 然而,业务流程上下文 得到了 传播。

异步方法调用可以利用 @Duration@Expiration@IntervalDuration注解为后续的执行定时。

@Local
public interface PaymentHandler
{
    @Asynchronous
    public void processScheduledPayment(Payment payment, @Expiration Date date);

    @Asynchronous
    public void processRecurringPayment(Payment payment, @Expiration Date date, @IntervalDuration Long interval)'
}
@Stateful
@Name("paymentAction")
public class CreatePaymentAction
{
    @In(create=true) PaymentHandler paymentHandler;
    @In Bill bill;

    public String schedulePayment()
    {
        paymentHandler.processScheduledPayment( new Payment(bill), bill.getDueDate() );
        return "success";
    }

    public String scheduleRecurringPayment()
    {
        paymentHandler.processRecurringPayment( new Payment(bill), bill.getDueDate(), ONE_MONTH );
        return "success";
    }
}

客户端和服务端两者都可以访问与调用相关联的 Timer 对象。当使用EJB3 Dispatcher时, The Timer 对象会显示在下面。对于默认的ScheduledThreadPoolExecutor,返回的是JDK的对象Future。对于Quartz Dispatcher,返回QuartzTriggerHandle,我们会在下部分对此进行讨论。

@Local
public interface PaymentHandler
{
    @Asynchronous
    public Timer processScheduledPayment(Payment payment, @Expiration Date date);
}
@Stateless
@Name("paymentHandler")
public class PaymentHandlerBean implements PaymentHandler
{
    @In Timer timer;

    public Timer processScheduledPayment(Payment payment, @Expiration Date date)
    {
        //do some work!

        return timer; // 注意返回值被完全忽略
    }

}
@Stateful
@Name("paymentAction")
public class CreatePaymentAction
{
    @In(create=true) PaymentHandler paymentHandler;
    @In Bill bill;

    public String schedulePayment()
    {
        Timer timer = paymentHandler.processScheduledPayment( new Payment(bill), bill.getDueDate() );
        return "success";
    }
}

异步方法不能返回任何其它值给调用者。

18.1.2. 包含Quartz Dispatcher的异步方法

Quartz dispatcher(它的安装方法请见前文)允许你使用 @Asynchronous@Duration@Expiration@IntervalDuration 注解。但它还有一些其他的强大功能。Quartz dispatcher还支持三种新注解。

@FinalExpiration 注解指定一个重现任务的终止日期。

    // Defines the method in the "processor" component
    @Asynchronous
    public QuartzTriggerHandle schedulePayment(@Expiration Date when,
                                 @IntervalDuration Long interval,
                                 @FinalExpiration Date endDate,
                                 Payment payment)
    {
        // do the repeating or long running task until endDate
    }

    ... ...

    // Schedule the task in the business logic processing code
    // Starts now, repeats every hour, and ends on May 10th, 2010
    Calendar cal = Calendar.getInstance ();
    cal.set (2010, Calendar.MAY, 10);
    processor.schedulePayment(new Date(), 60*60*1000, cal.getTime(), payment);

注意该方法返回 QuartzTriggerHandle 对象,你以后可以用它来中止、暂停和恢复定时器。 QuartzTriggerHandle 对象是可序列化的,因此,如果你需要保留更久一点,可以把它存到数据库中。

QuartzTriggerHandle handle =
         processor.schedulePayment(payment.getPaymentDate(),
                                   payment.getPaymentCron(),
                                   payment);
        payment.setQuartzTriggerHandle( handle );
        // Save payment to DB

        // later ...

        // Retrieve payment from DB
        // Cancel the remaining scheduled tasks
        payment.getQuartzTriggerHandle().cancel();

@IntervalCron 注解支持Unix cron语法的任务调度。例如,下面的异步方法在三月份每周三的2:10pm和2:44pm运行。

    // Define the method
    @Asynchronous
    public QuartzTriggerHandle schedulePayment(@Expiration Date when,
                                 @IntervalCron String cron,
                                 Payment payment)
    {
        // do the repeating or long running task
    }

    ... ...

    // Schedule the task in the business logic processing code
    QuartzTriggerHandle handle =
      processor.schedulePayment(new Date(), "0 10,44 14 ? 3 WED", payment);

@IntervalBusinessDay 注解支持在”第n个Business Day“调用。 例如,下面的异步方法在每个月的第2个business day的14:00运行。 默认时,它从business day中排除了2010年之前的所有周末和米国联邦假期。

    // Define the method
    @Asynchronous
    public QuartzTriggerHandle schedulePayment(@Expiration Date when,
                                 @IntervalBusinessDay NthBusinessDay nth,
                                 Payment payment)
    {
        // do the repeating or long running task
    }

    ... ...

    // Schedule the task in the business logic processing code
    QuartzTriggerHandle handle =
      processor.schedulePayment(new Date(),
          new NthBusinessDay(2, "14:00", WEEKLY), payment);

NthBusinessDay 对象包含调用触发器的配置。 你可以通过 additionalHolidays 属性指定更多的假期(例如,公司假期、非美国的假期等等。)

public class NthBusinessDay implements Serializable
{
      int n;
      String fireAtTime;
      List <Date> additionalHolidays;
      BusinessDayIntervalType interval;
      boolean excludeWeekends;
      boolean excludeUsFederalHolidays;

      public enum BusinessDayIntervalType { WEEKLY, MONTHLY, YEARLY }

      public NthBusinessDay ()
      {
        n = 1;
        fireAtTime = "12:00";
        additionalHolidays = new ArrayList <Date> ();
        interval = BusinessDayIntervalType.WEEKLY;
        excludeWeekends = true;
        excludeUsFederalHolidays = true;
      }
      ... ...
}

@IntervalDuration@IntervalCron@IntervalNthBusinessDay 注解相互排斥。 如果把它们用在同一个方法中,就会抛出 RuntimeException

18.1.3. 异步事件

组件驱动的事件也可以是异步的。 为了给异步处理提出事件,只要调用 Events 类的 raiseAsynchronousEvent() 方法就可以了。 要安排一个定时的事件,要调用 raiseTimedEvent() 的一个方法,并传递一个 schedule 对象 (对于默认的dispatcher或者定时服务dispatcher,要使用 TimerSchedule)。 组件可以用正常方式观察异步事件,但是要记住,只有业务处理上下文才被传播到异步线程上。

18.2. Seam中的消息

Seam让JMS消息发送到Seam组件和从Seam组件接收变得很容易。

18.2.1. 配置

为了给发送JMS消息配置Seam的基础结构,你需要告诉Seam关于任何你想发送消息到的主题(Topic)和队列(Queue),并且也要告诉Seam到哪里寻找 QueueConnectionFactory 和/或 TopicConnectionFactory

Seam默认使用 UIL2ConnectionFactory,它是使用JBossMQ时常用的连接工厂。 如果你正使用其他的JMS提供者,就需要在 seam.propertiesweb.xmlcomponents.xml 文件中设置一个或两个 queueConnection.queueConnectionFactoryJndiNametopicConnection.topicConnectionFactoryJndiName

你也需要在 components.xml 文件中列出主题(Topic)和队列(Queue),来安装Seam受控的 TopicPublisherQueueSender

<jms:managed-topic-publisher name="stockTickerPublisher" auto-create="true" topic-jndi-name="topic/stockTickerTopic"/>

<jms:managed-queue-sender name="paymentQueueSender" auto-create="true" queue-jndi-name="queue/paymentQueue"/>

18.2.2. 发送消息

现在,你可以注入一个JMS TopicPublisherTopicSession 到任何组件里:

@In
private TopicPublisher stockTickerPublisher;
@In
private TopicSession topicSession;

public void publish(StockPrice price) {
      try
      {
         stockTickerPublisher.publish( topicSession.createObjectMessage(price) );
      }
      catch (Exception ex)
      {
         throw new RuntimeException(ex);
      }
}

或用来同Queue一起使用

@In
private QueueSender paymentQueueSender;
@In
private QueueSession queueSession;

public void publish(Payment payment) {
      try
      {
         paymentQueueSender.send( queueSession.createObjectMessage(payment) );
      }
      catch (Exception ex)
      {
         throw new RuntimeException(ex);
      }
}

18.2.3. 利用消息驱动Bean接收消息

你可以利用任何EJB3消息驱动Bean来处理消息。 消息驱动Bean甚至可以是Seam组件,在这种情况下,它可能注入其他事件和应用程序作用域的Seam组件。

18.2.4. 在客户端接收消息

Seam Remoting允许你在客户端的JavaScript代码中订阅JMS主题(Topic)。这个在下一章里讲述。