Class KafkaMessageListenerContainer<K,V>
java.lang.Object
org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V>
org.springframework.kafka.listener.KafkaMessageListenerContainer<K,V>
- Type Parameters:
K- the key type.V- the value type.
- All Implemented Interfaces:
org.springframework.beans.factory.Aware,org.springframework.beans.factory.BeanNameAware,org.springframework.beans.factory.DisposableBean,org.springframework.context.ApplicationContextAware,org.springframework.context.ApplicationEventPublisherAware,org.springframework.context.Lifecycle,org.springframework.context.Phased,org.springframework.context.SmartLifecycle,ConsumerPauseResumeEventPublisher,GenericMessageListenerContainer<K,,V> MessageListenerContainer
public class KafkaMessageListenerContainer<K,V>
extends AbstractMessageListenerContainer<K,V>
implements ConsumerPauseResumeEventPublisher
Single-threaded Message listener container using the Java
Consumer supporting
auto-partition assignment or user-configured assignment.
With the latter, initial partition offsets can be provided.
-
Field Summary
Fields inherited from class org.springframework.kafka.listener.AbstractMessageListenerContainer
consumerFactory, DEFAULT_PHASE, lifecycleMonitor, logger -
Constructor Summary
ConstructorsConstructorDescriptionKafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) Construct an instance with the supplied configuration properties. -
Method Summary
Modifier and TypeMethodDescriptionprotected voiddoStart()protected voidStop the container normally or abnormally.Collection<org.apache.kafka.common.TopicPartition>Return theTopicPartitions currently assigned to this container, either explicitly or by Kafka; may be null if not assigned yet.Map<String,Collection<org.apache.kafka.common.TopicPartition>> Return the assigned topics/partitions for this container, by client.id.booleanReturn true ifMessageListenerContainer.pause()has been called; and all consumers in this container have actually paused.booleanReturn true if the container is running, has never been started, or has been stopped.booleanisPartitionPaused(org.apache.kafka.common.TopicPartition topicPartition) Whether or not this topic's partition is currently paused.metrics()Return metrics kept by this container's consumer(s), grouped byclient-id.protected AbstractMessageListenerContainer<?,?> Return this or a parent container if this has a parent.voidpause()Pause this container before the next poll().voidpublishConsumerPausedEvent(Collection<org.apache.kafka.common.TopicPartition> partitions, String reason) Publish a consumer paused event.voidpublishConsumerResumedEvent(Collection<org.apache.kafka.common.TopicPartition> partitions) Publish a consumer resumed event.voidresume()Resume this container, if paused, after the next poll().voidresumePartition(org.apache.kafka.common.TopicPartition topicPartition) Resume this partition, if paused, after the next poll().voidsetClientIdSuffix(String clientIdSuffix) Set a suffix to add to theclient.idconsumer property (if the consumer factory supports it).voidsetEmergencyStop(Runnable emergencyStop) Set aRunnableto call whenever a fatal error occurs on the listener thread.toString()Methods inherited from class org.springframework.kafka.listener.AbstractMessageListenerContainer
checkGroupId, checkTopics, createSimpleLoggingConsumerRebalanceListener, doStop, getAfterRollbackProcessor, getApplicationContext, getApplicationEventPublisher, getBatchInterceptor, getBeanName, getCommonErrorHandler, getContainerProperties, getGenericErrorHandler, getGroupId, getListenerId, getListenerInfo, getMainListenerId, getPhase, getRecordInterceptor, isAutoStartup, isInterceptBeforeTx, isPartitionPauseRequested, isPaused, isPauseRequested, isRunning, isStoppedNormally, pausePartition, publishContainerStoppedEvent, setAfterRollbackProcessor, setApplicationContext, setApplicationEventPublisher, setAutoStartup, setBatchErrorHandler, setBatchInterceptor, setBeanName, setCommonErrorHandler, setErrorHandler, setGenericErrorHandler, setInterceptBeforeTx, setListenerInfo, setMainListenerId, setPhase, setRecordInterceptor, setRunning, setStoppedNormally, setTopicCheckTimeout, setupMessageListener, start, stop, stop, stop, stopAbnormallyMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.springframework.kafka.listener.MessageListenerContainer
destroy, getContainerFor, isChildRunning
-
Constructor Details
-
KafkaMessageListenerContainer
public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) Construct an instance with the supplied configuration properties.- Parameters:
consumerFactory- the consumer factory.containerProperties- the container properties.
-
-
Method Details
-
setEmergencyStop
Set aRunnableto call whenever a fatal error occurs on the listener thread.- Parameters:
emergencyStop- the Runnable.- Since:
- 2.2.1
-
setClientIdSuffix
Set a suffix to add to theclient.idconsumer property (if the consumer factory supports it).- Parameters:
clientIdSuffix- the suffix to add.- Since:
- 1.0.6
-
getAssignedPartitions
Return theTopicPartitions currently assigned to this container, either explicitly or by Kafka; may be null if not assigned yet.- Specified by:
getAssignedPartitionsin interfaceMessageListenerContainer- Returns:
- the
TopicPartitions currently assigned to this container, either explicitly or by Kafka; may be null if not assigned yet.
-
getAssignmentsByClientId
@Nullable public Map<String,Collection<org.apache.kafka.common.TopicPartition>> getAssignmentsByClientId()Description copied from interface:MessageListenerContainerReturn the assigned topics/partitions for this container, by client.id.- Specified by:
getAssignmentsByClientIdin interfaceMessageListenerContainer- Returns:
- the topics/partitions.
-
isContainerPaused
public boolean isContainerPaused()Description copied from interface:MessageListenerContainerReturn true ifMessageListenerContainer.pause()has been called; and all consumers in this container have actually paused.- Specified by:
isContainerPausedin interfaceMessageListenerContainer- Returns:
- true if the container is paused.
-
isPartitionPaused
public boolean isPartitionPaused(org.apache.kafka.common.TopicPartition topicPartition) Description copied from interface:MessageListenerContainerWhether or not this topic's partition is currently paused.- Specified by:
isPartitionPausedin interfaceMessageListenerContainer- Parameters:
topicPartition- the topic partition to check- Returns:
- true if this partition has been paused.
-
isInExpectedState
public boolean isInExpectedState()Description copied from interface:MessageListenerContainerReturn true if the container is running, has never been started, or has been stopped.- Specified by:
isInExpectedStatein interfaceMessageListenerContainer- Returns:
- true if the state is as expected.
- See Also:
-
pause
public void pause()Description copied from interface:MessageListenerContainerPause this container before the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
pausein interfaceMessageListenerContainer- Overrides:
pausein classAbstractMessageListenerContainer<K,V> - See Also:
-
KafkaConsumer.pause(Collection)
-
resume
public void resume()Description copied from interface:MessageListenerContainerResume this container, if paused, after the next poll(). This is a thread-safe operation, the actual resume is processed by the consumer thread.- Specified by:
resumein interfaceMessageListenerContainer- Overrides:
resumein classAbstractMessageListenerContainer<K,V> - See Also:
-
KafkaConsumer.resume(Collection)
-
resumePartition
public void resumePartition(org.apache.kafka.common.TopicPartition topicPartition) Description copied from interface:MessageListenerContainerResume this partition, if paused, after the next poll(). This is a thread-safe operation, the actual pause is processed by the consumer thread.- Specified by:
resumePartitionin interfaceMessageListenerContainer- Overrides:
resumePartitionin classAbstractMessageListenerContainer<K,V> - Parameters:
topicPartition- the topicPartition to resume.
-
metrics
public Map<String,Map<org.apache.kafka.common.MetricName, metrics()? extends org.apache.kafka.common.Metric>> Description copied from interface:MessageListenerContainerReturn metrics kept by this container's consumer(s), grouped byclient-id.- Specified by:
metricsin interfaceMessageListenerContainer- Returns:
- the consumer(s) metrics grouped by
client-id - See Also:
-
Consumer.metrics()
-
doStart
protected void doStart()- Specified by:
doStartin classAbstractMessageListenerContainer<K,V>
-
doStop
Description copied from class:AbstractMessageListenerContainerStop the container normally or abnormally.- Specified by:
doStopin classAbstractMessageListenerContainer<K,V> - Parameters:
callback- the callback.normal- true for an expected stop.
-
publishConsumerPausedEvent
public void publishConsumerPausedEvent(Collection<org.apache.kafka.common.TopicPartition> partitions, String reason) Description copied from interface:ConsumerPauseResumeEventPublisherPublish a consumer paused event.- Specified by:
publishConsumerPausedEventin interfaceConsumerPauseResumeEventPublisher- Parameters:
partitions- the paused partitions.reason- the reason.
-
publishConsumerResumedEvent
public void publishConsumerResumedEvent(Collection<org.apache.kafka.common.TopicPartition> partitions) Description copied from interface:ConsumerPauseResumeEventPublisherPublish a consumer resumed event.- Specified by:
publishConsumerResumedEventin interfaceConsumerPauseResumeEventPublisher- Parameters:
partitions- the resumed partitions.
-
parentOrThis
Description copied from class:AbstractMessageListenerContainerReturn this or a parent container if this has a parent.- Overrides:
parentOrThisin classAbstractMessageListenerContainer<K,V> - Returns:
- the parent or this.
-
toString
-