K - the key type.V - the value type.public abstract class MessagingMessageListenerAdapter<K,V> extends java.lang.Object implements ConsumerSeekAware
MessageListener adapter
providing the necessary infrastructure to extract the payload of a
Message.| Modifier and Type | Class and Description |
|---|---|
static class |
MessagingMessageListenerAdapter.ReplyExpressionRoot
Root object for reply expression evaluation.
|
ConsumerSeekAware.ConsumerSeekCallback| Modifier and Type | Field and Description |
|---|---|
protected org.springframework.core.log.LogAccessor |
logger |
protected static org.springframework.messaging.Message<KafkaNull> |
NULL_MESSAGE
Message used when no conversion is needed.
|
| Constructor and Description |
|---|
MessagingMessageListenerAdapter(java.lang.Object bean,
java.lang.reflect.Method method) |
| Modifier and Type | Method and Description |
|---|---|
protected java.lang.String |
createMessagingErrorMessage(java.lang.String description,
java.lang.Object payload) |
protected java.lang.reflect.Type |
determineInferredType(java.lang.reflect.Method method)
Subclasses can override this method to use a different mechanism to determine
the target type of the payload conversion.
|
protected RecordMessageConverter |
getMessageConverter()
Return the
MessagingMessageConverter for this listener,
being able to convert Message. |
protected ReplyHeadersConfigurer |
getReplyHeadersConfigurer()
Return the reply configurer.
|
protected java.lang.reflect.Type |
getType()
Returns the inferred type for conversion or, if null, the
fallbackType. |
protected void |
handleResult(java.lang.Object resultArg,
java.lang.Object request,
java.lang.Object source)
Handle the given result object returned from the listener method, sending a
response message to the SendTo topic.
|
protected java.lang.Object |
invokeHandler(java.lang.Object data,
Acknowledgment acknowledgment,
org.springframework.messaging.Message<?> message,
org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
Invoke the handler, wrapping any exception to a
ListenerExecutionFailedException
with a dedicated error message. |
protected boolean |
isConsumerRecordList() |
boolean |
isConsumerRecords() |
boolean |
isConversionNeeded() |
protected boolean |
isMessageList() |
protected boolean |
isSplitIterables()
When true,
Iterable return results will be split into discrete records. |
void |
onIdleContainer(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments,
ConsumerSeekAware.ConsumerSeekCallback callback)
If the container is configured to emit idle container events, this method is called
when the container idle event is emitted - allowing a seek operation.
|
void |
onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments,
ConsumerSeekAware.ConsumerSeekCallback callback)
When using group management, called when partition assignments change.
|
void |
onPartitionsRevoked(java.util.Collection<org.apache.kafka.common.TopicPartition> partitions)
When using group management, called when partition assignments are revoked.
|
void |
registerSeekCallback(ConsumerSeekAware.ConsumerSeekCallback callback)
Register the callback to use when seeking at some arbitrary time.
|
protected void |
sendResponse(java.lang.Object result,
java.lang.String topic,
java.lang.Object source,
boolean returnTypeMessage)
Send the result to the topic.
|
void |
setBeanResolver(org.springframework.expression.BeanResolver beanResolver)
Set a bean resolver for runtime SpEL expressions.
|
void |
setFallbackType(java.lang.Class<?> fallbackType)
Set a fallback type to use when using a type-aware message converter and this
adapter cannot determine the inferred type from the method.
|
void |
setHandlerMethod(HandlerAdapter handlerMethod)
Set the
HandlerAdapter to use to invoke the method
processing an incoming ConsumerRecord. |
void |
setMessageConverter(RecordMessageConverter messageConverter)
Set the MessageConverter.
|
void |
setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)
Set a configurer which will be invoked when creating a reply message.
|
void |
setReplyTemplate(KafkaTemplate<?,?> replyTemplate)
Set the template to use to send any result from the method invocation.
|
void |
setReplyTopic(java.lang.String replyTopicParam)
Set the topic to which to send any result from the method invocation.
|
void |
setSplitIterables(boolean splitIterables)
Set to false to disable splitting
Iterable reply values into separate
records. |
protected org.springframework.messaging.Message<?> |
toMessagingMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record,
Acknowledgment acknowledgment,
org.apache.kafka.clients.consumer.Consumer<?,?> consumer) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitunregisterSeekCallbackprotected static final org.springframework.messaging.Message<KafkaNull> NULL_MESSAGE
protected final org.springframework.core.log.LogAccessor logger
public MessagingMessageListenerAdapter(java.lang.Object bean,
java.lang.reflect.Method method)
public void setMessageConverter(RecordMessageConverter messageConverter)
messageConverter - the converter.protected final RecordMessageConverter getMessageConverter()
MessagingMessageConverter for this listener,
being able to convert Message.MessagingMessageConverter for this listener,
being able to convert Message.protected java.lang.reflect.Type getType()
fallbackType.public void setFallbackType(java.lang.Class<?> fallbackType)
StringJsonMessageConverter. Defaults to
Object.fallbackType - the type.public void setHandlerMethod(HandlerAdapter handlerMethod)
HandlerAdapter to use to invoke the method
processing an incoming ConsumerRecord.handlerMethod - HandlerAdapter instance.protected boolean isConsumerRecordList()
public boolean isConsumerRecords()
public boolean isConversionNeeded()
public void setReplyTopic(java.lang.String replyTopicParam)
!{...} evaluated at runtime.replyTopicParam - the topic or expression.public void setReplyTemplate(KafkaTemplate<?,?> replyTemplate)
replyTemplate - the template.public void setBeanResolver(org.springframework.expression.BeanResolver beanResolver)
beanResolver - the resolver.protected boolean isMessageList()
protected ReplyHeadersConfigurer getReplyHeadersConfigurer()
setReplyHeadersConfigurer(ReplyHeadersConfigurer)public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)
replyHeadersConfigurer - the configurer.protected boolean isSplitIterables()
Iterable return results will be split into discrete records.public void setSplitIterables(boolean splitIterables)
Iterable reply values into separate
records.splitIterables - false to disable; default true.public void registerSeekCallback(ConsumerSeekAware.ConsumerSeekCallback callback)
ConsumerSeekAwareConcurrentMessageListenerContainer or the same listener instance in multiple
containers listeners should store the callback in a ThreadLocal.registerSeekCallback in interface ConsumerSeekAwarecallback - the callback.public void onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments,
ConsumerSeekAware.ConsumerSeekCallback callback)
ConsumerSeekAwareonPartitionsAssigned in interface ConsumerSeekAwareassignments - the new assignments and their current offsets.callback - the callback to perform an initial seek after assignment.public void onPartitionsRevoked(java.util.Collection<org.apache.kafka.common.TopicPartition> partitions)
ConsumerSeekAwareConsumerSeekAware.registerSeekCallback(ConsumerSeekCallback) on this thread.onPartitionsRevoked in interface ConsumerSeekAwarepartitions - the partitions that have been revoked.public void onIdleContainer(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments,
ConsumerSeekAware.ConsumerSeekCallback callback)
ConsumerSeekAwareonIdleContainer in interface ConsumerSeekAwareassignments - the new assignments and their current offsets.callback - the callback to perform a seek.protected org.springframework.messaging.Message<?> toMessagingMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record, Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
protected final java.lang.Object invokeHandler(java.lang.Object data,
Acknowledgment acknowledgment,
org.springframework.messaging.Message<?> message,
org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
ListenerExecutionFailedException
with a dedicated error message.data - the data to process during invocation.acknowledgment - the acknowledgment to use if any.message - the message to process.consumer - the consumer.protected void handleResult(java.lang.Object resultArg,
java.lang.Object request,
java.lang.Object source)
resultArg - the result object to handle (never null)request - the original request messagesource - the source data for the method invocation - e.g.
o.s.messaging.Message<?>; may be nullprotected void sendResponse(java.lang.Object result,
java.lang.String topic,
@Nullable
java.lang.Object source,
boolean returnTypeMessage)
result - the result.topic - the topic.source - the source (input).returnTypeMessage - true if we are returning message(s).protected final java.lang.String createMessagingErrorMessage(java.lang.String description,
java.lang.Object payload)
protected java.lang.reflect.Type determineInferredType(java.lang.reflect.Method method)
method - the method.