K - the key type.V - the value type.public class DefaultKafkaProducerFactory<K,V> extends java.lang.Object implements ProducerFactory<K,V>, org.springframework.context.ApplicationContextAware, org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>, org.springframework.beans.factory.DisposableBean
ProducerFactory implementation for a singleton shared
Producer instance.
This implementation will return the same Producer instance (if transactions are
not enabled) for the provided Map configs and optional Serializer
keySerializer, valueSerializer implementations on each
createProducer() invocation.
The Producer is wrapped and the underlying KafkaProducer instance is
not actually closed when Producer.close() is invoked. The KafkaProducer
is physically closed when DisposableBean.destroy() is invoked or when the
application context publishes a ContextStoppedEvent. You can also invoke
reset().
Setting setTransactionIdPrefix(String) enables transactions; in which case, a
cache of producers is maintained; closing a producer returns it to the cache. The
producers are closed and the cache is cleared when the factory is destroyed, the
application context stopped, or the reset() method is called.
| Modifier and Type | Class and Description |
|---|---|
protected static class |
DefaultKafkaProducerFactory.CloseSafeProducer<K,V>
A wrapper class for the delegate.
|
| Constructor and Description |
|---|
DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs)
Construct a factory with the provided configuration.
|
DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs,
org.apache.kafka.common.serialization.Serializer<K> keySerializer,
org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
Construct a factory with the provided configuration and
Serializers. |
| Modifier and Type | Method and Description |
|---|---|
void |
closeProducerFor(java.lang.String transactionIdSuffix)
Remove the specified producer from the cache and close it.
|
protected org.apache.kafka.clients.producer.Producer<K,V> |
createKafkaProducer()
Subclasses must return a raw producer which will be wrapped in a
DefaultKafkaProducerFactory.CloseSafeProducer. |
org.apache.kafka.clients.producer.Producer<K,V> |
createProducer()
Create a producer.
|
protected org.apache.kafka.clients.producer.Producer<K,V> |
createTransactionalProducer()
Subclasses must return a producer from the
getCache() or a
new raw producer wrapped in a DefaultKafkaProducerFactory.CloseSafeProducer. |
void |
destroy() |
protected java.util.concurrent.BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,V>> |
getCache() |
java.util.Map<java.lang.String,java.lang.Object> |
getConfigurationProperties()
Return an unmodifiable reference to the configuration map for this factory.
|
boolean |
isProducerPerConsumerPartition()
Return the producerPerConsumerPartition.
|
boolean |
isRunning()
Deprecated.
Lifecycle is no longer implemented. |
void |
onApplicationEvent(org.springframework.context.event.ContextStoppedEvent event) |
protected void |
removeProducer(DefaultKafkaProducerFactory.CloseSafeProducer<K,V> producerToRemove)
Remove the single shared producer if present.
|
void |
reset()
Close the
Producer(s) and clear the cache of transactional
Producer(s). |
void |
setApplicationContext(org.springframework.context.ApplicationContext applicationContext) |
void |
setKeySerializer(org.apache.kafka.common.serialization.Serializer<K> keySerializer) |
void |
setPhysicalCloseTimeout(int physicalCloseTimeout)
The time to wait when physically closing the producer via the factory rather than
closing the producer itself (when
reset(), or
#closeProducerFor(String) are invoked). |
void |
setProducerPerConsumerPartition(boolean producerPerConsumerPartition)
Set to false to revert to the previous behavior of a simple incrementing
trasactional.id suffix for each producer instead of maintaining a producer
for each group/topic/partition.
|
void |
setTransactionIdPrefix(java.lang.String transactionIdPrefix)
Set the transactional.id prefix.
|
void |
setValueSerializer(org.apache.kafka.common.serialization.Serializer<V> valueSerializer) |
void |
start()
Deprecated.
Lifecycle is no longer implemented. |
void |
stop()
Deprecated.
Lifecycle is no longer implemented;
use reset() to close the Producer(s). |
boolean |
transactionCapable()
Return true if the factory supports transactions.
|
public DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs)
configs - the configuration.public DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs,
@Nullable
org.apache.kafka.common.serialization.Serializer<K> keySerializer,
@Nullable
org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
Serializers.configs - the configuration.keySerializer - the key Serializer.valueSerializer - the value Serializer.public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext)
throws org.springframework.beans.BeansException
setApplicationContext in interface org.springframework.context.ApplicationContextAwareorg.springframework.beans.BeansExceptionpublic void setKeySerializer(@Nullable
org.apache.kafka.common.serialization.Serializer<K> keySerializer)
public void setValueSerializer(@Nullable
org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
public void setPhysicalCloseTimeout(int physicalCloseTimeout)
reset(), or
#closeProducerFor(String) are invoked). Specified in seconds; default
DEFAULT_PHYSICAL_CLOSE_TIMEOUT.physicalCloseTimeout - the timeout in seconds.public void setTransactionIdPrefix(java.lang.String transactionIdPrefix)
transactionIdPrefix - the prefix.public void setProducerPerConsumerPartition(boolean producerPerConsumerPartition)
producerPerConsumerPartition - false to revert.public boolean isProducerPerConsumerPartition()
isProducerPerConsumerPartition in interface ProducerFactory<K,V>public java.util.Map<java.lang.String,java.lang.Object> getConfigurationProperties()
public boolean transactionCapable()
ProducerFactorytransactionCapable in interface ProducerFactory<K,V>public void destroy()
destroy in interface org.springframework.beans.factory.DisposableBeanpublic void onApplicationEvent(org.springframework.context.event.ContextStoppedEvent event)
onApplicationEvent in interface org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>@Deprecated public void start()
Lifecycle is no longer implemented.@Deprecated public void stop()
Lifecycle is no longer implemented;
use reset() to close the Producer(s).public void reset()
Producer(s) and clear the cache of transactional
Producer(s).@Deprecated public boolean isRunning()
Lifecycle is no longer implemented.public org.apache.kafka.clients.producer.Producer<K,V> createProducer()
ProducerFactorycreateProducer in interface ProducerFactory<K,V>protected org.apache.kafka.clients.producer.Producer<K,V> createKafkaProducer()
DefaultKafkaProducerFactory.CloseSafeProducer.protected final void removeProducer(DefaultKafkaProducerFactory.CloseSafeProducer<K,V> producerToRemove)
producerToRemove - the producer;protected org.apache.kafka.clients.producer.Producer<K,V> createTransactionalProducer()
getCache() or a
new raw producer wrapped in a DefaultKafkaProducerFactory.CloseSafeProducer.protected java.util.concurrent.BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,V>> getCache()
public void closeProducerFor(java.lang.String transactionIdSuffix)
ProducerFactorycloseProducerFor in interface ProducerFactory<K,V>transactionIdSuffix - the producer's transaction id suffix.