JBoss.orgCommunity Documentation

Chapter 41. Reactive programming support

41.1. CompletionStage
41.2. CompletionStage in JAX-RS
41.3. Beyond CompletionStage
41.4. Pluggable reactive types: RxJava 2 in RESTEasy
41.5. Proxies
41.6. Adding extensions

With version 2.1, the JAX-RS specification (https://jcp.org/en/jsr/detail?id=370) takes its first steps into the world of Reactive Programming. There are many discussions of reactive programming on the internet, and a general introduction is beyond the scope of this document, but there are a few things worth discussing. Some primary aspects of reactive programming are the following:

In java 1.8 and JAX-RS 2.1, the support for reactive programming is fairly limited. Java 1.8 introduces the interface java.util.concurrent.CompletionStage, and JAX-RS 2.1 mandates support for the javax.ws.rs.client.CompletionStageRxInvoker, which allows a client to obtain a response in the form of a CompletionStage.

One implementation of CompletionStage is the java.util.concurrent.CompleteableFuture. For example:

@Test
public void testCompletionStage() throws Exception {
   CompletionStage<String> stage = getCompletionStage();
   log.info("result: " + stage.toCompletableFuture().get());
}

private CompletionStage<String> getCompletionStage() {
   CompletableFuture<String> future = new CompletableFuture<String>();
   future.complete("foo");
   return future;
}

Here, a CompleteableFuture is created with the value "foo", and its value is extracted by the method CompletableFuture.get(). That's fine, but consider the altered version:

@Test
public void testCompletionStageAsync() throws Exception {
   log.info("start");
   CompletionStage<String> stage = getCompletionStageAsync();
   String result = stage.toCompletableFuture().get();
   log.info("do some work");
   log.info("result: " + result);
}

private CompletionStage<String> getCompletionStageAsync() {
   CompletableFuture<String> future = new CompletableFuture<String>();
   Executors.newCachedThreadPool().submit(() -> {sleep(2000); future.complete("foo");});
   return future;
}

private void sleep(long l) {
   try {
      Thread.sleep(l);
   } catch (InterruptedException e) {
      e.printStackTrace();
   }
}

with output something like:

3:10:51 PM INFO: start
3:10:53 PM INFO: do some work
3:10:53 PM INFO: result: foo

It also works, but it illustrates the fact that CompletableFuture.get() is a blocking call. The CompletionStage is constructed and returned immediately, but the value isn't returned for two seconds. A version that is more in the spirit of the reactive style is:

@Test
public void testCompletionStageAsyncAccept() throws Exception {
   log.info("start");
   CompletionStage<String> stage = getCompletionStageAsync();
   stage.thenAccept((String s) -> log.info("s: " + s));
   log.info("do some work");
   ...
}

In this case, the lambda (String s) -> log.info("s: " + s) is registered with the CompletionStage as a "subscriber", and, when the CompletionStage eventually has a value, that value is passed to the lambda. Note that the output is something like

3:23:05 INFO: start
3:23:05 INFO: do some work
3:23:07 INFO: s: foo

Executing CompletionStages asynchronously is so common that there are several supporting convenience methods. For example:

@Test
public void testCompletionStageSupplyAsync() throws Exception {
   CompletionStage<String> stage = getCompletionStageSupplyAsync();;
   stage.thenAccept((String s) -> log.info("s: " + s));
}

private CompletionStage<String> getCompletionStageSupplyAsync() {
   return CompletableFuture.supplyAsync(() -> "foo");
}

The static method ComputableFuture.supplyAsync() creates a ComputableFuture, the value of which is supplied asynchronously by the lambda () -> "foo", running, by default, in the default pool of java.util.concurrent.ForkJoinPool.

One final example illustrates a more complex computational structure:

@Test
public void testCompletionStageComplex() throws Exception {
   ExecutorService executor = Executors.newCachedThreadPool();
   CompletionStage<String> stage1 = getCompletionStageSupplyAsync1("foo", executor);
   CompletionStage<String> stage2 = getCompletionStageSupplyAsync1("bar", executor);
   CompletionStage<String> stage3 = stage1.thenCombineAsync(stage2, (String s, String t) -> s + t, executor);
   stage3.thenAccept((String s) -> log.info("s: " + s));
}

private CompletionStage<String> getCompletionStageSupplyAsync1(String s, ExecutorService executor) {
   return CompletableFuture.supplyAsync(() -> s, executor);
}

stage1 returns "foo", stage2 returns "bar", and stage3, which runs when both stage1 and stage2 have completed, returns the concatenation of "foo" and "bar". Note that, in this example, an explict ExecutorService is provided for asynchronous processing.

On the client side, the JAX-RS 2.1 specification mandates an implementation of the interface javax.ws.rs.client.CompletionStageRxInvoker:

public interface CompletionStageRxInvoker extends RxInvoker<CompletionStage> {

    @Override
    public CompletionStage<Response> get();

    @Override
    public <T> CompletionStage<T> get(Class<T> responseType);

    @Override
    public <T> CompletionStage<T> get(GenericType<T> responseType);
    ...

That is, there are invocation methods for the standard HTTP verbs, just as in the standard javax.ws.rs.client.SyncInvoker. A CompletionStageRxInvoker is obtained by calling rx() on a javax.ws.rs.client.Invocation.Builder, which extends SyncInvoker. For example,

Invocation.Builder builder = client.target(generateURL("/get/string")).request();
CompletionStageRxInvoker invoker = builder.rx(CompletionStageRxInvoker.class);
CompletionStage<Response> stage = invoker.get();
Response response = stage.toCompletableFuture().get();
log.info("result: " + response.readEntity(String.class));

or

CompletionStageRxInvoker invoker = client.target(generateURL("/get/string")).request().rx(CompletionStageRxInvoker.class);
CompletionStage<String> stage = invoker.get(String.class);
String s = stage.toCompletableFuture().get();
log.info("result: " + s);

On the server side, the JAX-RS 2.1 specification requires support for resource methods with return type CompletionStage<T>. For example,

@GET
@Path("get/async")
public CompletionStage<String> longRunningOpAsync() {
   CompletableFuture<String> cs = new CompletableFuture<>();
   executor.submit(
      new Runnable() {
         public void run() {
            executeLongRunningOp();
            cs.complete("Hello async world!");
         }
      });
   return cs;
}

The way to think about longRunningOpAsync() is that it is asynchronously creating and returning a String. After cs.complete() is called, the server will return the String "Hello async world!" to the client.

An important thing to understand is that the decision to produce a result asynchronously on the server and the decision to retrieve the result asynchronously on the client are independent. Suppose that there is also a resource method

@GET
@Path("get/sync")
public String longRunningOpSync() {
   return "Hello async world!";
}

Then all three of the following invocations are valid:

public void testGetStringAsyncAsync() throws Exception {
   CompletionStageRxInvoker invoker = client.target(generateURL("/get/async")).request().rx();
   CompletionStage<String> stage = invoker.get(String.class);
   log.info("s: " + stage.toCompletableFuture().get());
}
public void testGetStringSyncAsync() throws Exception {
   Builder request = client.target(generateURL("/get/async")).request();
   String s = request.get(String.class);
   log.info("s: " + s);
}

and

public void testGetStringAsyncSync() throws Exception {
   CompletionStageRxInvoker invoker = client.target(generateURL("/get/sync")).request().rx();
   CompletionStage<String> stage = invoker.get(String.class);
   log.info("s: " + stage.toCompletableFuture().get());
}

Note

Since running code asynchronously is so common in this context, it is worth pointing out that objects obtained by way of the annotation @Context or by way of calling ResteasyProviderFactory.getContextData() are sensitive to the executing thread. For example, given resource method

@GET
@Path("test")
@Produces("text/plain")
public CompletionStage<String> text(@Context HttpRequest request) {
   System.out.println("request (inline): " + request);
   System.out.println("application (inline): " + ResteasyProviderFactory.getContextData(Application.class));
   CompletableFuture<String> cs = new CompletableFuture<>();
   ExecutorService executor = Executors.newSingleThreadExecutor();
   executor.submit(
         new Runnable() {
            public void run() {
               try {
                  System.out.println("request (async): " + request); 
                  System.out.println("application (async): " + ResteasyProviderFactory.getContextData(Application.class));
                  cs.complete("hello");
               } catch (Exception e) {
                  e.printStackTrace();
               }
            }
         });
   return cs;
}

the output will look something like

application (inline): org.jboss.resteasy.experiment.Test1798CompletionStage$TestApp@23c57474
request (inline): org.jboss.resteasy.plugins.server.servlet.Servlet3AsyncHttpRequest@2ce23138
application (async): null
org.jboss.resteasy.spi.LoggableFailure: RESTEASY003880: Unable to find contextual data of type: org.jboss.resteasy.spi.HttpRequest

The point is that it is the developer's responsibility to extract information from these context objects in advance. For example:

@GET
@Path("test")
@Produces("text/plain")
public CompletionStage<String> text(@Context HttpRequest req) {
   System.out.println("request (inline): " + request);
   System.out.println("application (inline): " + ResteasyProviderFactory.getContextData(Application.class));
   CompletableFuture<String> cs = new CompletableFuture<>();
   ExecutorService executor = Executors.newSingleThreadExecutor();
   final String httpMethodFinal = request.getHttpMethod();
   final Map<String, Object> mapFinal = ResteasyProviderFactory.getContextData(Application.class).getProperties();
   executor.submit(
         new Runnable() {
            public void run() {
               System.out.println("httpMethod (async): " + httpMethodFinal); 
               System.out.println("map (async): " + mapFinal); 
               cs.complete("hello");
            }
         });
   return cs;
}

The picture becomes more complex and interesting when sequences are added. A CompletionStage holds no more than one potential value, but other reactive objects can hold multiple, even unlimited, values. Currently, most Java implementations of reactive programming are based on the project Reactive Streams (http://www.reactive-streams.org/), which defines a set of four interfaces and a specification, in the form of a set of rules, describing how they interact:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

A Producer pushes objects to a Subscriber, a Subscription mediates the relationship between the two, and a Processor which is derived from both, helps to construct pipelines through which objects pass.

One important aspect of the specification is flow control, the ability of a Suscriber to control the load it receives from a Producer by calling Suscription.request(). The general term in this context for flow control is backpressure.

There are a number of implementations of Reactive Streams, including

RESTEasy currently supports RxJava (deprecated) and RxJava2.

JAX-RS 2.1 doesn't require support for any Reactive Streams implementations, but it does allow for extensibility to support various reactive libraries. RESTEasy's optional modules resteasy-rxjava1 and resteasy-rxjava2 add support for RxJava 1 and 2. [Only resteasy-rxjava2 will be discussed here, since resteasy-rxjava1 is deprecated, but the treatment of the two is quite similar.]

In particular, resteasy-rxjava2 contributes support for reactive types io.reactivex.Single, io.reactivex.Flowable, and io.reactivex.Observable. Of these, Single is similar to CompletionStage in that it holds at most one potential value. Flowable implements io.reactivex.Publisher, and Observable is very similar to Flowable except that it doesn't support backpressure. So, if you import resteasy-rxjava2, you can just start returning these reactive types from your resource methods on the server side and receiving them on the client side.

Given the class Thing, which can be represented in JSON:

public class Thing {

   private String name;

   public Thing() {
   }

   public Thing(String name) {
      this.name = name;
   }
   ...
}

the method postThingList() in the following is a valid resource method:

...
@POST
@Path("post/thing/list")
@Produces(MediaType.APPLICATION_JSON)
@Stream
public Flowable<List<Thing>> postThingList(String s) {
   return buildFlowableThingList(s, 2, 3);
}

static Flowable<List<Thing>> buildFlowableThingList(String s, int listSize, int elementSize) {
   return Flowable.create(
      new FlowableOnSubscribe<List<Thing>>() {

         @Override
         public void subscribe(FlowableEmitter<List<Thing>> emitter) throws Exception {
            for (int i = 0; i < listSize; i++) {
               List<Thing> list = new ArrayList<Thing>();
               for (int j = 0; j < elementSize; j++) {
                  list.add(new Thing(s));
               }
               emitter.onNext(list);
            }
            emitter.onComplete();
         }
      },
      BackpressureStrategy.BUFFER);
}

The somewhat imposing method buildFlowableThingList() probably deserves some explanation. First,

Flowable<List<Thing>> Flowable.create(FlowableOnSubscribe<List<Thing>> source, BackpressureStrategy mode);

creates a Flowable<List<Thing>> by describing what should happen when the Flowable<List<Thing>> is subscribed to. FlowableEmitter<List<Thing>> extends io.reactivex.Emitter<List<Thing>>:

/**
 * Base interface for emitting signals in a push-fashion in various generator-like source
 * operators (create, generate).
 *
 * @param <T> the value type emitted
 */
public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

and FlowableOnSubscribe uses a FlowableEmitter to send out values from the Flowable<List<Thing>>:

/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of a {@link FlowableEmitter} instance that allows pushing
 * events in a backpressure-safe and cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface FlowableOnSubscribe<T> {

    /**
     * Called for each Subscriber that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull FlowableEmitter<T> e) throws Exception;
}

So, what will happen when a subscription to the Flowable<List<Thing>> is created is, the FlowableEmitter.onNext() will be called, once for each <List<Thing>> created, followed by a call to FlowableEmitter.onComplete() to indicate that the sequence has ended. Under the covers, RESTEasy subscribes to the Flowable<List<Thing>> and handles each element passed in by way of onNext().

On the client side, JAX-RS 2.1 supports extensions for reactive classes by adding the method

/**
 * Access a reactive invoker based on a {@link RxInvoker} subclass provider. Note
 * that corresponding {@link RxInvokerProvider} must be registered in the client runtime.
 * 
 * This method is an extension point for JAX-RS implementations to support other types
 * representing asynchronous computations.
 *
 * @param clazz {@link RxInvoker} subclass.
 * @return reactive invoker instance.
 * @throws IllegalStateException when provider for given class is not registered.
 * @see javax.ws.rs.client.Client#register(Class)
 * @since 2.1
 */
public <T extends RxInvoker> T rx(Class<T> clazz);

to interface javax.ws.rs.client.Invocation.Builder. Resteasy module resteasy-rxjava2 adds support for classes:

  1. org.jboss.resteasy.rxjava2.SingleRxInvoker,
  2. org.jboss.resteasy.rxjava2.FlowableRxInvoker
  3. org.jbosss.resteasy.rxjava2.ObservableRxInvoker

which allow accessing Singles, Observables, and Flowables on the client side.

For example, given the resource method postThingList() above, a Flowable<List<Thing>> can be retrieved from the server by calling

@SuppressWarnings("unchecked")
@Test
public void testPostThingList() throws Exception {
   CountDownLatch latch = new CountdownLatch(1);
   FlowableRxInvoker invoker = client.target(generateURL("/post/thing/list")).request().rx(FlowableRxInvoker.class);
   Flowable<List<Thing>> flowable = (Flowable<List<Thing>>) invoker.post(Entity.entity("a", MediaType.TEXT_PLAIN_TYPE), new GenericType<List<Thing>>() {});
   flowable.subscribe(
         (List<?> l) -> thingListList.add(l),
         (Throwable t) -> latch.countDown(),
         () -> latch.countDown());
   latch.await();
   Assert.assertEquals(aThingListList, thingListList);
}

where aThingListList is

[[Thing[a], Thing[a], Thing[a]], [Thing[a], Thing[a], Thing[a]]]

Note the call to Flowable.suscribe(). On the server side, RESTEasy subscribes to a returning Flowable in order to receive its elements and send them over the wire. On the client side, the user subscribes to the Flowable in order to receive its elements and do whatever it wants to with them. In this case, three lambdas determine what should happen 1) for each element, 2) if a Throwable is thrown, and 3) when the Flowable is done passing elements.

Neither Reactive Streams nor JAX-RS have anything to say about representing reactive types on the network. RESTEasy offers a number of representations, each suitable for different circumstances. The wire protocol is determined by 1) the presence or absence of the @Stream annotation on the resource method, and 2) the value of the value field in the @Stream annotation:

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface Stream
{
   public enum MODE {RAW, GENERAL};
   public String INCLUDE_STREAMING_PARAMETER = "streaming";
   public MODE value() default MODE.GENERAL;
   public boolean includeStreaming() default false;
}

