Class ContainerProperties
- java.lang.Object
-
- org.springframework.kafka.listener.ConsumerProperties
-
- org.springframework.kafka.listener.ContainerProperties
-
public class ContainerProperties extends ConsumerProperties
Contains runtime properties for a listener container.- Author:
- Gary Russell, Artem Bilan, Artem Yakshin, Johnny Lim, Lukasz Kaminski
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classContainerProperties.AckModeThe offset commit behavior enumeration.static classContainerProperties.AssignmentCommitOptionOffset commit behavior during assignment.static classContainerProperties.EOSModeMode for exactly once semantics.
-
Field Summary
Fields Modifier and Type Field Description static intDEFAULT_MONITOR_INTERVALThe defaultmonitorInterval(s).static floatDEFAULT_NO_POLL_THRESHOLDThe defaultnoPollThreshold.static longDEFAULT_SHUTDOWN_TIMEOUTThe defaultshutDownTimeout(ms).-
Fields inherited from class org.springframework.kafka.listener.ConsumerProperties
DEFAULT_POLL_TIMEOUT
-
-
Constructor Summary
Constructors Constructor Description ContainerProperties(java.lang.String... topics)Create properties for a container that will subscribe to the specified topics.ContainerProperties(java.util.regex.Pattern topicPattern)Create properties for a container that will subscribe to topics matching the specified pattern.ContainerProperties(TopicPartitionOffset... topicPartitions)Create properties for a container that will assign itself the provided topic partitions.
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description intgetAckCount()ContainerProperties.AckModegetAckMode()longgetAckTime()org.aopalliance.aop.Advice[]getAdviceChain()A chain of listenerAdvices.ContainerProperties.AssignmentCommitOptiongetAssignmentCommitOption()java.time.DurationgetConsumerStartTimout()org.springframework.core.task.AsyncListenableTaskExecutorgetConsumerTaskExecutor()ContainerProperties.EOSModegetEosMode()Get the exactly once semantics mode.longgetIdleBetweenPolls()java.lang.LonggetIdleEventInterval()java.lang.ObjectgetMessageListener()java.util.Map<java.lang.String,java.lang.String>getMicrometerTags()intgetMonitorInterval()floatgetNoPollThreshold()org.springframework.scheduling.TaskSchedulergetScheduler()longgetShutdownTimeout()java.lang.BooleangetSubBatchPerPartition()Return whether to split batches by partition; null if not set.org.springframework.transaction.TransactionDefinitiongetTransactionDefinition()Get the transaction definition.org.springframework.transaction.PlatformTransactionManagergetTransactionManager()booleanisAckOnError()booleanisDeliveryAttemptHeader()booleanisLogContainerConfig()Log the container configuration if true (INFO).booleanisMicrometerEnabled()booleanisMissingTopicsFatal()If true, the container won't start if any of the configured topics are not present on the broker.booleanisSubBatchPerPartition()Return whether to split batches by partition.voidsetAckCount(int count)Set the number of outstanding record count after which offsets should be committed whenContainerProperties.AckMode.COUNTorContainerProperties.AckMode.COUNT_TIMEis being used.voidsetAckMode(ContainerProperties.AckMode ackMode)Set the ack mode to use when auto ack (in the configuration properties) is false.voidsetAckOnError(boolean ackOnError)Deprecated.in favor ofGenericErrorHandler.isAckAfterHandle().voidsetAckTime(long ackTime)Set the time (ms) after which outstanding offsets should be committed whenContainerProperties.AckMode.TIMEorContainerProperties.AckMode.COUNT_TIMEis being used.voidsetAdviceChain(org.aopalliance.aop.Advice... adviceChain)Set a chain of listenerAdvices; must not be null or have null elements.voidsetAssignmentCommitOption(ContainerProperties.AssignmentCommitOption assignmentCommitOption)Set the assignment commit option.voidsetConsumerStartTimout(java.time.Duration consumerStartTimout)Set the timeout to wait for a consumer thread to start before logging an error.voidsetConsumerTaskExecutor(org.springframework.core.task.AsyncListenableTaskExecutor consumerTaskExecutor)Set the executor for threads that poll the consumer.voidsetDeliveryAttemptHeader(boolean deliveryAttemptHeader)Set to true to populate theKafkaHeaders.DELIVERY_ATTEMPTheader when the error handler or after rollback processor implementsDeliveryAttemptAware.voidsetEosMode(ContainerProperties.EOSMode eosMode)Set the exactly once semantics mode.voidsetIdleBetweenPolls(long idleBetweenPolls)The sleep interval in milliseconds used in the main loop betweenConsumer.poll(Duration)calls.voidsetIdleEventInterval(java.lang.Long idleEventInterval)Set the idle event interval; when set, an event is emitted if a poll returns no records and this interval has elapsed since a record was returned.voidsetLogContainerConfig(boolean logContainerConfig)Set to true to instruct each container to log this configuration.voidsetMessageListener(java.lang.Object messageListener)Set the message listener; must be aMessageListenerorAcknowledgingMessageListener.voidsetMicrometerEnabled(boolean micrometerEnabled)Set to false to disable the Micrometer listener timers.voidsetMicrometerTags(java.util.Map<java.lang.String,java.lang.String> tags)Set additional tags for the Micrometer listener timers.voidsetMissingTopicsFatal(boolean missingTopicsFatal)Set to false to allow the container to start even if any of the configured topics are not present on the broker.voidsetMonitorInterval(int monitorInterval)The interval between checks for a non-responsive consumer in seconds; default 30.voidsetNoPollThreshold(float noPollThreshold)If the time since the last poll /poll timeoutexceeds this value, a NonResponsiveConsumerEvent is published.voidsetScheduler(org.springframework.scheduling.TaskScheduler scheduler)A scheduler used with the monitor interval.voidsetShutdownTimeout(long shutdownTimeout)Set the timeout for shutting down the container.voidsetSubBatchPerPartition(boolean subBatchPerPartition)When using a batch message listener whether to dispatch records by partition (with a transaction for each sub batch if transactions are in use) or the complete batch received by thepoll().voidsetSyncCommitTimeout(java.time.Duration syncCommitTimeout)Set the timeout for commitSync operations (ifConsumerProperties.isSyncCommits().voidsetTransactionDefinition(org.springframework.transaction.TransactionDefinition transactionDefinition)Set a transaction definition with properties (e.g.voidsetTransactionManager(org.springframework.transaction.PlatformTransactionManager transactionManager)Set the transaction manager to start a transaction; offsets are committed with semantics equivalent toContainerProperties.AckMode.RECORDandContainerProperties.AckMode.BATCHdepending on the listener type (record or batch).java.lang.StringtoString()-
Methods inherited from class org.springframework.kafka.listener.ConsumerProperties
getAuthorizationExceptionRetryInterval, getClientId, getCommitCallback, getCommitLogLevel, getCommitRetries, getConsumerRebalanceListener, getGroupId, getKafkaConsumerProperties, getPollTimeout, getSyncCommitTimeout, getTopicPartitions, getTopicPartitionsToAssign, getTopicPattern, getTopics, isOnlyLogRecordMetadata, isSyncCommits, renderProperties, setAuthorizationExceptionRetryInterval, setClientId, setCommitCallback, setCommitLogLevel, setCommitRetries, setConsumerRebalanceListener, setGroupId, setKafkaConsumerProperties, setOnlyLogRecordMetadata, setPollTimeout, setSyncCommits
-
-
-
-
Field Detail
-
DEFAULT_SHUTDOWN_TIMEOUT
public static final long DEFAULT_SHUTDOWN_TIMEOUT
The defaultshutDownTimeout(ms).- See Also:
- Constant Field Values
-
DEFAULT_MONITOR_INTERVAL
public static final int DEFAULT_MONITOR_INTERVAL
The defaultmonitorInterval(s).- See Also:
- Constant Field Values
-
DEFAULT_NO_POLL_THRESHOLD
public static final float DEFAULT_NO_POLL_THRESHOLD
The defaultnoPollThreshold.- See Also:
- Constant Field Values
-
-
Constructor Detail
-
ContainerProperties
public ContainerProperties(java.lang.String... topics)
Create properties for a container that will subscribe to the specified topics.- Parameters:
topics- the topics.
-
ContainerProperties
public ContainerProperties(java.util.regex.Pattern topicPattern)
Create properties for a container that will subscribe to topics matching the specified pattern. The framework will create a container that subscribes to all topics matching the specified pattern to get dynamically assigned partitions. The pattern matching will be performed periodically against topics existing at the time of check.- Parameters:
topicPattern- the pattern.- See Also:
CommonClientConfigs.METADATA_MAX_AGE_CONFIG
-
ContainerProperties
public ContainerProperties(TopicPartitionOffset... topicPartitions)
Create properties for a container that will assign itself the provided topic partitions.- Parameters:
topicPartitions- the topic partitions.
-
-
Method Detail
-
setMessageListener
public void setMessageListener(java.lang.Object messageListener)
Set the message listener; must be aMessageListenerorAcknowledgingMessageListener.- Parameters:
messageListener- the listener.
-
setAckMode
public void setAckMode(ContainerProperties.AckMode ackMode)
Set the ack mode to use when auto ack (in the configuration properties) is false.- RECORD: Ack after each record has been passed to the listener.
- BATCH: Ack after each batch of records received from the consumer has been passed to the listener
- TIME: Ack after this number of milliseconds; (should be greater than
#setPollTimeout(long) pollTimeout. - COUNT: Ack after at least this number of records have been received
- MANUAL: Listener is responsible for acking - use a
AcknowledgingMessageListener.
RECORDorBATCH, depending on the listener type.- Parameters:
ackMode- theContainerProperties.AckMode; default BATCH.- See Also:
setTransactionManager(PlatformTransactionManager)
-
setAckCount
public void setAckCount(int count)
Set the number of outstanding record count after which offsets should be committed whenContainerProperties.AckMode.COUNTorContainerProperties.AckMode.COUNT_TIMEis being used.- Parameters:
count- the count
-
setAckTime
public void setAckTime(long ackTime)
Set the time (ms) after which outstanding offsets should be committed whenContainerProperties.AckMode.TIMEorContainerProperties.AckMode.COUNT_TIMEis being used. Should be larger than- Parameters:
ackTime- the time
-
setConsumerTaskExecutor
public void setConsumerTaskExecutor(org.springframework.core.task.AsyncListenableTaskExecutor consumerTaskExecutor)
Set the executor for threads that poll the consumer.- Parameters:
consumerTaskExecutor- the executor
-
setShutdownTimeout
public void setShutdownTimeout(long shutdownTimeout)
Set the timeout for shutting down the container. This is the maximum amount of time that the invocation to#stop(Runnable)will block for, before returning; default 10000L.- Parameters:
shutdownTimeout- the shutdown timeout.
-
setSyncCommitTimeout
public void setSyncCommitTimeout(@Nullable java.time.Duration syncCommitTimeout)Set the timeout for commitSync operations (ifConsumerProperties.isSyncCommits(). Overrides the default api timeout property. In order of precedence:- this property
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIGinConsumerProperties.setKafkaConsumerProperties(Properties)ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIGin the consumer factory properties- 60 seconds
- Overrides:
setSyncCommitTimeoutin classConsumerProperties- Parameters:
syncCommitTimeout- the timeout.- See Also:
ConsumerProperties.setSyncCommits(boolean)
-
setIdleEventInterval
public void setIdleEventInterval(java.lang.Long idleEventInterval)
Set the idle event interval; when set, an event is emitted if a poll returns no records and this interval has elapsed since a record was returned.- Parameters:
idleEventInterval- the interval.
-
setAckOnError
@Deprecated public void setAckOnError(boolean ackOnError)
Deprecated.in favor ofGenericErrorHandler.isAckAfterHandle().Set whether or not the container should commit offsets (ack messages) where the listener throws exceptions. This works in conjunction withackModeand is effective only when the kafka propertyenable.auto.commitisfalse; it is not applicable to manual ack modes. When this property is set totrue, all messages handled will have their offset committed. When set tofalse(the default), offsets will be committed only for successfully handled messages. Manual acks will always be applied. Bear in mind that, if the next message is successfully handled, its offset will be committed, effectively committing the offset of the failed message anyway, so this option has limited applicability, unless you are using aSeekToCurrentBatchErrorHandlerwhich will seek the current record so that it is reprocessed.Does not apply when transactions are used - in that case, whether or not the offsets are sent to the transaction depends on whether the transaction is committed or rolled back. If a listener throws an exception, the transaction will normally be rolled back unless an error handler is provided that handles the error and exits normally; in which case the offsets are sent to the transaction and the transaction is committed.
- Parameters:
ackOnError- whether the container should acknowledge messages that throw exceptions.
-
getAckMode
public ContainerProperties.AckMode getAckMode()
-
getAckCount
public int getAckCount()
-
getAckTime
public long getAckTime()
-
getMessageListener
public java.lang.Object getMessageListener()
-
getConsumerTaskExecutor
public org.springframework.core.task.AsyncListenableTaskExecutor getConsumerTaskExecutor()
-
getShutdownTimeout
public long getShutdownTimeout()
-
getIdleEventInterval
public java.lang.Long getIdleEventInterval()
-
isAckOnError
public boolean isAckOnError()
-
getTransactionManager
public org.springframework.transaction.PlatformTransactionManager getTransactionManager()
-
setTransactionManager
public void setTransactionManager(org.springframework.transaction.PlatformTransactionManager transactionManager)
Set the transaction manager to start a transaction; offsets are committed with semantics equivalent toContainerProperties.AckMode.RECORDandContainerProperties.AckMode.BATCHdepending on the listener type (record or batch).- Parameters:
transactionManager- the transaction manager.- Since:
- 1.3
- See Also:
setAckMode(AckMode)
-
getMonitorInterval
public int getMonitorInterval()
-
setMonitorInterval
public void setMonitorInterval(int monitorInterval)
The interval between checks for a non-responsive consumer in seconds; default 30.- Parameters:
monitorInterval- the interval.- Since:
- 1.3.1
-
getScheduler
public org.springframework.scheduling.TaskScheduler getScheduler()
-
setScheduler
public void setScheduler(org.springframework.scheduling.TaskScheduler scheduler)
A scheduler used with the monitor interval.- Parameters:
scheduler- the scheduler.- Since:
- 1.3.1
- See Also:
setMonitorInterval(int)
-
getNoPollThreshold
public float getNoPollThreshold()
-
setNoPollThreshold
public void setNoPollThreshold(float noPollThreshold)
If the time since the last poll /poll timeoutexceeds this value, a NonResponsiveConsumerEvent is published. This value should be more than 1.0 to avoid a race condition that can cause spurious events to be published. Default 3.0f.- Parameters:
noPollThreshold- the threshold- Since:
- 1.3.1
-
isLogContainerConfig
public boolean isLogContainerConfig()
Log the container configuration if true (INFO).- Returns:
- true to log.
- Since:
- 2.1.1
-
setLogContainerConfig
public void setLogContainerConfig(boolean logContainerConfig)
Set to true to instruct each container to log this configuration.- Parameters:
logContainerConfig- true to log.- Since:
- 2.1.1
-
isMissingTopicsFatal
public boolean isMissingTopicsFatal()
If true, the container won't start if any of the configured topics are not present on the broker. Does not apply when topic patterns are configured. Default false.- Returns:
- the missingTopicsFatal.
- Since:
- 2.2
-
setMissingTopicsFatal
public void setMissingTopicsFatal(boolean missingTopicsFatal)
Set to false to allow the container to start even if any of the configured topics are not present on the broker. Does not apply when topic patterns are configured. Default true;- Parameters:
missingTopicsFatal- the missingTopicsFatal.- Since:
- 2.2
-
setIdleBetweenPolls
public void setIdleBetweenPolls(long idleBetweenPolls)
The sleep interval in milliseconds used in the main loop betweenConsumer.poll(Duration)calls. Defaults to0- no idling.- Parameters:
idleBetweenPolls- the interval to sleep between polling cycles.- Since:
- 2.3
-
getIdleBetweenPolls
public long getIdleBetweenPolls()
-
isMicrometerEnabled
public boolean isMicrometerEnabled()
-
setMicrometerEnabled
public void setMicrometerEnabled(boolean micrometerEnabled)
Set to false to disable the Micrometer listener timers. Default true.- Parameters:
micrometerEnabled- false to disable.- Since:
- 2.3
-
setMicrometerTags
public void setMicrometerTags(java.util.Map<java.lang.String,java.lang.String> tags)
Set additional tags for the Micrometer listener timers.- Parameters:
tags- the tags.- Since:
- 2.3
-
getMicrometerTags
public java.util.Map<java.lang.String,java.lang.String> getMicrometerTags()
-
getConsumerStartTimout
public java.time.Duration getConsumerStartTimout()
-
setConsumerStartTimout
public void setConsumerStartTimout(java.time.Duration consumerStartTimout)
Set the timeout to wait for a consumer thread to start before logging an error. Default 30 seconds.- Parameters:
consumerStartTimout- the consumer start timeout.
-
isSubBatchPerPartition
public boolean isSubBatchPerPartition()
Return whether to split batches by partition.- Returns:
- subBatchPerPartition.
- Since:
- 2.3.2
-
getSubBatchPerPartition
@Nullable public java.lang.Boolean getSubBatchPerPartition()
Return whether to split batches by partition; null if not set.- Returns:
- subBatchPerPartition.
- Since:
- 2.5
-
setSubBatchPerPartition
public void setSubBatchPerPartition(boolean subBatchPerPartition)
When using a batch message listener whether to dispatch records by partition (with a transaction for each sub batch if transactions are in use) or the complete batch received by thepoll(). Useful when using transactions to enable zombie fencing, by using atransactional.idthat is unique for each group/topic/partition. Defaults to true when using transactions withEOSMode.ALPHAand false when not using transactions or withEOSMode.BETA.- Parameters:
subBatchPerPartition- true for a separate transaction for each partition.- Since:
- 2.3.2
-
getAssignmentCommitOption
public ContainerProperties.AssignmentCommitOption getAssignmentCommitOption()
-
setAssignmentCommitOption
public void setAssignmentCommitOption(ContainerProperties.AssignmentCommitOption assignmentCommitOption)
Set the assignment commit option. DefaultContainerProperties.AssignmentCommitOption.LATEST_ONLY_NO_TX.- Parameters:
assignmentCommitOption- the option.- Since:
- 2.3.6
-
isDeliveryAttemptHeader
public boolean isDeliveryAttemptHeader()
-
setDeliveryAttemptHeader
public void setDeliveryAttemptHeader(boolean deliveryAttemptHeader)
Set to true to populate theKafkaHeaders.DELIVERY_ATTEMPTheader when the error handler or after rollback processor implementsDeliveryAttemptAware. There is a small overhead so this is false by default.- Parameters:
deliveryAttemptHeader- true to populate- Since:
- 2.5
-
getEosMode
public ContainerProperties.EOSMode getEosMode()
Get the exactly once semantics mode.- Returns:
- the mode.
- Since:
- 2.5
- See Also:
setEosMode(EOSMode)
-
setEosMode
public void setEosMode(ContainerProperties.EOSMode eosMode)
Set the exactly once semantics mode. WhenContainerProperties.EOSMode.ALPHAa producer per group/topic/partition is used (enabling 'transactional.id fencing`).ContainerProperties.EOSMode.BETAenables fetch-offset-request fencing, and requires brokers 2.5 or later. In the 2.6 client, the default will be BETA because the 2.6 client can automatically fall back to ALPHA.- Parameters:
eosMode- the mode; default ALPHA.- Since:
- 2.5
-
getTransactionDefinition
@Nullable public org.springframework.transaction.TransactionDefinition getTransactionDefinition()
Get the transaction definition.- Returns:
- the definition.
- Since:
- 2.5.4
-
setTransactionDefinition
public void setTransactionDefinition(org.springframework.transaction.TransactionDefinition transactionDefinition)
Set a transaction definition with properties (e.g. timeout) that will be copied to the container's transaction template. Note that this is only generally useful when used with aChainedKafkaTransactionManagerconfigured with a non-Kafka transaction manager. Kafka has no concept of transaction timeout, for example.- Parameters:
transactionDefinition- the definition.- Since:
- 2.5.4
-
getAdviceChain
public org.aopalliance.aop.Advice[] getAdviceChain()
A chain of listenerAdvices.- Returns:
- the adviceChain.
- Since:
- 2.5.6
-
setAdviceChain
public void setAdviceChain(org.aopalliance.aop.Advice... adviceChain)
Set a chain of listenerAdvices; must not be null or have null elements.- Parameters:
adviceChain- the adviceChain to set.- Since:
- 2.5.6
-
toString
public java.lang.String toString()
- Overrides:
toStringin classConsumerProperties
-
-