Class MessagingMessageListenerAdapter<K,V>
java.lang.Object
org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter<K,V>
- Type Parameters:
K- the key type.V- the value type.
- All Implemented Interfaces:
ConsumerSeekAware
- Direct Known Subclasses:
BatchMessagingMessageListenerAdapter,RecordMessagingMessageListenerAdapter
public abstract class MessagingMessageListenerAdapter<K,V>
extends Object
implements ConsumerSeekAware
An abstract
MessageListener adapter
providing the necessary infrastructure to extract the payload of a
Message.- Author:
- Stephane Nicoll, Gary Russell, Artem Bilan, Venil Noronha
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic final recordRoot object for reply expression evaluation.Nested classes/interfaces inherited from interface org.springframework.kafka.listener.ConsumerSeekAware
ConsumerSeekAware.ConsumerSeekCallback -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final LogAccessorMessage used when no conversion is needed. -
Constructor Summary
ConstructorsModifierConstructorDescriptionprotectedMessagingMessageListenerAdapter(Object bean, Method method) Create an instance with the provided bean and method. -
Method Summary
Modifier and TypeMethodDescriptionprotected final StringcreateMessagingErrorMessage(String description, Object payload) protected TypedetermineInferredType(Method method) Subclasses can override this method to use a different mechanism to determine the target type of the payload conversion.protected final RecordMessageConverterReturn theMessagingMessageConverterfor this listener, being able to convertMessage.protected ReplyHeadersConfigurerReturn the reply configurer.protected TypegetType()Returns the inferred type for conversion or, if null, thefallbackType.protected voidhandleResult(Object resultArg, Object request, Object source) Handle the given result object returned from the listener method, sending a response message to the SendTo topic.protected final ObjectinvokeHandler(Object data, Acknowledgment acknowledgment, Message<?> message, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer) Invoke the handler, wrapping any exception to aListenerExecutionFailedExceptionwith a dedicated error message.protected booleanbooleanbooleanprotected booleanprotected booleanWhen true,Iterablereturn results will be split into discrete records.voidonIdleContainer(Map<org.apache.kafka.common.TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) If the container is configured to emit idle container events, this method is called when the container idle event is emitted - allowing a seek operation.voidonPartitionsAssigned(Map<org.apache.kafka.common.TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) When using group management, called when partition assignments change.voidonPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions) When using group management, called when partition assignments are revoked.voidRegister the callback to use when seeking at some arbitrary time.protected voidsendResponse(Object result, String topic, Object source, boolean returnTypeMessage) Send the result to the topic.voidsetBeanResolver(BeanResolver beanResolver) Set a bean resolver for runtime SpEL expressions.voidsetCorrelationHeaderName(String correlationHeaderName) Set a custom header name for the correlation id.voidsetFallbackType(Class<?> fallbackType) Set a fallback type to use when using a type-aware message converter and this adapter cannot determine the inferred type from the method.voidsetHandlerMethod(HandlerAdapter handlerMethod) Set theHandlerAdapterto use to invoke the method processing an incomingConsumerRecord.voidsetMessageConverter(RecordMessageConverter messageConverter) Set the MessageConverter.voidsetMessagingConverter(SmartMessageConverter messageConverter) Set theSmartMessageConverterto use with the defaultMessagingMessageConverter.voidsetReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer) Set a configurer which will be invoked when creating a reply message.voidsetReplyTemplate(KafkaTemplate<?, ?> replyTemplate) Set the template to use to send any result from the method invocation.voidsetReplyTopic(String replyTopicParam) Set the topic to which to send any result from the method invocation.voidsetSplitIterables(boolean splitIterables) Set to false to disable splittingIterablereply values into separate records.protected Message<?>toMessagingMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V> cRecord, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer) Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.springframework.kafka.listener.ConsumerSeekAware
onFirstPoll, unregisterSeekCallback
-
Field Details
-
NULL_MESSAGE
Message used when no conversion is needed. -
logger
-
-
Constructor Details
-
MessagingMessageListenerAdapter
Create an instance with the provided bean and method.- Parameters:
bean- the bean.method- the method.
-
-
Method Details
-
setCorrelationHeaderName
Set a custom header name for the correlation id. DefaultKafkaHeaders.CORRELATION_ID. This header will be echoed back in any reply message.- Parameters:
correlationHeaderName- the header name.- Since:
- 3.0
-
setMessageConverter
Set the MessageConverter.- Parameters:
messageConverter- the converter.
-
getMessageConverter
Return theMessagingMessageConverterfor this listener, being able to convertMessage.- Returns:
- the
MessagingMessageConverterfor this listener, being able to convertMessage.
-
setMessagingConverter
Set theSmartMessageConverterto use with the defaultMessagingMessageConverter. Not allowed when a custommessageConverteris provided.- Parameters:
messageConverter- the converter.- Since:
- 2.7.1
-
getType
Returns the inferred type for conversion or, if null, thefallbackType.- Returns:
- the type.
-
setFallbackType
Set a fallback type to use when using a type-aware message converter and this adapter cannot determine the inferred type from the method. An example of a type-aware message converter is theStringJsonMessageConverter. Defaults toObject.- Parameters:
fallbackType- the type.
-
setHandlerMethod
Set theHandlerAdapterto use to invoke the method processing an incomingConsumerRecord.- Parameters:
handlerMethod-HandlerAdapterinstance.
-
isConsumerRecordList
protected boolean isConsumerRecordList() -
isConsumerRecords
public boolean isConsumerRecords() -
isConversionNeeded
public boolean isConversionNeeded() -
setReplyTopic
Set the topic to which to send any result from the method invocation. May be a SpEL expression!{...}evaluated at runtime.- Parameters:
replyTopicParam- the topic or expression.- Since:
- 2.0
-
setReplyTemplate
Set the template to use to send any result from the method invocation.- Parameters:
replyTemplate- the template.- Since:
- 2.0
-
setBeanResolver
Set a bean resolver for runtime SpEL expressions. Also configures the evaluation context with a standard type converter and map accessor.- Parameters:
beanResolver- the resolver.- Since:
- 2.0
-
isMessageList
protected boolean isMessageList() -
getReplyHeadersConfigurer
Return the reply configurer.- Returns:
- the configurer.
- Since:
- 2.2
- See Also:
-
setReplyHeadersConfigurer
Set a configurer which will be invoked when creating a reply message.- Parameters:
replyHeadersConfigurer- the configurer.- Since:
- 2.2
-
isSplitIterables
protected boolean isSplitIterables()When true,Iterablereturn results will be split into discrete records.- Returns:
- true to split.
- Since:
- 2.3.5
-
setSplitIterables
public void setSplitIterables(boolean splitIterables) Set to false to disable splittingIterablereply values into separate records.- Parameters:
splitIterables- false to disable; default true.- Since:
- 2.3.5
-
registerSeekCallback
Description copied from interface:ConsumerSeekAwareRegister the callback to use when seeking at some arbitrary time. When used with aConcurrentMessageListenerContaineror the same listener instance in multiple containers listeners should store the callback in aThreadLocal.- Specified by:
registerSeekCallbackin interfaceConsumerSeekAware- Parameters:
callback- the callback.
-
onPartitionsAssigned
public void onPartitionsAssigned(Map<org.apache.kafka.common.TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) Description copied from interface:ConsumerSeekAwareWhen using group management, called when partition assignments change.- Specified by:
onPartitionsAssignedin interfaceConsumerSeekAware- Parameters:
assignments- the new assignments and their current offsets.callback- the callback to perform an initial seek after assignment.
-
onPartitionsRevoked
Description copied from interface:ConsumerSeekAwareWhen using group management, called when partition assignments are revoked. Listeners should discard any callback saved fromConsumerSeekAware.registerSeekCallback(ConsumerSeekCallback)on this thread.- Specified by:
onPartitionsRevokedin interfaceConsumerSeekAware- Parameters:
partitions- the partitions that have been revoked.
-
onIdleContainer
public void onIdleContainer(Map<org.apache.kafka.common.TopicPartition, Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback) Description copied from interface:ConsumerSeekAwareIf the container is configured to emit idle container events, this method is called when the container idle event is emitted - allowing a seek operation.- Specified by:
onIdleContainerin interfaceConsumerSeekAware- Parameters:
assignments- the new assignments and their current offsets.callback- the callback to perform a seek.
-
toMessagingMessage
protected Message<?> toMessagingMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K, V> cRecord, @Nullable Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer) -
invokeHandler
protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message<?> message, org.apache.kafka.clients.consumer.Consumer<?, ?> consumer) Invoke the handler, wrapping any exception to aListenerExecutionFailedExceptionwith a dedicated error message.- Parameters:
data- the data to process during invocation.acknowledgment- the acknowledgment to use if any.message- the message to process.consumer- the consumer.- Returns:
- the result of invocation.
-
handleResult
Handle the given result object returned from the listener method, sending a response message to the SendTo topic.- Parameters:
resultArg- the result object to handle (nevernull)request- the original request messagesource- the source data for the method invocation - e.g.o.s.messaging.Message<?>; may be null
-
sendResponse
protected void sendResponse(Object result, String topic, @Nullable Object source, boolean returnTypeMessage) Send the result to the topic.- Parameters:
result- the result.topic- the topic.source- the source (input).returnTypeMessage- true if we are returning message(s).- Since:
- 2.1.3
-
createMessagingErrorMessage
-
determineInferredType
Subclasses can override this method to use a different mechanism to determine the target type of the payload conversion.- Parameters:
method- the method.- Returns:
- the type.
-