K - the key type.V - the value type.public class DefaultKafkaProducerFactory<K,V> extends java.lang.Object implements ProducerFactory<K,V>, org.springframework.context.Lifecycle, org.springframework.beans.factory.DisposableBean
ProducerFactory implementation for the singleton shared Producer
instance.
This implementation will produce a new Producer instance (if transactions are not enabled).
for provided Map configs and optional Serializer keySerializer,
valueSerializer implementations on each createProducer()
invocation.
The Producer instance is freed from the external Producer.close() invocation
with the internal wrapper. The real Producer.close() is called on the target
Producer during the Lifecycle.stop() or DisposableBean.destroy().
Setting setTransactionIdPrefix(String) enables transactions; in which case, a cache
of producers is maintained; closing the producer returns it to the cache.
| Modifier and Type | Class and Description |
|---|---|
protected static class |
DefaultKafkaProducerFactory.CloseSafeProducer<K,V>
A wrapper class for the delegate.
|
| Constructor and Description |
|---|
DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs)
Construct a factory with the provided configuration.
|
DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs,
org.apache.kafka.common.serialization.Serializer<K> keySerializer,
org.apache.kafka.common.serialization.Serializer<V> valueSerializer) |
| Modifier and Type | Method and Description |
|---|---|
void |
closeProducerFor(java.lang.String transactionIdSuffix)
Remove the specified producer from the cache and close it.
|
protected org.apache.kafka.clients.producer.Producer<K,V> |
createKafkaProducer()
Subclasses must return a raw producer which will be wrapped in a
DefaultKafkaProducerFactory.CloseSafeProducer. |
org.apache.kafka.clients.producer.Producer<K,V> |
createProducer()
Create a producer.
|
protected org.apache.kafka.clients.producer.Producer<K,V> |
createTransactionalProducer()
Subclasses must return a producer from the
getCache() or a
new raw producer wrapped in a DefaultKafkaProducerFactory.CloseSafeProducer. |
void |
destroy() |
protected java.util.concurrent.BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,V>> |
getCache() |
java.util.Map<java.lang.String,java.lang.Object> |
getConfigurationProperties()
Return an unmodifiable reference to the configuration map for this factory.
|
boolean |
isProducerPerConsumerPartition()
Return the producerPerConsumerPartition.
|
boolean |
isRunning() |
void |
setKeySerializer(org.apache.kafka.common.serialization.Serializer<K> keySerializer) |
void |
setPhysicalCloseTimeout(int physicalCloseTimeout)
|
void |
setProducerPerConsumerPartition(boolean producerPerConsumerPartition)
Set to false to revert to the previous behavior of a simple incrementing
trasactional.id suffix for each producer instead of maintaining a producer
for each group/topic/partition.
|
void |
setTransactionIdPrefix(java.lang.String transactionIdPrefix)
Set the transactional.id prefix.
|
void |
setValueSerializer(org.apache.kafka.common.serialization.Serializer<V> valueSerializer) |
void |
start() |
void |
stop() |
boolean |
transactionCapable()
Return true if the factory supports transactions.
|
public DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs)
configs - the configuration.public void setKeySerializer(org.apache.kafka.common.serialization.Serializer<K> keySerializer)
public void setValueSerializer(org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
public void setPhysicalCloseTimeout(int physicalCloseTimeout)
stop() or destroy() is invoked).
Specified in seconds; default .physicalCloseTimeout - the timeout in seconds.public void setTransactionIdPrefix(java.lang.String transactionIdPrefix)
transactionIdPrefix - the prefix.public void setProducerPerConsumerPartition(boolean producerPerConsumerPartition)
producerPerConsumerPartition - false to revert.public boolean isProducerPerConsumerPartition()
isProducerPerConsumerPartition in interface ProducerFactory<K,V>public java.util.Map<java.lang.String,java.lang.Object> getConfigurationProperties()
public boolean transactionCapable()
ProducerFactorytransactionCapable in interface ProducerFactory<K,V>public void destroy()
throws java.lang.Exception
destroy in interface org.springframework.beans.factory.DisposableBeanjava.lang.Exceptionpublic void start()
start in interface org.springframework.context.Lifecyclepublic void stop()
stop in interface org.springframework.context.Lifecyclepublic boolean isRunning()
isRunning in interface org.springframework.context.Lifecyclepublic org.apache.kafka.clients.producer.Producer<K,V> createProducer()
ProducerFactorycreateProducer in interface ProducerFactory<K,V>protected org.apache.kafka.clients.producer.Producer<K,V> createKafkaProducer()
DefaultKafkaProducerFactory.CloseSafeProducer.protected org.apache.kafka.clients.producer.Producer<K,V> createTransactionalProducer()
getCache() or a
new raw producer wrapped in a DefaultKafkaProducerFactory.CloseSafeProducer.protected java.util.concurrent.BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,V>> getCache()
public void closeProducerFor(java.lang.String transactionIdSuffix)
ProducerFactorycloseProducerFor in interface ProducerFactory<K,V>transactionIdSuffix - the producer's transaction id suffix.