K - the key type.V - the value type.public class KafkaProducerMessageHandler<K,V>
extends org.springframework.integration.handler.AbstractReplyProducingMessageHandler
ReplyingKafkaTemplate
it is used as the handler in an outbound gateway. When supplied with a simple
KafkaTemplate it used as the handler in an outbound channel adapter.messagingTemplateEXPRESSION_PARSER, logger| Constructor and Description |
|---|
KafkaProducerMessageHandler(org.springframework.kafka.core.KafkaTemplate<K,V> kafkaTemplate) |
| Modifier and Type | Method and Description |
|---|---|
protected void |
doInit() |
java.lang.String |
getComponentType() |
org.springframework.kafka.core.KafkaTemplate<?,?> |
getKafkaTemplate() |
protected org.springframework.messaging.MessageChannel |
getSendFailureChannel() |
protected org.springframework.messaging.MessageChannel |
getSendSuccessChannel() |
protected java.lang.Object |
handleRequestMessage(org.springframework.messaging.Message<?> message) |
void |
processSendResult(org.springframework.messaging.Message<?> message,
org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord,
org.springframework.util.concurrent.ListenableFuture<org.springframework.kafka.support.SendResult<K,V>> future,
org.springframework.messaging.MessageChannel metadataChannel) |
void |
setErrorMessageStrategy(org.springframework.integration.support.ErrorMessageStrategy errorMessageStrategy)
Set the error message strategy implementation to use when sending error messages after
send failures.
|
void |
setHeaderMapper(org.springframework.kafka.support.KafkaHeaderMapper headerMapper)
Set the header mapper to use.
|
void |
setMessageKeyExpression(org.springframework.expression.Expression messageKeyExpression) |
void |
setPartitionIdExpression(org.springframework.expression.Expression partitionIdExpression) |
void |
setReplyMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
Set a message converter for gateway replies.
|
void |
setReplyPayloadType(java.lang.reflect.Type payloadType)
When using a type-aware message converter (such as
StringJsonMessageConverter,
set the payload type the converter should create. |
void |
setSendFailureChannel(org.springframework.messaging.MessageChannel sendFailureChannel)
Set the failure channel.
|
void |
setSendFailureChannelName(java.lang.String sendFailureChannelName)
Set the failure channel name.
|
void |
setSendSuccessChannel(org.springframework.messaging.MessageChannel sendSuccessChannel)
Set the success channel.
|
void |
setSendSuccessChannelName(java.lang.String sendSuccessChannelName)
Set the Success channel name.
|
void |
setSendTimeout(long sendTimeout)
Specify a timeout in milliseconds for how long this
KafkaProducerMessageHandler should wait wait for send operation
results. |
void |
setSendTimeoutExpression(org.springframework.expression.Expression sendTimeoutExpression)
Specify a SpEL expression to evaluate a timeout in milliseconds for how long this
KafkaProducerMessageHandler should wait wait for send operation
results. |
void |
setSync(boolean sync)
A
boolean indicating if the KafkaProducerMessageHandler
should wait for the send operation results or not. |
void |
setTimestampExpression(org.springframework.expression.Expression timestampExpression)
Specify a SpEL expression to evaluate a timestamp that will be added in the Kafka record.
|
void |
setTopicExpression(org.springframework.expression.Expression topicExpression) |
doInvokeAdvisedRequestHandler, getRequiresReply, handleMessageInternal, hasAdviceChain, onInit, setAdviceChain, setBeanClassLoader, setRequiresReplyaddNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, shouldCopyRequestHeaders, shouldSplitOutput, updateNotPropagatedHeadersconfigureMetrics, getActiveCount, getActiveCountLong, getDuration, getErrorCount, getErrorCountLong, getHandleCount, getHandleCountLong, getManagedName, getManagedType, getMaxDuration, getMeanDuration, getMinDuration, getOrder, getOverrides, getStandardDeviationDuration, handleMessage, isCountsEnabled, isLoggingEnabled, isStatsEnabled, onComplete, onError, onNext, onSubscribe, registerMetricsCaptor, reset, setCountsEnabled, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, setStatsEnabledafterPropertiesSet, extractTypeIfPossible, getApplicationContext, getApplicationContextId, getBeanFactory, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitpublic void setTopicExpression(org.springframework.expression.Expression topicExpression)
public void setMessageKeyExpression(org.springframework.expression.Expression messageKeyExpression)
public void setPartitionIdExpression(org.springframework.expression.Expression partitionIdExpression)
public void setTimestampExpression(org.springframework.expression.Expression timestampExpression)
Long type representing epoch time in milliseconds.timestampExpression - the Expression for timestamp to wait for result
fo send operation.public void setHeaderMapper(org.springframework.kafka.support.KafkaHeaderMapper headerMapper)
headerMapper - the mapper; can be null to disable header mapping.public org.springframework.kafka.core.KafkaTemplate<?,?> getKafkaTemplate()
public void setSync(boolean sync)
boolean indicating if the KafkaProducerMessageHandler
should wait for the send operation results or not. Defaults to false.
In sync mode a downstream send operation exception will be re-thrown.sync - the send mode; async by default.public void setSendTimeout(long sendTimeout)
KafkaProducerMessageHandler should wait wait for send operation
results. Defaults to 10 seconds. The timeout is applied only in sync mode.
Also applies when sending to the success or failure channels.setSendTimeout in class org.springframework.integration.handler.AbstractMessageProducingHandlersendTimeout - the timeout to wait for result fo send operation.public void setSendTimeoutExpression(org.springframework.expression.Expression sendTimeoutExpression)
KafkaProducerMessageHandler should wait wait for send operation
results. Defaults to 10 seconds. The timeout is applied only in sync mode.sendTimeoutExpression - the Expression for timeout to wait for result
fo send operation.public void setSendFailureChannel(org.springframework.messaging.MessageChannel sendFailureChannel)
ErrorMessage will be sent
to this channel with a payload of a KafkaSendFailureException with the
failed message and cause.sendFailureChannel - the failure channel.public void setSendFailureChannelName(java.lang.String sendFailureChannelName)
ErrorMessage will be
sent to this channel name with a payload of a KafkaSendFailureException
with the failed message and cause.sendFailureChannelName - the failure channel name.public void setSendSuccessChannel(org.springframework.messaging.MessageChannel sendSuccessChannel)
sendSuccessChannel - the Success channel.public void setSendSuccessChannelName(java.lang.String sendSuccessChannelName)
sendSuccessChannelName - the Success channel name.public void setErrorMessageStrategy(org.springframework.integration.support.ErrorMessageStrategy errorMessageStrategy)
errorMessageStrategy - the implementation.public void setReplyMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
messageConverter - the converter.setReplyPayloadType(Type)public void setReplyPayloadType(java.lang.reflect.Type payloadType)
StringJsonMessageConverter,
set the payload type the converter should create. Defaults to Object.payloadType - the type.setReplyMessageConverter(RecordMessageConverter)public java.lang.String getComponentType()
getComponentType in interface org.springframework.integration.support.context.NamedComponentgetComponentType in class org.springframework.integration.handler.AbstractMessageHandlerprotected org.springframework.messaging.MessageChannel getSendFailureChannel()
protected org.springframework.messaging.MessageChannel getSendSuccessChannel()
protected void doInit()
doInit in class org.springframework.integration.handler.AbstractReplyProducingMessageHandlerprotected java.lang.Object handleRequestMessage(org.springframework.messaging.Message<?> message)
handleRequestMessage in class org.springframework.integration.handler.AbstractReplyProducingMessageHandlerpublic void processSendResult(org.springframework.messaging.Message<?> message,
org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord,
org.springframework.util.concurrent.ListenableFuture<org.springframework.kafka.support.SendResult<K,V>> future,
org.springframework.messaging.MessageChannel metadataChannel)
throws java.lang.InterruptedException,
java.util.concurrent.ExecutionException
java.lang.InterruptedExceptionjava.util.concurrent.ExecutionException