Class LocalPublisherManagerImpl<K,​V>

  • All Implemented Interfaces:
    LocalPublisherManager<K,​V>

    public class LocalPublisherManagerImpl<K,​V>
    extends Object
    implements LocalPublisherManager<K,​V>
    LocalPublisherManager that publishes entries from the local node only. This class handles suspecting segments if they are lost while still processing them. The notification of the segments being lost is done by invoking the segmentsLost(IntSet) method.
    Since:
    10.0
    Author:
    wburns
    • Field Detail

      • nonBlockingScheduler

        protected io.reactivex.rxjava3.core.Scheduler nonBlockingScheduler
      • maxSegment

        protected int maxSegment
      • cpuCount

        protected final int cpuCount
    • Constructor Detail

      • LocalPublisherManagerImpl

        public LocalPublisherManagerImpl()
    • Method Detail

      • inject

        public void inject​(ExecutorService nonBlockingExecutor)
        Injects the cache - unfortunately this cannot be in start. Tests will rewire certain components which will in turn reinject the cache, but they won't call the start method! If the latter is fixed we can add this to start method and add @Inject to the variable.
      • start

        public void start()
      • stop

        public void stop()
      • entryReduction

        public <R> CompletionStage<org.infinispan.reactive.publisher.impl.commands.reduction.PublisherResult<R>> entryReduction​(boolean parallelPublisher,
                                                                                                                                IntSet segments,
                                                                                                                                Set<K> keysToInclude,
                                                                                                                                Set<K> keysToExclude,
                                                                                                                                long explicitFlags,
                                                                                                                                DeliveryGuarantee deliveryGuarantee,
                                                                                                                                Function<? super org.reactivestreams.Publisher<CacheEntry<K,​V>>,​? extends CompletionStage<R>> collator,
                                                                                                                                Function<? super org.reactivestreams.Publisher<R>,​? extends CompletionStage<R>> finalizer)
        Description copied from interface: LocalPublisherManager
        Performs the given transformer and finalizer on data in the cache that is local, resulting in a single value. Depending on the deliveryGuarantee the transformer may be invoked 1..numSegments times. It could be that the transformer is invoked for every segment and produces a result. All of these results are then fed into the finalizer to produce a final result. If publisher is parallel the finalizer will be invoked on each node to ensure there is only a single result per node.

        The effects of the provided deliveryGuarantee are as follows:

        GuaranteeParallelBehavior>
        AT_MOST_ONCE TRUEEach segment is a publisher passed to the transformer individually. Each result of the transformer is supplied to the finalizer. All segments are always complete, ignoring loss of data
        AT_MOST_ONCE FALSEA single publisher for all segments is created and passed to the transformer. That result is returned, finalizer is never used All segments are always complete, ignoring loss of data
        AT_LEAST_ONCE TRUESame as AT_MOST_ONCE, but if a segment is lost in the middle it is returned as a suspected segment always returning all values
        AT_LEAST_ONCE FALSESame as AT_MOST_ONCE, but if a segment is lost in the middle it is returned as a suspected segment always returning all values
        EXACTLY_ONCE TRUEEach segment is a publisher passed to the transformer individually. Each result is only accepted if the segment was owned the entire duration of the Subscription.
        EXACTLY_ONCE FALSESame as EXACTLY_ONCE/TRUE, except the publishers are consumed one at a time.
        Specified by:
        entryReduction in interface LocalPublisherManager<K,​V>
        Type Parameters:
        R - return value type
        Parameters:
        parallelPublisher - Whether the publisher should be parallelized
        segments - determines what entries should be evaluated by only using ones that map to the given segments (must not be null)
        keysToInclude - set of keys that should only be used. May be null, in which case all provided entries for the given segments will be evaluated
        keysToExclude - set of keys that should not be used. May be null, in which case all provided entries will be evaluated
        explicitFlags - cache flags
        deliveryGuarantee - delivery guarantee for given entries
        collator - reduces the given publisher of data eventually into a single value. Must not be null.
        finalizer - reduces all of the single values produced by the transformer or this finalizer into one final value. May be null if not parallel
        Returns:
        CompletionStage that contains the resulting value when complete
      • entryPublisher

        public <R> SegmentAwarePublisherSupplier<R> entryPublisher​(IntSet segments,
                                                                   Set<K> keysToInclude,
                                                                   Set<K> keysToExclude,
                                                                   long explicitFlags,
                                                                   DeliveryGuarantee deliveryGuarantee,
                                                                   Function<? super org.reactivestreams.Publisher<CacheEntry<K,​V>>,​? extends org.reactivestreams.Publisher<R>> transformer)
        Description copied from interface: LocalPublisherManager
        Performs the given transformer on data in the cache that is local, resulting in a stream of values of possibly varying size. The transformer will be invoked numSegments times. The table below shows the behavior for the various delivery guarantees.

        The effects of the provided deliveryGuarantee are as follows:

        GuaranteeBehavior>
        AT_MOST_ONCE For each segment a publisher passed to the transformer sequentially. All segments are always complete, ignoring loss of data
        AT_LEAST_ONCE Same as AT_MOST_ONCE, but if a segment is lost in the middle it is returned as a suspected segment possibly dropping values in that segment.
        EXACTLY_ONCE Same as AT_LEAST_ONCE except whenever as segment is lost the value(s) collected in the same response for that segment are always dropped.

        The returned publisher supplier method SegmentAwarePublisherSupplier.publisherWithLostSegments() will guarantee that all entries from a given segment are returned first proceeded by a segment lost or completed notification. This publisher will not intermingle entries from different segment together. This guarantee should allow for callers to be able to optimize knowing this since segments can be completed quicker and fewer entries should have to be retained in memory.

        Specified by:
        entryPublisher in interface LocalPublisherManager<K,​V>
        Type Parameters:
        R - return value type
        Parameters:
        segments - determines what entries should be evaluated by only using ones that map to the given segments (must not be null)
        keysToInclude - set of keys that should only be used. May be null, in which case all provided entries for the given segments will be evaluated
        keysToExclude - set of keys that should not be used. May be null, in which case all provided entries will be evaluated
        explicitFlags - cache flags
        deliveryGuarantee - delivery guarantee for given entries
        transformer - transforms the values to another value (0 to many). Must not be null.
        Returns:
        SegmentAwarePublisher that will publish the values when subscribed to along with segment completions and losses
      • segmentsLost

        public void segmentsLost​(IntSet lostSegments)
        Description copied from interface: LocalPublisherManager
        Method to invoke when a set of segments are being removed from this node. This way operations can be aware of possible data loss while processing.
        Specified by:
        segmentsLost in interface LocalPublisherManager<K,​V>
        Parameters:
        lostSegments - the segments that are being removed from this node
      • exactlyOnceHandleLostSegments

        protected <R> CompletionStage<org.infinispan.reactive.publisher.impl.commands.reduction.PublisherResult<R>> exactlyOnceHandleLostSegments​(CompletionStage<R> finalValue,
                                                                                                                                                  org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl.SegmentListener listener)
      • exactlyOnceParallel

        protected <I,​R> io.reactivex.rxjava3.core.Flowable<R> exactlyOnceParallel​(CacheSet<I> set,
                                                                                        Set<K> keysToExclude,
                                                                                        Function<I,​K> toKeyFunction,
                                                                                        IntSet segments,
                                                                                        Function<? super org.reactivestreams.Publisher<I>,​? extends CompletionStage<R>> collator,
                                                                                        org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl.SegmentListener listener,
                                                                                        IntSet concurrentSegments)
        This method iteratively submits a task to operate on the cpu bound thread pool up to the number of cores - 1. The tasks perform a type of work stealing where they attempt to retrieve the next available segment and process them as fast as possible. It is entirely possible that a given task is never submitted due to the other tasks completing all the segments asynchronously. After the main thread has submitted all the tasks it will attempt to steal a segment and run it if possible and if it can will subsequently attempt to complete all remaining segments in the same fashion as the other threads. NOTE that this behavior is not normally how reactive streams are done as given operations are not normally performed until the returned Flowable is subscribed to, but for performance reasons this method eagerly publishes entries. This is because we do not have to context switch an additional thread and we know that it is subscribed to immediately after.

        The results of each segment data will then be published each as a single result in the returned Flowable. Due to the results being retrieved eagerly it is entirely possible that if the Subscriber of the Flowable is slow that that results queue up. But due to that the fact that results are reduced to single values for each segment this shouldn't become an issue.

        Type Parameters:
        I - input type of the data
        R - resulting value
        Parameters:
        set - CacheSet to retrieve the publisher for (non-nullable)
        keysToExclude - whether given keys should be excluded from the processing (nullable)
        toKeyFunction - function to convert an entry to a key to determine if it is excluded (must be non null if keysToExclude is)
        segments - the segments to process results for (non-nullable)
        collator - reducer to collate all the entries for a given segment into a single result (non-nullable)
        listener - listener that handles segments being lost and determining what results should be discarded (non-nullable)
        concurrentSegments - segments map of semgnets left to complete. remove an entry when a segment is completed to prevent a data rehash causing a retry for the given segment
        Returns:
        Flowable that publishes a result for each segment
      • exactlyOnceSequential

        protected <I,​R> io.reactivex.rxjava3.core.Flowable<R> exactlyOnceSequential​(CacheSet<I> set,
                                                                                          Set<K> keysToExclude,
                                                                                          Function<I,​K> toKeyFunction,
                                                                                          IntSet segments,
                                                                                          Function<? super org.reactivestreams.Publisher<I>,​? extends CompletionStage<R>> collator,
                                                                                          org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl.SegmentListener listener,
                                                                                          IntSet concurrentSegments)
      • handleLostSegments

        protected <R> CompletionStage<org.infinispan.reactive.publisher.impl.commands.reduction.PublisherResult<R>> handleLostSegments​(CompletionStage<R> stage,
                                                                                                                                       org.infinispan.reactive.publisher.impl.LocalPublisherManagerImpl.SegmentListener segmentListener)