Package org.springframework.kafka.config
Class AbstractKafkaListenerEndpoint<K,V>
- java.lang.Object
-
- org.springframework.kafka.config.AbstractKafkaListenerEndpoint<K,V>
-
- Type Parameters:
K- the key type.V- the value type.
- All Implemented Interfaces:
org.springframework.beans.factory.Aware,org.springframework.beans.factory.BeanFactoryAware,org.springframework.beans.factory.InitializingBean,KafkaListenerEndpoint
- Direct Known Subclasses:
MethodKafkaListenerEndpoint
public abstract class AbstractKafkaListenerEndpoint<K,V> extends java.lang.Object implements KafkaListenerEndpoint, org.springframework.beans.factory.BeanFactoryAware, org.springframework.beans.factory.InitializingBean
Base model for a Kafka listener endpoint.- See Also:
MethodKafkaListenerEndpoint
-
-
Constructor Summary
Constructors Constructor Description AbstractKafkaListenerEndpoint()
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description voidafterPropertiesSet()protected abstract MessagingMessageListenerAdapter<K,V>createMessageListener(MessageListenerContainer container, MessageConverter messageConverter)Create aMessageListenerthat is able to serve this endpoint for the specified container.java.lang.BooleangetAutoStartup()Return the autoStartup for this endpoint's container.protected BatchToRecordAdapter<K,V>getBatchToRecordAdapter()protected org.springframework.beans.factory.config.BeanExpressionContextgetBeanExpressionContext()protected org.springframework.beans.factory.BeanFactorygetBeanFactory()protected org.springframework.expression.BeanResolvergetBeanResolver()java.lang.StringgetClientIdPrefix()Return the client id prefix for the container; it will be suffixed by '-n' to provide a unique id when concurrency is used.java.lang.IntegergetConcurrency()Return the concurrency for this endpoint's container.java.util.PropertiesgetConsumerProperties()Get the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.protected java.lang.StringBuildergetEndpointDescription()Return a description for this endpoint.java.lang.StringgetGroup()Return the group of this endpoint or null if not in a group.java.lang.StringgetGroupId()Return the groupId of this endpoint - if present, overrides thegroup.idproperty of the consumer factory.java.lang.StringgetId()Return the id of this endpoint.protected RecordFilterStrategy<? super K,? super V>getRecordFilterStrategy()protected org.springframework.retry.RecoveryCallback<?>getRecoveryCallback()protected KafkaTemplate<?,?>getReplyTemplate()protected org.springframework.beans.factory.config.BeanExpressionResolvergetResolver()protected org.springframework.retry.support.RetryTemplategetRetryTemplate()TopicPartitionOffset[]getTopicPartitionsToAssign()Return the topicPartitions for this endpoint.java.util.regex.PatterngetTopicPattern()Return the topicPattern for this endpoint.java.util.Collection<java.lang.String>getTopics()Return the topics for this endpoint.protected booleanisAckDiscarded()booleanisBatchListener()Return true if this endpoint creates a batch listener.booleanisSplitIterables()When true,Iterablereturn results will be split into discrete records.protected booleanisStatefulRetry()voidsetAckDiscarded(boolean ackDiscarded)Set to true if thesetRecordFilterStrategy(RecordFilterStrategy)is in use.voidsetAutoStartup(java.lang.Boolean autoStartup)Set the autoStartup for this endpoint's container.voidsetBatchListener(boolean batchListener)Set to true if this endpoint should create a batch listener.voidsetBatchToRecordAdapter(BatchToRecordAdapter<K,V> batchToRecordAdapter)Set aBatchToRecordAdapter.voidsetBeanFactory(org.springframework.beans.factory.BeanFactory beanFactory)voidsetClientIdPrefix(java.lang.String clientIdPrefix)Set the client id prefix; overrides the client id in the consumer configuration properties.voidsetConcurrency(java.lang.Integer concurrency)Set the concurrency for this endpoint's container.voidsetConsumerProperties(java.util.Properties consumerProperties)Set the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.voidsetGroup(java.lang.String group)Set the group for the corresponding listener container.voidsetGroupId(java.lang.String groupId)Set the group id to override thegroup.idproperty in the ContainerFactory.voidsetId(java.lang.String id)voidsetRecordFilterStrategy(RecordFilterStrategy<? super K,? super V> recordFilterStrategy)Set aRecordFilterStrategyimplementation.voidsetRecoveryCallback(org.springframework.retry.RecoveryCallback<? extends java.lang.Object> recoveryCallback)Set a callback to be used with thesetRetryTemplate(RetryTemplate).voidsetReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)Set a configurer which will be invoked when creating a reply message.voidsetReplyTemplate(KafkaTemplate<?,?> replyTemplate)Set theKafkaTemplateto use to send replies.voidsetRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)Set a retryTemplate.voidsetSplitIterables(boolean splitIterables)Set to false to disable splittingIterablereply values into separate records.voidsetStatefulRetry(boolean statefulRetry)When using aRetryTemplate, set to true to enable stateful retry.voidsetTopicPartitions(TopicPartitionOffset... topicPartitions)Set the topicPartitions to use.voidsetTopicPattern(java.util.regex.Pattern topicPattern)Set the topic pattern to use.voidsetTopics(java.lang.String... topics)Set the topics to use.voidsetupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter)Setup the specified message listener container with the model defined by this endpoint.java.lang.StringtoString()
-
-
-
Method Detail
-
setBeanFactory
public void setBeanFactory(org.springframework.beans.factory.BeanFactory beanFactory) throws org.springframework.beans.BeansException- Specified by:
setBeanFactoryin interfaceorg.springframework.beans.factory.BeanFactoryAware- Throws:
org.springframework.beans.BeansException
-
getBeanFactory
@Nullable protected org.springframework.beans.factory.BeanFactory getBeanFactory()
-
getResolver
@Nullable protected org.springframework.beans.factory.config.BeanExpressionResolver getResolver()
-
getBeanExpressionContext
@Nullable protected org.springframework.beans.factory.config.BeanExpressionContext getBeanExpressionContext()
-
getBeanResolver
@Nullable protected org.springframework.expression.BeanResolver getBeanResolver()
-
setId
public void setId(java.lang.String id)
-
getId
@Nullable public java.lang.String getId()
Description copied from interface:KafkaListenerEndpointReturn the id of this endpoint.- Specified by:
getIdin interfaceKafkaListenerEndpoint- Returns:
- the id of this endpoint. The id can be further qualified when the endpoint is resolved against its actual listener container.
- See Also:
KafkaListenerContainerFactory.createListenerContainer(org.springframework.kafka.config.KafkaListenerEndpoint)
-
setGroupId
public void setGroupId(java.lang.String groupId)
Set the group id to override thegroup.idproperty in the ContainerFactory.- Parameters:
groupId- the group id.- Since:
- 1.3
-
getGroupId
@Nullable public java.lang.String getGroupId()
Description copied from interface:KafkaListenerEndpointReturn the groupId of this endpoint - if present, overrides thegroup.idproperty of the consumer factory.- Specified by:
getGroupIdin interfaceKafkaListenerEndpoint- Returns:
- the group id; may be null.
-
setTopics
public void setTopics(java.lang.String... topics)
Set the topics to use. Either these or 'topicPattern' or 'topicPartitions' should be provided, but not a mixture.- Parameters:
topics- to set.- See Also:
setTopicPartitions(TopicPartitionOffset...),setTopicPattern(Pattern)
-
getTopics
public java.util.Collection<java.lang.String> getTopics()
Return the topics for this endpoint.- Specified by:
getTopicsin interfaceKafkaListenerEndpoint- Returns:
- the topics for this endpoint.
-
setTopicPartitions
public void setTopicPartitions(TopicPartitionOffset... topicPartitions)
Set the topicPartitions to use. Either this or 'topic' or 'topicPattern' should be provided, but not a mixture.- Parameters:
topicPartitions- to set.- Since:
- 2.3
- See Also:
setTopics(String...),setTopicPattern(Pattern)
-
getTopicPartitionsToAssign
@Nullable public TopicPartitionOffset[] getTopicPartitionsToAssign()
Return the topicPartitions for this endpoint.- Specified by:
getTopicPartitionsToAssignin interfaceKafkaListenerEndpoint- Returns:
- the topicPartitions for this endpoint.
- Since:
- 2.3
-
setTopicPattern
public void setTopicPattern(java.util.regex.Pattern topicPattern)
Set the topic pattern to use. Cannot be used with topics or topicPartitions.- Parameters:
topicPattern- the pattern- See Also:
setTopicPartitions(TopicPartitionOffset...),setTopics(String...)
-
getTopicPattern
@Nullable public java.util.regex.Pattern getTopicPattern()
Return the topicPattern for this endpoint.- Specified by:
getTopicPatternin interfaceKafkaListenerEndpoint- Returns:
- the topicPattern for this endpoint.
-
getGroup
@Nullable public java.lang.String getGroup()
Description copied from interface:KafkaListenerEndpointReturn the group of this endpoint or null if not in a group.- Specified by:
getGroupin interfaceKafkaListenerEndpoint- Returns:
- the group of this endpoint or null if not in a group.
-
setGroup
public void setGroup(java.lang.String group)
Set the group for the corresponding listener container.- Parameters:
group- the group.
-
isBatchListener
public boolean isBatchListener()
Return true if this endpoint creates a batch listener.- Returns:
- true for a batch listener.
- Since:
- 1.1
-
setBatchListener
public void setBatchListener(boolean batchListener)
Set to true if this endpoint should create a batch listener.- Parameters:
batchListener- true for a batch listener.- Since:
- 1.1
-
setReplyTemplate
public void setReplyTemplate(KafkaTemplate<?,?> replyTemplate)
Set theKafkaTemplateto use to send replies.- Parameters:
replyTemplate- the template.- Since:
- 2.0
-
getReplyTemplate
@Nullable protected KafkaTemplate<?,?> getReplyTemplate()
-
getRecordFilterStrategy
@Nullable protected RecordFilterStrategy<? super K,? super V> getRecordFilterStrategy()
-
setRecordFilterStrategy
public void setRecordFilterStrategy(RecordFilterStrategy<? super K,? super V> recordFilterStrategy)
Set aRecordFilterStrategyimplementation.- Parameters:
recordFilterStrategy- the strategy implementation.
-
isAckDiscarded
protected boolean isAckDiscarded()
-
setAckDiscarded
public void setAckDiscarded(boolean ackDiscarded)
Set to true if thesetRecordFilterStrategy(RecordFilterStrategy)is in use.- Parameters:
ackDiscarded- the ackDiscarded.
-
getRetryTemplate
@Nullable protected org.springframework.retry.support.RetryTemplate getRetryTemplate()
-
setRetryTemplate
public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
Set a retryTemplate.- Parameters:
retryTemplate- the template.
-
getRecoveryCallback
@Nullable protected org.springframework.retry.RecoveryCallback<?> getRecoveryCallback()
-
setRecoveryCallback
public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<? extends java.lang.Object> recoveryCallback)
Set a callback to be used with thesetRetryTemplate(RetryTemplate).- Parameters:
recoveryCallback- the callback.
-
isStatefulRetry
protected boolean isStatefulRetry()
-
setStatefulRetry
public void setStatefulRetry(boolean statefulRetry)
When using aRetryTemplate, set to true to enable stateful retry. Use in conjunction with aSeekToCurrentErrorHandlerwhen retry can take excessive time; each failure goes back to the broker, to keep the Consumer alive.- Parameters:
statefulRetry- true to enable stateful retry.- Since:
- 2.1.3
-
getClientIdPrefix
@Nullable public java.lang.String getClientIdPrefix()
Description copied from interface:KafkaListenerEndpointReturn the client id prefix for the container; it will be suffixed by '-n' to provide a unique id when concurrency is used.- Specified by:
getClientIdPrefixin interfaceKafkaListenerEndpoint- Returns:
- the client id prefix.
-
setClientIdPrefix
public void setClientIdPrefix(java.lang.String clientIdPrefix)
Set the client id prefix; overrides the client id in the consumer configuration properties.- Parameters:
clientIdPrefix- the prefix.- Since:
- 2.1.1
-
getConcurrency
@Nullable public java.lang.Integer getConcurrency()
Description copied from interface:KafkaListenerEndpointReturn the concurrency for this endpoint's container.- Specified by:
getConcurrencyin interfaceKafkaListenerEndpoint- Returns:
- the concurrency.
-
setConcurrency
public void setConcurrency(java.lang.Integer concurrency)
Set the concurrency for this endpoint's container.- Parameters:
concurrency- the concurrency.- Since:
- 2.2
-
getAutoStartup
@Nullable public java.lang.Boolean getAutoStartup()
Description copied from interface:KafkaListenerEndpointReturn the autoStartup for this endpoint's container.- Specified by:
getAutoStartupin interfaceKafkaListenerEndpoint- Returns:
- the autoStartup.
-
setAutoStartup
public void setAutoStartup(java.lang.Boolean autoStartup)
Set the autoStartup for this endpoint's container.- Parameters:
autoStartup- the autoStartup.- Since:
- 2.2
-
setReplyHeadersConfigurer
public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)
Set a configurer which will be invoked when creating a reply message.- Parameters:
replyHeadersConfigurer- the configurer.- Since:
- 2.2
-
getConsumerProperties
@Nullable public java.util.Properties getConsumerProperties()
Description copied from interface:KafkaListenerEndpointGet the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.group.idandclient.idare ignored.- Specified by:
getConsumerPropertiesin interfaceKafkaListenerEndpoint- Returns:
- the properties.
- See Also:
ConsumerConfig,KafkaListenerEndpoint.getGroupId(),KafkaListenerEndpoint.getClientIdPrefix()
-
setConsumerProperties
public void setConsumerProperties(java.util.Properties consumerProperties)
Set the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.group.idandclient.idare ignored.- Parameters:
consumerProperties- the properties.- Since:
- 2.1.4
- See Also:
ConsumerConfig,setGroupId(String),setClientIdPrefix(String)
-
isSplitIterables
public boolean isSplitIterables()
Description copied from interface:KafkaListenerEndpointWhen true,Iterablereturn results will be split into discrete records.- Specified by:
isSplitIterablesin interfaceKafkaListenerEndpoint- Returns:
- true to split.
-
setSplitIterables
public void setSplitIterables(boolean splitIterables)
Set to false to disable splittingIterablereply values into separate records.- Parameters:
splitIterables- false to disable; default true.- Since:
- 2.3.5
-
getBatchToRecordAdapter
@Nullable protected BatchToRecordAdapter<K,V> getBatchToRecordAdapter()
-
setBatchToRecordAdapter
public void setBatchToRecordAdapter(BatchToRecordAdapter<K,V> batchToRecordAdapter)
Set aBatchToRecordAdapter.- Parameters:
batchToRecordAdapter- the adapter.- Since:
- 2.4.2
-
afterPropertiesSet
public void afterPropertiesSet()
- Specified by:
afterPropertiesSetin interfaceorg.springframework.beans.factory.InitializingBean
-
setupListenerContainer
public void setupListenerContainer(MessageListenerContainer listenerContainer, @Nullable MessageConverter messageConverter)
Description copied from interface:KafkaListenerEndpointSetup the specified message listener container with the model defined by this endpoint.This endpoint must provide the requested missing option(s) of the specified container to make it usable. Usually, this is about setting the
queuesand themessageListenerto use but an implementation may override any default setting that was already set.- Specified by:
setupListenerContainerin interfaceKafkaListenerEndpoint- Parameters:
listenerContainer- the listener container to configuremessageConverter- the message converter - can be null
-
createMessageListener
protected abstract MessagingMessageListenerAdapter<K,V> createMessageListener(MessageListenerContainer container, @Nullable MessageConverter messageConverter)
Create aMessageListenerthat is able to serve this endpoint for the specified container.- Parameters:
container- theMessageListenerContainerto create aMessageListener.messageConverter- the message converter - may be null.- Returns:
- a
MessageListenerinstance.
-
getEndpointDescription
protected java.lang.StringBuilder getEndpointDescription()
Return a description for this endpoint.- Returns:
- a description for this endpoint.
Available to subclasses, for inclusion in their
toString()result.
-
toString
public java.lang.String toString()
- Overrides:
toStringin classjava.lang.Object
-
-