Class RemoteStore<K,V>
- java.lang.Object
-
- org.infinispan.persistence.remote.RemoteStore<K,V>
-
- All Implemented Interfaces:
Lifecycle
,AdvancedCacheExpirationWriter<K,V>
,AdvancedCacheLoader<K,V>
,AdvancedCacheWriter<K,V>
,AdvancedLoadWriteStore<K,V>
,CacheLoader<K,V>
,CacheWriter<K,V>
,ExternalStore<K,V>
,FlagAffectedStore<K,V>
,SegmentedAdvancedLoadWriteStore<K,V>
@ThreadSafe public class RemoteStore<K,V> extends Object implements SegmentedAdvancedLoadWriteStore<K,V>, FlagAffectedStore<K,V>
Cache store that delegates the call to a infinispan cluster. Communication between this cache store and the remote cluster is achieved through the java HotRod client: this assures fault tolerance and smart dispatching of calls to the nodes that have the highest chance of containing the given key. This cache store supports both preloading and fetchPersistentState. Purging elements is not possible, as HotRod does not support the fetching of all remote keys (this would be a very costly operation as well). Purging takes place at the remote end (infinispan cluster).- Since:
- 4.1
- Author:
- Mircea.Markus@jboss.com
- See Also:
RemoteStoreConfiguration
, Hotrod Java Client
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.infinispan.persistence.spi.AdvancedCacheExpirationWriter
AdvancedCacheExpirationWriter.ExpirationPurgeListener<K,V>
-
Nested classes/interfaces inherited from interface org.infinispan.persistence.spi.AdvancedCacheLoader
AdvancedCacheLoader.CacheLoaderTask<K,V>, AdvancedCacheLoader.TaskContext
-
Nested classes/interfaces inherited from interface org.infinispan.persistence.spi.AdvancedCacheWriter
AdvancedCacheWriter.PurgeListener<K>
-
-
Field Summary
Fields Modifier and Type Field Description protected InitializationContext
ctx
-
Constructor Summary
Constructors Constructor Description RemoteStore()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description CompletionStage<Void>
bulkUpdate(org.reactivestreams.Publisher<MarshallableEntry<? extends K,? extends V>> publisher)
Persist all provided entries to the store in chunks, with the size of each chunk determined by the store implementation.void
clear()
Removes all the data from the storage.void
clear(IntSet segments)
Removes all the data that maps to the given segments from the storage.boolean
contains(Object key)
Returns true if the storage contains an entry associated with the given key.boolean
delete(Object key)
org.reactivestreams.Publisher<MarshallableEntry<K,V>>
entryPublisher(Predicate<? super K> filter, boolean fetchValue, boolean fetchMetadata)
Publishes all entries from this store.org.reactivestreams.Publisher<MarshallableEntry<K,V>>
entryPublisher(IntSet segments, Predicate<? super K> filter, boolean fetchValue, boolean fetchMetadata)
Publishes all entries from this store.RemoteStoreConfiguration
getConfiguration()
RemoteCache<Object,Object>
getRemoteCache()
void
init(InitializationContext ctx)
Used to initialize a cache loader.boolean
isAvailable()
MarshallableEntry<K,V>
loadEntry(Object key)
Fetches an entry from the storage.io.reactivex.Flowable<K>
publishKeys(Predicate<? super K> filter)
Publishes all the keys from this store.io.reactivex.Flowable<K>
publishKeys(IntSet segments, Predicate<? super K> filter)
Publishes all the keys that map to the given segments from this store.void
purge(Executor threadPool, AdvancedCacheWriter.PurgeListener task)
Using the thread in the pool, removed all the expired data from the persistence storage.void
setInternalCacheEntryFactory(org.infinispan.container.impl.InternalEntryFactory iceFactory)
boolean
shouldWrite(long commandFlags)
int
size()
Returns the number of elements in the store.int
size(IntSet segments)
Returns the number of elements in the store that map to the given segments that aren't expired.void
start()
Invoked on component startvoid
stop()
Invoked on component stopvoid
write(MarshallableEntry entry)
Persists the entry to the storage.-
Methods inherited from interface org.infinispan.persistence.spi.AdvancedCacheLoader
process, publishEntries
-
Methods inherited from interface org.infinispan.persistence.spi.CacheLoader
load
-
Methods inherited from interface org.infinispan.persistence.spi.CacheWriter
deleteBatch, write, writeBatch
-
Methods inherited from interface org.infinispan.persistence.spi.ExternalStore
destroy
-
Methods inherited from interface org.infinispan.persistence.spi.SegmentedAdvancedLoadWriteStore
addSegments, contains, delete, get, load, publishEntries, purge, removeSegments, write, write
-
-
-
-
Field Detail
-
ctx
protected InitializationContext ctx
-
-
Method Detail
-
init
public void init(InitializationContext ctx)
Description copied from interface:CacheLoader
Used to initialize a cache loader. Typically invoked by thePersistenceManager
when setting up cache loaders.- Specified by:
init
in interfaceCacheLoader<K,V>
- Specified by:
init
in interfaceCacheWriter<K,V>
-
start
public void start() throws PersistenceException
Description copied from interface:Lifecycle
Invoked on component start- Specified by:
start
in interfaceLifecycle
- Throws:
PersistenceException
-
stop
public void stop() throws PersistenceException
Description copied from interface:Lifecycle
Invoked on component stop- Specified by:
stop
in interfaceLifecycle
- Throws:
PersistenceException
-
isAvailable
public boolean isAvailable()
- Specified by:
isAvailable
in interfaceCacheLoader<K,V>
- Specified by:
isAvailable
in interfaceCacheWriter<K,V>
- Specified by:
isAvailable
in interfaceExternalStore<K,V>
- Returns:
- true if the writer can be connected to, otherwise false
-
loadEntry
public MarshallableEntry<K,V> loadEntry(Object key) throws PersistenceException
Description copied from interface:CacheLoader
Fetches an entry from the storage. If aMarshallableEntry
needs to be created here,InitializationContext.getMarshallableEntryFactory()
()} andInitializationContext.getByteBufferFactory()
should be used.- Specified by:
loadEntry
in interfaceCacheLoader<K,V>
- Returns:
- the entry, or null if the entry does not exist
- Throws:
PersistenceException
- in case of an error, e.g. communicating with the external storage
-
contains
public boolean contains(Object key) throws PersistenceException
Description copied from interface:CacheLoader
Returns true if the storage contains an entry associated with the given key.- Specified by:
contains
in interfaceCacheLoader<K,V>
- Throws:
PersistenceException
- in case of an error, e.g. communicating with the external storage
-
publishKeys
public io.reactivex.Flowable<K> publishKeys(Predicate<? super K> filter)
Description copied from interface:AdvancedCacheLoader
Publishes all the keys from this store. The given publisher can be used by as manySubscriber
s as desired. Keys are not retrieved until a given Subscriber requests them from theSubscription
.Stores will return only non expired keys
- Specified by:
publishKeys
in interfaceAdvancedCacheLoader<K,V>
- Parameters:
filter
- a filter - null is treated as allowing all entries- Returns:
- a publisher that will provide the keys from the store
-
publishKeys
public io.reactivex.Flowable<K> publishKeys(IntSet segments, Predicate<? super K> filter)
Description copied from interface:SegmentedAdvancedLoadWriteStore
Publishes all the keys that map to the given segments from this store. The given publisher can be used by as manySubscriber
s as desired. Keys are not retrieved until a given Subscriber requests them from theSubscription
.Stores will return only non expired keys
The segments here must be adhered to and the keys published must not include any that don't belong to the provided segments.
This method is not invoked invoked when the store is not configured to be
StoreConfiguration.segmented()
.- Specified by:
publishKeys
in interfaceSegmentedAdvancedLoadWriteStore<K,V>
- Parameters:
segments
- the segments that the keys must map to. Always non null.filter
- a filter- Returns:
- a publisher that will provide the keys from the store
-
entryPublisher
public org.reactivestreams.Publisher<MarshallableEntry<K,V>> entryPublisher(Predicate<? super K> filter, boolean fetchValue, boolean fetchMetadata)
Description copied from interface:AdvancedCacheLoader
Publishes all entries from this store. The given publisher can be used by as manySubscriber
s as desired. Entries are not retrieved until a given Subscriber requests them from theSubscription
.If fetchMetadata is true this store must guarantee to not return any expired entries.
- Specified by:
entryPublisher
in interfaceAdvancedCacheLoader<K,V>
- Parameters:
filter
- a filter - null is treated as allowing all entriesfetchValue
- whether or not to fetch the value from the persistent store. E.g. if the iteration is intended only over the key set, no point fetching the values from the persistent store as wellfetchMetadata
- whether or not to fetch the metadata from the persistent store. E.g. if the iteration is intended only ove the key set, then no point fetching the metadata from the persistent store as well- Returns:
- a publisher that will provide the entries from the store
-
entryPublisher
public org.reactivestreams.Publisher<MarshallableEntry<K,V>> entryPublisher(IntSet segments, Predicate<? super K> filter, boolean fetchValue, boolean fetchMetadata)
Description copied from interface:SegmentedAdvancedLoadWriteStore
Publishes all entries from this store. The given publisher can be used by as manySubscriber
s as desired. Entries are not retrieved until a given Subscriber requests them from theSubscription
.If fetchMetadata is true this store must guarantee to not return any expired entries.
The segments here must be adhered to and the entries published must not include any that don't belong to the provided segments.
This method is not invoked invoked when the store is not configured to be
StoreConfiguration.segmented()
.StoreConfiguration.segmented()
.- Specified by:
entryPublisher
in interfaceSegmentedAdvancedLoadWriteStore<K,V>
- Parameters:
segments
- the segments that the keys of the entries must map to. Always non null.filter
- a filter on the keys of the entries that if passed will allow the given entry to be returned from the publisherfetchValue
- whether the value should be included in the marshalled entryfetchMetadata
- whether the metadata should be included in the marshalled entry- Returns:
- a publisher that will provide the entries from the store that map to the given segments
-
size
public int size()
Description copied from interface:AdvancedCacheLoader
Returns the number of elements in the store.- Specified by:
size
in interfaceAdvancedCacheLoader<K,V>
-
size
public int size(IntSet segments)
Description copied from interface:SegmentedAdvancedLoadWriteStore
Returns the number of elements in the store that map to the given segments that aren't expired.The segments here must be adhered to and the size must not count any entries that don't belong to the provided segments.
This method is not invoked invoked when the store is not configured to be
StoreConfiguration.segmented()
.- Specified by:
size
in interfaceSegmentedAdvancedLoadWriteStore<K,V>
- Parameters:
segments
- the segments which should have their entries counted. Always non null.- Returns:
- the count of entries in the given segments
-
purge
public void purge(Executor threadPool, AdvancedCacheWriter.PurgeListener task)
Description copied from interface:AdvancedCacheWriter
Using the thread in the pool, removed all the expired data from the persistence storage. For each removed entry, the supplied listener is invoked.When this method returns all entries will be purged and no tasks will be running due to this loader in the provided executor. If however an exception is thrown there could be tasks still pending or running in the executor.
- Specified by:
purge
in interfaceAdvancedCacheExpirationWriter<K,V>
- Specified by:
purge
in interfaceAdvancedCacheWriter<K,V>
-
write
public void write(MarshallableEntry entry) throws PersistenceException
Description copied from interface:CacheWriter
Persists the entry to the storage.- Specified by:
write
in interfaceCacheWriter<K,V>
- Throws:
PersistenceException
- in case of an error, e.g. communicating with the external storage- See Also:
MarshallableEntry
-
bulkUpdate
public CompletionStage<Void> bulkUpdate(org.reactivestreams.Publisher<MarshallableEntry<? extends K,? extends V>> publisher)
Description copied from interface:CacheWriter
Persist all provided entries to the store in chunks, with the size of each chunk determined by the store implementation. If chunking is not supported by the underlying store, then entries are written to the store individually viaCacheWriter.write(MarshallableEntry)
.- Specified by:
bulkUpdate
in interfaceCacheWriter<K,V>
- Parameters:
publisher
- aPublisher
ofMarshallableEntry
instances
-
clear
public void clear() throws PersistenceException
Description copied from interface:AdvancedCacheWriter
Removes all the data from the storage.- Specified by:
clear
in interfaceAdvancedCacheWriter<K,V>
- Throws:
PersistenceException
- in case of an error, e.g. communicating with the external storage
-
clear
public void clear(IntSet segments)
Description copied from interface:SegmentedAdvancedLoadWriteStore
Removes all the data that maps to the given segments from the storage.This method must only remove entries that map to the provided segments.
This method may be invoked irrespective if the configuration is
StoreConfiguration.segmented()
or not.- Specified by:
clear
in interfaceSegmentedAdvancedLoadWriteStore<K,V>
- Parameters:
segments
- data mapping to these segments are removed. Always non null.
-
delete
public boolean delete(Object key) throws PersistenceException
- Specified by:
delete
in interfaceCacheWriter<K,V>
- Returns:
- true if the entry existed in the persistent store and it was deleted.
- Throws:
PersistenceException
- in case of an error, e.g. communicating with the external storage
-
setInternalCacheEntryFactory
public void setInternalCacheEntryFactory(org.infinispan.container.impl.InternalEntryFactory iceFactory)
-
getRemoteCache
public RemoteCache<Object,Object> getRemoteCache()
-
getConfiguration
public RemoteStoreConfiguration getConfiguration()
-
shouldWrite
public boolean shouldWrite(long commandFlags)
- Specified by:
shouldWrite
in interfaceFlagAffectedStore<K,V>
-
-