public final class MultiProducerSequencer extends AbstractSequencer
Coordinator for claiming sequences for access to a data structure while tracking dependent Sequence
s.
Suitable for use for sequencing across multiple publisher threads.
*
Note on Cursored.getCursor()
: With this sequencer the cursor value is updated after the call
to Sequenced.next()
, to determine the highest available sequence that can be read, then
Sequencer.getHighestPublishedSequence(long, long)
should be used.
Modifier and Type | Field and Description |
---|---|
private int[] |
availableBuffer |
private static long |
BASE |
private Sequence |
gatingSequenceCache |
private int |
indexMask |
private int |
indexShift |
private static long |
SCALE |
private static sun.misc.Unsafe |
UNSAFE |
bufferSize, cursor, gatingSequences, waitStrategy
INITIAL_CURSOR_VALUE
Constructor and Description |
---|
MultiProducerSequencer(int bufferSize,
WaitStrategy waitStrategy)
Construct a Sequencer with the selected wait strategy and buffer size.
|
Modifier and Type | Method and Description |
---|---|
private int |
calculateAvailabilityFlag(long sequence) |
private int |
calculateIndex(long sequence) |
void |
claim(long sequence)
Claim a specific sequence.
|
long |
getHighestPublishedSequence(long lowerBound,
long availableSequence)
Get the highest sequence number that can be safely read from the ring buffer.
|
boolean |
hasAvailableCapacity(int requiredCapacity)
Has the buffer got capacity to allocate another sequence.
|
private boolean |
hasAvailableCapacity(Sequence[] gatingSequences,
int requiredCapacity,
long cursorValue) |
private void |
initialiseAvailableBuffer() |
boolean |
isAvailable(long sequence)
Confirms if a sequence is published and the event is available for use; non-blocking.
|
long |
next()
Claim the next event in sequence for publishing.
|
long |
next(int n)
Claim the next n events in sequence for publishing.
|
void |
publish(long sequence)
Publishes a sequence.
|
void |
publish(long lo,
long hi)
Batch publish sequences.
|
long |
remainingCapacity()
Get the remaining capacity for this sequencer.
|
private void |
setAvailable(long sequence)
The below methods work on the availableBuffer flag.
|
private void |
setAvailableBufferValue(int index,
int flag) |
long |
tryNext()
Attempt to claim the next event in sequence for publishing.
|
long |
tryNext(int n)
Attempt to claim the next n events in sequence for publishing.
|
addGatingSequences, getBufferSize, getCursor, getMinimumSequence, newBarrier, newPoller, removeGatingSequence, toString
private static final sun.misc.Unsafe UNSAFE
private static final long BASE
private static final long SCALE
private final Sequence gatingSequenceCache
private final int[] availableBuffer
private final int indexMask
private final int indexShift
public MultiProducerSequencer(int bufferSize, WaitStrategy waitStrategy)
bufferSize
- the size of the buffer that this will sequence over.waitStrategy
- for those waiting on sequences.public boolean hasAvailableCapacity(int requiredCapacity)
Sequenced
requiredCapacity
- in the bufferSequenced.hasAvailableCapacity(int)
private boolean hasAvailableCapacity(Sequence[] gatingSequences, int requiredCapacity, long cursorValue)
public void claim(long sequence)
Sequencer
sequence
- The sequence to initialise too.Sequencer.claim(long)
public long next()
Sequenced
Sequenced.next()
public long next(int n)
Sequenced
int n = 10; long hi = sequencer.next(n); long lo = hi - (n - 1); for (long sequence = lo; sequence <= hi; sequence++) { // Do work. } sequencer.publish(lo, hi);
n
- the number of sequences to claimSequenced.next(int)
public long tryNext() throws InsufficientCapacityException
Sequenced
requiredCapacity
slots
available.InsufficientCapacityException
Sequenced.tryNext()
public long tryNext(int n) throws InsufficientCapacityException
Sequenced
requiredCapacity
slots
available. Have a look at Sequenced.next()
for a description on how to
use this method.n
- the number of sequences to claimInsufficientCapacityException
Sequenced.tryNext(int)
public long remainingCapacity()
Sequenced
Sequenced.remainingCapacity()
private void initialiseAvailableBuffer()
public void publish(long sequence)
Sequenced
Sequenced.publish(long)
public void publish(long lo, long hi)
Sequenced
lo
- first sequence number to publishhi
- last sequence number to publishSequenced.publish(long, long)
private void setAvailable(long sequence)
The prime reason is to avoid a shared sequence object between publisher threads. (Keeping single pointers tracking start and end would require coordination between the threads).
-- Firstly we have the constraint that the delta between the cursor and minimum gating sequence will never be larger than the buffer size (the code in next/tryNext in the Sequence takes care of that). -- Given that; take the sequence value and mask off the lower portion of the sequence as the index into the buffer (indexMask). (aka modulo operator) -- The upper portion of the sequence becomes the value to check for availability. ie: it tells us how many times around the ring buffer we've been (aka division) -- Because we can't wrap without the gating sequences moving forward (i.e. the minimum gating sequence is effectively our last available position in the buffer), when we have new data and successfully claimed a slot we can simply write over the top.
private void setAvailableBufferValue(int index, int flag)
public boolean isAvailable(long sequence)
Sequencer
sequence
- of the buffer to checkSequencer.isAvailable(long)
public long getHighestPublishedSequence(long lowerBound, long availableSequence)
Sequencer
>= nextSequence
the return value will be
nextSequence - 1
. To work correctly a consumer should pass a value that
it 1 higher than the last sequence that was successfully processed.lowerBound
- The sequence to start scanning from.availableSequence
- The sequence to scan to.nextSequence - 1
.private int calculateAvailabilityFlag(long sequence)
private int calculateIndex(long sequence)