Class AbstractAsyncPublisherHandler<Target,Output,InitialResponse,NextResponse>
- All Implemented Interfaces:
io.reactivex.rxjava3.functions.Action
,io.reactivex.rxjava3.functions.LongConsumer
- Direct Known Subclasses:
QuerySubscription
Subscription.request(long)
the handler will keep
track of the outstanding count and invoke either sendInitialCommand(Object, int)
or
sendNextCommand(Object, int)
depending upon if it is the first request or subsequent.
This handler guarantees that there is only one outstanding request and that if the request amount is larger than the
batch, it will continue to send new commands one at a time until the request amount has been satisfied.
The handler processes each target via the provided Supplier
one by one, exhausting all values from the
target, until there are no more targets left or the publishing has been cancelled by the subscriber.
When a command returns successfully either the handleInitialResponse(Object, Object)
or
handleNextResponse(Object, Object)
will be invoked with the response object. The returned entries can then
be emitted by invoking the onNext(Object)
method for each value. Note that the return of onNext
should be checked in case if the Subscriber
has cancelled the publishing of values in the middle.
The publisher will continue to send requests to the initialTarget provided in the constructor. When that
target no longer has any more entries to retrieve the implementation should invoke targetComplete()
to signal
it is complete. After this is invoked the supplier provided in the constructor will be invoked to have the
next target to send to. This will repeat until the supplier returns null which is the signal to this
class that there are no more entries left to retrieve.
A command may also encounter a Throwable and it is possible to customize what happens by implementing the
handleThrowableInResponse(Throwable, Object)
method. For example you may want to skip the given
The user is required to supply a maxBatchSize argument to the constructor. This setting will ensure that
this handler will never have more than this amount of entries enqueued at once. However, we may request less than
this batch size from the underlying target(s).
Each request is provided a batchSize
argument and the underlying resource should adhere to,
failure to do so may cause an OutOfMemoryError
, since entries are only emitted to the Subscriber based on the
requested amount, and any additional are enqueued.
This handler also supports Subscription.cancel()
by extending the sendCancel(Object)
command and
the underlying service must be cancelled in an asynchronous and non blocking fashion. Once cancelled this Publisher
will not publish additional values or requests.
Note this handler only supports a single Subscriber for the returned Publisher from startPublisher()
. Failure
to do so can cause multiple requests and unexpected problems.
-
Field Summary
-
Constructor Summary
ModifierConstructorDescriptionprotected
AbstractAsyncPublisherHandler
(int maxBatchSize, Supplier<Target> supplier, Target firstTarget) -
Method Summary
Modifier and TypeMethodDescriptionvoid
accept
(long count) This method is invoked every time a new request is sent to the underlying publisher.protected boolean
This method returns whether this subscription has been cancelledprotected abstract long
handleInitialResponse
(InitialResponse response, Target target) protected abstract long
handleNextResponse
(NextResponse response, Target target) protected void
handleThrowableInResponse
(Throwable t, Target target) Allows any implementor to handle what happens when a Throwable is encountered.protected boolean
Method that should be called for each emitted output value.void
run()
This is invoked when the Subscription is cancelledprotected abstract void
sendCancel
(Target target) protected abstract CompletionStage
<InitialResponse> sendInitialCommand
(Target target, int batchSize) protected abstract CompletionStage
<NextResponse> sendNextCommand
(Target target, int batchSize) org.reactivestreams.Publisher
<Output> protected void
Method to invoke when a given target is found to have been completed and the next target should be used
-
Field Details
-
log
protected static final org.infinispan.commons.logging.Log log -
batchSize
protected final int batchSize -
supplier
-
-
Constructor Details
-
AbstractAsyncPublisherHandler
-
-
Method Details
-
startPublisher
-
run
public void run()This is invoked when the Subscription is cancelled- Specified by:
run
in interfaceio.reactivex.rxjava3.functions.Action
-
sendCancel
-
sendInitialCommand
protected abstract CompletionStage<InitialResponse> sendInitialCommand(Target target, int batchSize) -
sendNextCommand
-
handleInitialResponse
-
handleNextResponse
-
handleThrowableInResponse
Allows any implementor to handle what happens when a Throwable is encountered. By default the returned publisher invokesSubscriber.onError(Throwable)
and stops processing. It is possible to ignore the throwable and continue processing by invokingaccept(long)
with a value of 0. It may also be required to reset thecurrentTarget
so it is initialized to the next supplied value.- Parameters:
t
- throwable that was encounteredtarget
- the target which was invoked that caused the throwable
-
accept
public void accept(long count) This method is invoked every time a new request is sent to the underlying publisher. We need to submit a request if there is not a pending one. Whenever requestedAmount is a number greater than 0, that means we must submit or there is a pending one.- Specified by:
accept
in interfaceio.reactivex.rxjava3.functions.LongConsumer
- Parameters:
count
- request count
-
onNext
Method that should be called for each emitted output value. The returned boolean is whether the handler should continue publishing values or not.- Parameters:
value
- value emit to the publisher- Returns:
- whether to continue emitting values
-
targetComplete
protected void targetComplete()Method to invoke when a given target is found to have been completed and the next target should be used -
checkCancelled
protected boolean checkCancelled()This method returns whether this subscription has been cancelled
-