Class DefaultKafkaConsumerFactory<K,V>
- java.lang.Object
-
- org.springframework.kafka.core.KafkaResourceFactory
-
- org.springframework.kafka.core.DefaultKafkaConsumerFactory<K,V>
-
- Type Parameters:
K- the key type.V- the value type.
- All Implemented Interfaces:
org.springframework.beans.factory.Aware,org.springframework.beans.factory.BeanNameAware,ConsumerFactory<K,V>
public class DefaultKafkaConsumerFactory<K,V> extends KafkaResourceFactory implements ConsumerFactory<K,V>, org.springframework.beans.factory.BeanNameAware
TheConsumerFactoryimplementation to produce newConsumerinstances for providedMapconfigsand optionalDeserializers on eachConsumerFactory.createConsumer()invocation.If you are using
Deserializers that have no-arg constructors and require no setup, then simplest to specifyDeserializerclasses againstConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIGandConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIGkeys in theconfigspassed to theDefaultKafkaConsumerFactoryconstructor.If that is not possible, but you are using
Deserializers that may be shared between allConsumerinstances (and specifically that their close() method is a no-op), then you can pass inDeserializerinstances for one or both of the key and value deserializers.If neither of the above is true then you may provide a
Supplierfor one or bothDeserializers which will be used to obtainDeserializer(s) each time aConsumeris created by the factory.- Author:
- Gary Russell, Murali Reddy, Artem Bilan, Chris Gilbert
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.springframework.kafka.core.ConsumerFactory
ConsumerFactory.Listener<K,V>
-
-
Constructor Summary
Constructors Constructor Description DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs)Construct a factory with the provided configuration.DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs, java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier)Construct a factory with the provided configuration and deserializer suppliers.DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs, java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier, boolean configureDeserializers)Construct a factory with the provided configuration and deserializer suppliers.DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs, org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)Construct a factory with the provided configuration and deserializers.DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs, org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer, boolean configureDeserializers)Construct a factory with the provided configuration and deserializers.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidaddListener(int index, ConsumerFactory.Listener<K,V> listener)Add a listener at a specific index.voidaddListener(ConsumerFactory.Listener<K,V> listener)Add a listener.voidaddPostProcessor(ConsumerPostProcessor<K,V> postProcessor)Add a post processor.org.apache.kafka.clients.consumer.Consumer<K,V>createConsumer(java.lang.String groupId, java.lang.String clientIdPrefix, java.lang.String clientIdSuffix)Create a consumer with an explicit group id; in addition, the client id suffix is appended to the clientIdPrefix which overrides theclient.idproperty, if present.org.apache.kafka.clients.consumer.Consumer<K,V>createConsumer(java.lang.String groupId, java.lang.String clientIdPrefix, java.lang.String clientIdSuffixArg, java.util.Properties properties)Create a consumer with an explicit group id; in addition, the client id suffix is appended to the clientIdPrefix which overrides theclient.idproperty, if present.protected org.apache.kafka.clients.consumer.Consumer<K,V>createKafkaConsumer(java.lang.String groupId, java.lang.String clientIdPrefixArg, java.lang.String clientIdSuffixArg, java.util.Properties properties)protected org.apache.kafka.clients.consumer.Consumer<K,V>createKafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configProps)protected org.apache.kafka.clients.consumer.Consumer<K,V>createRawConsumer(java.util.Map<java.lang.String,java.lang.Object> configProps)Create a Consumer.java.util.Map<java.lang.String,java.lang.Object>getConfigurationProperties()Return an unmodifiable reference to the configuration map for this factory.org.apache.kafka.common.serialization.Deserializer<K>getKeyDeserializer()Return the configured key deserializer (if provided as an object instead of a class name in the properties).java.util.List<ConsumerFactory.Listener<K,V>>getListeners()Get the current list of listeners.java.util.List<ConsumerPostProcessor<K,V>>getPostProcessors()Get the current list of post processors.org.apache.kafka.common.serialization.Deserializer<V>getValueDeserializer()Return the configured value deserializer (if provided as an object instead of a class name in the properties).booleanisAutoCommit()Return true if consumers created by this factory use auto commit.voidremoveConfig(java.lang.String configKey)Remove the specified key from the configuration map.booleanremoveListener(ConsumerFactory.Listener<K,V> listener)Remove a listener.booleanremovePostProcessor(ConsumerPostProcessor<K,V> postProcessor)Remove a post processor.voidsetBeanName(java.lang.String name)voidsetConfigureDeserializers(boolean configureDeserializers)Set to false (default true) to prevent programmatically provided deserializers (via constructor or setters) from being configured using the producer configuration, e.g.voidsetKeyDeserializer(org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer)Set the key deserializer.voidsetKeyDeserializerSupplier(java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier)Set a supplier to supply instances of the key deserializer.voidsetValueDeserializer(org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)Set the value deserializer.voidsetValueDeserializerSupplier(java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier)Set a supplier to supply instances of the value deserializer.voidupdateConfigs(java.util.Map<java.lang.String,java.lang.Object> updates)Update the consumer configuration map; useful for situations such as credential rotation.-
Methods inherited from class org.springframework.kafka.core.KafkaResourceFactory
checkBootstrap, getBootstrapServers, setBootstrapServersSupplier
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface org.springframework.kafka.core.ConsumerFactory
createConsumer, createConsumer, createConsumer
-
-
-
-
Constructor Detail
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs)
Construct a factory with the provided configuration.- Parameters:
configs- the configuration.
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs, @Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, @Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)Construct a factory with the provided configuration and deserializers. The deserializers'configure()methods will be called with the configuration map.- Parameters:
configs- the configuration.keyDeserializer- the keyDeserializer.valueDeserializer- the valueDeserializer.
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs, @Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, @Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer, boolean configureDeserializers)Construct a factory with the provided configuration and deserializers. The deserializers'configure()methods will be called with the configuration map unlessconfigureDeserializersis false.- Parameters:
configs- the configuration.keyDeserializer- the keyDeserializer.valueDeserializer- the valueDeserializer.configureDeserializers- false to not configure the deserializers.- Since:
- 2.8.7
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs, @Nullable java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, @Nullable java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier)Construct a factory with the provided configuration and deserializer suppliers. When the suppliers are invoked to get an instance, the deserializers'configure()methods will be called with the configuration map.- Parameters:
configs- the configuration.keyDeserializerSupplier- the keyDeserializersupplier function.valueDeserializerSupplier- the valueDeserializersupplier function.- Since:
- 2.3
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(java.util.Map<java.lang.String,java.lang.Object> configs, @Nullable java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, @Nullable java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier, boolean configureDeserializers)Construct a factory with the provided configuration and deserializer suppliers. When the suppliers are invoked to get an instance, the deserializers'configure()methods will be called with the configuration map unlessconfigureDeserializersis false.- Parameters:
configs- the configuration.keyDeserializerSupplier- the keyDeserializersupplier function.valueDeserializerSupplier- the valueDeserializersupplier function.configureDeserializers- false to not configure the deserializers.- Since:
- 2.8.7
-
-
Method Detail
-
setBeanName
public void setBeanName(java.lang.String name)
- Specified by:
setBeanNamein interfaceorg.springframework.beans.factory.BeanNameAware
-
setKeyDeserializer
public void setKeyDeserializer(@Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer)Set the key deserializer. The deserializer will be configured using the consumer configuration, unlessconfigureDeserializersis false.- Parameters:
keyDeserializer- the deserializer.
-
setValueDeserializer
public void setValueDeserializer(@Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer)Set the value deserializer. The deserializer will be configured using the consumer configuration, unlessconfigureDeserializersis false.- Parameters:
valueDeserializer- the value deserializer.
-
setKeyDeserializerSupplier
public void setKeyDeserializerSupplier(java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier)
Set a supplier to supply instances of the key deserializer. The deserializer will be configured using the consumer configuration, unlessconfigureDeserializersis false.- Parameters:
keyDeserializerSupplier- the supplier.- Since:
- 2.8
-
setValueDeserializerSupplier
public void setValueDeserializerSupplier(java.util.function.Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier)
Set a supplier to supply instances of the value deserializer. The deserializer will be configured using the consumer configuration, unlessconfigureDeserializersis false.- Parameters:
valueDeserializerSupplier- the supplier.- Since:
- 2.8
-
setConfigureDeserializers
public void setConfigureDeserializers(boolean configureDeserializers)
Set to false (default true) to prevent programmatically provided deserializers (via constructor or setters) from being configured using the producer configuration, e.g. if the deserializers are already fully configured.- Parameters:
configureDeserializers- false to not configure.- Since:
- 2.8.7
- See Also:
setKeyDeserializer(Deserializer),setKeyDeserializerSupplier(Supplier),setValueDeserializer(Deserializer),setValueDeserializerSupplier(Supplier)
-
getConfigurationProperties
public java.util.Map<java.lang.String,java.lang.Object> getConfigurationProperties()
Description copied from interface:ConsumerFactoryReturn an unmodifiable reference to the configuration map for this factory. Useful for cloning to make a similar factory.- Specified by:
getConfigurationPropertiesin interfaceConsumerFactory<K,V>- Returns:
- the configs.
-
getKeyDeserializer
public org.apache.kafka.common.serialization.Deserializer<K> getKeyDeserializer()
Description copied from interface:ConsumerFactoryReturn the configured key deserializer (if provided as an object instead of a class name in the properties).- Specified by:
getKeyDeserializerin interfaceConsumerFactory<K,V>- Returns:
- the deserializer.
-
getValueDeserializer
public org.apache.kafka.common.serialization.Deserializer<V> getValueDeserializer()
Description copied from interface:ConsumerFactoryReturn the configured value deserializer (if provided as an object instead of a class name in the properties).- Specified by:
getValueDeserializerin interfaceConsumerFactory<K,V>- Returns:
- the deserializer.
-
getListeners
public java.util.List<ConsumerFactory.Listener<K,V>> getListeners()
Get the current list of listeners.- Specified by:
getListenersin interfaceConsumerFactory<K,V>- Returns:
- the listeners.
- Since:
- 2.5
-
getPostProcessors
public java.util.List<ConsumerPostProcessor<K,V>> getPostProcessors()
Description copied from interface:ConsumerFactoryGet the current list of post processors.- Specified by:
getPostProcessorsin interfaceConsumerFactory<K,V>- Returns:
- the post processor.
-
addListener
public void addListener(ConsumerFactory.Listener<K,V> listener)
Add a listener.- Specified by:
addListenerin interfaceConsumerFactory<K,V>- Parameters:
listener- the listener.- Since:
- 2.5
-
addListener
public void addListener(int index, ConsumerFactory.Listener<K,V> listener)Add a listener at a specific index.- Specified by:
addListenerin interfaceConsumerFactory<K,V>- Parameters:
index- the index (list position).listener- the listener.- Since:
- 2.5
-
addPostProcessor
public void addPostProcessor(ConsumerPostProcessor<K,V> postProcessor)
Description copied from interface:ConsumerFactoryAdd a post processor.- Specified by:
addPostProcessorin interfaceConsumerFactory<K,V>- Parameters:
postProcessor- the post processor.
-
removePostProcessor
public boolean removePostProcessor(ConsumerPostProcessor<K,V> postProcessor)
Description copied from interface:ConsumerFactoryRemove a post processor.- Specified by:
removePostProcessorin interfaceConsumerFactory<K,V>- Parameters:
postProcessor- the post processor.- Returns:
- true if removed.
-
removeListener
public boolean removeListener(ConsumerFactory.Listener<K,V> listener)
Remove a listener.- Specified by:
removeListenerin interfaceConsumerFactory<K,V>- Parameters:
listener- the listener.- Returns:
- true if removed.
- Since:
- 2.5
-
updateConfigs
public void updateConfigs(java.util.Map<java.lang.String,java.lang.Object> updates)
Description copied from interface:ConsumerFactoryUpdate the consumer configuration map; useful for situations such as credential rotation.- Specified by:
updateConfigsin interfaceConsumerFactory<K,V>- Parameters:
updates- the configuration properties to update.
-
removeConfig
public void removeConfig(java.lang.String configKey)
Description copied from interface:ConsumerFactoryRemove the specified key from the configuration map.- Specified by:
removeConfigin interfaceConsumerFactory<K,V>- Parameters:
configKey- the key to remove.
-
createConsumer
public org.apache.kafka.clients.consumer.Consumer<K,V> createConsumer(@Nullable java.lang.String groupId, @Nullable java.lang.String clientIdPrefix, @Nullable java.lang.String clientIdSuffix)
Description copied from interface:ConsumerFactoryCreate a consumer with an explicit group id; in addition, the client id suffix is appended to the clientIdPrefix which overrides theclient.idproperty, if present.- Specified by:
createConsumerin interfaceConsumerFactory<K,V>- Parameters:
groupId- the group id.clientIdPrefix- the prefix.clientIdSuffix- the suffix.- Returns:
- the consumer.
-
createConsumer
public org.apache.kafka.clients.consumer.Consumer<K,V> createConsumer(@Nullable java.lang.String groupId, @Nullable java.lang.String clientIdPrefix, @Nullable java.lang.String clientIdSuffixArg, @Nullable java.util.Properties properties)
Description copied from interface:ConsumerFactoryCreate a consumer with an explicit group id; in addition, the client id suffix is appended to the clientIdPrefix which overrides theclient.idproperty, if present. In addition, consumer properties can be overridden if the factory implementation supports it.- Specified by:
createConsumerin interfaceConsumerFactory<K,V>- Parameters:
groupId- the group id.clientIdPrefix- the prefix.clientIdSuffixArg- the suffix.properties- the properties to override.- Returns:
- the consumer.
-
createKafkaConsumer
protected org.apache.kafka.clients.consumer.Consumer<K,V> createKafkaConsumer(@Nullable java.lang.String groupId, @Nullable java.lang.String clientIdPrefixArg, @Nullable java.lang.String clientIdSuffixArg, @Nullable java.util.Properties properties)
-
createKafkaConsumer
protected org.apache.kafka.clients.consumer.Consumer<K,V> createKafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configProps)
-
createRawConsumer
protected org.apache.kafka.clients.consumer.Consumer<K,V> createRawConsumer(java.util.Map<java.lang.String,java.lang.Object> configProps)
Create a Consumer.- Parameters:
configProps- the configuration properties.- Returns:
- the consumer.
- Since:
- 2.5
-
isAutoCommit
public boolean isAutoCommit()
Description copied from interface:ConsumerFactoryReturn true if consumers created by this factory use auto commit.- Specified by:
isAutoCommitin interfaceConsumerFactory<K,V>- Returns:
- true if auto commit.
-
-