Note that MODE.GENERAL is the default value, so @Stream is equivalent to @Stream(Stream.MODE.GENERAL).

No @Stream annotation on the resource method
Resteasy will collect every value until the stream is complete, then wrap them into a java.util.List entity and send to the client.
@Stream(Stream.MODE.GENERAL)
This case uses a variant of the SSE format, modified to eliminate some restrictions inherent in SSE. (See the specification at https://html.spec.whatwg.org/multipage/server-sent-events.html for details.) In particular, 1) SSE events are meant to hold text data, represented in character set UTF-8. In the general streaming mode, certain delimiting characters in the data ('\r', '\n', and '\') are escaped so that arbitrary binary data can be transmitted. Also, 2) the SSE specification requires the client to reconnect if it gets disconnected. If the stream is finite, reconnecting will induce a repeat of the stream, so SSE is really meant for unlimited streams. In general streaming mode, the client will close, rather than automatically reconnect, at the end of the stream. It follows that this mode is suitable for finite streams.

Note. The Content-Type header in general streaming mode is set to

          applicaton/x-stream-general;"element-type=<element-type>"
                

where <element-type> is the media type of the data elements in the stream. The element media type is derived from the @Produces annotation. For example,

      @GET
      @Path("flowable/thing")
      @Stream
      @Produces("application/json")
      public Flowable<Thing> getFlowable() { ... }
                

induces the media type

          application/x-stream-general;"element-type=application/json"
                

which describes a stream of JSON elements.

@Stream(Stream.MODE.RAW)
In this case each value is written directly to the wire, without any formatting, as it becomes available. This is most useful for values that can be cut in pieces, such as strings, bytes, buffers, etc., and then re-concatenated on the client side. Note that without delimiters as in general mode, it isn't possible to reconstruct something like List<List<String>>.

Note. The Content-Type header in raw streaming mode is derived from the @Produces annotation. The @Stream annotation offers the possibility of an optional MediaType parameter called "streaming". The point is to be able to suggest that the stream of data emanating from the server is unbounded, i.e., that the client shouldn't try to read it all as a single byte array, for example. The parameter is set by explicitly setting the @Stream parameter includeStreaming() to true. For example,

   @GET
   @Path("byte/default")
   @Produces("application/octet-stream;x=y")
   @Stream(Stream.MODE.RAW)
   public Flowable<Byte> aByteDefault() {
      return Flowable.fromArray((byte) 0, (byte) 1, (byte) 2);
   }

induces the MediaType "application/octet-stream;x=y", and

   @GET
   @Path("byte/true")
   @Produces("application/octet-stream;x=y")
   @Stream(value=Stream.MODE.RAW, includeStreaming=true)
   public Flowable<Byte> aByteTrue() {
      return Flowable.fromArray((byte) 0, (byte) 1, (byte) 2);
   }

induces the MediaType "application/octet-stream;x=y;streaming=true".

Note that browsers such as Firefox and Chrome seem to be comfortable with reading unlimited streams without any additional hints.

Example 1.

@POST
@Path("post/thing/list")
@Produces(MediaType.APPLICATION_JSON)
@Stream(Stream.MODE.GENERAL)
public Flowable<List<Thing>> postThingList(String s) {
   return buildFlowableThingList(s, 2, 3);
}
...
@SuppressWarnings("unchecked")
@Test
public void testPostThingList() throws Exception {
   CountDownLatch latch = new CountdownLatch(1);
   FlowableRxInvoker invoker = client.target(generateURL("/post/thing/list")).request().rx(FlowableRxInvoker.class);
   Flowable<List<Thing>> flowable = (Flowable<List<Thing>>) invoker.post(Entity.entity("a", MediaType.TEXT_PLAIN_TYPE), new GenericType<List<Thing>>() {});
   flowable.subscribe(
         (List<?> l) -> thingListList.add(l),
         (Throwable t) -> latch.countDown(),
         () -> latch.countDown());
   latch.await();
   Assert.assertEquals(aThingListList, thingListList);
}

This is the example given previously, except that the mode in the @Stream annotation (which defaults to MODE.GENERAL) is given explicitly. In this scenario, the Flowable emits <List<Thing>> elements on the server, they are transmitted over the wire as SSE events:

data: [{"name":"a"},{"name":"a"},{"name":"a"}]
data: [{"name":"a"},{"name":"a"},{"name":"a"}]

and the FlowableRxInvoker reconstitutes a Flowable on the client side.

Example 2.

@POST
@Path("post/thing/list")
@Produces(MediaType.APPLICATION_JSON)
public Flowable<List<Thing>> postThingList(String s) {
   return buildFlowableThingList(s, 2, 3);
}
...
@Test
public void testPostThingList() throws Exception {
   Builder request = client.target(generateURL("/post/thing/list")).request();
   List<List<Thing>> list = request.post(Entity.entity("a", MediaType.TEXT_PLAIN_TYPE), new GenericType<List<List<Thing>>>() {});
   Assert.assertEquals(aThingListList, list);
}  

In this scenario, in which the resource method has no @Stream annotation, the Flowable emits stream elements which are accumulated by the server until the Flowable is done, at which point the entire JSON list is transmitted over the wire:

[[{"name":"a"},{"name":"a"},{"name":"a"}],[{"name":"a"},{"name":"a"},{"name":"a"}]]

and the list is reconstituted on the client side by an ordinary invoker.

Example 3.

@GET
@Path("get/bytes")
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Stream(Stream.MODE.RAW)
public Flowable<byte[]> getBytes() {
   return Flowable.create(
      new FlowableOnSubscribe<byte[]>() {

         @Override
         public void subscribe(FlowableEmitter<byte[]> emitter) throws Exception {
            for (int i = 0; i < 3; i++) {
               byte[] b = new byte[10];
               for (int j = 0; j < 10; j++) {
                  b[j] = (byte) (i + j);
               }
               emitter.onNext(b);
            }
            emitter.onComplete();
         }
      },
      BackpressureStrategy.BUFFER);
}
...
@Test
public void testGetBytes() throws Exception {
   Builder request = client.target(generateURL("/get/bytes")).request();
   InputStream is = request.get(InputStream.class);
   int n = is.read();
   while (n > -1) {
      System.out.print(n);
      n = is.read();
   }
}

Here, the byte arrays are written to the network as they are created by the Flowable. On the network, they are concatenated, so the client sees one stream of bytes.

Since general streaming mode and SSE share minor variants of the same wire protocol, they are, modulo the SSE restriction to character data, interchangeable. That is, an SSE client can connect to a resource method that returns a Flowable or an Observable, and a FlowableRxInvoker, for example, can connect to an SSE resource method.

Note. SSE requires a @Produces("text/event-stream") annotation, so, unlike the cases of raw and general streaming, the element media type cannot be derived from the @Produces annotation. To solve this problem, Resteasy introduces the

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface SseElementType
{
   public String value();
}

annotation, from which the element media type is derived.

Example 1.

@GET
@Path("eventStream/thing")
@Produces("text/event-stream")
@SseElementType("application/json")
public void eventStreamThing(@Context SseEventSink eventSink, @Context Sse sse) {
   new ScheduledThreadPoolExecutor(5).execute(() -> {
      try (SseEventSink sink = eventSink) {
         OutboundSseEvent.Builder  builder = sse.newEventBuilder();
         eventSink.send(builder.data(new Thing("e1")).build());
         eventSink.send(builder.data(new Thing("e2")).build());
         eventSink.send(builder.data(new Thing("e3")).build());
      }
   });
}
...
@SuppressWarnings("unchecked")
@Test
public void testFlowableToSse() throws Exception {
   CountDownLatch latch = new CountDownLatch(1);
   final AtomicInteger errors = new AtomicInteger(0);
   FlowableRxInvoker invoker = client.target(generateURL("/eventStream/thing")).request().rx(FlowableRxInvoker.class);
   Flowable<Thing> flowable = (Flowable<Thing>) invoker.get(Thing.class);
   flowable.subscribe(
      (Thing t) -> thingList.add(t),
      (Throwable t) -> errors.incrementAndGet(),
      () -> latch.countDown());
   boolean waitResult = latch.await(30, TimeUnit.SECONDS);
   Assert.assertTrue("Waiting for event to be delivered has timed out.", waitResult);
   Assert.assertEquals(0, errors.get());
   Assert.assertEquals(eThingList, thingList);
}  

Here, a FlowableRxInvoker is connecting to an SSE resource method. On the network, the data looks like

data: {"name":"e1"}
data: {"name":"e2"}
data: {"name":"e3"}

Note that the character data is suitable for an SSE resource method.

Also, note that the eventStreamThing() method in this example induces the media type

    text/event-stream;element-type="application/json"

Example 2.

@GET
@Path("flowable/thing")
@Produces("text/event-stream")
@SseElementType("application/json")
public Flowable<Thing> flowableSSE() {
   return Flowable.create(
      new FlowableOnSubscribe<Thing>() {

         @Override
         public void subscribe(FlowableEmitter<Thing> emitter) throws Exception {
            emitter.onNext(new Thing("e1"));
            emitter.onNext(new Thing("e2"));
            emitter.onNext(new Thing("e3"));
            emitter.onComplete();
         }
      },
      BackpressureStrategy.BUFFER);
}
...
@Test
public void testSseToFlowable() throws Exception {
   final CountDownLatch latch = new CountDownLatch(3);
   final AtomicInteger errors = new AtomicInteger(0);
   WebTarget target = client.target(generateURL("/flowable/thing"));
   SseEventSource msgEventSource = SseEventSource.target(target).build();
   try (SseEventSource eventSource = msgEventSource)
   {
      eventSource.register(
         event -> {thingList.add(event.readData(Thing.class, MediaType.APPLICATION_JSON_TYPE)); latch.countDown();},
         ex -> errors.incrementAndGet());
      eventSource.open();

      boolean waitResult = latch.await(30, TimeUnit.SECONDS);
      Assert.assertTrue("Waiting for event to be delivered has timed out.", waitResult);
      Assert.assertEquals(0, errors.get());
      Assert.assertEquals(eThingList, thingList);
   }
}

Here, an SSE client is connecting to a resource method that returns a Flowable. Again, the server is sending character data, which is suitable for the SSE client, and the data looks the same on the network.

Proxies, discussed in RESTEasy Proxy Framework, are a RESTEasy extension that supports a natural programming style in which generic JAX-RS invoker calls are replaced by application specific interface calls. The proxy framework is extended to include both CompletionStage and the RxJava2 types Single, Observable, and Flowable.

Example 1.

@Path("")
public interface RxCompletionStageResource {

   @GET
   @Path("get/string")
   @Produces(MediaType.TEXT_PLAIN)
   public CompletionStage<String> getString();
}

@Path("")
public class RxCompletionStageResourceImpl {

   @GET
   @Path("get/string")
   @Produces(MediaType.TEXT_PLAIN)
   public CompletionStage<String> getString() { .... }
}

public class RxCompletionStageProxyTest {

   private static ResteasyClient client;
   private static RxCompletionStageResource proxy;
   
   static {
      client = new ResteasyClientBuilder().build();
      proxy = client.target(generateURL("/")).proxy(RxCompletionStageResource.class);
   }
   
   @Test
   public void testGet() throws Exception {
      CompletionStage<String> completionStage = proxy.getString();
      Assert.assertEquals("x", completionStage.toCompletableFuture().get());
   }
}

Example 2.

public interface Rx2FlowableResource {

   @GET
   @Path("get/string")
   @Produces(MediaType.TEXT_PLAIN)
   @Stream
   public Flowable<String> getFlowable();
}

@Path("")
public class Rx2FlowableResourceImpl {

   @GET
   @Path("get/string")
   @Produces(MediaType.TEXT_PLAIN)
   @Stream
   public Flowable<String> getFlowable() { ... }
}

public class Rx2FlowableProxyTest {

   private static ResteasyClient client;
   private static Rx2FlowableResource proxy;
   
   static {
      client = new ResteasyClientBuilder().build();
      proxy = client.target(generateURL("/")).proxy(Rx2FlowableResource.class);
   }
   
   @Test
   public void testGet() throws Exception {
      Flowable<String> flowable = proxy.getFlowable();
      flowable.subscribe(
         (String o) -> stringList.add(o),
         (Throwable t) -> errors.incrementAndGet(),
         () -> latch.countDown());
      boolean waitResult = latch.await(30, TimeUnit.SECONDS);
      Assert.assertTrue("Waiting for event to be delivered has timed out.", waitResult);
      Assert.assertEquals(0, errors.get());
      Assert.assertEquals(xStringList, stringList);
   }
}

RESTEasy implements a framework that supports extensions for additional reactive classes. To understand the framework, it is necessary to understand the existing support for CompletionStage and other reactive classes.

Server side. When a resource method returns a CompletionStage, RESTEasy subscribes to it using the class org.jboss.resteasy.core.AsyncResponseConsumer.CompletionStageResponseConsumer. When the CompletionStage completes, it calls CompletionStageResponseConsumer.accept(), which sends the result back to the client.

Support for CompletionStage is built in to RESTEasy, but it's not hard to extend that support to a class like Single by providing a mechanism for transforming a Single into a CompletionStage. In module resteasy-rxjava2, that mechanism is supplied by org.jboss.resteasy.rxjava2.SingleProvider, which implements interface org.jboss.resteasy.spi.AsyncResponseProvider<Single<?>>:

public interface AsyncResponseProvider<T> {
   public CompletionStage toCompletionStage(T asyncResponse);
}

Given SingleProvider, RESTEasy can take a Single, transform it into a CompletionStage, and then use CompletionStageResponseConsumer to handle the eventual value of the Single.

Similarly, when a resource method returns a streaming reactive class like Flowable, RESTEasy subscribes to it, receives a stream of data elements, and sends them to the client. AsyncResponseConsumer has several supporting classes, each of which implements a different mode of streaming. For example, AsyncResponseConsumer.AsyncGeneralStreamingSseResponseConsumer handles general streaming and SSE streaming. Subscribing is done by calling org.reactivestreams.Publisher.subscribe(), so a mechanism is needed for turning, say, a Flowable into a Publisher. That is, an implementation of org.jboss.resteasy.spi.AsyncStreamProvider<Flowable> is called for, where AsyncStreamProvider is defined:

public interface AsyncStreamProvider<T> {
   public Publisher toAsyncStream(T asyncResponse);
}

In module resteasy-rxjava2, org.jboss.resteasy.FlowableProvider provides that mechanism for Flowable. [Actually, that's not too hard since, in rxjava2, a Flowable is a Provider.]

So, on the server side, adding support for other reactive types can be done by declaring a @Provider for the interface AsyncStreamProvider (for streams) or AsyncResponseProvider (for single values), which both have a single method to convert the new reactive type into (respectively) a Publisher (for streams) or a CompletionStage (for single values).

Client side. The JAX-RS specification version 2.1 imposes two requirements for support of reactive classes on the client side:

  1. support for CompletionStage in the form of an implementation of the interface javax.ws.rs.client.CompletionStageRxInvoker, and
  2. extensibility in the form of support for registering providers that implement
    public interface RxInvokerProvider<T extends RxInvoker> {
        public boolean isProviderFor(Class<T> clazz);
        public T getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService);
    }
    
    Once an RxInvokerProvider is registered, an RxInvoker can be requested by calling the javax.ws.rs.client.Invocation.Builder method
    public <T extends RxInvoker> T rx(Class<T> clazz);
    
    That RxInvoker can then be used for making an invocation that returns the appropriate reactive class. For example,
    FlowableRxInvoker invoker = client.target(generateURL("/get/string")).request().rx(FlowableRxInvoker.class);
    Flowable<String> flowable = (Flowable<String>) invoker.get();
    

RESTEasy provides partial support for implementing RxInvokers. For example, SingleProvider, mentioned above, also implements org.jboss.resteasy.spi.AsyncClientResponseProvider<Single<?>>, where AsyncClientResponseProvider is defined

public interface AsyncClientResponseProvider<T> {
   public T fromCompletionStage(CompletionStage<?> completionStage);
}

SingleProvider's ability to turn a CompletionStage into a Single is used in the implementation of org.jboss.resteasy.rxjava2.SingleRxInvokerImpl.

The same concept might be useful in implementing other RxInvokers. Note, though, that ObservableRxInvokerImpl and FlowableRxInvokerImpl in module resteasy-rxjava2 are each derived directly from the SSE implementation.