Class MessagingMessageListenerAdapter<K,V>
- java.lang.Object
-
- org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter<K,V>
-
- Type Parameters:
K- the key type.V- the value type.
- All Implemented Interfaces:
ConsumerSeekAware
- Direct Known Subclasses:
BatchMessagingMessageListenerAdapter,RecordMessagingMessageListenerAdapter
public abstract class MessagingMessageListenerAdapter<K,V> extends java.lang.Object implements ConsumerSeekAware
An abstractMessageListeneradapter providing the necessary infrastructure to extract the payload of aMessage.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classMessagingMessageListenerAdapter.ReplyExpressionRootRoot object for reply expression evaluation.-
Nested classes/interfaces inherited from interface org.springframework.kafka.listener.ConsumerSeekAware
ConsumerSeekAware.ConsumerSeekCallback
-
-
Field Summary
Fields Modifier and Type Field Description protected org.springframework.core.log.LogAccessorloggerprotected static org.springframework.messaging.Message<KafkaNull>NULL_MESSAGEMessage used when no conversion is needed.
-
Constructor Summary
Constructors Constructor Description MessagingMessageListenerAdapter(java.lang.Object bean, java.lang.reflect.Method method)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected java.lang.StringcreateMessagingErrorMessage(java.lang.String description, java.lang.Object payload)protected java.lang.reflect.TypedetermineInferredType(java.lang.reflect.Method method)Subclasses can override this method to use a different mechanism to determine the target type of the payload conversion.protected RecordMessageConvertergetMessageConverter()Return theMessagingMessageConverterfor this listener, being able to convertMessage.protected ReplyHeadersConfigurergetReplyHeadersConfigurer()Return the reply configurer.protected java.lang.reflect.TypegetType()Returns the inferred type for conversion or, if null, thefallbackType.protected voidhandleResult(java.lang.Object resultArg, java.lang.Object request, java.lang.Object source)Handle the given result object returned from the listener method, sending a response message to the SendTo topic.protected java.lang.ObjectinvokeHandler(java.lang.Object data, Acknowledgment acknowledgment, org.springframework.messaging.Message<?> message, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)Invoke the handler, wrapping any exception to aListenerExecutionFailedExceptionwith a dedicated error message.protected booleanisConsumerRecordList()booleanisConsumerRecords()booleanisConversionNeeded()protected booleanisMessageList()protected booleanisSplitIterables()When true,Iterablereturn results will be split into discrete records.voidonIdleContainer(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback)If the container is configured to emit idle container events, this method is called when the container idle event is emitted - allowing a seek operation.voidonPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback)When using group management, called when partition assignments change.voidonPartitionsRevoked(java.util.Collection<org.apache.kafka.common.TopicPartition> partitions)When using group management, called when partition assignments are revoked.voidregisterSeekCallback(ConsumerSeekAware.ConsumerSeekCallback callback)Register the callback to use when seeking at some arbitrary time.protected voidsendResponse(java.lang.Object result, java.lang.String topic, java.lang.Object source, boolean returnTypeMessage)Send the result to the topic.voidsetBeanResolver(org.springframework.expression.BeanResolver beanResolver)Set a bean resolver for runtime SpEL expressions.voidsetFallbackType(java.lang.Class<?> fallbackType)Set a fallback type to use when using a type-aware message converter and this adapter cannot determine the inferred type from the method.voidsetHandlerMethod(HandlerAdapter handlerMethod)Set theHandlerAdapterto use to invoke the method processing an incomingConsumerRecord.voidsetMessageConverter(RecordMessageConverter messageConverter)Set the MessageConverter.voidsetReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)Set a configurer which will be invoked when creating a reply message.voidsetReplyTemplate(KafkaTemplate<?,?> replyTemplate)Set the template to use to send any result from the method invocation.voidsetReplyTopic(java.lang.String replyTopicParam)Set the topic to which to send any result from the method invocation.voidsetSplitIterables(boolean splitIterables)Set to false to disable splittingIterablereply values into separate records.protected org.springframework.messaging.Message<?>toMessagingMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.kafka.listener.ConsumerSeekAware
unregisterSeekCallback
-
-
-
-
Field Detail
-
NULL_MESSAGE
protected static final org.springframework.messaging.Message<KafkaNull> NULL_MESSAGE
Message used when no conversion is needed.
-
logger
protected final org.springframework.core.log.LogAccessor logger
-
-
Method Detail
-
setMessageConverter
public void setMessageConverter(RecordMessageConverter messageConverter)
Set the MessageConverter.- Parameters:
messageConverter- the converter.
-
getMessageConverter
protected final RecordMessageConverter getMessageConverter()
Return theMessagingMessageConverterfor this listener, being able to convertMessage.- Returns:
- the
MessagingMessageConverterfor this listener, being able to convertMessage.
-
getType
protected java.lang.reflect.Type getType()
Returns the inferred type for conversion or, if null, thefallbackType.- Returns:
- the type.
-
setFallbackType
public void setFallbackType(java.lang.Class<?> fallbackType)
Set a fallback type to use when using a type-aware message converter and this adapter cannot determine the inferred type from the method. An example of a type-aware message converter is theStringJsonMessageConverter. Defaults toObject.- Parameters:
fallbackType- the type.
-
setHandlerMethod
public void setHandlerMethod(HandlerAdapter handlerMethod)
Set theHandlerAdapterto use to invoke the method processing an incomingConsumerRecord.- Parameters:
handlerMethod-HandlerAdapterinstance.
-
isConsumerRecordList
protected boolean isConsumerRecordList()
-
isConsumerRecords
public boolean isConsumerRecords()
-
isConversionNeeded
public boolean isConversionNeeded()
-
setReplyTopic
public void setReplyTopic(java.lang.String replyTopicParam)
Set the topic to which to send any result from the method invocation. May be a SpEL expression!{...}evaluated at runtime.- Parameters:
replyTopicParam- the topic or expression.- Since:
- 2.0
-
setReplyTemplate
public void setReplyTemplate(KafkaTemplate<?,?> replyTemplate)
Set the template to use to send any result from the method invocation.- Parameters:
replyTemplate- the template.- Since:
- 2.0
-
setBeanResolver
public void setBeanResolver(org.springframework.expression.BeanResolver beanResolver)
Set a bean resolver for runtime SpEL expressions. Also configures the evaluation context with a standard type converter and map accessor.- Parameters:
beanResolver- the resolver.- Since:
- 2.0
-
isMessageList
protected boolean isMessageList()
-
getReplyHeadersConfigurer
protected ReplyHeadersConfigurer getReplyHeadersConfigurer()
Return the reply configurer.- Returns:
- the configurer.
- Since:
- 2.2
- See Also:
setReplyHeadersConfigurer(ReplyHeadersConfigurer)
-
setReplyHeadersConfigurer
public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)
Set a configurer which will be invoked when creating a reply message.- Parameters:
replyHeadersConfigurer- the configurer.- Since:
- 2.2
-
isSplitIterables
protected boolean isSplitIterables()
When true,Iterablereturn results will be split into discrete records.- Returns:
- true to split.
- Since:
- 2.3.5
-
setSplitIterables
public void setSplitIterables(boolean splitIterables)
Set to false to disable splittingIterablereply values into separate records.- Parameters:
splitIterables- false to disable; default true.- Since:
- 2.3.5
-
registerSeekCallback
public void registerSeekCallback(ConsumerSeekAware.ConsumerSeekCallback callback)
Description copied from interface:ConsumerSeekAwareRegister the callback to use when seeking at some arbitrary time. When used with aConcurrentMessageListenerContaineror the same listener instance in multiple containers listeners should store the callback in aThreadLocal.- Specified by:
registerSeekCallbackin interfaceConsumerSeekAware- Parameters:
callback- the callback.
-
onPartitionsAssigned
public void onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback)Description copied from interface:ConsumerSeekAwareWhen using group management, called when partition assignments change.- Specified by:
onPartitionsAssignedin interfaceConsumerSeekAware- Parameters:
assignments- the new assignments and their current offsets.callback- the callback to perform an initial seek after assignment.
-
onPartitionsRevoked
public void onPartitionsRevoked(java.util.Collection<org.apache.kafka.common.TopicPartition> partitions)
Description copied from interface:ConsumerSeekAwareWhen using group management, called when partition assignments are revoked. Listeners should discard any callback saved fromConsumerSeekAware.registerSeekCallback(ConsumerSeekCallback)on this thread.- Specified by:
onPartitionsRevokedin interfaceConsumerSeekAware- Parameters:
partitions- the partitions that have been revoked.
-
onIdleContainer
public void onIdleContainer(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments, ConsumerSeekAware.ConsumerSeekCallback callback)Description copied from interface:ConsumerSeekAwareIf the container is configured to emit idle container events, this method is called when the container idle event is emitted - allowing a seek operation.- Specified by:
onIdleContainerin interfaceConsumerSeekAware- Parameters:
assignments- the new assignments and their current offsets.callback- the callback to perform a seek.
-
toMessagingMessage
protected org.springframework.messaging.Message<?> toMessagingMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
-
invokeHandler
protected final java.lang.Object invokeHandler(java.lang.Object data, Acknowledgment acknowledgment, org.springframework.messaging.Message<?> message, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)Invoke the handler, wrapping any exception to aListenerExecutionFailedExceptionwith a dedicated error message.- Parameters:
data- the data to process during invocation.acknowledgment- the acknowledgment to use if any.message- the message to process.consumer- the consumer.- Returns:
- the result of invocation.
-
handleResult
protected void handleResult(java.lang.Object resultArg, java.lang.Object request, java.lang.Object source)Handle the given result object returned from the listener method, sending a response message to the SendTo topic.- Parameters:
resultArg- the result object to handle (nevernull)request- the original request messagesource- the source data for the method invocation - e.g.o.s.messaging.Message<?>; may be null
-
sendResponse
protected void sendResponse(java.lang.Object result, java.lang.String topic, @Nullable java.lang.Object source, boolean returnTypeMessage)Send the result to the topic.- Parameters:
result- the result.topic- the topic.source- the source (input).returnTypeMessage- true if we are returning message(s).- Since:
- 2.1.3
-
createMessagingErrorMessage
protected final java.lang.String createMessagingErrorMessage(java.lang.String description, java.lang.Object payload)
-
determineInferredType
protected java.lang.reflect.Type determineInferredType(java.lang.reflect.Method method)
Subclasses can override this method to use a different mechanism to determine the target type of the payload conversion.- Parameters:
method- the method.- Returns:
- the type.
-
-