Package org.springframework.kafka.config
Class AbstractKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>,K,V>
- java.lang.Object
-
- org.springframework.kafka.config.AbstractKafkaListenerContainerFactory<C,K,V>
-
- Type Parameters:
C- theAbstractMessageListenerContainerimplementation type.K- the key type.V- the value type.
- All Implemented Interfaces:
org.springframework.beans.factory.Aware,org.springframework.beans.factory.InitializingBean,org.springframework.context.ApplicationContextAware,org.springframework.context.ApplicationEventPublisherAware,KafkaListenerContainerFactory<C>
- Direct Known Subclasses:
ConcurrentKafkaListenerContainerFactory
public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>,K,V> extends java.lang.Object implements KafkaListenerContainerFactory<C>, org.springframework.context.ApplicationEventPublisherAware, org.springframework.beans.factory.InitializingBean, org.springframework.context.ApplicationContextAware
BaseKafkaListenerContainerFactoryfor Spring's base container implementation.- Author:
- Stephane Nicoll, Gary Russell, Artem Bilan
- See Also:
AbstractMessageListenerContainer
-
-
Field Summary
Fields Modifier and Type Field Description protected org.springframework.core.log.LogAccessorlogger
-
Constructor Summary
Constructors Constructor Description AbstractKafkaListenerContainerFactory()
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description voidafterPropertiesSet()CcreateContainer(java.lang.String... topics)Create and configure a container without a listener; used to create containers that are not used for KafkaListener annotations.CcreateContainer(java.util.regex.Pattern topicPattern)Create and configure a container without a listener; used to create containers that are not used for KafkaListener annotations.CcreateContainer(TopicPartitionOffset... topicsAndPartitions)Create and configure a container without a listener; used to create containers that are not used for KafkaListener annotations.protected abstract CcreateContainerInstance(KafkaListenerEndpoint endpoint)Create an empty container instance.CcreateListenerContainer(KafkaListenerEndpoint endpoint)Create aMessageListenerContainerfor the givenKafkaListenerEndpoint.ConsumerFactory<? super K,? super V>getConsumerFactory()ContainerPropertiesgetContainerProperties()Obtain the properties template for this factory - set properties as needed and they will be copied to a final properties instance for the endpoint.protected voidinitializeContainer(C instance, KafkaListenerEndpoint endpoint)Further initialize the specified container.java.lang.BooleanisBatchListener()Return true if this endpoint creates a batch listener.voidsetAckDiscarded(java.lang.Boolean ackDiscarded)Set to true to ack discards when a filter strategy is in use.voidsetAfterRollbackProcessor(AfterRollbackProcessor<? super K,? super V> afterRollbackProcessor)Set a processor to invoke after a transaction rollback; typically will seek the unprocessed topic/partition to reprocess the records.voidsetApplicationContext(org.springframework.context.ApplicationContext applicationContext)voidsetApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)voidsetAutoStartup(java.lang.Boolean autoStartup)Specify anautoStartup booleanflag.voidsetBatchErrorHandler(BatchErrorHandler errorHandler)Set the batch error handler to call when the listener throws an exception.voidsetBatchInterceptor(BatchInterceptor<K,V> batchInterceptor)Set a batch interceptor to be called before and after calling the listener.voidsetBatchListener(java.lang.Boolean batchListener)Set to true if this endpoint should create a batch listener.voidsetBatchToRecordAdapter(BatchToRecordAdapter<K,V> batchToRecordAdapter)Set aBatchToRecordAdapter.voidsetConsumerFactory(ConsumerFactory<? super K,? super V> consumerFactory)Specify aConsumerFactoryto use.voidsetContainerCustomizer(ContainerCustomizer<K,V,C> containerCustomizer)Set a customizer used to further configure a container after it has been created.voidsetErrorHandler(ErrorHandler errorHandler)Set the error handler to call when the listener throws an exception.voidsetMessageConverter(MessageConverter messageConverter)Set the message converter to use if dynamic argument type matching is needed.voidsetMissingTopicsFatal(boolean missingTopicsFatal)Set to false to allow the container to start even if any of the configured topics are not present on the broker.voidsetPhase(int phase)Specify aphaseto use.voidsetRecordFilterStrategy(RecordFilterStrategy<? super K,? super V> recordFilterStrategy)Set the record filter strategy.voidsetRecordInterceptor(RecordInterceptor<K,V> recordInterceptor)Set an interceptor to be called before calling the listener.voidsetRecoveryCallback(org.springframework.retry.RecoveryCallback<? extends java.lang.Object> recoveryCallback)Set a callback to be used with theretryTemplate.voidsetReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)Set a configurer which will be invoked when creating a reply message.voidsetReplyTemplate(KafkaTemplate<?,?> replyTemplate)Set theKafkaTemplateto use to send replies.voidsetRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)Set a retryTemplate.voidsetStatefulRetry(boolean statefulRetry)When using aRetryTemplateSet to true to enable stateful retry.
-
-
-
Method Detail
-
setApplicationContext
public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext) throws org.springframework.beans.BeansException- Specified by:
setApplicationContextin interfaceorg.springframework.context.ApplicationContextAware- Throws:
org.springframework.beans.BeansException
-
setConsumerFactory
public void setConsumerFactory(ConsumerFactory<? super K,? super V> consumerFactory)
Specify aConsumerFactoryto use.- Parameters:
consumerFactory- The consumer factory.
-
getConsumerFactory
public ConsumerFactory<? super K,? super V> getConsumerFactory()
-
setAutoStartup
public void setAutoStartup(java.lang.Boolean autoStartup)
Specify anautoStartup booleanflag.- Parameters:
autoStartup- true for auto startup.- See Also:
AbstractMessageListenerContainer.setAutoStartup(boolean)
-
setPhase
public void setPhase(int phase)
Specify aphaseto use.- Parameters:
phase- The phase.- See Also:
AbstractMessageListenerContainer.setPhase(int)
-
setMessageConverter
public void setMessageConverter(MessageConverter messageConverter)
Set the message converter to use if dynamic argument type matching is needed.- Parameters:
messageConverter- the converter.
-
setRecordFilterStrategy
public void setRecordFilterStrategy(RecordFilterStrategy<? super K,? super V> recordFilterStrategy)
Set the record filter strategy.- Parameters:
recordFilterStrategy- the strategy.
-
setAckDiscarded
public void setAckDiscarded(java.lang.Boolean ackDiscarded)
Set to true to ack discards when a filter strategy is in use.- Parameters:
ackDiscarded- the ackDiscarded.
-
setRetryTemplate
public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
Set a retryTemplate.- Parameters:
retryTemplate- the template.
-
setRecoveryCallback
public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<? extends java.lang.Object> recoveryCallback)
Set a callback to be used with theretryTemplate.- Parameters:
recoveryCallback- the callback.
-
setStatefulRetry
public void setStatefulRetry(boolean statefulRetry)
When using aRetryTemplateSet to true to enable stateful retry. Use in conjunction with aSeekToCurrentErrorHandlerwhen retry can take excessive time; each failure goes back to the broker, to keep the Consumer alive.- Parameters:
statefulRetry- true to enable stateful retry.- Since:
- 2.1.3
-
isBatchListener
public java.lang.Boolean isBatchListener()
Return true if this endpoint creates a batch listener.- Returns:
- true for a batch listener.
- Since:
- 1.1
-
setBatchListener
public void setBatchListener(java.lang.Boolean batchListener)
Set to true if this endpoint should create a batch listener.- Parameters:
batchListener- true for a batch listener.- Since:
- 1.1
-
setApplicationEventPublisher
public void setApplicationEventPublisher(org.springframework.context.ApplicationEventPublisher applicationEventPublisher)
- Specified by:
setApplicationEventPublisherin interfaceorg.springframework.context.ApplicationEventPublisherAware
-
setReplyTemplate
public void setReplyTemplate(KafkaTemplate<?,?> replyTemplate)
Set theKafkaTemplateto use to send replies.- Parameters:
replyTemplate- the template.- Since:
- 2.0
-
setErrorHandler
public void setErrorHandler(ErrorHandler errorHandler)
Set the error handler to call when the listener throws an exception.- Parameters:
errorHandler- the error handler.- Since:
- 2.2
-
setBatchErrorHandler
public void setBatchErrorHandler(BatchErrorHandler errorHandler)
Set the batch error handler to call when the listener throws an exception.- Parameters:
errorHandler- the error handler.- Since:
- 2.2
-
setAfterRollbackProcessor
public void setAfterRollbackProcessor(AfterRollbackProcessor<? super K,? super V> afterRollbackProcessor)
Set a processor to invoke after a transaction rollback; typically will seek the unprocessed topic/partition to reprocess the records. The default does so, including the failed record.- Parameters:
afterRollbackProcessor- the processor.- Since:
- 1.3.5
-
setReplyHeadersConfigurer
public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)
Set a configurer which will be invoked when creating a reply message.- Parameters:
replyHeadersConfigurer- the configurer.- Since:
- 2.2
-
setMissingTopicsFatal
public void setMissingTopicsFatal(boolean missingTopicsFatal)
Set to false to allow the container to start even if any of the configured topics are not present on the broker. Does not apply when topic patterns are configured. Default true;- Parameters:
missingTopicsFatal- the missingTopicsFatal.- Since:
- 2.3
-
getContainerProperties
public ContainerProperties getContainerProperties()
Obtain the properties template for this factory - set properties as needed and they will be copied to a final properties instance for the endpoint.- Returns:
- the properties.
-
setRecordInterceptor
public void setRecordInterceptor(RecordInterceptor<K,V> recordInterceptor)
Set an interceptor to be called before calling the listener. Does not apply to batch listeners.- Parameters:
recordInterceptor- the interceptor.- Since:
- 2.2.7
-
setBatchInterceptor
public void setBatchInterceptor(BatchInterceptor<K,V> batchInterceptor)
Set a batch interceptor to be called before and after calling the listener.- Parameters:
batchInterceptor- the interceptor.- Since:
- 2.6.8
-
setBatchToRecordAdapter
public void setBatchToRecordAdapter(BatchToRecordAdapter<K,V> batchToRecordAdapter)
Set aBatchToRecordAdapter.- Parameters:
batchToRecordAdapter- the adapter.- Since:
- 2.4.2
-
setContainerCustomizer
public void setContainerCustomizer(ContainerCustomizer<K,V,C> containerCustomizer)
Set a customizer used to further configure a container after it has been created.- Parameters:
containerCustomizer- the customizer.- Since:
- 2.3.4
-
afterPropertiesSet
public void afterPropertiesSet()
- Specified by:
afterPropertiesSetin interfaceorg.springframework.beans.factory.InitializingBean
-
createListenerContainer
public C createListenerContainer(KafkaListenerEndpoint endpoint)
Description copied from interface:KafkaListenerContainerFactoryCreate aMessageListenerContainerfor the givenKafkaListenerEndpoint. Containers created using this method are added to the listener endpoint registry.- Specified by:
createListenerContainerin interfaceKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>>- Parameters:
endpoint- the endpoint to configure- Returns:
- the created container
-
createContainerInstance
protected abstract C createContainerInstance(KafkaListenerEndpoint endpoint)
Create an empty container instance.- Parameters:
endpoint- the endpoint.- Returns:
- the new container instance.
-
initializeContainer
protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint)
Further initialize the specified container.Subclasses can inherit from this method to apply extra configuration if necessary.
- Parameters:
instance- the container instance to configure.endpoint- the endpoint.
-
createContainer
public C createContainer(TopicPartitionOffset... topicsAndPartitions)
Description copied from interface:KafkaListenerContainerFactoryCreate and configure a container without a listener; used to create containers that are not used for KafkaListener annotations. Containers created using this method are not added to the listener endpoint registry.- Specified by:
createContainerin interfaceKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>>- Parameters:
topicsAndPartitions- the topicPartitions to assign.- Returns:
- the container.
-
createContainer
public C createContainer(java.lang.String... topics)
Description copied from interface:KafkaListenerContainerFactoryCreate and configure a container without a listener; used to create containers that are not used for KafkaListener annotations. Containers created using this method are not added to the listener endpoint registry.- Specified by:
createContainerin interfaceKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>>- Parameters:
topics- the topics.- Returns:
- the container.
-
createContainer
public C createContainer(java.util.regex.Pattern topicPattern)
Description copied from interface:KafkaListenerContainerFactoryCreate and configure a container without a listener; used to create containers that are not used for KafkaListener annotations. Containers created using this method are not added to the listener endpoint registry.- Specified by:
createContainerin interfaceKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K,V>>- Parameters:
topicPattern- the topicPattern.- Returns:
- the container.
-
-