Package org.springframework.kafka.config
Class AbstractKafkaListenerEndpoint<K,V>
java.lang.Object
org.springframework.kafka.config.AbstractKafkaListenerEndpoint<K,V>
- Type Parameters:
K- the key type.V- the value type.
- All Implemented Interfaces:
Aware,BeanFactoryAware,InitializingBean,KafkaListenerEndpoint
- Direct Known Subclasses:
MethodKafkaListenerEndpoint
public abstract class AbstractKafkaListenerEndpoint<K,V>
extends Object
implements KafkaListenerEndpoint, BeanFactoryAware, InitializingBean
Base model for a Kafka listener endpoint.
- Author:
- Stephane Nicoll, Gary Russell, Artem Bilan
- See Also:
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoidprotected abstract MessagingMessageListenerAdapter<K,V> createMessageListener(MessageListenerContainer container, MessageConverter messageConverter) Create aMessageListenerthat is able to serve this endpoint for the specified container.Return the autoStartup for this endpoint's container.Return the current batch listener flag for this endpoint, or null if not explicitly set.protected BatchToRecordAdapter<K,V> protected BeanExpressionContextprotected BeanFactoryprotected BeanResolverReturn the client id prefix for the container; it will be suffixed by '-n' to provide a unique id when concurrency is used.Return the concurrency for this endpoint's container.Get the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.protected StringBuilderReturn a description for this endpoint.getGroup()Return the group of this endpoint or null if not in a group.Return the groupId of this endpoint - if present, overrides thegroup.idproperty of the consumer factory.getId()Return the id of this endpoint.byte[]Get the listener info to insert in the record header.Return the main listener id if this container is for a retry topic.protected RecordFilterStrategy<? super K,? super V> protected KafkaTemplate<?,?> protected BeanExpressionResolverReturn the topicPartitions for this endpoint.Return the topicPattern for this endpoint.Return the topics for this endpoint.protected booleanbooleanReturn true if this endpoint creates a batch listener.booleanWhen true,Iterablereturn results will be split into discrete records.voidsetAckDiscarded(boolean ackDiscarded) Set to true if thesetRecordFilterStrategy(RecordFilterStrategy)is in use.voidsetAutoStartup(Boolean autoStartup) Set the autoStartup for this endpoint's container.voidsetBatchListener(boolean batchListener) Set to true if this endpoint should create a batch listener.voidsetBatchToRecordAdapter(BatchToRecordAdapter<K, V> batchToRecordAdapter) Set aBatchToRecordAdapter.voidsetBeanFactory(BeanFactory beanFactory) voidsetClientIdPrefix(String clientIdPrefix) Set the client id prefix; overrides the client id in the consumer configuration properties.voidsetConcurrency(Integer concurrency) Set the concurrency for this endpoint's container.voidsetConsumerProperties(Properties consumerProperties) Set the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.voidsetCorrelationHeaderName(String correlationHeaderName) Set a custom header name for the correlation id.voidSet the group for the corresponding listener container.voidsetGroupId(String groupId) Set the group id to override thegroup.idproperty in the ContainerFactory.voidvoidsetListenerInfo(byte[] listenerInfo) Set the listener info to insert in the record header.voidvoidsetRecordFilterStrategy(RecordFilterStrategy<? super K, ? super V> recordFilterStrategy) Set aRecordFilterStrategyimplementation.voidsetReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer) Set a configurer which will be invoked when creating a reply message.voidsetReplyTemplate(KafkaTemplate<?, ?> replyTemplate) Set theKafkaTemplateto use to send replies.voidsetSplitIterables(boolean splitIterables) Set to false to disable splittingIterablereply values into separate records.voidsetTopicPartitions(TopicPartitionOffset... topicPartitions) Set the topicPartitions to use.voidsetTopicPattern(Pattern topicPattern) Set the topic pattern to use.voidSet the topics to use.voidsetupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter) Setup the specified message listener container with the model defined by this endpoint.toString()
-
Constructor Details
-
AbstractKafkaListenerEndpoint
public AbstractKafkaListenerEndpoint()
-
-
Method Details
-
setBeanFactory
- Specified by:
setBeanFactoryin interfaceBeanFactoryAware- Throws:
BeansException
-
getBeanFactory
-
getResolver
-
getBeanExpressionContext
-
getBeanResolver
-
setId
-
setMainListenerId
-
getMainListenerId
Description copied from interface:KafkaListenerEndpointReturn the main listener id if this container is for a retry topic.- Specified by:
getMainListenerIdin interfaceKafkaListenerEndpoint- Returns:
- the main listener id or null.
-
getId
Description copied from interface:KafkaListenerEndpointReturn the id of this endpoint.- Specified by:
getIdin interfaceKafkaListenerEndpoint- Returns:
- the id of this endpoint. The id can be further qualified when the endpoint is resolved against its actual listener container.
- See Also:
-
setGroupId
Set the group id to override thegroup.idproperty in the ContainerFactory.- Parameters:
groupId- the group id.- Since:
- 1.3
-
getGroupId
Description copied from interface:KafkaListenerEndpointReturn the groupId of this endpoint - if present, overrides thegroup.idproperty of the consumer factory.- Specified by:
getGroupIdin interfaceKafkaListenerEndpoint- Returns:
- the group id; may be null.
-
setTopics
Set the topics to use. Either these or 'topicPattern' or 'topicPartitions' should be provided, but not a mixture.- Parameters:
topics- to set.- See Also:
-
getTopics
Return the topics for this endpoint.- Specified by:
getTopicsin interfaceKafkaListenerEndpoint- Returns:
- the topics for this endpoint.
-
setTopicPartitions
Set the topicPartitions to use. Either this or 'topic' or 'topicPattern' should be provided, but not a mixture.- Parameters:
topicPartitions- to set.- Since:
- 2.3
- See Also:
-
getTopicPartitionsToAssign
Return the topicPartitions for this endpoint.- Specified by:
getTopicPartitionsToAssignin interfaceKafkaListenerEndpoint- Returns:
- the topicPartitions for this endpoint.
- Since:
- 2.3
-
setTopicPattern
Set the topic pattern to use. Cannot be used with topics or topicPartitions.- Parameters:
topicPattern- the pattern- See Also:
-
getTopicPattern
Return the topicPattern for this endpoint.- Specified by:
getTopicPatternin interfaceKafkaListenerEndpoint- Returns:
- the topicPattern for this endpoint.
-
getGroup
Description copied from interface:KafkaListenerEndpointReturn the group of this endpoint or null if not in a group.- Specified by:
getGroupin interfaceKafkaListenerEndpoint- Returns:
- the group of this endpoint or null if not in a group.
-
setGroup
Set the group for the corresponding listener container.- Parameters:
group- the group.
-
isBatchListener
public boolean isBatchListener()Return true if this endpoint creates a batch listener.- Returns:
- true for a batch listener.
- Since:
- 1.1
-
getBatchListener
Return the current batch listener flag for this endpoint, or null if not explicitly set.- Specified by:
getBatchListenerin interfaceKafkaListenerEndpoint- Returns:
- the batch listener flag.
- Since:
- 2.8
-
setBatchListener
public void setBatchListener(boolean batchListener) Set to true if this endpoint should create a batch listener.- Parameters:
batchListener- true for a batch listener.- Since:
- 1.1
-
setReplyTemplate
Set theKafkaTemplateto use to send replies.- Parameters:
replyTemplate- the template.- Since:
- 2.0
-
getReplyTemplate
-
getRecordFilterStrategy
-
setRecordFilterStrategy
Set aRecordFilterStrategyimplementation.- Parameters:
recordFilterStrategy- the strategy implementation.
-
isAckDiscarded
protected boolean isAckDiscarded() -
setAckDiscarded
public void setAckDiscarded(boolean ackDiscarded) Set to true if thesetRecordFilterStrategy(RecordFilterStrategy)is in use.- Parameters:
ackDiscarded- the ackDiscarded.
-
getClientIdPrefix
Description copied from interface:KafkaListenerEndpointReturn the client id prefix for the container; it will be suffixed by '-n' to provide a unique id when concurrency is used.- Specified by:
getClientIdPrefixin interfaceKafkaListenerEndpoint- Returns:
- the client id prefix.
-
setClientIdPrefix
Set the client id prefix; overrides the client id in the consumer configuration properties.- Parameters:
clientIdPrefix- the prefix.- Since:
- 2.1.1
-
getConcurrency
Description copied from interface:KafkaListenerEndpointReturn the concurrency for this endpoint's container.- Specified by:
getConcurrencyin interfaceKafkaListenerEndpoint- Returns:
- the concurrency.
-
setConcurrency
Set the concurrency for this endpoint's container.- Parameters:
concurrency- the concurrency.- Since:
- 2.2
-
getAutoStartup
Description copied from interface:KafkaListenerEndpointReturn the autoStartup for this endpoint's container.- Specified by:
getAutoStartupin interfaceKafkaListenerEndpoint- Returns:
- the autoStartup.
-
setAutoStartup
Set the autoStartup for this endpoint's container.- Parameters:
autoStartup- the autoStartup.- Since:
- 2.2
-
setReplyHeadersConfigurer
Set a configurer which will be invoked when creating a reply message.- Parameters:
replyHeadersConfigurer- the configurer.- Since:
- 2.2
-
getConsumerProperties
Description copied from interface:KafkaListenerEndpointGet the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.group.idandclient.idare ignored.- Specified by:
getConsumerPropertiesin interfaceKafkaListenerEndpoint- Returns:
- the properties.
- See Also:
-
setConsumerProperties
Set the consumer properties that will be merged with the consumer properties provided by the consumer factory; properties here will supersede any with the same name(s) in the consumer factory.group.idandclient.idare ignored.- Parameters:
consumerProperties- the properties.- Since:
- 2.1.4
- See Also:
-
ConsumerConfigsetGroupId(String)setClientIdPrefix(String)
-
isSplitIterables
public boolean isSplitIterables()Description copied from interface:KafkaListenerEndpointWhen true,Iterablereturn results will be split into discrete records.- Specified by:
isSplitIterablesin interfaceKafkaListenerEndpoint- Returns:
- true to split.
-
setSplitIterables
public void setSplitIterables(boolean splitIterables) Set to false to disable splittingIterablereply values into separate records.- Parameters:
splitIterables- false to disable; default true.- Since:
- 2.3.5
-
getListenerInfo
Description copied from interface:KafkaListenerEndpointGet the listener info to insert in the record header.- Specified by:
getListenerInfoin interfaceKafkaListenerEndpoint- Returns:
- the info.
-
setListenerInfo
Set the listener info to insert in the record header.- Parameters:
listenerInfo- the info.- Since:
- 2.8.4
-
getBatchToRecordAdapter
-
setBatchToRecordAdapter
Set aBatchToRecordAdapter.- Parameters:
batchToRecordAdapter- the adapter.- Since:
- 2.4.2
-
setCorrelationHeaderName
Set a custom header name for the correlation id. DefaultKafkaHeaders.CORRELATION_ID. This header will be echoed back in any reply message.- Parameters:
correlationHeaderName- the header name.- Since:
- 3.0
-
afterPropertiesSet
public void afterPropertiesSet()- Specified by:
afterPropertiesSetin interfaceInitializingBean
-
setupListenerContainer
public void setupListenerContainer(MessageListenerContainer listenerContainer, @Nullable MessageConverter messageConverter) Description copied from interface:KafkaListenerEndpointSetup the specified message listener container with the model defined by this endpoint.This endpoint must provide the requested missing option(s) of the specified container to make it usable. Usually, this is about setting the
queuesand themessageListenerto use but an implementation may override any default setting that was already set.- Specified by:
setupListenerContainerin interfaceKafkaListenerEndpoint- Parameters:
listenerContainer- the listener container to configuremessageConverter- the message converter - can be null
-
createMessageListener
protected abstract MessagingMessageListenerAdapter<K,V> createMessageListener(MessageListenerContainer container, @Nullable MessageConverter messageConverter) Create aMessageListenerthat is able to serve this endpoint for the specified container.- Parameters:
container- theMessageListenerContainerto create aMessageListener.messageConverter- the message converter - may be null.- Returns:
- a
MessageListenerinstance.
-
getEndpointDescription
Return a description for this endpoint.- Returns:
- a description for this endpoint.
Available to subclasses, for inclusion in their
toString()result.
-
toString
-