Class DefaultKafkaConsumerFactory<K,V>
- Type Parameters:
K- the key type.V- the value type.
- All Implemented Interfaces:
Aware,BeanNameAware,ConsumerFactory<K,V>
ConsumerFactory implementation to produce new Consumer instances
for provided Map configs and optional Deserializers on each ConsumerFactory.createConsumer()
invocation.
If you are using Deserializers that have no-arg constructors and require no setup, then simplest to
specify Deserializer classes against ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG and
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG keys in the configs passed to the
DefaultKafkaConsumerFactory constructor.
If that is not possible, but you are using Deserializers that may be shared between all Consumer
instances (and specifically that their close() method is a no-op), then you can pass in Deserializer
instances for one or both of the key and value deserializers.
If neither of the above is true then you may provide a Supplier for one or both Deserializers
which will be used to obtain Deserializer(s) each time a Consumer is created by the factory.
- Author:
- Gary Russell, Murali Reddy, Artem Bilan, Chris Gilbert
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.kafka.core.ConsumerFactory
ConsumerFactory.Listener<K,V> -
Constructor Summary
ConstructorsConstructorDescriptionDefaultKafkaConsumerFactory(Map<String, Object> configs) Construct a factory with the provided configuration.DefaultKafkaConsumerFactory(Map<String, Object> configs, Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier) Construct a factory with the provided configuration and deserializer suppliers.DefaultKafkaConsumerFactory(Map<String, Object> configs, Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier, boolean configureDeserializers) Construct a factory with the provided configuration and deserializer suppliers.DefaultKafkaConsumerFactory(Map<String, Object> configs, org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer) Construct a factory with the provided configuration and deserializers.DefaultKafkaConsumerFactory(Map<String, Object> configs, org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer, boolean configureDeserializers) Construct a factory with the provided configuration and deserializers. -
Method Summary
Modifier and TypeMethodDescriptionvoidaddListener(int index, ConsumerFactory.Listener<K, V> listener) Add a listener at a specific index.voidaddListener(ConsumerFactory.Listener<K, V> listener) Add a listener.voidaddPostProcessor(ConsumerPostProcessor<K, V> postProcessor) Add a post processor.createConsumer(String groupId, String clientIdPrefix, String clientIdSuffix) Create a consumer with an explicit group id; in addition, the client id suffix is appended to the clientIdPrefix which overrides theclient.idproperty, if present.createConsumer(String groupId, String clientIdPrefix, String clientIdSuffixArg, Properties properties) Create a consumer with an explicit group id; in addition, the client id suffix is appended to the clientIdPrefix which overrides theclient.idproperty, if present.createKafkaConsumer(String groupId, String clientIdPrefixArg, String clientIdSuffixArg, Properties properties) createKafkaConsumer(Map<String, Object> configProps) createRawConsumer(Map<String, Object> configProps) Create a Consumer.Return an unmodifiable reference to the configuration map for this factory.org.apache.kafka.common.serialization.Deserializer<K>Return the configured key deserializer (if provided as an object instead of a class name in the properties).Get the current list of listeners.Get the current list of post processors.org.apache.kafka.common.serialization.Deserializer<V>Return the configured value deserializer (if provided as an object instead of a class name in the properties).booleanReturn true if consumers created by this factory use auto commit.voidremoveConfig(String configKey) Remove the specified key from the configuration map.booleanremoveListener(ConsumerFactory.Listener<K, V> listener) Remove a listener.booleanremovePostProcessor(ConsumerPostProcessor<K, V> postProcessor) Remove a post processor.voidsetBeanName(String name) voidsetConfigureDeserializers(boolean configureDeserializers) Set to false (default true) to prevent programmatically provided deserializers (via constructor or setters) from being configured using the producer configuration, e.g.voidsetKeyDeserializer(org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer) Set the key deserializer.voidsetKeyDeserializerSupplier(Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier) Set a supplier to supply instances of the key deserializer.voidsetValueDeserializer(org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer) Set the value deserializer.voidsetValueDeserializerSupplier(Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier) Set a supplier to supply instances of the value deserializer.voidupdateConfigs(Map<String, Object> updates) Update the consumer configuration map; useful for situations such as credential rotation.Methods inherited from class org.springframework.kafka.core.KafkaResourceFactory
checkBootstrap, getBootstrapServers, setBootstrapServersSupplierMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.springframework.kafka.core.ConsumerFactory
createConsumer, createConsumer, createConsumer
-
Constructor Details
-
DefaultKafkaConsumerFactory
Construct a factory with the provided configuration.- Parameters:
configs- the configuration.
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(Map<String, Object> configs, @Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, @Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer) Construct a factory with the provided configuration and deserializers. The deserializers'configure()methods will be called with the configuration map.- Parameters:
configs- the configuration.keyDeserializer- the keyDeserializer.valueDeserializer- the valueDeserializer.
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(Map<String, Object> configs, @Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer, @Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer, boolean configureDeserializers) Construct a factory with the provided configuration and deserializers. The deserializers'configure()methods will be called with the configuration map unlessconfigureDeserializersis false.- Parameters:
configs- the configuration.keyDeserializer- the keyDeserializer.valueDeserializer- the valueDeserializer.configureDeserializers- false to not configure the deserializers.- Since:
- 2.8.7
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(Map<String, Object> configs, @Nullable Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, @Nullable Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier) Construct a factory with the provided configuration and deserializer suppliers. When the suppliers are invoked to get an instance, the deserializers'configure()methods will be called with the configuration map.- Parameters:
configs- the configuration.keyDeserializerSupplier- the keyDeserializersupplier function.valueDeserializerSupplier- the valueDeserializersupplier function.- Since:
- 2.3
-
DefaultKafkaConsumerFactory
public DefaultKafkaConsumerFactory(Map<String, Object> configs, @Nullable Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier, @Nullable Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier, boolean configureDeserializers) Construct a factory with the provided configuration and deserializer suppliers. When the suppliers are invoked to get an instance, the deserializers'configure()methods will be called with the configuration map unlessconfigureDeserializersis false.- Parameters:
configs- the configuration.keyDeserializerSupplier- the keyDeserializersupplier function.valueDeserializerSupplier- the valueDeserializersupplier function.configureDeserializers- false to not configure the deserializers.- Since:
- 2.8.7
-
-
Method Details
-
setBeanName
- Specified by:
setBeanNamein interfaceBeanNameAware
-
setKeyDeserializer
public void setKeyDeserializer(@Nullable org.apache.kafka.common.serialization.Deserializer<K> keyDeserializer) Set the key deserializer. The deserializer will be configured using the consumer configuration, unlessconfigureDeserializersis false.- Parameters:
keyDeserializer- the deserializer.
-
setValueDeserializer
public void setValueDeserializer(@Nullable org.apache.kafka.common.serialization.Deserializer<V> valueDeserializer) Set the value deserializer. The deserializer will be configured using the consumer configuration, unlessconfigureDeserializersis false.- Parameters:
valueDeserializer- the value deserializer.
-
setKeyDeserializerSupplier
public void setKeyDeserializerSupplier(Supplier<org.apache.kafka.common.serialization.Deserializer<K>> keyDeserializerSupplier) Set a supplier to supply instances of the key deserializer. The deserializer will be configured using the consumer configuration, unlessconfigureDeserializersis false.- Parameters:
keyDeserializerSupplier- the supplier.- Since:
- 2.8
-
setValueDeserializerSupplier
public void setValueDeserializerSupplier(Supplier<org.apache.kafka.common.serialization.Deserializer<V>> valueDeserializerSupplier) Set a supplier to supply instances of the value deserializer. The deserializer will be configured using the consumer configuration, unlessconfigureDeserializersis false.- Parameters:
valueDeserializerSupplier- the supplier.- Since:
- 2.8
-
setConfigureDeserializers
public void setConfigureDeserializers(boolean configureDeserializers) Set to false (default true) to prevent programmatically provided deserializers (via constructor or setters) from being configured using the producer configuration, e.g. if the deserializers are already fully configured.- Parameters:
configureDeserializers- false to not configure.- Since:
- 2.8.7
- See Also:
-
getConfigurationProperties
Description copied from interface:ConsumerFactoryReturn an unmodifiable reference to the configuration map for this factory. Useful for cloning to make a similar factory.- Specified by:
getConfigurationPropertiesin interfaceConsumerFactory<K,V> - Returns:
- the configs.
-
getKeyDeserializer
Description copied from interface:ConsumerFactoryReturn the configured key deserializer (if provided as an object instead of a class name in the properties).- Specified by:
getKeyDeserializerin interfaceConsumerFactory<K,V> - Returns:
- the deserializer.
-
getValueDeserializer
Description copied from interface:ConsumerFactoryReturn the configured value deserializer (if provided as an object instead of a class name in the properties).- Specified by:
getValueDeserializerin interfaceConsumerFactory<K,V> - Returns:
- the deserializer.
-
getListeners
Get the current list of listeners.- Specified by:
getListenersin interfaceConsumerFactory<K,V> - Returns:
- the listeners.
- Since:
- 2.5
-
getPostProcessors
Description copied from interface:ConsumerFactoryGet the current list of post processors.- Specified by:
getPostProcessorsin interfaceConsumerFactory<K,V> - Returns:
- the post processor.
-
addListener
Add a listener.- Specified by:
addListenerin interfaceConsumerFactory<K,V> - Parameters:
listener- the listener.- Since:
- 2.5
-
addListener
Add a listener at a specific index.- Specified by:
addListenerin interfaceConsumerFactory<K,V> - Parameters:
index- the index (list position).listener- the listener.- Since:
- 2.5
-
addPostProcessor
Description copied from interface:ConsumerFactoryAdd a post processor.- Specified by:
addPostProcessorin interfaceConsumerFactory<K,V> - Parameters:
postProcessor- the post processor.
-
removePostProcessor
Description copied from interface:ConsumerFactoryRemove a post processor.- Specified by:
removePostProcessorin interfaceConsumerFactory<K,V> - Parameters:
postProcessor- the post processor.- Returns:
- true if removed.
-
removeListener
Remove a listener.- Specified by:
removeListenerin interfaceConsumerFactory<K,V> - Parameters:
listener- the listener.- Returns:
- true if removed.
- Since:
- 2.5
-
updateConfigs
Description copied from interface:ConsumerFactoryUpdate the consumer configuration map; useful for situations such as credential rotation.- Specified by:
updateConfigsin interfaceConsumerFactory<K,V> - Parameters:
updates- the configuration properties to update.
-
removeConfig
Description copied from interface:ConsumerFactoryRemove the specified key from the configuration map.- Specified by:
removeConfigin interfaceConsumerFactory<K,V> - Parameters:
configKey- the key to remove.
-
createConsumer
public org.apache.kafka.clients.consumer.Consumer<K,V> createConsumer(@Nullable String groupId, @Nullable String clientIdPrefix, @Nullable String clientIdSuffix) Description copied from interface:ConsumerFactoryCreate a consumer with an explicit group id; in addition, the client id suffix is appended to the clientIdPrefix which overrides theclient.idproperty, if present.- Specified by:
createConsumerin interfaceConsumerFactory<K,V> - Parameters:
groupId- the group id.clientIdPrefix- the prefix.clientIdSuffix- the suffix.- Returns:
- the consumer.
-
createConsumer
public org.apache.kafka.clients.consumer.Consumer<K,V> createConsumer(@Nullable String groupId, @Nullable String clientIdPrefix, @Nullable String clientIdSuffixArg, @Nullable Properties properties) Description copied from interface:ConsumerFactoryCreate a consumer with an explicit group id; in addition, the client id suffix is appended to the clientIdPrefix which overrides theclient.idproperty, if present. In addition, consumer properties can be overridden if the factory implementation supports it.- Specified by:
createConsumerin interfaceConsumerFactory<K,V> - Parameters:
groupId- the group id.clientIdPrefix- the prefix.clientIdSuffixArg- the suffix.properties- the properties to override.- Returns:
- the consumer.
-
createKafkaConsumer
-
createKafkaConsumer
-
createRawConsumer
protected org.apache.kafka.clients.consumer.Consumer<K,V> createRawConsumer(Map<String, Object> configProps) Create a Consumer.- Parameters:
configProps- the configuration properties.- Returns:
- the consumer.
- Since:
- 2.5
-
isAutoCommit
public boolean isAutoCommit()Description copied from interface:ConsumerFactoryReturn true if consumers created by this factory use auto commit.- Specified by:
isAutoCommitin interfaceConsumerFactory<K,V> - Returns:
- true if auto commit.
-