public class KafkaTopicOffsetManager extends AbstractOffsetManager implements org.springframework.beans.factory.InitializingBean
OffsetManager that uses a Kafka topic as the underlying support.
For its proper functioning, the Kafka server(s) must set log.cleaner.enable=true. It relies on the property
cleanup.policy=compact to be set on the target topic, and if the topic is not found,
it will create a topic with the appropriate settings.| Modifier and Type | Class and Description |
|---|---|
static class |
KafkaTopicOffsetManager.Key
Wraps the partition and consumer information and will be used as a key on the Kafka topic
|
static class |
KafkaTopicOffsetManager.KeySerializerDecoder |
| Modifier and Type | Field and Description |
|---|---|
static java.lang.String |
CLEANUP_POLICY |
static java.lang.String |
CLEANUP_POLICY_COMPACT |
static java.lang.String |
DELETE_RETENTION |
static java.lang.String |
SEGMENT_BYTES |
connectionFactory, consumerId, highestUpdatedOffsets, initialOffsets, log, referenceTimestamp| Constructor and Description |
|---|
KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect,
java.lang.String topic) |
KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect,
java.lang.String topic,
java.util.Map<Partition,java.lang.Long> initialOffsets) |
| Modifier and Type | Method and Description |
|---|---|
void |
afterPropertiesSet() |
void |
close() |
protected java.lang.Long |
doGetOffset(Partition partition) |
protected void |
doRemoveOffset(Partition partition) |
protected void |
doUpdateOffset(Partition partition,
long offset) |
void |
flush() |
void |
setBatchBytes(int batchBytes)
The maximum batch size in bytes for offset writes
|
void |
setCompressionCodec(ProducerMetadata.CompressionType compressionType)
The compression type for writing to the offset topic
|
void |
setMaxQueueBufferingTime(int maxQueueBufferingTime)
For how long will producers buffer data before writing to the topic
|
void |
setMaxSize(int maxSize)
Sets the maximum size of a fetch request, allowing to tune the initialization process.
|
void |
setReplicationFactor(int replicationFactor)
The replication factor of the offset topic
|
void |
setRequiredAcks(int requiredAcks)
The number of required acks on write operations
|
void |
setRetentionTime(int retentionTime)
How long are dead records retained in the offset topic
|
void |
setSegmentSize(int segmentSize)
The size of a segment in the offset topic
|
deleteOffset, destroy, getConsumerId, getOffset, resetOffsets, setConsumerId, setReferenceTimestamp, updateOffsetpublic static final java.lang.String CLEANUP_POLICY
public static final java.lang.String CLEANUP_POLICY_COMPACT
public static final java.lang.String DELETE_RETENTION
public static final java.lang.String SEGMENT_BYTES
public KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect, java.lang.String topic)
public KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect, java.lang.String topic, java.util.Map<Partition,java.lang.Long> initialOffsets)
public void setMaxSize(int maxSize)
maxSize - the maximum amount of data to be brought on a fetchpublic void setCompressionCodec(ProducerMetadata.CompressionType compressionType)
compressionType - the compression typepublic void setMaxQueueBufferingTime(int maxQueueBufferingTime)
maxQueueBufferingTime - the maximum buffering window (in milliseconds)public void setSegmentSize(int segmentSize)
segmentSize - the segment size of an offset topicpublic void setRetentionTime(int retentionTime)
retentionTime - the retention time for dead records (in seconds)public void setReplicationFactor(int replicationFactor)
replicationFactor - the replication factorpublic void setBatchBytes(int batchBytes)
batchBytes - maximum batching windowpublic void setRequiredAcks(int requiredAcks)
requiredAcks - the number of required ackspublic void afterPropertiesSet()
throws java.lang.Exception
afterPropertiesSet in interface org.springframework.beans.factory.InitializingBeanjava.lang.Exceptionprotected void doUpdateOffset(Partition partition, long offset)
doUpdateOffset in class AbstractOffsetManagerprotected void doRemoveOffset(Partition partition)
doRemoveOffset in class AbstractOffsetManagerprotected java.lang.Long doGetOffset(Partition partition)
doGetOffset in class AbstractOffsetManagerpublic void flush()
throws java.io.IOException
flush in interface java.io.Flushablejava.io.IOExceptionpublic void close()
throws java.io.IOException
close in interface java.io.Closeableclose in interface java.lang.AutoCloseablejava.io.IOException