Class DeadLetterPublishingRecoverer
- java.lang.Object
-
- org.springframework.kafka.listener.DeadLetterPublishingRecoverer
-
- All Implemented Interfaces:
java.util.function.BiConsumer<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception>,ConsumerAwareRecordRecoverer,ConsumerRecordRecoverer
public class DeadLetterPublishingRecoverer extends java.lang.Object implements ConsumerAwareRecordRecoverer
AConsumerRecordRecovererthat publishes a failed record to a dead-letter topic.- Since:
- 2.2
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static classDeadLetterPublishingRecoverer.HeaderNamesContainer class for the name of the headers that will be added to the produced record.
-
Field Summary
Fields Modifier and Type Field Description protected org.springframework.core.log.LogAccessorlogger
-
Constructor Summary
Constructors Constructor Description DeadLetterPublishingRecoverer(java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<?,?>,KafkaOperations<?,?>> templateResolver, boolean transactional, java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)Create an instance with a template resolving function that receives the failed consumer record and the exception and returns aKafkaOperationsand a flag on whether or not the publishing from this instance will be transactional or not.DeadLetterPublishingRecoverer(java.util.Map<java.lang.Class<?>,KafkaOperations<? extends java.lang.Object,? extends java.lang.Object>> templates)Create an instance with the provided templates and a default destination resolving function that returns a TopicPartition based on the original topic (appended with ".DLT") from the failed record, and the same partition as the failed record.DeadLetterPublishingRecoverer(java.util.Map<java.lang.Class<?>,KafkaOperations<? extends java.lang.Object,? extends java.lang.Object>> templates, java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)Create an instance with the provided templates and destination resolving function, that receives the failed consumer record and the exception and returns aTopicPartition.DeadLetterPublishingRecoverer(KafkaOperations<? extends java.lang.Object,? extends java.lang.Object> template)Create an instance with the provided template and a default destination resolving function that returns a TopicPartition based on the original topic (appended with ".DLT") from the failed record, and the same partition as the failed record.DeadLetterPublishingRecoverer(KafkaOperations<? extends java.lang.Object,? extends java.lang.Object> template, java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)Create an instance with the provided template and destination resolving function, that receives the failed consumer record and the exception and returns aTopicPartition.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidaccept(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.Exception exception)Recover the record.protected org.apache.kafka.clients.producer.ProducerRecord<java.lang.Object,java.lang.Object>createProducerRecord(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.common.header.Headers headers, byte[] key, byte[] value)Subclasses can override this method to customize the producer record to send to the DLQ.protected DeadLetterPublishingRecoverer.HeaderNamesgetHeaderNames()Override this if you want different header names to be used in the sent record.protected voidpublish(org.apache.kafka.clients.producer.ProducerRecord<java.lang.Object,java.lang.Object> outRecord, KafkaOperations<java.lang.Object,java.lang.Object> kafkaTemplate)Override this if you want more than just logging of the send result.protected voidsend(org.apache.kafka.clients.producer.ProducerRecord<java.lang.Object,java.lang.Object> outRecord, KafkaOperations<java.lang.Object,java.lang.Object> kafkaTemplate)Send the record.voidsetFailIfSendResultIsError(boolean failIfSendResultIsError)Set to true to enable waiting for the send result and throw an exception if it fails.voidsetHeadersFunction(java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.header.Headers> headersFunction)Set a function which will be called to obtain additional headers to add to the published record.voidsetPartitionInfoTimeout(java.time.Duration partitionInfoTimeout)Time to wait for partition information when verifying.voidsetReplaceOriginalHeaders(boolean replaceOriginalHeaders)Set to false if you don't want to replace the dead letter original headers if they are already present.voidsetRetainExceptionHeader(boolean retainExceptionHeader)Set to true to retain a Java serializedDeserializationExceptionheader.voidsetThrowIfNoDestinationReturned(boolean throwIfNoDestinationReturned)Set to true to throw an exception if the destination resolver function returns a null TopicPartition.voidsetTimeoutBuffer(long buffer)Set the number of milliseconds to add to the producer configurationdelivery.timeout.msproperty to avoid timing out before the Kafka producer.voidsetVerifyPartition(boolean verifyPartition)Set to false to disable partition verification.voidsetWaitForSendResultTimeout(java.time.Duration waitForSendResultTimeout)Set the minumum time to wait for message sending.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.kafka.listener.ConsumerAwareRecordRecoverer
accept
-
-
-
-
Constructor Detail
-
DeadLetterPublishingRecoverer
public DeadLetterPublishingRecoverer(KafkaOperations<? extends java.lang.Object,? extends java.lang.Object> template)
Create an instance with the provided template and a default destination resolving function that returns a TopicPartition based on the original topic (appended with ".DLT") from the failed record, and the same partition as the failed record. Therefore the dead-letter topic must have at least as many partitions as the original topic.- Parameters:
template- theKafkaOperationsto use for publishing.
-
DeadLetterPublishingRecoverer
public DeadLetterPublishingRecoverer(KafkaOperations<? extends java.lang.Object,? extends java.lang.Object> template, java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)
Create an instance with the provided template and destination resolving function, that receives the failed consumer record and the exception and returns aTopicPartition. If the partition in theTopicPartitionis less than 0, no partition is set when publishing to the topic.- Parameters:
template- theKafkaOperationsto use for publishing.destinationResolver- the resolving function.
-
DeadLetterPublishingRecoverer
public DeadLetterPublishingRecoverer(java.util.Map<java.lang.Class<?>,KafkaOperations<? extends java.lang.Object,? extends java.lang.Object>> templates)
Create an instance with the provided templates and a default destination resolving function that returns a TopicPartition based on the original topic (appended with ".DLT") from the failed record, and the same partition as the failed record. Therefore the dead-letter topic must have at least as many partitions as the original topic. The templates map keys are classes and the value the corresponding template to use for objects (producer record values) of that type. ALinkedHashMapis recommended when there is more than one template, to ensure the map is traversed in order. To send records with a null value, add a template with theVoidclass as a key; otherwise the first template from the map values iterator will be used.- Parameters:
templates- theKafkaOperationss to use for publishing.
-
DeadLetterPublishingRecoverer
public DeadLetterPublishingRecoverer(java.util.Map<java.lang.Class<?>,KafkaOperations<? extends java.lang.Object,? extends java.lang.Object>> templates, java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)
Create an instance with the provided templates and destination resolving function, that receives the failed consumer record and the exception and returns aTopicPartition. If the partition in theTopicPartitionis less than 0, no partition is set when publishing to the topic. The templates map keys are classes and the value the corresponding template to use for objects (producer record values) of that type. ALinkedHashMapis recommended when there is more than one template, to ensure the map is traversed in order. To send records with a null value, add a template with theVoidclass as a key; otherwise the first template from the map values iterator will be used.- Parameters:
templates- theKafkaOperationss to use for publishing.destinationResolver- the resolving function.
-
DeadLetterPublishingRecoverer
public DeadLetterPublishingRecoverer(java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<?,?>,KafkaOperations<?,?>> templateResolver, boolean transactional, java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.TopicPartition> destinationResolver)
Create an instance with a template resolving function that receives the failed consumer record and the exception and returns aKafkaOperationsand a flag on whether or not the publishing from this instance will be transactional or not. Also receives a destination resolving function that works similarly but returns aTopicPartitioninstead. If the partition in theTopicPartitionis less than 0, no partition is set when publishing to the topic.- Parameters:
templateResolver- the function that resolver theKafkaOperationsto use for publishing.transactional- whether or not publishing by this instance should be transactionaldestinationResolver- the resolving function.- Since:
- 2.7
-
-
Method Detail
-
setRetainExceptionHeader
public void setRetainExceptionHeader(boolean retainExceptionHeader)
Set to true to retain a Java serializedDeserializationExceptionheader. By default, such headers are removed from the published record, unless both key and value deserialization exceptions occur, in which case, the DLT_* headers are created from the value exception and the key exception header is retained.- Parameters:
retainExceptionHeader- true to retain the- Since:
- 2.5
-
setHeadersFunction
public void setHeadersFunction(java.util.function.BiFunction<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,java.lang.Exception,org.apache.kafka.common.header.Headers> headersFunction)
Set a function which will be called to obtain additional headers to add to the published record.- Parameters:
headersFunction- the headers function.- Since:
- 2.5.4
-
setVerifyPartition
public void setVerifyPartition(boolean verifyPartition)
Set to false to disable partition verification. When true, verify that the partition returned by the resolver actually exists. If not, set theProducerRecord.partition()to null, allowing the producer to determine the destination partition.- Parameters:
verifyPartition- false to disable.- Since:
- 2.7
- See Also:
setPartitionInfoTimeout(Duration)
-
setPartitionInfoTimeout
public void setPartitionInfoTimeout(java.time.Duration partitionInfoTimeout)
Time to wait for partition information when verifying. Default is 5 seconds.- Parameters:
partitionInfoTimeout- the timeout.- Since:
- 2.7
- See Also:
setVerifyPartition(boolean)
-
setReplaceOriginalHeaders
public void setReplaceOriginalHeaders(boolean replaceOriginalHeaders)
Set to false if you don't want to replace the dead letter original headers if they are already present.- Parameters:
replaceOriginalHeaders- set to false not to replace.- Since:
- 2.7
-
setThrowIfNoDestinationReturned
public void setThrowIfNoDestinationReturned(boolean throwIfNoDestinationReturned)
Set to true to throw an exception if the destination resolver function returns a null TopicPartition.- Parameters:
throwIfNoDestinationReturned- true to enable.- Since:
- 2.7
-
setFailIfSendResultIsError
public void setFailIfSendResultIsError(boolean failIfSendResultIsError)
Set to true to enable waiting for the send result and throw an exception if it fails. It will wait for the milliseconds specified in waitForSendResultTimeout for the result.- Parameters:
failIfSendResultIsError- true to enable.- Since:
- 2.7
- See Also:
setWaitForSendResultTimeout(Duration)
-
setWaitForSendResultTimeout
public void setWaitForSendResultTimeout(java.time.Duration waitForSendResultTimeout)
Set the minumum time to wait for message sending. Default is the producer configurationdelivery.timeout.msplus thesetTimeoutBuffer(long).- Parameters:
waitForSendResultTimeout- the timeout.- Since:
- 2.7
- See Also:
setFailIfSendResultIsError(boolean),setTimeoutBuffer(long)
-
setTimeoutBuffer
public void setTimeoutBuffer(long buffer)
Set the number of milliseconds to add to the producer configurationdelivery.timeout.msproperty to avoid timing out before the Kafka producer. Default 5000.- Parameters:
buffer- the buffer.- Since:
- 2.7
- See Also:
setWaitForSendResultTimeout(Duration)
-
accept
public void accept(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, @Nullable org.apache.kafka.clients.consumer.Consumer<?,?> consumer, java.lang.Exception exception)Description copied from interface:ConsumerAwareRecordRecovererRecover the record.- Specified by:
acceptin interfaceConsumerAwareRecordRecoverer- Parameters:
record- the record.consumer- the consumer.exception- the exception.
-
send
protected void send(org.apache.kafka.clients.producer.ProducerRecord<java.lang.Object,java.lang.Object> outRecord, KafkaOperations<java.lang.Object,java.lang.Object> kafkaTemplate)Send the record.- Parameters:
outRecord- the record.kafkaTemplate- the template.- Since:
- 2.7
-
createProducerRecord
protected org.apache.kafka.clients.producer.ProducerRecord<java.lang.Object,java.lang.Object> createProducerRecord(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.common.header.Headers headers, @Nullable byte[] key, @Nullable byte[] value)Subclasses can override this method to customize the producer record to send to the DLQ. The default implementation simply copies the key and value from the consumer record and adds the headers. The timestamp is not set (the original timestamp is in one of the headers). IMPORTANT: if the partition in theTopicPartitionis less than 0, it must be set to null in theProducerRecord.- Parameters:
record- the failed recordtopicPartition- theTopicPartitionreturned by the destination resolver.headers- the headers - original record headers plus DLT headers.key- the key to use instead of the consumer record key.value- the value to use instead of the consumer record value.- Returns:
- the producer record to send.
- See Also:
KafkaHeaders
-
publish
protected void publish(org.apache.kafka.clients.producer.ProducerRecord<java.lang.Object,java.lang.Object> outRecord, KafkaOperations<java.lang.Object,java.lang.Object> kafkaTemplate)Override this if you want more than just logging of the send result.- Parameters:
outRecord- the record to send.kafkaTemplate- the template.- Since:
- 2.2.5
-
getHeaderNames
protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames()
Override this if you want different header names to be used in the sent record.- Returns:
- the header names.
- Since:
- 2.7
-
-