org.apache.kafka.clients.producer.internals
Class RecordAccumulator

java.lang.Object
  extended by org.apache.kafka.clients.producer.internals.RecordAccumulator

public final class RecordAccumulator
extends java.lang.Object

This class acts as a queue that accumulates records into MemoryRecords instances to be sent to the server.

The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.


Constructor Summary
RecordAccumulator(int batchSize, long totalSize, long lingerMs, long retryBackoffMs, boolean blockOnBufferFull, Metrics metrics, Time time)
          Create a new record accumulator
 
Method Summary
 FutureRecordMetadata append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback)
          Add a record to the accumulator.
 void close()
          Close this accumulator and force all the record buffers to be drained
 void deallocate(RecordBatch batch)
          Deallocate the record batch
 java.util.List<RecordBatch> drain(java.util.List<TopicPartition> partitions, int maxSize, long now)
          Drain all the data for the given topic-partitions that will fit within the specified size.
 boolean hasUnsent()
           
 java.util.List<TopicPartition> ready(long now)
          Get a list of topic-partitions which are ready to be sent.
 void reenqueue(RecordBatch batch, long now)
          Re-enqueue the given record batch in the accumulator to retry
 
Methods inherited from class java.lang.Object
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

RecordAccumulator

public RecordAccumulator(int batchSize,
                         long totalSize,
                         long lingerMs,
                         long retryBackoffMs,
                         boolean blockOnBufferFull,
                         Metrics metrics,
                         Time time)
Create a new record accumulator

Parameters:
batchSize - The size to use when allocating MemoryRecords instances
totalSize - The maximum memory the record accumulator can use.
lingerMs - An artificial delay time to add before declaring a records instance that isn't full ready for sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some latency for potentially better throughput due to more batching (and hence fewer, larger requests).
retryBackoffMs - An artificial delay time to retry the produce request upon receiving an error. This avoids exhausting all retries in a short period of time.
blockOnBufferFull - If true block when we are out of memory; if false throw an exception when we are out of memory
metrics - The metrics
time - The time instance to use
Method Detail

append

public FutureRecordMetadata append(TopicPartition tp,
                                   byte[] key,
                                   byte[] value,
                                   CompressionType compression,
                                   Callback callback)
                            throws java.lang.InterruptedException
Add a record to the accumulator.

This method will block if sufficient memory isn't available for the record unless blocking has been disabled.

Parameters:
tp - The topic/partition to which this record is being sent
key - The key for the record
value - The value for the record
compression - The compression codec for the record
callback - The user-supplied callback to execute when the request is complete
Throws:
java.lang.InterruptedException

reenqueue

public void reenqueue(RecordBatch batch,
                      long now)
Re-enqueue the given record batch in the accumulator to retry


ready

public java.util.List<TopicPartition> ready(long now)
Get a list of topic-partitions which are ready to be sent.

A partition is ready if ANY of the following are true:

  1. The record set is full
  2. The record set has sat in the accumulator for at least lingerMs milliseconds
  3. The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are immediately considered ready).
  4. The accumulator has been closed


hasUnsent

public boolean hasUnsent()
Returns:
Whether there is any unsent record in the accumulator.

drain

public java.util.List<RecordBatch> drain(java.util.List<TopicPartition> partitions,
                                         int maxSize,
                                         long now)
Drain all the data for the given topic-partitions that will fit within the specified size. This method attempts to avoid choosing the same topic-partitions over and over.

Parameters:
partitions - The list of partitions to drain
maxSize - The maximum number of bytes to drain
now - The current unix time
Returns:
A list of RecordBatch for partitions specified with total size less than the requested maxSize. TODO: There may be a starvation issue due to iteration order

deallocate

public void deallocate(RecordBatch batch)
Deallocate the record batch


close

public void close()
Close this accumulator and force all the record buffers to be drained