K - the key type.V - the value type.public abstract class MessagingMessageListenerAdapter<K,V> extends java.lang.Object implements ConsumerSeekAware
MessageListener adapter providing the necessary infrastructure
to extract the payload of a Message.ConsumerSeekAware.ConsumerSeekCallback| Modifier and Type | Field and Description |
|---|---|
protected org.apache.commons.logging.Log |
logger |
| Constructor and Description |
|---|
MessagingMessageListenerAdapter(java.lang.Object bean,
java.lang.reflect.Method method) |
| Modifier and Type | Method and Description |
|---|---|
protected java.lang.reflect.Type |
determineInferredType(java.lang.reflect.Method method)
Subclasses can override this method to use a different mechanism to determine
the target type of the payload conversion.
|
protected RecordMessageConverter |
getMessageConverter()
Return the
MessagingMessageConverter for this listener,
being able to convert Message. |
protected java.lang.reflect.Type |
getType()
Returns the inferred type for conversion or, if null, the
fallbackType. |
protected java.lang.Object |
invokeHandler(java.lang.Object data,
Acknowledgment acknowledgment,
org.springframework.messaging.Message<?> message)
Invoke the handler, wrapping any exception to a
ListenerExecutionFailedException
with a dedicated error message. |
protected boolean |
isConsumerRecordList() |
protected boolean |
isMessageList() |
void |
onIdleContainer(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments,
ConsumerSeekAware.ConsumerSeekCallback callback)
If the container is configured to emit idle container events, this method is called
when the container idle event is emitted - allowing a seek operation.
|
void |
onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments,
ConsumerSeekAware.ConsumerSeekCallback callback)
When using group management, called when partition assignments change.
|
void |
registerSeekCallback(ConsumerSeekAware.ConsumerSeekCallback callback)
Register the callback to use when seeking at some arbitrary time.
|
void |
setFallbackType(java.lang.Class<?> fallbackType)
Set a fallback type to use when using a type-aware message converter and this
adapter cannot determine the inferred type from the method.
|
void |
setHandlerMethod(HandlerAdapter handlerMethod)
Set the
HandlerAdapter to use to invoke the method
processing an incoming ConsumerRecord. |
void |
setMessageConverter(RecordMessageConverter messageConverter)
Set the MessageConverter.
|
protected org.springframework.messaging.Message<?> |
toMessagingMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record,
Acknowledgment acknowledgment) |
public MessagingMessageListenerAdapter(java.lang.Object bean,
java.lang.reflect.Method method)
public void setMessageConverter(RecordMessageConverter messageConverter)
messageConverter - the converter.protected final RecordMessageConverter getMessageConverter()
MessagingMessageConverter for this listener,
being able to convert Message.MessagingMessageConverter for this listener,
being able to convert Message.protected java.lang.reflect.Type getType()
fallbackType.public void setFallbackType(java.lang.Class<?> fallbackType)
StringJsonMessageConverter. Defaults to
Object.fallbackType - the type.public void setHandlerMethod(HandlerAdapter handlerMethod)
HandlerAdapter to use to invoke the method
processing an incoming ConsumerRecord.handlerMethod - HandlerAdapter instance.protected boolean isConsumerRecordList()
protected boolean isMessageList()
public void registerSeekCallback(ConsumerSeekAware.ConsumerSeekCallback callback)
ConsumerSeekAwareConcurrentMessageListenerContainer or the same listener instance in multiple
containers listeners should store the callback in a ThreadLocal.registerSeekCallback in interface ConsumerSeekAwarecallback - the callback.public void onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments,
ConsumerSeekAware.ConsumerSeekCallback callback)
ConsumerSeekAwareonPartitionsAssigned in interface ConsumerSeekAwareassignments - the new assignments and their current offsets.callback - the callback to perform an initial seek after assignment.public void onIdleContainer(java.util.Map<org.apache.kafka.common.TopicPartition,java.lang.Long> assignments,
ConsumerSeekAware.ConsumerSeekCallback callback)
ConsumerSeekAwareonIdleContainer in interface ConsumerSeekAwareassignments - the new assignments and their current offsets.callback - the callback to perform a seek.protected org.springframework.messaging.Message<?> toMessagingMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> record, Acknowledgment acknowledgment)
protected final java.lang.Object invokeHandler(java.lang.Object data,
Acknowledgment acknowledgment,
org.springframework.messaging.Message<?> message)
ListenerExecutionFailedException
with a dedicated error message.data - the data to process during invocation.acknowledgment - the acknowledgment to use if any.message - the message to process.protected java.lang.reflect.Type determineInferredType(java.lang.reflect.Method method)
method - the method.