Interface NonBlockingStore<K,V>
- Type Parameters:
K
- key value typeV
- value value type
- All Known Implementing Classes:
AbstractSchemaJdbcStore
,AsyncNonBlockingStore
,BaseJdbcStore
,DelegatingNonBlockingStore
,JdbcStringBasedStore
,NonBlockingSoftIndexFileStore
,NonBlockingStoreAdapter
,QueriesJdbcStore
,RemoteStore
,RocksDBStore
,SFSToSIFSStore
,SingleFileStore
,TableJdbcStore
The first method invoked on this store is start(InitializationContext)
, which starts the store.
Once the returned stage has completed, the store is assumed to be in working state and ready to handle operations.
Infinispan guarantees the visibility of variables written during the start method, so you do not need to
synchronize them manually unless they are mutated in the normal operations of the store itself.
After the store starts, Infinispan uses the characteristics()
method to query the characteristics of
the store. It is highly recommended that this method never change the values it returns after the
store starts because characteristics might not be cached. For more information, see NonBlockingStore.Characteristic
and its various values.
By default, this interface has only a few required methods. If you implement any of the optional methods,
ensure that you advertise the appropriate characteristic for that method so that Infinispan invokes it.
If Infinispan is instructed that a
characteristic is available and the method is not overridden, an UnsupportedOperationException
will be
thrown when trying to invoke the appropriate method. Each NonBlockingStore.Characteristic
defines what methods map to which
characteristic.
Although recommended, segmentation support in store implementations is optional. Segment parameters are provided
for all methods where segment information is required, for example load(int, Object)
and
publishEntries(IntSet, Predicate, boolean)
. If your store implementation does not support segmentation,
you can ignore these parameters. However, you should note that segmented stores allow Infinispan caches to more
efficiently perform bulk operations such as Cache.size()
or Cache.entrySet().stream()
. Segmentation
also decreases the duration of state transfers when a store with NonBlockingStore.Characteristic.BULK_READ
is present,
as well as the time required to remove data by segments. To indicate that a store implementation supports segmentation,
the NonBlockingStore.Characteristic.SEGMENTABLE
characteristic must be returned by the characteristics()
method. Store
implementations can determine if stores are configured to be segmented if StoreConfiguration.segmented()
is
enabled, which is available from the InitializationContext
.
Store implementations might need to interact with blocking APIs to perform their required operations. However, the invoking
thread must never be blocked, so Infinispan provides a BlockingManager
utility class
that handles blocking operations to ensure that they do not leak into the internal system. BlockingManager
does this
by running any blocking operations on blocking threads, while any stages continue on non-blocking threads.
This utility class provides different methods that range from equivalents for commonly used methods, such as
CompletableFuture.supplyAsync(Supplier, Executor)
, to a wrapper around a Publisher
that
ensures it is subscribed and observed on the correct threads. To obtain a BlockingManager
, invoke the
InitializationContext.getBlockingManager()
method on the provided context in the start method.
Implementations of this store must be thread safe if concurrent operations are performed on it. The one exception
is that start(InitializationContext)
and stop()
are not invoked concurrently with other operations.
Note that this interface is Experimental and its methods may change slightly over time until it has matured.
- Since:
- 11.0
- Author:
- William Burns
-
Nested Class Summary
Modifier and TypeInterfaceDescriptionstatic enum
Enumeration defining the various characteristics of the underlying store to communicate what features it may or may not support.static interface
A Publisher that provides a stream of values and the segments to which those values map. -
Field Summary
Modifier and TypeFieldDescriptionstatic final CompletableFuture
<Long> Shortcut to return -1L when the size or approximate size is unavailable. -
Method Summary
Modifier and TypeMethodDescriptiondefault CompletionStage
<Void> addSegments
(IntSet segments) Invoked when a node becomes an owner of the given segments.default CompletionStage
<Long> approximateSize
(IntSet segments) Returns an estimation of the amount of entries that map to the given segments in the store.default CompletionStage
<Void> batch
(int publisherCount, org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<Object>> removePublisher, org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K, V>>> writePublisher) Writes and removes the entries provided by the Publishers into the store.default Set
<NonBlockingStore.Characteristic> Returns a set of characteristics for this store and its elements.clear()
Clears all entries from the store.default CompletionStage
<Void> commit
(jakarta.transaction.Transaction transaction) Commit changes in the provided transaction to the underlying store.default CompletionStage
<Boolean> containsKey
(int segment, Object key) Returns a stage that will contain whether the value can be found in the store.Removes the entry for given key and segment from the store and optionally report if the entry was actually removed or not.default CompletionStage
<Void> destroy()
This method is to be invoked when the store should clean up all underlying data and storage of said data.default boolean
ignoreCommandWithFlags
(long commandFlags) Some stores may not want to perform operations based on if a command has certain flags.default CompletionStage
<Boolean> Returns a stage that, when complete, returns a boolean indicating whether the current store can be accessed for requests.Returns a stage that will contain the value loaded from the store.default CompletionStage
<Void> prepareWithModifications
(jakarta.transaction.Transaction transaction, int publisherCount, org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<Object>> removePublisher, org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K, V>>> writePublisher) Write remove and put modifications to the store in the prepare phase, which should not yet persisted until the same transaction is committed viacommit(Transaction)
or they are discarded if the transaction is rolled back viarollback(Transaction)
.default org.reactivestreams.Publisher
<MarshallableEntry<K, V>> publishEntries
(IntSet segments, Predicate<? super K> filter, boolean includeValues) Publishes entries from this store that are in one of the provided segments and also pass the provided filter.default org.reactivestreams.Publisher
<K> publishKeys
(IntSet segments, Predicate<? super K> filter) Publishes keys from this store that are in one of the provided segments and also pass the provided filter.default org.reactivestreams.Publisher
<MarshallableEntry<K, V>> Returns a Publisher that, after it is subscribed to, removes any expired entries from the store and publishes them to the returned Publisher.default CompletionStage
<Void> removeSegments
(IntSet segments) Invoked when a node loses ownership of the given segments.default CompletionStage
<Void> rollback
(jakarta.transaction.Transaction transaction) Roll back the changes from the provided transaction to the underlying store.default CompletionStage
<Long> Returns the amount of entries that map to the given segments in the store.The first method to invoke so that the store can be configured and additional steps, such as connecting through a socket or opening file descriptors, are performed.stop()
This method is invoked when the cache is being shutdown.write
(int segment, MarshallableEntry<? extends K, ? extends V> entry) Writes the entry to the store for the given segment returning a stage that completes normally when it is finished.
-
Field Details
-
SIZE_UNAVAILABLE_FUTURE
Shortcut to return -1L when the size or approximate size is unavailable.
-
-
Method Details
-
start
The first method to invoke so that the store can be configured and additional steps, such as connecting through a socket or opening file descriptors, are performed.The provided
InitializationContext
contains many helpful objects, including the configuration of the cache and store, concurrency utilities such asBlockingManager
or an executor reserved for non-blocking operations onlyInitializationContext.getNonBlockingExecutor()
.This method is guaranteed not to be invoked concurrently with other operations. This means other methods are not invoked on this store until after the returned Stage completes.
It is expected that an implementation should be able to "restart" by invoking
start
a second time ifstop()
has been invoked and allowed for its stage to complete.- Parameters:
ctx
- initialization context used to initialize this store.- Returns:
- a stage that, when complete, indicates that this store has started successfully.
-
stop
CompletionStage<Void> stop()This method is invoked when the cache is being shutdown. It is expected that all resources related to the store are freed when the returned stage is complete.This method is guaranteed not to be invoked concurrently with other operations. This means other methods are not invoked on this store until after the returned Stage completes.
It is expected that an implementation should be able to "restart" by invoking
start(InitializationContext)
a second time ifstop
has been invoked and allowed for its stage to complete.- Returns:
- a stage that, when complete, indicates that this store has stopped.
-
destroy
This method is to be invoked when the store should clean up all underlying data and storage of said data. For example a database store would remove the underlying table(s) that it is using and a file based store would remove all of the various files or directories it may have created.- Returns:
- a stage that, when complete, indicates that this store is stopped and all data and storage for it are also cleaned up
-
characteristics
Returns a set of characteristics for this store and its elements. This method may be invoked multiple times to determine which methods of the store can be used and how the data in the store can be handled.Refer to
NonBlockingStore.Characteristic
and its values for descriptions of each characteristic for stores.- Returns:
- the set of characteristics that this store supports.
-
isAvailable
Returns a stage that, when complete, returns a boolean indicating whether the current store can be accessed for requests. This can be useful for store implementations that rely on an external source, such as a remote database, that may become unreachable. This can reduce sending requests to a store that is not available, as subsequent cache requests will result in aStoreUnavailableException
being thrown until the store becomes available again.Store availability is polled periodically to update the status of stores if their availability changes. This method is not invoked concurrently with itself. In other words, this method is not invoked until after the previous stage has completed. However, this method is invoked concurrently with other operations, except for
start(InitializationContext)
andstop()
.If a store is configured to be
StoreConfiguration.async()
and the store becomes unavailable, then it is possible for the cache operations to be accepted in the interim period between the loss of availability and the modification-queue becoming full. This allows for this store to be unavailable for short periods of time without aStoreUnavailableException
being thrown; however if the store does not become available before the queue fills, then aStoreUnavailableException
is thrown.- Returns:
- stage that, when complete, indicates if the store is available.
-
load
Returns a stage that will contain the value loaded from the store. If aMarshallableEntry
needs to be created here,InitializationContext.getMarshallableEntryFactory()
andInitializationContext.getByteBufferFactory()
should be used.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.WRITE_ONLY
This method will never be invoked. NonBlockingStore.Characteristic.EXPIRATION
When set this method must not return expired entries. NonBlockingStore.Characteristic.SEGMENTABLE
When this is not set or segmentation is disabled in the configuration
, thesegment
parameter may be ignored.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Parameters:
segment
- the segment for the given key if segmentation is enabled, otherwise 0.key
- key of the entry to load.- Returns:
- a stage that, when complete, contains the store value or null if not present.
-
containsKey
Returns a stage that will contain whether the value can be found in the store.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.WRITE_ONLY
This method will never be invoked. NonBlockingStore.Characteristic.EXPIRATION
When set this method must not return true if the entry was expired. NonBlockingStore.Characteristic.SEGMENTABLE
When this is not set or segmentation is disabled in the configuration
, thesegment
parameter may be ignored.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Parameters:
segment
- the segment for the given key if segmentation is enabled, otherwise 0.key
- key of the entry to check.- Returns:
- a stage that, when complete, contains a boolean stating if the value is contained in the store.
-
write
Writes the entry to the store for the given segment returning a stage that completes normally when it is finished.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.READ_ONLY
This method will never be invoked. NonBlockingStore.Characteristic.EXPIRATION
When set, this method must store the expiration metadata. NonBlockingStore.Characteristic.SEGMENTABLE
When set and segmentation is not disabled in the configuration
, this method must ensure the segment is stored with the entry.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Parameters:
segment
- the segment for the given key if segmentation is enabled, otherwise 0.entry
- the entry to persist to the store.- Returns:
- a stage that when complete indicates that the store has written the value.
-
delete
Removes the entry for given key and segment from the store and optionally report if the entry was actually removed or not.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.READ_ONLY
This method will never be invoked. NonBlockingStore.Characteristic.SEGMENTABLE
When this is not set or segmentation is disabled in the configuration
, thesegment
parameter may be ignored.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Parameters:
segment
- the segment for the given key if segmentation is enabled, otherwise 0.key
- key of the entry to delete from the store.- Returns:
- a stage that completes with
TRUE
if the key existed in the store,FALSE
if the key did not exist in the store, ornull
if the store does not report this information.
-
addSegments
Invoked when a node becomes an owner of the given segments. Some store implementations may require initializing additional resources when a new segment is required. For example a store could store entries in a different file per segment.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.SHAREABLE
If the store has this characteristic and is configured to be StoreConfiguration.shared()
, this method will never be invoked.NonBlockingStore.Characteristic.SEGMENTABLE
This method is invoked only if the store has this characteristic and is configured to be segmented
.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Parameters:
segments
- the segments to add.- Returns:
- a stage that, when complete, indicates that the segments have been added.
-
removeSegments
Invoked when a node loses ownership of the given segments. A store must then remove any entries that map to the given segments and can remove any resources related to the given segments. For example, a database store can delete rows of the given segment or a file-based store can delete files related to the given segments.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.SHAREABLE
If the store has this characteristic and is configured to be shared
, this method will never be invoked.NonBlockingStore.Characteristic.SEGMENTABLE
This method is invoked only if the store has this characteristic and is configured to be segmented
.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Parameters:
segments
- the segments to remove.- Returns:
- a stage that, when complete, indicates that the segments have been removed.
-
clear
CompletionStage<Void> clear()Clears all entries from the store.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.READ_ONLY
This method will never be invoked. If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Returns:
- a stage that, when complete, indicates that the store has been cleared.
-
batch
default CompletionStage<Void> batch(int publisherCount, org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<Object>> removePublisher, org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K, V>>> writePublisher) Writes and removes the entries provided by the Publishers into the store. Both are provided in the same method so that a batch may be performed as a single atomic operation if desired, although it is up to the store to manage its batching. If needed a store may generate batches of a configured size by using theStoreConfiguration.maxBatchSize()
setting.Each of the
Publisher
s may publish up topublisherCount
publishers where each publisher is separated by the segment each entry maps to. Failure to request at leastpublisherCount
publishers from the Publisher may cause a deadlock. Many reactive tools have methods such asflatMap
that take an argument of how many concurrent subscriptions it manages, which is perfectly matched with this argument.WARNING: For performance reasons neither Publisher will emit any
NonBlockingStore.SegmentedPublisher
s until both write and remove Publishers are subscribed to. These Publishers should also be only subscribed once.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.READ_ONLY
This method will never be invoked. NonBlockingStore.Characteristic.SEGMENTABLE
If not set or segmentation is disabled in the configuration
, thepublisherCount
parameter has a value of 1, which means there is only be oneSegmentedPublisher
to subscribe to.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Parameters:
publisherCount
- the maximum number ofSegmentPublisher
s either publisher will publishremovePublisher
- publishes what keys should be removed from the storewritePublisher
- publishes the entries to write to the store- Returns:
- a stage that when complete signals that the store has written the values
-
size
Returns the amount of entries that map to the given segments in the store.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.BULK_READ
This method is only invoked if the store has this characteristic. NonBlockingStore.Characteristic.SEGMENTABLE
When this is not set or segmentation is disabled in the configuration
, thesegments
parameter may be ignored.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Parameters:
segments
- the segments for which the entries are counted.- Returns:
- a stage that, when complete, contains the count of how many entries are present for the given segments.
-
approximateSize
Returns an estimation of the amount of entries that map to the given segments in the store. This is similar tosize(IntSet)
except that it is not strict about the returned size. For instance, this method might ignore if an entry is expired or if the store has some underlying optimizations to eventually have a consistent size.The implementations should be O(1). If a size approximation cannot be returned without iterating over all the entries in the store, the implementation should return
-1L
.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.BULK_READ
This method is only invoked if the store has this characteristic. NonBlockingStore.Characteristic.SEGMENTABLE
When the store does not have this characteristic or segmentation is disabled in the configuration
, thesegment
parameter is alwaysIntSets.immutableRangeSet(numSegments)
.If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Parameters:
segments
- the segments for which the entries are counted.- Returns:
- a stage that, when complete, contains the approximate count of the entries in the given segments,
or
-1L
if an approximate count cannot be provided.
-
publishEntries
default org.reactivestreams.Publisher<MarshallableEntry<K,V>> publishEntries(IntSet segments, Predicate<? super K> filter, boolean includeValues) Publishes entries from this store that are in one of the provided segments and also pass the provided filter. The returned publisher must support being subscribed to any number of times. That is subsequent invocations ofPublisher.subscribe(Subscriber)
should provide independent views of the underlying entries to the Subscribers. Entries should not be retrieved until a given Subscriber requests them via theSubscription.request(long)
method.Subscribing to the returned
Publisher
should not block the invoking thread. It is the responsibility of the store implementation to ensure this occurs. If however the store must block to perform an operation it is recommended to wrap your Publisher before returning with theBlockingManager.blockingPublisher(Publisher)
method and it will handle subscription and observation on the blocking and non-blocking executors respectively.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.BULK_READ
This method is only invoked if the store has this characteristic. NonBlockingStore.Characteristic.EXPIRATION
When set the returned publisher must not return expired entries. NonBlockingStore.Characteristic.SEGMENTABLE
When this is not set or segmentation is disabled in the configuration
, thesegment
parameter may be ignored.- Parameters:
segments
- a set of segments to filter entries by. This will always be non-null.filter
- a filter to filter the keys by. If this is null then no additional filtering should be done after segments.- Returns:
- a publisher that provides the keys from the store.
-
publishKeys
Publishes keys from this store that are in one of the provided segments and also pass the provided filter. The returned publisher must support being subscribed to any number of times. That is subsequent invocations ofPublisher.subscribe(Subscriber)
should provide independent views of the underlying keys to the Subscribers. Keys should not be retrieved until a given Subscriber requests them via theSubscription.request(long)
method.Subscribing to the returned
Publisher
should not block the invoking thread. It is the responsibility of the store implementation to ensure this occurs. If however the store must block to perform an operation it is recommended to wrap your Publisher before returning with theBlockingManager.blockingPublisher(Publisher)
method and it will handle subscription and observation on the blocking and non-blocking executors respectively.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.BULK_READ
This method is only invoked if the store has this characteristic. NonBlockingStore.Characteristic.EXPIRATION
When set the returned publisher must not return expired keys. NonBlockingStore.Characteristic.SEGMENTABLE
When this is not set or segmentation is disabled in the configuration
, thesegment
parameter may be ignored.- Parameters:
segments
- a set of segments to filter keys by. This will always be non-null.filter
- a filter to filter the keys by. If this is null then no additional filtering should be done after segments.- Returns:
- a publisher that provides the keys from the store.
-
purgeExpired
Returns a Publisher that, after it is subscribed to, removes any expired entries from the store and publishes them to the returned Publisher.When the Publisher is subscribed to, it is expected to do point-in-time expiration and should not return a Publisher that has infinite entries or never completes.
Subscribing to the returned
Publisher
should not block the invoking thread. It is the responsibility of the store implementation to ensure this occurs. If however the store must block to perform an operation it is recommended to wrap your Publisher before returning with theBlockingManager.blockingPublisher(Publisher)
method and it will handle subscription and observation on the blocking and non-blocking executors respectively.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.EXPIRATION
This method is only invoked if the store has this characteristic. If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Returns:
- a Publisher that publishes the entries that are expired at the time of subscription.
-
prepareWithModifications
default CompletionStage<Void> prepareWithModifications(jakarta.transaction.Transaction transaction, int publisherCount, org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<Object>> removePublisher, org.reactivestreams.Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K, V>>> writePublisher) Write remove and put modifications to the store in the prepare phase, which should not yet persisted until the same transaction is committed viacommit(Transaction)
or they are discarded if the transaction is rolled back viarollback(Transaction)
.Each of the
Publisher
s may publish up topublisherCount
publishers where each publisher is separated by the segment each entry maps to. Failure to request at leastpublisherCount
publishers from the Publisher may cause a deadlock. Many reactive tools have methods such asflatMap
that take an argument of how many concurrent subscriptions it manages, which is perfectly matched with this argument.WARNING: For performance reasons neither Publisher will emit any
NonBlockingStore.SegmentedPublisher
s until both write and remove Publishers are subscribed to. These Publishers should also be only subscribed once.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.TRANSACTIONAL
This method is invoked only if the store has this characteristic. If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Parameters:
transaction
- the current transactional context.publisherCount
- the maximum number ofSegmentPublisher
s either publisher will publishremovePublisher
- publishes what keys should be removed from the storewritePublisher
- publishes the entries to write to the store- Returns:
- a stage that when complete signals that the store has written the values
-
commit
Commit changes in the provided transaction to the underlying store.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.TRANSACTIONAL
This method is invoked only if the store has this characteristic. If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Parameters:
transaction
- the current transactional context.- Returns:
- a stage that, when completed, indicates that the transaction was committed.
-
rollback
Roll back the changes from the provided transaction to the underlying store.Summary of Characteristics Effects
Characteristic Effect NonBlockingStore.Characteristic.TRANSACTIONAL
This method is invoked only if the store has this characteristic. If a problem is encountered, it is recommended to wrap any created/caught Throwable in a
PersistenceException
and the stage be completed exceptionally.- Parameters:
transaction
- the current transactional context.- Returns:
- a stage that, when completed, indicates that the transaction was rolled back.
-
ignoreCommandWithFlags
Some stores may not want to perform operations based on if a command has certain flags. This method is currently only used for testing single write operations. This method may be removed at any time as it is experimental, it is not recommended for end users to implement it.- Parameters:
commandFlags
- the flags attributed to the command when performing the operation.- Returns:
- whether the operation should occur.
-