Class ConsumerProperties
- java.lang.Object
-
- org.springframework.kafka.listener.ConsumerProperties
-
- Direct Known Subclasses:
ContainerProperties
public class ConsumerProperties extends java.lang.ObjectCommon consumer properties.- Since:
- 2.3
- Author:
- Gary Russell
-
-
Field Summary
Fields Modifier and Type Field Description static longDEFAULT_POLL_TIMEOUTThe defaultpollTimeout(ms).
-
Constructor Summary
Constructors Constructor Description ConsumerProperties(java.lang.String... topics)Create properties for a container that will subscribe to the specified topics.ConsumerProperties(java.util.regex.Pattern topicPattern)Create properties for a container that will subscribe to topics matching the specified pattern.ConsumerProperties(TopicPartitionOffset... topicPartitions)Create properties for a container that will assign itself the provided topic partitions.
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description java.time.DurationgetAuthExceptionRetryInterval()Get the authentication/authorization retry interval.java.time.DurationgetAuthorizationExceptionRetryInterval()Deprecated.in favor ofgetAuthExceptionRetryInterval().java.lang.StringgetClientId()Return the client id.org.apache.kafka.clients.consumer.OffsetCommitCallbackgetCommitCallback()Return the commit callback.LogIfLevelEnabled.LevelgetCommitLogLevel()The level at which to log offset commits.intgetCommitRetries()The number of retries allowed when aRetriableCommitFailedExceptionis thrown by the consumer when usingsetSyncCommits(boolean)set to true.org.apache.kafka.clients.consumer.ConsumerRebalanceListenergetConsumerRebalanceListener()Return the rebalance listener.java.lang.StringgetGroupId()Return the container's group id.java.util.PropertiesgetKafkaConsumerProperties()Get the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.OffsetAndMetadataProvidergetOffsetAndMetadataProvider()Return the offset and metadata provider.longgetPollTimeout()java.time.DurationgetSyncCommitTimeout()Return the sync commit timeout.TopicPartitionOffset[]getTopicPartitions()Return the configuredTopicPartitionOffsets.java.util.regex.PatterngetTopicPattern()Return the configured topic pattern.java.lang.String[]getTopics()Return the configured topics.booleanisCheckDeserExWhenKeyNull()Always check for a deserialization exception header with a null key.booleanisCheckDeserExWhenValueNull()Always check for a deserialization exception header with a null value.booleanisFixTxOffsets()Whether or not to correct terminal transactional offsets.booleanisOnlyLogRecordMetadata()Deprecated.booleanisSyncCommits()protected java.lang.StringrenderProperties()voidsetAuthExceptionRetryInterval(java.time.Duration authExceptionRetryInterval)Set the interval between retries after andAuthenticationExceptionororg.apache.kafka.common.errors.AuthorizationExceptionis thrown byKafkaConsumer.voidsetAuthorizationExceptionRetryInterval(java.time.Duration authorizationExceptionRetryInterval)Deprecated.in favor ofsetAuthExceptionRetryInterval(Duration).voidsetCheckDeserExWhenKeyNull(boolean checkDeserExWhenKeyNull)Set to true to always check forDeserializationExceptionheader when a null key is received.voidsetCheckDeserExWhenValueNull(boolean checkDeserExWhenValueNull)Set to true to always check forDeserializationExceptionheader when a null value is received.voidsetClientId(java.lang.String clientId)Set the client id; overrides the consumer factory client.id property.voidsetCommitCallback(org.apache.kafka.clients.consumer.OffsetCommitCallback commitCallback)Set the commit callback; by default a simple logging callback is used to log success at DEBUG level and failures at ERROR level.voidsetCommitLogLevel(LogIfLevelEnabled.Level commitLogLevel)Set the level at which to log offset commits.voidsetCommitRetries(int commitRetries)Set number of retries allowed when aRetriableCommitFailedExceptionis thrown by the consumer when usingsetSyncCommits(boolean)set to true.voidsetConsumerRebalanceListener(org.apache.kafka.clients.consumer.ConsumerRebalanceListener consumerRebalanceListener)Set the user definedConsumerRebalanceListenerimplementation.voidsetFixTxOffsets(boolean fixTxOffsets)When consuming records produced by a transactional producer, and the consumer is positioned at the end of a partition, the lag can incorrectly be reported as greater than zero, due to the pseudo record used to indicate transaction commit/rollback and, possibly, the presence of rolled-back records.voidsetGroupId(java.lang.String groupId)Set the group id for this container.voidsetKafkaConsumerProperties(java.util.Properties kafkaConsumerProperties)Set the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.voidsetOffsetAndMetadataProvider(OffsetAndMetadataProvider offsetAndMetadataProvider)Set the offset and metadata provider associated to a commit callback.voidsetOnlyLogRecordMetadata(boolean onlyLogRecordMetadata)Deprecated.voidsetPollTimeout(long pollTimeout)Set the max time to block in the consumer waiting for records.voidsetSyncCommits(boolean syncCommits)Set whether or not to call consumer.commitSync() or commitAsync() when the container is responsible for commits.voidsetSyncCommitTimeout(java.time.Duration syncCommitTimeout)Set the timeout for commitSync operations (ifisSyncCommits().java.lang.StringtoString()
-
-
-
Field Detail
-
DEFAULT_POLL_TIMEOUT
public static final long DEFAULT_POLL_TIMEOUT
The defaultpollTimeout(ms).- See Also:
- Constant Field Values
-
-
Constructor Detail
-
ConsumerProperties
public ConsumerProperties(java.lang.String... topics)
Create properties for a container that will subscribe to the specified topics.- Parameters:
topics- the topics.
-
ConsumerProperties
public ConsumerProperties(java.util.regex.Pattern topicPattern)
Create properties for a container that will subscribe to topics matching the specified pattern. The framework will create a container that subscribes to all topics matching the specified pattern to get dynamically assigned partitions. The pattern matching will be performed periodically against topics existing at the time of check.- Parameters:
topicPattern- the pattern.- See Also:
CommonClientConfigs.METADATA_MAX_AGE_CONFIG
-
ConsumerProperties
public ConsumerProperties(TopicPartitionOffset... topicPartitions)
Create properties for a container that will assign itself the provided topic partitions.- Parameters:
topicPartitions- the topic partitions.
-
-
Method Detail
-
getTopics
@Nullable public java.lang.String[] getTopics()
Return the configured topics.- Returns:
- the topics.
-
getTopicPattern
@Nullable public java.util.regex.Pattern getTopicPattern()
Return the configured topic pattern.- Returns:
- the topic pattern.
-
getTopicPartitions
@Nullable public TopicPartitionOffset[] getTopicPartitions()
Return the configuredTopicPartitionOffsets.- Returns:
- the topics/partitions.
- Since:
- 2.5
-
setPollTimeout
public void setPollTimeout(long pollTimeout)
Set the max time to block in the consumer waiting for records.- Parameters:
pollTimeout- the timeout in ms; default 5000L.
-
getPollTimeout
public long getPollTimeout()
-
setGroupId
public void setGroupId(java.lang.String groupId)
Set the group id for this container. Overrides anygroup.idproperty provided by the consumer factory configuration.- Parameters:
groupId- the group id.
-
getGroupId
@Nullable public java.lang.String getGroupId()
Return the container's group id.- Returns:
- the group id.
-
getClientId
public java.lang.String getClientId()
Return the client id.- Returns:
- the client id.
- See Also:
setClientId(String)
-
setClientId
public void setClientId(java.lang.String clientId)
Set the client id; overrides the consumer factory client.id property. When used in a concurrent container, will be suffixed with '-n' to provide a unique value for each consumer.- Parameters:
clientId- the client id.
-
setConsumerRebalanceListener
public void setConsumerRebalanceListener(org.apache.kafka.clients.consumer.ConsumerRebalanceListener consumerRebalanceListener)
Set the user definedConsumerRebalanceListenerimplementation.- Parameters:
consumerRebalanceListener- theConsumerRebalanceListenerinstance
-
getConsumerRebalanceListener
@Nullable public org.apache.kafka.clients.consumer.ConsumerRebalanceListener getConsumerRebalanceListener()
Return the rebalance listener.- Returns:
- the listener.
-
setSyncCommitTimeout
public void setSyncCommitTimeout(@Nullable java.time.Duration syncCommitTimeout)Set the timeout for commitSync operations (ifisSyncCommits(). Overrides the default api timeout property.- Parameters:
syncCommitTimeout- the timeout.- See Also:
setSyncCommits(boolean)
-
getSyncCommitTimeout
@Nullable public java.time.Duration getSyncCommitTimeout()
Return the sync commit timeout.- Returns:
- the timeout.
-
setCommitCallback
public void setCommitCallback(org.apache.kafka.clients.consumer.OffsetCommitCallback commitCallback)
Set the commit callback; by default a simple logging callback is used to log success at DEBUG level and failures at ERROR level. Used whensyncCommitsis false.- Parameters:
commitCallback- the callback.- See Also:
setSyncCommits(boolean)
-
setOffsetAndMetadataProvider
public void setOffsetAndMetadataProvider(OffsetAndMetadataProvider offsetAndMetadataProvider)
Set the offset and metadata provider associated to a commit callback.- Parameters:
offsetAndMetadataProvider- an offset and metadata provider.- Since:
- 2.8.5
- See Also:
setCommitCallback(OffsetCommitCallback)
-
getCommitCallback
@Nullable public org.apache.kafka.clients.consumer.OffsetCommitCallback getCommitCallback()
Return the commit callback.- Returns:
- the callback.
-
getOffsetAndMetadataProvider
@Nullable public OffsetAndMetadataProvider getOffsetAndMetadataProvider()
Return the offset and metadata provider.- Returns:
- the offset and metadata provider.
-
setSyncCommits
public void setSyncCommits(boolean syncCommits)
Set whether or not to call consumer.commitSync() or commitAsync() when the container is responsible for commits. Default true.- Parameters:
syncCommits- true to use commitSync().- See Also:
setSyncCommitTimeout(Duration),setCommitCallback(OffsetCommitCallback),setCommitLogLevel(org.springframework.kafka.support.LogIfLevelEnabled.Level),setCommitRetries(int)
-
isSyncCommits
public boolean isSyncCommits()
-
getCommitLogLevel
public LogIfLevelEnabled.Level getCommitLogLevel()
The level at which to log offset commits.- Returns:
- the level.
-
setCommitLogLevel
public void setCommitLogLevel(LogIfLevelEnabled.Level commitLogLevel)
Set the level at which to log offset commits. Default: DEBUG.- Parameters:
commitLogLevel- the level.
-
getKafkaConsumerProperties
public java.util.Properties getKafkaConsumerProperties()
Get the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory. You can add non-String-valued properties, but the property name (hashtable key) must be String; all others will be ignored.group.idandclient.idare ignored.- Returns:
- the properties.
- See Also:
ConsumerConfig,setGroupId(String),setClientId(String)
-
setKafkaConsumerProperties
public void setKafkaConsumerProperties(java.util.Properties kafkaConsumerProperties)
Set the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.group.idandclient.idare ignored. Property keys must beStrings.- Parameters:
kafkaConsumerProperties- the properties.- See Also:
ConsumerConfig,setGroupId(String),setClientId(String)
-
getAuthorizationExceptionRetryInterval
@Deprecated @Nullable public java.time.Duration getAuthorizationExceptionRetryInterval()
Deprecated.in favor ofgetAuthExceptionRetryInterval().Get the authentication/authorization retry interval.- Returns:
- the interval.
-
setAuthorizationExceptionRetryInterval
@Deprecated public void setAuthorizationExceptionRetryInterval(java.time.Duration authorizationExceptionRetryInterval)
Deprecated.in favor ofsetAuthExceptionRetryInterval(Duration).Set the interval between retries after andAuthenticationExceptionororg.apache.kafka.common.errors.AuthorizationExceptionis thrown byKafkaConsumer. By default the field is null and retries are disabled. In such case the container will be stopped. The interval must be less thanmax.poll.interval.msconsumer property.- Parameters:
authorizationExceptionRetryInterval- the duration between retries- Since:
- 2.3.5
-
getAuthExceptionRetryInterval
@Nullable public java.time.Duration getAuthExceptionRetryInterval()
Get the authentication/authorization retry interval.- Returns:
- the interval.
-
setAuthExceptionRetryInterval
public void setAuthExceptionRetryInterval(java.time.Duration authExceptionRetryInterval)
Set the interval between retries after andAuthenticationExceptionororg.apache.kafka.common.errors.AuthorizationExceptionis thrown byKafkaConsumer. By default the field is null and retries are disabled. In such case the container will be stopped. The interval must be less thanmax.poll.interval.msconsumer property.- Parameters:
authExceptionRetryInterval- the duration between retries- Since:
- 2.8
-
getCommitRetries
public int getCommitRetries()
The number of retries allowed when aRetriableCommitFailedExceptionis thrown by the consumer when usingsetSyncCommits(boolean)set to true.- Returns:
- the number of retries.
- Since:
- 2.3.9
- See Also:
setSyncCommits(boolean)
-
setCommitRetries
public void setCommitRetries(int commitRetries)
Set number of retries allowed when aRetriableCommitFailedExceptionis thrown by the consumer when usingsetSyncCommits(boolean)set to true. Default 3 (4 attempts total).- Parameters:
commitRetries- the commitRetries.- Since:
- 2.3.9
- See Also:
setSyncCommits(boolean)
-
isOnlyLogRecordMetadata
@Deprecated public boolean isOnlyLogRecordMetadata()
Deprecated.
-
setOnlyLogRecordMetadata
@Deprecated public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata)
Deprecated.Set to false to logrecord.toString()in log messages instead oftopic-partition@offset.- Parameters:
onlyLogRecordMetadata- false to log the entire record.- Since:
- 2.2.14
-
isFixTxOffsets
public boolean isFixTxOffsets()
Whether or not to correct terminal transactional offsets.- Returns:
- true to fix.
- Since:
- 2.5.6
- See Also:
setFixTxOffsets(boolean)
-
setFixTxOffsets
public void setFixTxOffsets(boolean fixTxOffsets)
When consuming records produced by a transactional producer, and the consumer is positioned at the end of a partition, the lag can incorrectly be reported as greater than zero, due to the pseudo record used to indicate transaction commit/rollback and, possibly, the presence of rolled-back records. This does not functionally affect the consumer but some users have expressed concern that the "lag" is non-zero. Set this to true and the container will correct such mis-reported offsets. The check is performed before the next poll to avoid adding significant complexity to the commit processing. IMPORTANT: At the time of writing, the lag will only be corrected if the consumer is configured withisolation.level=read_committedandmax.poll.recordsis greater than 1. See https://issues.apache.org/jira/browse/KAFKA-10683 for more information.- Parameters:
fixTxOffsets- true to correct the offset(s).- Since:
- 2.5.6
-
isCheckDeserExWhenKeyNull
public boolean isCheckDeserExWhenKeyNull()
Always check for a deserialization exception header with a null key.- Returns:
- true to check.
- Since:
- 2.8.1
-
setCheckDeserExWhenKeyNull
public void setCheckDeserExWhenKeyNull(boolean checkDeserExWhenKeyNull)
Set to true to always check forDeserializationExceptionheader when a null key is received. Useful when the consumer code cannot determine that anErrorHandlingDeserializerhas been configured, such as when using a delegating deserializer.- Parameters:
checkDeserExWhenKeyNull- true to always check.- Since:
- 2.8.1
-
isCheckDeserExWhenValueNull
public boolean isCheckDeserExWhenValueNull()
Always check for a deserialization exception header with a null value.- Returns:
- true to check.
- Since:
- 2.8.1
-
setCheckDeserExWhenValueNull
public void setCheckDeserExWhenValueNull(boolean checkDeserExWhenValueNull)
Set to true to always check forDeserializationExceptionheader when a null value is received. Useful when the consumer code cannot determine that anErrorHandlingDeserializerhas been configured, such as when using a delegating deserializer.- Parameters:
checkDeserExWhenValueNull- true to always check.- Since:
- 2.8.1
-
toString
public java.lang.String toString()
- Overrides:
toStringin classjava.lang.Object
-
renderProperties
protected final java.lang.String renderProperties()
-
-