K - the key type.V - the value type.public class DefaultKafkaProducerFactory<K,V> extends KafkaResourceFactory implements ProducerFactory<K,V>, org.springframework.context.ApplicationContextAware, org.springframework.beans.factory.BeanNameAware, org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>, org.springframework.beans.factory.DisposableBean
ProducerFactory implementation for a singleton shared Producer instance.
This implementation will return the same Producer instance (if transactions are
not enabled) for the provided Map configs and optional Serializer
implementations on each createProducer() invocation.
If you are using Serializers that have no-arg constructors and require no setup, then simplest to
specify Serializer classes against ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG and
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG keys in the configs passed to the
DefaultKafkaProducerFactory constructor.
If that is not possible, but you are sure that at least one of the following is true:
Producer will use the SerializersSerializers that may be shared between Producer instances (and specifically
that their close() method is a no-op)Producer being closed while other
Producer instances with the same Serializers are in useSerializer instances for one or both of the key and value serializers.
If none of the above is true then you may provide a Supplier function for one or both Serializers
which will be used to obtain Serializer(s) each time a Producer is created by the factory.
The Producer is wrapped and the underlying KafkaProducer instance is
not actually closed when Producer.close() is invoked. The KafkaProducer
is physically closed when DisposableBean.destroy() is invoked or when the
application context publishes a ContextStoppedEvent. You can also invoke
reset().
Setting setTransactionIdPrefix(String) enables transactions; in which case, a
cache of producers is maintained; closing a producer returns it to the cache. The
producers are closed and the cache is cleared when the factory is destroyed, the
application context stopped, or the reset() method is called.
| Modifier and Type | Class and Description |
|---|---|
protected static class |
DefaultKafkaProducerFactory.CloseSafeProducer<K,V>
A wrapper class for the delegate.
|
ProducerFactory.Listener<K,V>DEFAULT_PHYSICAL_CLOSE_TIMEOUT| Constructor and Description |
|---|
DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs)
Construct a factory with the provided configuration.
|
DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs,
org.apache.kafka.common.serialization.Serializer<K> keySerializer,
org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
Construct a factory with the provided configuration and
Serializers. |
DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs,
java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier,
java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier)
Construct a factory with the provided configuration and
Serializer Suppliers. |
| Modifier and Type | Method and Description |
|---|---|
void |
addListener(int index,
ProducerFactory.Listener<K,V> listener)
Add a listener at a specific index.
|
void |
addListener(ProducerFactory.Listener<K,V> listener)
Add a listener.
|
void |
addPostProcessor(ProducerPostProcessor<K,V> postProcessor)
Add a post processor.
|
void |
closeProducerFor(java.lang.String suffix)
Remove the specified producer from the cache and close it.
|
void |
closeThreadBoundProducer()
When using
setProducerPerThread(boolean) (true), call this method to close
and release this thread's producer. |
protected org.apache.kafka.clients.producer.Producer<K,V> |
createKafkaProducer()
Subclasses must return a raw producer which will be wrapped in a
DefaultKafkaProducerFactory.CloseSafeProducer. |
org.apache.kafka.clients.producer.Producer<K,V> |
createNonTransactionalProducer()
Create a non-transactional producer.
|
org.apache.kafka.clients.producer.Producer<K,V> |
createProducer()
Create a producer which will be transactional if the factory is so configured.
|
org.apache.kafka.clients.producer.Producer<K,V> |
createProducer(java.lang.String txIdPrefixArg)
Create a producer with an overridden transaction id prefix.
|
protected org.apache.kafka.clients.producer.Producer<K,V> |
createRawProducer(java.util.Map<java.lang.String,java.lang.Object> rawConfigs) |
protected org.apache.kafka.clients.producer.Producer<K,V> |
createTransactionalProducer()
Subclasses must return a producer from the
getCache() or a
new raw producer wrapped in a DefaultKafkaProducerFactory.CloseSafeProducer. |
protected org.apache.kafka.clients.producer.Producer<K,V> |
createTransactionalProducer(java.lang.String txIdPrefix) |
protected org.apache.kafka.clients.producer.Producer<K,V> |
createTransactionalProducerForPartition() |
protected org.apache.kafka.clients.producer.Producer<K,V> |
createTransactionalProducerForPartition(java.lang.String txIdPrefix) |
void |
destroy() |
protected java.util.concurrent.BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,V>> |
getCache() |
protected java.util.concurrent.BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,V>> |
getCache(java.lang.String txIdPrefix) |
java.util.Map<java.lang.String,java.lang.Object> |
getConfigurationProperties()
Return an unmodifiable reference to the configuration map for this factory.
|
java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<K>> |
getKeySerializerSupplier()
Return a supplier for a key serializer.
|
java.util.List<ProducerFactory.Listener<K,V>> |
getListeners()
Get the current list of listeners.
|
java.time.Duration |
getPhysicalCloseTimeout()
Get the physical close timeout.
|
java.util.List<ProducerPostProcessor<K,V>> |
getPostProcessors()
Get the current list of post processors.
|
java.lang.String |
getTransactionIdPrefix()
Return the transaction id prefix.
|
java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<V>> |
getValueSerializerSupplier()
Return a supplier for a value serializer.
|
boolean |
isProducerPerConsumerPartition()
Return the producerPerConsumerPartition.
|
boolean |
isProducerPerThread()
Return true when there is a producer per thread.
|
void |
onApplicationEvent(org.springframework.context.event.ContextStoppedEvent event) |
void |
removeConfig(java.lang.String configKey)
Remove the specified key from the configuration map.
|
boolean |
removeListener(ProducerFactory.Listener<K,V> listener)
Remove a listener.
|
boolean |
removePostProcessor(ProducerPostProcessor<K,V> postProcessor)
Remove a post processor.
|
protected boolean |
removeProducer(DefaultKafkaProducerFactory.CloseSafeProducer<K,V> producerToRemove,
java.time.Duration timeout)
Remove the single shared producer and a thread-bound instance if present.
|
void |
reset()
Close the
Producer(s) and clear the cache of transactional
Producer(s). |
void |
setApplicationContext(org.springframework.context.ApplicationContext applicationContext) |
void |
setBeanName(java.lang.String name) |
void |
setKeySerializer(org.apache.kafka.common.serialization.Serializer<K> keySerializer)
Set a key serializer.
|
void |
setMaxAge(java.time.Duration maxAge)
Set the maximum age for a producer; useful when using transactions and the broker
might expire a
transactional.id due to inactivity. |
void |
setPhysicalCloseTimeout(int physicalCloseTimeout)
The time to wait when physically closing the producer via the factory rather than
closing the producer itself (when
reset(), #closeProducerFor(String), or closeThreadBoundProducer() are invoked). |
void |
setProducerPerConsumerPartition(boolean producerPerConsumerPartition)
Set to false to revert to the previous behavior of a simple incrementing
transactional.id suffix for each producer instead of maintaining a producer
for each group/topic/partition.
|
void |
setProducerPerThread(boolean producerPerThread)
Set to true to create a producer per thread instead of singleton that is shared by
all clients.
|
void |
setTransactionIdPrefix(java.lang.String transactionIdPrefix)
Set a prefix for the
ProducerConfig.TRANSACTIONAL_ID_CONFIG config. |
void |
setValueSerializer(org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
Set a value serializer.
|
boolean |
transactionCapable()
Return true if the factory supports transactions.
|
void |
updateConfigs(java.util.Map<java.lang.String,java.lang.Object> updates)
Update the producer configuration map; useful for situations such as
credential rotation.
|
checkBootstrap, getBootstrapServers, setBootstrapServersSupplierpublic DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs)
configs - the configuration.public DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs,
@Nullable
org.apache.kafka.common.serialization.Serializer<K> keySerializer,
@Nullable
org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
Serializers.
Also configures a transactionIdPrefix as a value from the
ProducerConfig.TRANSACTIONAL_ID_CONFIG if provided.
This config is going to be overridden with a suffix for target Producer instance.configs - the configuration.keySerializer - the key Serializer.valueSerializer - the value Serializer.public DefaultKafkaProducerFactory(java.util.Map<java.lang.String,java.lang.Object> configs,
@Nullable
java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier,
@Nullable
java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier)
Serializer Suppliers.
Also configures a transactionIdPrefix as a value from the
ProducerConfig.TRANSACTIONAL_ID_CONFIG if provided.
This config is going to be overridden with a suffix for target Producer instance.configs - the configuration.keySerializerSupplier - the key Serializer supplier function.valueSerializerSupplier - the value Serializer supplier function.public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext)
throws org.springframework.beans.BeansException
setApplicationContext in interface org.springframework.context.ApplicationContextAwareorg.springframework.beans.BeansExceptionpublic void setBeanName(java.lang.String name)
setBeanName in interface org.springframework.beans.factory.BeanNameAwarepublic void setKeySerializer(@Nullable
org.apache.kafka.common.serialization.Serializer<K> keySerializer)
keySerializer - the key serializer.public void setValueSerializer(@Nullable
org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
valueSerializer - the value serializer.public void setPhysicalCloseTimeout(int physicalCloseTimeout)
reset(), #closeProducerFor(String), or closeThreadBoundProducer() are invoked).
Specified in seconds; default ProducerFactory.DEFAULT_PHYSICAL_CLOSE_TIMEOUT.physicalCloseTimeout - the timeout in seconds.public java.time.Duration getPhysicalCloseTimeout()
getPhysicalCloseTimeout in interface ProducerFactory<K,V>public final void setTransactionIdPrefix(java.lang.String transactionIdPrefix)
ProducerConfig.TRANSACTIONAL_ID_CONFIG config. By
default a ProducerConfig.TRANSACTIONAL_ID_CONFIG value from configs is used
as a prefix in the target producer configs.transactionIdPrefix - the prefix.@Nullable public java.lang.String getTransactionIdPrefix()
ProducerFactorygetTransactionIdPrefix in interface ProducerFactory<K,V>public void setProducerPerThread(boolean producerPerThread)
closeThreadBoundProducer() to
physically close the producer when it is no longer needed. These producers will not
be closed by destroy() or reset().producerPerThread - true for a producer per thread.closeThreadBoundProducer()public boolean isProducerPerThread()
ProducerFactoryisProducerPerThread in interface ProducerFactory<K,V>public void setProducerPerConsumerPartition(boolean producerPerConsumerPartition)
producerPerConsumerPartition - false to revert.public boolean isProducerPerConsumerPartition()
isProducerPerConsumerPartition in interface ProducerFactory<K,V>public java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<K>> getKeySerializerSupplier()
ProducerFactorygetKeySerializerSupplier in interface ProducerFactory<K,V>public java.util.function.Supplier<org.apache.kafka.common.serialization.Serializer<V>> getValueSerializerSupplier()
ProducerFactorygetValueSerializerSupplier in interface ProducerFactory<K,V>public java.util.Map<java.lang.String,java.lang.Object> getConfigurationProperties()
getConfigurationProperties in interface ProducerFactory<K,V>public java.util.List<ProducerFactory.Listener<K,V>> getListeners()
getListeners in interface ProducerFactory<K,V>public java.util.List<ProducerPostProcessor<K,V>> getPostProcessors()
ProducerFactorygetPostProcessors in interface ProducerFactory<K,V>public void setMaxAge(java.time.Duration maxAge)
transactional.id due to inactivity.maxAge - the maxAge to setpublic void addListener(ProducerFactory.Listener<K,V> listener)
addListener in interface ProducerFactory<K,V>listener - the listener.public void addListener(int index,
ProducerFactory.Listener<K,V> listener)
addListener in interface ProducerFactory<K,V>index - the index (list position).listener - the listener.public boolean removeListener(ProducerFactory.Listener<K,V> listener)
removeListener in interface ProducerFactory<K,V>listener - the listener.public void addPostProcessor(ProducerPostProcessor<K,V> postProcessor)
ProducerFactoryaddPostProcessor in interface ProducerFactory<K,V>postProcessor - the post processor.public boolean removePostProcessor(ProducerPostProcessor<K,V> postProcessor)
ProducerFactoryremovePostProcessor in interface ProducerFactory<K,V>postProcessor - the post processor.public void updateConfigs(java.util.Map<java.lang.String,java.lang.Object> updates)
ProducerFactoryupdateConfigs in interface ProducerFactory<K,V>updates - the configuration properties to update.public void removeConfig(java.lang.String configKey)
ProducerFactoryremoveConfig in interface ProducerFactory<K,V>configKey - the key to remove.public boolean transactionCapable()
ProducerFactorytransactionCapable in interface ProducerFactory<K,V>public void destroy()
destroy in interface org.springframework.beans.factory.DisposableBeanpublic void onApplicationEvent(org.springframework.context.event.ContextStoppedEvent event)
onApplicationEvent in interface org.springframework.context.ApplicationListener<org.springframework.context.event.ContextStoppedEvent>public void reset()
Producer(s) and clear the cache of transactional
Producer(s).reset in interface ProducerFactory<K,V>public org.apache.kafka.clients.producer.Producer<K,V> createProducer()
ProducerFactorycreateProducer in interface ProducerFactory<K,V>ProducerFactory.transactionCapable()public org.apache.kafka.clients.producer.Producer<K,V> createProducer(@Nullable java.lang.String txIdPrefixArg)
ProducerFactorycreateProducer in interface ProducerFactory<K,V>txIdPrefixArg - the transaction id prefix.public org.apache.kafka.clients.producer.Producer<K,V> createNonTransactionalProducer()
ProducerFactorycreateNonTransactionalProducer in interface ProducerFactory<K,V>ProducerFactory.transactionCapable()protected org.apache.kafka.clients.producer.Producer<K,V> createKafkaProducer()
DefaultKafkaProducerFactory.CloseSafeProducer.protected org.apache.kafka.clients.producer.Producer<K,V> createTransactionalProducerForPartition()
protected org.apache.kafka.clients.producer.Producer<K,V> createTransactionalProducerForPartition(java.lang.String txIdPrefix)
protected final boolean removeProducer(DefaultKafkaProducerFactory.CloseSafeProducer<K,V> producerToRemove, java.time.Duration timeout)
producerToRemove - the producer.timeout - the close timeout.protected org.apache.kafka.clients.producer.Producer<K,V> createTransactionalProducer()
getCache() or a
new raw producer wrapped in a DefaultKafkaProducerFactory.CloseSafeProducer.protected org.apache.kafka.clients.producer.Producer<K,V> createTransactionalProducer(java.lang.String txIdPrefix)
protected org.apache.kafka.clients.producer.Producer<K,V> createRawProducer(java.util.Map<java.lang.String,java.lang.Object> rawConfigs)
@Nullable protected java.util.concurrent.BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,V>> getCache()
@Nullable protected java.util.concurrent.BlockingQueue<DefaultKafkaProducerFactory.CloseSafeProducer<K,V>> getCache(java.lang.String txIdPrefix)
public void closeProducerFor(java.lang.String suffix)
ProducerFactorycloseProducerFor in interface ProducerFactory<K,V>suffix - the producer's transaction id suffix.public void closeThreadBoundProducer()
setProducerPerThread(boolean) (true), call this method to close
and release this thread's producer. Thread bound producers are not closed by
destroy() or reset() methods.closeThreadBoundProducer in interface ProducerFactory<K,V>setProducerPerThread(boolean)