K - the key type.V - the outbound data type.R - the reply data type.public class ReplyingKafkaTemplate<K,V,R> extends KafkaTemplate<K,V> implements BatchMessageListener<K,R>, org.springframework.beans.factory.InitializingBean, org.springframework.context.SmartLifecycle, org.springframework.beans.factory.DisposableBean, ReplyingKafkaOperations<K,V,R>
KafkaOperations.OperationsCallback<K,V,T>, KafkaOperations.ProducerCallback<K,V,T>logger| Constructor and Description |
|---|
ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory,
GenericMessageListenerContainer<K,R> replyContainer) |
ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory,
GenericMessageListenerContainer<K,R> replyContainer,
boolean autoFlush) |
| Modifier and Type | Method and Description |
|---|---|
void |
afterPropertiesSet() |
protected CorrelationKey |
createCorrelationId(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
Deprecated.
in favor of
setCorrelationIdStrategy(Function). |
void |
destroy() |
java.util.Collection<org.apache.kafka.common.TopicPartition> |
getAssignedReplyTopicPartitions()
Return the topics/partitions assigned to the replying listener container.
|
int |
getPhase() |
protected long |
getReplyTimeout() |
protected boolean |
handleTimeout(CorrelationKey correlationId,
RequestReplyFuture<K,V,R> future)
Used to inform subclasses that a request has timed out so they can clean up state
and, optionally, complete the future.
|
boolean |
isAutoStartup() |
protected boolean |
isPending(CorrelationKey correlationId)
Return true if this correlation id is still active.
|
boolean |
isRunning() |
protected void |
logLateArrival(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record,
CorrelationKey correlationId) |
void |
onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>> data)
Invoked with data from kafka.
|
RequestReplyFuture<K,V,R> |
sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
Send a request and receive a reply.
|
void |
setAutoStartup(boolean autoStartup) |
void |
setCorrelationIdStrategy(java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<K,V>,CorrelationKey> correlationStrategy)
Set a function to be called to establish a unique correlation key for each request
record.
|
void |
setPhase(int phase) |
void |
setReplyTimeout(long replyTimeout) |
void |
setSharedReplyTopic(boolean sharedReplyTopic)
Set to true when multiple templates are using the same topic for replies.
|
void |
setTaskScheduler(org.springframework.scheduling.TaskScheduler scheduler) |
void |
start() |
void |
stop() |
void |
stop(java.lang.Runnable callback) |
closeProducer, doSend, execute, executeInTransaction, flush, getDefaultTopic, getMessageConverter, getProducerFactory, getTransactionIdPrefix, inTransaction, isTransactional, metrics, partitionsFor, send, send, send, send, send, send, sendDefault, sendDefault, sendDefault, sendDefault, sendOffsetsToTransaction, sendOffsetsToTransaction, setDefaultTopic, setMessageConverter, setProducerListener, setTransactionIdPrefixclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitonMessage, wantsPollResultonMessage, onMessage, onMessagepublic ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer)
public ReplyingKafkaTemplate(ProducerFactory<K,V> producerFactory, GenericMessageListenerContainer<K,R> replyContainer, boolean autoFlush)
public void setTaskScheduler(org.springframework.scheduling.TaskScheduler scheduler)
protected long getReplyTimeout()
public void setReplyTimeout(long replyTimeout)
public boolean isRunning()
isRunning in interface org.springframework.context.Lifecyclepublic int getPhase()
getPhase in interface org.springframework.context.PhasedgetPhase in interface org.springframework.context.SmartLifecyclepublic void setPhase(int phase)
public boolean isAutoStartup()
isAutoStartup in interface org.springframework.context.SmartLifecyclepublic void setAutoStartup(boolean autoStartup)
public java.util.Collection<org.apache.kafka.common.TopicPartition> getAssignedReplyTopicPartitions()
public void setSharedReplyTopic(boolean sharedReplyTopic)
sharedReplyTopic - true if using a shared topic.public void setCorrelationIdStrategy(java.util.function.Function<org.apache.kafka.clients.producer.ProducerRecord<K,V>,CorrelationKey> correlationStrategy)
correlationStrategy - the function.public void afterPropertiesSet()
afterPropertiesSet in interface org.springframework.beans.factory.InitializingBeanpublic void start()
start in interface org.springframework.context.Lifecyclepublic void stop()
stop in interface org.springframework.context.Lifecyclepublic void stop(java.lang.Runnable callback)
stop in interface org.springframework.context.SmartLifecyclepublic RequestReplyFuture<K,V,R> sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
ReplyingKafkaOperationssendAndReceive in interface ReplyingKafkaOperations<K,V,R>record - the record to send.protected boolean handleTimeout(CorrelationKey correlationId, RequestReplyFuture<K,V,R> future)
correlationId - the correlation id.future - the future.protected boolean isPending(CorrelationKey correlationId)
correlationId - the correlation id.public void destroy()
destroy in interface org.springframework.beans.factory.DisposableBean@Deprecated protected CorrelationKey createCorrelationId(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
setCorrelationIdStrategy(Function).record - the record.public void onMessage(java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>> data)
GenericMessageListeneronMessage in interface GenericMessageListener<java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>data - the data to be processed.protected void logLateArrival(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record, CorrelationKey correlationId)