SCATTERED CACHE DESIGN
IdeaDistributed caches have fixed owners for each key. Operations where originator is one of the owners require less messages (and possibly less roundtrips), so let's make the originator always the owner. In case of node crash, we can retrieve the data by inspecting all nodes. To find the last-written entry in case of crashed primary owner, entries will keep write versions in metadata. These versions also determine order of writes; we don't have to use locking anymore.
- faster writes
- no locking contention
- reads always have to go to the primary (slower writes in small clusters)
- complex reconciliation during rebalance
Scope of implementation
- Scattered cache is implemented as resilient to single node failure (equivalent to 2 owners distributed cache).
- Transactional mode is not implemented yet.
- Functional commands are not fully implemented yet.
- All other features (state transfer, distributed streams, persistence...) should work.
OperationsWe need to keep tombstones with versions after entry removal. These tombstones have limited lifespan - we keep them around only until the invalidations are applied on all nodes.
The versions have to grow monotonically; therefore the version counter won't be per-entry but per segment
(as tombstone will be eventually removed, per-entry version would be lost). The version is implemented by
SimpleClusteredVersion and therefore it contains topology id.
Unlike other cache modes, entry commit does not happen in
but before replication to backup in
(see below for the detailed operation descriptions). As scattered cache synchronizes only on the data container
(instead of using locking interceptors), the value in data container can change between loading that in
EntryWrappingInterceptor and committing it. Therefore, for command
that reads previous values according to
the version seen before modification is checked against actual data-container value and if it does not match,
ConcurrentChangeException is thrown. This is caught in
RetryingEntryWrappingInterceptor and the command is retried in that case.
Single entry write (put, getAndPut, putIfAbsent, replace...)
originator == primary owner
- Primary increments version for segment
- Primary commits entry
- Primary picks one node (next member in CH) and sends backup RPC
- Backup commits entry
- Backup sends RPC response
- Primary returns after it gets the response
- Primary schedules invalidation of entry with lower versions
Selection of backup could be random, but having it ~fixed probably reduces overall memory consumption
Updating value on primary before backup finishes does not change data consistency - if backup RPC fails in distributed cache we can't know whether backup has committed the entry and so it can be published anyway.
originator != primary owner
- Origin sends sync RPC to primary owner
- Primary increments version for segment
- Primary commits entry
- Primary returns response with version (+ return value if appropriate)
- Origin commits entry
- Origin schedules invalidation of entry with lower versions
Invalidation must be scheduled by origin, because primary does not know if backup committed
Single entry read
originator == primary ownerJust local read
originator != primary ownerATM just invoke sync RPC to the primary owner
Possible improvement (not implemented yet)
- Origin locally loads entry with SKIP_CACHE_LOAD
- Origin sends sync RPC including the version to primary
- Primary compares version with it's own
- If version matches, origin gets just successful response and returns locally-loaded value
- If version does not match, value + version is sent back
- Allow reading local values only (if present) - risk of stale reads
- Store read value locally with expiration (L1 enabled) - as invalidations are broadcast anyway, there's not much overhead with that. This will still require RPC on read (unless stale reads are allowed) but not marshalling the value.
Multiple entries writes
- Increment version for primary-owned entries and commit them
- Backup these entries to next node
- Send all other entries to their primary owner
- Commit entries after successful response from primary
Entries for which this node is the primary owner won't be backed up just to the next member, but to a node that is primary owner of another entries in the multiwrite. That way some messages can be spared by merging the primary(keyA) -> backup and origin -> primary(keyB) requests.
Multiple entries readsSame as single entry reads, just merge RPCs for the same primary owners.
InvalidationsIt would be inefficient to send invalidations (key + version) one-by-one, so these are be merged and sent in batches, using
Possible improvement (not implemented yet):
The list of invalidations-to-be-sent could be updated when we get invalidation from another node, in order to reduce the overall noise.
State TransferDuring rebalance, scattered cache always uses pendinCH for both reads and writes. It does not implement four-phase rebalance as the segment state and ability to read/write on a node is tracked in
ScatteredVersionManager, we use only two-phase rebalance.
When the command traverses through interceptor stack
checks the segment state, and either retrieves the remove value (ahead of regular state transfer) or blocks
the command until the state transfer is finished (for commands which need all values - there's no need to start
a second retrieval of all values).
The state transfer of a segment has several phases:
- NOT_OWNED: this node is not primary owner, it can backup the entry, though
- BLOCKED: node has just become an owner but the old owner did not revoke segment ownership yet
- KEYS_TRANSFER: node knows what is the highest version for given segment and is requesting keys + versions (no values) from all other nodes.
- VALUES_TRANSFER: we got all keys with metadata and now store the highest version of each key
and the node storing the value in
- OWNED: all data is in
StateRequestCommand, namely it is:
CONFIRM_REVOKED_SEGMENTSthat makes sure that all old owners have adopted the new topology and won't serve furher requests according to the old one.
START_KEYS_TRANSFERthat is very similar to
START_STATE_TRANSFERbut moves only keys.
During node crash, we experience 3 topologies:
- CH_UPDATE just removing the dead member (STABLE topology)
- REBALANCE starts shuffling entries around (TRANSITORY topology)
- CH_UPDATE with final (STABLE) topology
Operations are always driven by the new primary owner of given segment.
If the segment has not changed an owner:
- Replicate all data from this segment to the next node using
InvalidateVersionsCommands with all keys in this segment to all nodes but the next member (receiving state via the push transfer)
If the segment just got a new primary owner:
- Synchronously retrieve highest version for this segment from all nodes (using
- Request all nodes to send you all keys + versions from this segment (and do that locally as well)
- Retrieve values from nodes with highest versions
- Send invalidations to all other nodes, removing the entry
Clean rebalance (after join, no data is lost)Optimization for rebalance when there's single owner with all data (previous primary) has not been implemented yet.
Partition handlingPartition becomes degraded any time it loses more than one node compared to members in last stable topology. In degraded mode, all operations are prohibited; one partition cannot have all owners in (in that case operations are allowed in distributed caches) because we don't know who is the backup owner. Having primary owner is not sufficient; the other partition may be still available and therefore we would get inconsistent/provide possibly stale data.
PersistenceAs we don't use locking for everything after
EntryWrappingInterceptorwe need another synchronization for storing an entry into cache store. We don't want to block data-container for the potentially long cache store update, and therefore
ScatteredCacheWriterInterceptorgoes into data-container (getting the lock) just to compare versions and create a
CompletableFuturethat serves as a lock that can be waited upon in non-blocking way.
- The pre- listeners may be invoked multiple times, with stale values (the command then does not update DC, and retries).
- However if command itself does not read the value, it can commit even if the value changed in between and listener will get out-of-date value.
- As ordering updates to DC is based on the versions, it is possible that some operations arrive to DC finding that a newer (according to version) update has been applied there. In that case, the operation correctly finishes, but an event for this update is not fired as we don't have the previous value, and the event that was fired for the newer update carries the value before this update.
Interface Summary Interface Description BiasManagerThis component tracks if this node can read the data stored locally despite not being an owner and which nodes will read the local data that is primary-owned by this node. BiasManager.Revocation ScatteredStateProvider ScatteredVersionManager<K>Manages versions of entries and states of segments.
Enum Summary Enum Description ScatteredVersionManager.SegmentState