T
- the type of entries stored in the bufferC
- the type of consumerpublic final class RingBuffer<T,C> extends Object
consumers
. The design of this ring buffer attempts to eliminate or minimize contention between the
different consumers. The ring buffer can be completely lock-free, although by default the consumers of the ring buffer use a
WaitStrategy
that blocks if they have processed all available entries and are waiting for more to be added. Conceptually, this buffer consists of a fixed-sized ring of elements; entries are added at the ring's "cursor" while multiple consumers follow behind the cursor processing each of the entries as quickly as they can. Each consumer runs in its own thread, and work toward the cursor at their own pace, independently of all other consumers. Most importantly, every consumer sees the exact same order of entries.
When the ring buffer starts out, it is empty and the cursor is at the starting position. As entries are added, the cursor travels around the ring, keeping track of its position and the position of all consumers. The cursor can never "lap" any of the consumers, and this ensures that the consumers see a consistent and ordered set of entries. Typically, consumers are fast enough that they trail relatively closely behind the cursor; plus, ring buffers are usually sized large enough so that the cursor rarely (if ever) closes on the slowest consumer. (If this does happen, consider increasing the size of the buffer or changing the consumers to process the entries more quickly, perhaps using a separate durable queue for those slow consumers.)
Consumers can be added after the ring buffer has entries, but such consumers will only see those entries that are added after the consumer has been attached to the buffer. Additionally, the ring buffer guarantees that the consumers will be called from a single thread, so consumers do not need to be concurrent or thread-safe.
Even though there is almost no locking within the ring buffer, the ring buffer uses another technique to make it as fast as possible: batching. A producer can add multiple entries, called a "batch", at once. So rather than having to check for each entry the the values that are shared among the different threads, adding entries via a batch means the shared data needs to be checked only once per batch.
The consumer threads also process batches, although most of this is hidden within the runnable that calls the
Consumer.consume(Object, long, long)
method. When ready to process an entry, this code asks for one entry and will get
as many entries that are available. All of the returned entries can then be processed without having to check any of the shared
data.
The shutdown()
method is a graceful termination that immediately prevents adding new entries and that allows all
consumer threads to continue processing all previously-added entries. When each thread has consumed all entries, the consumer's
thread will terminate and the consumer "unregistered" from the ring buffer. The method will block until all consumers have
completed and are terminated.
Once a ring buffer has been shutdown, it cannot be restarted.
Modifier and Type | Class and Description |
---|---|
static interface |
RingBuffer.ConsumerAdapter<EntryType,ConsumerType>
Adapts the
RingBuffer.ConsumerAdapter.consume(Object, Object, long, long) , RingBuffer.ConsumerAdapter.close(Object) and
RingBuffer.ConsumerAdapter.handleException(Object, Throwable, Object, long, long) methods to other methods on an unknown type. |
protected class |
RingBuffer.ConsumerRunner |
protected static class |
RingBuffer.NoOpLock |
Modifier and Type | Field and Description |
---|---|
protected AtomicBoolean |
addEntries |
protected RingBuffer.ConsumerAdapter<T,C> |
consumerAdapter |
protected Cursor |
cursor |
protected Logger |
logger |
Modifier and Type | Method and Description |
---|---|
boolean |
add(T entry)
Add to this buffer a single entry.
|
boolean |
add(T[] entries)
Add to this buffer multiple entries.
|
boolean |
addConsumer(C consumer)
Add the supplied consumer, and have it start processing entries in a separate thread.
|
boolean |
addConsumer(C consumer,
int timesToRetryUponTimeout)
Add the supplied consumer, and have it start processing entries in a separate thread.
|
protected void |
clearEntry(long position) |
protected void |
disconnect(RingBuffer.ConsumerRunner runner)
Method called by the
RingBuffer.ConsumerRunner.run() method just before the method returns and the thread terminates. |
protected int |
getBufferSize() |
protected T |
getEntry(long position) |
boolean |
hasConsumers()
Checks if there are any consumers registered.
|
boolean |
remove(C consumer)
Remove the supplied consumer, and block until it stops running and is closed and removed from this buffer.
|
void |
shutdown()
Shutdown this ring buffer by preventing any further entries, but allowing all existing entries to be processed by all
consumers.
|
protected final Cursor cursor
protected final AtomicBoolean addEntries
protected final RingBuffer.ConsumerAdapter<T,C> consumerAdapter
protected final Logger logger
public boolean add(T entry)
entry
- the entry to be added; may not be nullshutdown()
public boolean add(T[] entries)
entries
- the entries that are to be added; may not be nullshutdown()
and none of the entries
were addedprotected T getEntry(long position)
protected void clearEntry(long position)
public boolean addConsumer(C consumer)
Note that the thread will block when there are no more entries to be consumed. If the thread gets a timeout when waiting for an entry, this method will retry the wait only one time before stopping.
The consumer is automatically removed from the ring buffer when it returns false
from its
Consumer.consume(Object, long, long)
method.
consumer
- the component that will process the entries; may not be nullpublic boolean addConsumer(C consumer, int timesToRetryUponTimeout)
The consumer is automatically removed from the ring buffer when it returns false
from its
Consumer.consume(Object, long, long)
method.
consumer
- the component that will process the entries; may not be nulltimesToRetryUponTimeout
- the number of times that the thread should retry after timing out while waiting for the next
entry; retries will not be attempted if the value is less than 1IllegalStateException
- if the ring buffer has already been shutdown()
public boolean remove(C consumer)
consumer
- the consumer component to be removed entry; retries will not be attempted if the value is less than 1IllegalStateException
- if the ring buffer has already been shutdown()
protected void disconnect(RingBuffer.ConsumerRunner runner)
RingBuffer.ConsumerRunner.run()
method just before the method returns and the thread terminates. This
method invocation allows this buffer to clean up its reference to the runner.runner
- the runner that has completedprotected int getBufferSize()
public boolean hasConsumers()
true
if this buffer has any consumers, false
otherwise.public void shutdown()
Copyright © 2008–2016 JBoss, a division of Red Hat. All rights reserved.