package org.springframework.cloud.stream.binder.kinesis;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.kinesis.AmazonKinesisAsync;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.SimpleRecordsFetcherFactory;
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy;
import org.springframework.cloud.stream.binder.kinesis.adapter.SpringDynamoDBAdapterClient;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisConsumerProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kinesis.properties.KinesisProducerProperties;
import org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisConsumerDestination;
import org.springframework.cloud.stream.binder.kinesis.provisioning.KinesisStreamProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.core.convert.converter.Converter;
import org.springframework.expression.EvaluationContext;
import org.springframework.integration.aws.inbound.kinesis.KclMessageDrivenChannelAdapter;
import org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter;
import org.springframework.integration.aws.inbound.kinesis.KinesisMessageHeaderErrorMessageStrategy;
import org.springframework.integration.aws.inbound.kinesis.KinesisShardOffset;
import org.springframework.integration.aws.outbound.AbstractAwsMessageHandler;
import org.springframework.integration.aws.outbound.KinesisMessageHandler;
import org.springframework.integration.aws.outbound.KplMessageHandler;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.InterceptableChannel;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kinesis/KinesisMessageChannelBinder.class */
public class KinesisMessageChannelBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<KinesisConsumerProperties>, ExtendedProducerProperties<KinesisProducerProperties>, KinesisStreamProvisioner> implements ExtendedPropertiesBinder<MessageChannel, KinesisConsumerProperties, KinesisProducerProperties> {
    private static final ErrorMessageStrategy ERROR_MESSAGE_STRATEGY = new KinesisMessageHeaderErrorMessageStrategy();
    private final List<String> streamsInUse;
    private final KinesisBinderConfigurationProperties configurationProperties;
    private final AmazonDynamoDBStreamsAdapterClient dynamoDBStreamsAdapter;
    private final AmazonKinesisAsync amazonKinesis;
    private final AWSCredentialsProvider awsCredentialsProvider;
    private final AmazonCloudWatch cloudWatchClient;
    private final AmazonDynamoDB dynamoDBClient;
    private KinesisExtendedBindingProperties extendedBindingProperties;

    @Nullable
    private ConcurrentMetadataStore checkpointStore;

    @Nullable
    private LockRegistry lockRegistry;

    @Nullable
    private KinesisProducerConfiguration kinesisProducerConfiguration;
    private EvaluationContext evaluationContext;
    private List<KinesisClientLibConfiguration> kinesisClientLibConfigurations;

    public KinesisMessageChannelBinder(KinesisBinderConfigurationProperties kinesisBinderConfigurationProperties, KinesisStreamProvisioner kinesisStreamProvisioner, AmazonKinesisAsync amazonKinesisAsync, AWSCredentialsProvider aWSCredentialsProvider, @Nullable AmazonDynamoDB amazonDynamoDB, @Nullable AmazonDynamoDBStreams amazonDynamoDBStreams, @Nullable AmazonCloudWatch amazonCloudWatch) {
        super(headersToMap(kinesisBinderConfigurationProperties), kinesisStreamProvisioner);
        this.streamsInUse = new ArrayList();
        this.extendedBindingProperties = new KinesisExtendedBindingProperties();
        Assert.notNull(amazonKinesisAsync, "'amazonKinesis' must not be null");
        Assert.notNull(aWSCredentialsProvider, "'awsCredentialsProvider' must not be null");
        this.configurationProperties = kinesisBinderConfigurationProperties;
        this.amazonKinesis = amazonKinesisAsync;
        this.cloudWatchClient = amazonCloudWatch;
        this.dynamoDBClient = amazonDynamoDB;
        this.awsCredentialsProvider = aWSCredentialsProvider;
        if (amazonDynamoDBStreams != null) {
            this.dynamoDBStreamsAdapter = new SpringDynamoDBAdapterClient(amazonDynamoDBStreams);
        } else {
            this.dynamoDBStreamsAdapter = null;
        }
    }

    public void setExtendedBindingProperties(KinesisExtendedBindingProperties kinesisExtendedBindingProperties) {
        this.extendedBindingProperties = kinesisExtendedBindingProperties;
    }

    public void setCheckpointStore(ConcurrentMetadataStore concurrentMetadataStore) {
        this.checkpointStore = concurrentMetadataStore;
    }

    public void setLockRegistry(LockRegistry lockRegistry) {
        this.lockRegistry = lockRegistry;
    }

    public void setKinesisProducerConfiguration(KinesisProducerConfiguration kinesisProducerConfiguration) {
        this.kinesisProducerConfiguration = kinesisProducerConfiguration;
    }

    public void setKinesisClientLibConfigurations(List<KinesisClientLibConfiguration> list) {
        this.kinesisClientLibConfigurations = list;
    }

    /* renamed from: getExtendedConsumerProperties, reason: merged with bridge method [inline-methods] */
    public KinesisConsumerProperties m2getExtendedConsumerProperties(String str) {
        return this.extendedBindingProperties.m6getExtendedConsumerProperties(str);
    }

    /* renamed from: getExtendedProducerProperties, reason: merged with bridge method [inline-methods] */
    public KinesisProducerProperties m1getExtendedProducerProperties(String str) {
        return this.extendedBindingProperties.m5getExtendedProducerProperties(str);
    }

    public String getDefaultsPrefix() {
        return this.extendedBindingProperties.getDefaultsPrefix();
    }

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
    }

    public AmazonKinesisAsync getAmazonKinesis() {
        return this.amazonKinesis;
    }

    public List<String> getStreamsInUse() {
        return this.streamsInUse;
    }

    protected void onInit() throws Exception {
        super.onInit();
        this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
    }

    public String getBinderIdentity() {
        return "kinesis-" + super.getBinderIdentity();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageHandler createProducerMessageHandler(ProducerDestination producerDestination, ExtendedProducerProperties<KinesisProducerProperties> extendedProducerProperties, MessageChannel messageChannel) {
        FunctionExpression<Message<?>> functionExpression = new FunctionExpression<>(message -> {
            return message.getHeaders().containsKey("scst_partition") ? message.getHeaders().get("scst_partition") : Integer.valueOf(message.getPayload().hashCode());
        });
        AbstractAwsMessageHandler<?> createKplMessageHandler = this.configurationProperties.isKplKclEnabled() ? createKplMessageHandler(producerDestination, functionExpression) : createKinesisMessageHandler(producerDestination, functionExpression);
        createKplMessageHandler.setSync(((KinesisProducerProperties) extendedProducerProperties.getExtension()).isSync());
        createKplMessageHandler.setSendTimeout(((KinesisProducerProperties) extendedProducerProperties.getExtension()).getSendTimeout());
        createKplMessageHandler.setFailureChannel(messageChannel);
        createKplMessageHandler.setBeanFactory(getBeanFactory());
        this.streamsInUse.add(producerDestination.getName());
        return createKplMessageHandler;
    }

    private AbstractAwsMessageHandler<?> createKinesisMessageHandler(ProducerDestination producerDestination, FunctionExpression<Message<?>> functionExpression) {
        KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(this.amazonKinesis);
        kinesisMessageHandler.setStream(producerDestination.getName());
        kinesisMessageHandler.setPartitionKeyExpression(functionExpression);
        return kinesisMessageHandler;
    }

    private AbstractAwsMessageHandler<?> createKplMessageHandler(ProducerDestination producerDestination, FunctionExpression<Message<?>> functionExpression) {
        KplMessageHandler kplMessageHandler = new KplMessageHandler(new KinesisProducer(this.kinesisProducerConfiguration));
        kplMessageHandler.setStream(producerDestination.getName());
        kplMessageHandler.setPartitionKeyExpression(functionExpression);
        return kplMessageHandler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void postProcessOutputChannel(MessageChannel messageChannel, final ExtendedProducerProperties<KinesisProducerProperties> extendedProducerProperties) {
        if ((messageChannel instanceof InterceptableChannel) && extendedProducerProperties.isPartitioned()) {
            ((InterceptableChannel) messageChannel).addInterceptor(0, new ChannelInterceptor() { // from class: org.springframework.cloud.stream.binder.kinesis.KinesisMessageChannelBinder.1
                private final PartitionKeyExtractorStrategy partitionKeyExtractorStrategy;

                {
                    if (StringUtils.hasText(extendedProducerProperties.getPartitionKeyExtractorName())) {
                        this.partitionKeyExtractorStrategy = (PartitionKeyExtractorStrategy) KinesisMessageChannelBinder.this.getBeanFactory().getBean(extendedProducerProperties.getPartitionKeyExtractorName(), PartitionKeyExtractorStrategy.class);
                    } else {
                        ExtendedProducerProperties extendedProducerProperties2 = extendedProducerProperties;
                        this.partitionKeyExtractorStrategy = message -> {
                            return extendedProducerProperties2.getPartitionKeyExpression().getValue(KinesisMessageChannelBinder.this.evaluationContext, message);
                        };
                    }
                }

                public Message<?> preSend(Message<?> message, MessageChannel messageChannel2) {
                    return MessageBuilder.fromMessage(message).setHeader("scst_partitionOverride", this.partitionKeyExtractorStrategy.extractKey(message)).build();
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageProducer createConsumerEndpoint(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties) {
        ConsumerDestination consumerDestination2 = consumerDestination;
        if (((KinesisConsumerProperties) extendedConsumerProperties.getExtension()).isDynamoDbStreams()) {
            String latestStreamArn = this.dynamoDBClient.describeTable(consumerDestination2.getName()).getTable().getLatestStreamArn();
            if (!StringUtils.hasText(latestStreamArn)) {
                throw new ProvisioningException("The DynamoDB table [" + consumerDestination2.getName() + "] doesn't have Streams enabled.");
            }
            consumerDestination2 = new KinesisConsumerDestination(latestStreamArn, Collections.emptyList());
        } else {
            this.streamsInUse.add(consumerDestination2.getName());
        }
        return this.configurationProperties.isKplKclEnabled() ? createKclConsumerEndpoint(consumerDestination2, str, extendedConsumerProperties) : createKinesisConsumerEndpoint(consumerDestination2, str, extendedConsumerProperties);
    }

    private MessageProducer createKclConsumerEndpoint(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties) {
        KinesisConsumerProperties kinesisConsumerProperties = (KinesisConsumerProperties) extendedConsumerProperties.getExtension();
        if (kinesisConsumerProperties.getShardId() != null) {
            this.logger.warn("Kinesis Client Library doesn't does not support explicit shard configuration. Ignoring 'shardId' property");
        }
        AmazonDynamoDBStreamsAdapterClient amazonDynamoDBStreamsAdapterClient = kinesisConsumerProperties.isDynamoDbStreams() ? this.dynamoDBStreamsAdapter : this.amazonKinesis;
        KinesisClientLibConfiguration obtainKinesisClientLibConfiguration = obtainKinesisClientLibConfiguration((KinesisConsumerProperties) extendedConsumerProperties.getExtension(), consumerDestination.getName(), str);
        KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter = new KclMessageDrivenChannelAdapter(obtainKinesisClientLibConfiguration, amazonDynamoDBStreamsAdapterClient, this.cloudWatchClient, this.dynamoDBClient);
        String applicationName = obtainKinesisClientLibConfiguration.getApplicationName();
        kclMessageDrivenChannelAdapter.setCheckpointMode(kinesisConsumerProperties.getCheckpointMode());
        kclMessageDrivenChannelAdapter.setCheckpointsInterval(kinesisConsumerProperties.getCheckpointInterval().longValue());
        kclMessageDrivenChannelAdapter.setListenerMode(kinesisConsumerProperties.getListenerMode());
        if (extendedConsumerProperties.isUseNativeDecoding()) {
            kclMessageDrivenChannelAdapter.setConverter((Converter) null);
        } else {
            kclMessageDrivenChannelAdapter.setConverter(bArr -> {
                return bArr;
            });
        }
        AbstractMessageChannelBinder.ErrorInfrastructure registerErrorInfrastructure = registerErrorInfrastructure(consumerDestination, applicationName, extendedConsumerProperties);
        kclMessageDrivenChannelAdapter.setErrorMessageStrategy(ERROR_MESSAGE_STRATEGY);
        kclMessageDrivenChannelAdapter.setErrorChannel(registerErrorInfrastructure.getErrorChannel());
        kclMessageDrivenChannelAdapter.setBindSourceRecord(true);
        return kclMessageDrivenChannelAdapter;
    }

    private KinesisClientLibConfiguration obtainKinesisClientLibConfiguration(KinesisConsumerProperties kinesisConsumerProperties, String str, String str2) {
        KinesisClientLibConfiguration kinesisClientLibConfiguration = null;
        for (KinesisClientLibConfiguration kinesisClientLibConfiguration2 : this.kinesisClientLibConfigurations) {
            if (str.equals(kinesisClientLibConfiguration2.getStreamName())) {
                kinesisClientLibConfiguration = kinesisClientLibConfiguration2;
                if (Objects.equals(str2, kinesisClientLibConfiguration2.getApplicationName())) {
                    break;
                }
            }
        }
        if (kinesisClientLibConfiguration == null) {
            boolean z = !StringUtils.hasText(str2);
            kinesisClientLibConfiguration = new KinesisClientLibConfiguration(z ? "anonymous." + UUID.randomUUID() : str2, str, (String) null, (String) null, InitialPositionInStream.LATEST, this.awsCredentialsProvider, (AWSCredentialsProvider) null, (AWSCredentialsProvider) null, 10000L, kinesisConsumerProperties.getWorkerId() != null ? kinesisConsumerProperties.getWorkerId() : UUID.randomUUID().toString(), 10000, kinesisConsumerProperties.getIdleBetweenPolls(), false, 10000L, 60000L, true, new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(), kinesisConsumerProperties.getConsumerBackoff(), 10000L, 10000, true, (String) null, 5000L, KinesisClientLibConfiguration.DEFAULT_DDB_BILLING_MODE, new SimpleRecordsFetcherFactory(), Duration.ofMinutes(1L).toMillis(), Duration.ofMinutes(5L).toMillis(), Duration.ofMinutes(30L).toMillis());
            String shardIteratorType = kinesisConsumerProperties.getShardIteratorType();
            KinesisShardOffset latest = KinesisShardOffset.latest();
            if (StringUtils.hasText(shardIteratorType)) {
                String[] split = shardIteratorType.split(":", 2);
                ShardIteratorType valueOf = ShardIteratorType.valueOf(split[0]);
                latest = new KinesisShardOffset(valueOf);
                if (split.length > 1) {
                    if (ShardIteratorType.AT_TIMESTAMP.equals(valueOf)) {
                        latest.setTimestamp(new Date(Long.parseLong(split[1])));
                    } else {
                        latest.setSequenceNumber(split[1]);
                    }
                }
            }
            KinesisShardOffset trimHorizon = (z || StringUtils.hasText(shardIteratorType)) ? latest : KinesisShardOffset.trimHorizon();
            if (trimHorizon.getIteratorType().equals(ShardIteratorType.AT_TIMESTAMP)) {
                kinesisClientLibConfiguration.withTimestampAtInitialPositionInStream(trimHorizon.getTimestamp());
            } else {
                if (trimHorizon.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER) || trimHorizon.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)) {
                    throw new IllegalArgumentException("The KCL does not support 'AT_SEQUENCE_NUMBER' or 'AFTER_SEQUENCE_NUMBER' initial position in stream.");
                }
                kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.valueOf(trimHorizon.getIteratorType().name()));
            }
        }
        return kinesisClientLibConfiguration;
    }

    private MessageProducer createKinesisConsumerEndpoint(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<KinesisConsumerProperties> extendedConsumerProperties) {
        KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter;
        KinesisConsumerProperties kinesisConsumerProperties = (KinesisConsumerProperties) extendedConsumerProperties.getExtension();
        if (extendedConsumerProperties.getInstanceCount() > 1 && ((KinesisConsumerProperties) extendedConsumerProperties.getExtension()).getShardId() != null) {
            throw new InvalidArgumentException("'instanceCount' more than 1 and 'shardId' cannot be provided together.");
        }
        HashSet hashSet = null;
        String shardIteratorType = kinesisConsumerProperties.getShardIteratorType();
        KinesisShardOffset latest = KinesisShardOffset.latest();
        if (StringUtils.hasText(shardIteratorType)) {
            String[] split = shardIteratorType.split(":", 2);
            ShardIteratorType valueOf = ShardIteratorType.valueOf(split[0]);
            latest = new KinesisShardOffset(valueOf);
            if (split.length > 1) {
                if (ShardIteratorType.AT_TIMESTAMP.equals(valueOf)) {
                    latest.setTimestamp(new Date(Long.parseLong(split[1])));
                } else {
                    latest.setSequenceNumber(split[1]);
                }
            }
        }
        if (extendedConsumerProperties.getInstanceCount() > 1) {
            hashSet = new HashSet();
            List<Shard> shards = ((KinesisConsumerDestination) consumerDestination).getShards();
            for (int i = 0; i < shards.size(); i++) {
                if (i % extendedConsumerProperties.getInstanceCount() == extendedConsumerProperties.getInstanceIndex()) {
                    KinesisShardOffset kinesisShardOffset = new KinesisShardOffset(latest);
                    kinesisShardOffset.setStream(consumerDestination.getName());
                    kinesisShardOffset.setShard(shards.get(i).getShardId());
                    hashSet.add(kinesisShardOffset);
                }
            }
        }
        AmazonDynamoDBStreamsAdapterClient amazonDynamoDBStreamsAdapterClient = kinesisConsumerProperties.isDynamoDbStreams() ? this.dynamoDBStreamsAdapter : this.amazonKinesis;
        String shardId = kinesisConsumerProperties.getShardId();
        if (CollectionUtils.isEmpty(hashSet) && shardId == null) {
            kinesisMessageDrivenChannelAdapter = new KinesisMessageDrivenChannelAdapter(amazonDynamoDBStreamsAdapterClient, new String[]{consumerDestination.getName()});
        } else if (shardId != null) {
            KinesisShardOffset kinesisShardOffset2 = new KinesisShardOffset(latest);
            kinesisShardOffset2.setStream(consumerDestination.getName());
            kinesisShardOffset2.setShard(shardId);
            kinesisMessageDrivenChannelAdapter = new KinesisMessageDrivenChannelAdapter(amazonDynamoDBStreamsAdapterClient, new KinesisShardOffset[]{kinesisShardOffset2});
        } else {
            kinesisMessageDrivenChannelAdapter = new KinesisMessageDrivenChannelAdapter(amazonDynamoDBStreamsAdapterClient, (KinesisShardOffset[]) hashSet.toArray(new KinesisShardOffset[0]));
        }
        boolean z = !StringUtils.hasText(str);
        String str2 = z ? "anonymous." + UUID.randomUUID() : str;
        kinesisMessageDrivenChannelAdapter.setConsumerGroup(str2);
        kinesisMessageDrivenChannelAdapter.setStreamInitialSequence((z || StringUtils.hasText(shardIteratorType)) ? latest : KinesisShardOffset.trimHorizon());
        kinesisMessageDrivenChannelAdapter.setListenerMode(kinesisConsumerProperties.getListenerMode());
        if (extendedConsumerProperties.isUseNativeDecoding()) {
            kinesisMessageDrivenChannelAdapter.setConverter((Converter) null);
        } else {
            kinesisMessageDrivenChannelAdapter.setConverter(bArr -> {
                return bArr;
            });
        }
        kinesisMessageDrivenChannelAdapter.setCheckpointMode(kinesisConsumerProperties.getCheckpointMode());
        kinesisMessageDrivenChannelAdapter.setRecordsLimit(kinesisConsumerProperties.getRecordsLimit());
        kinesisMessageDrivenChannelAdapter.setIdleBetweenPolls(kinesisConsumerProperties.getIdleBetweenPolls());
        kinesisMessageDrivenChannelAdapter.setConsumerBackoff(kinesisConsumerProperties.getConsumerBackoff());
        kinesisMessageDrivenChannelAdapter.setCheckpointsInterval(kinesisConsumerProperties.getCheckpointInterval().longValue());
        if (this.checkpointStore != null) {
            kinesisMessageDrivenChannelAdapter.setCheckpointStore(this.checkpointStore);
        }
        kinesisMessageDrivenChannelAdapter.setLockRegistry(this.lockRegistry);
        kinesisMessageDrivenChannelAdapter.setConcurrency(extendedConsumerProperties.getConcurrency());
        kinesisMessageDrivenChannelAdapter.setStartTimeout(kinesisConsumerProperties.getStartTimeout());
        kinesisMessageDrivenChannelAdapter.setDescribeStreamBackoff(this.configurationProperties.getDescribeStreamBackoff());
        kinesisMessageDrivenChannelAdapter.setDescribeStreamRetries(this.configurationProperties.getDescribeStreamRetries());
        AbstractMessageChannelBinder.ErrorInfrastructure registerErrorInfrastructure = registerErrorInfrastructure(consumerDestination, str2, extendedConsumerProperties);
        kinesisMessageDrivenChannelAdapter.setErrorMessageStrategy(ERROR_MESSAGE_STRATEGY);
        kinesisMessageDrivenChannelAdapter.setErrorChannel(registerErrorInfrastructure.getErrorChannel());
        kinesisMessageDrivenChannelAdapter.setBindSourceRecord(true);
        return kinesisMessageDrivenChannelAdapter;
    }

    protected ErrorMessageStrategy getErrorMessageStrategy() {
        return ERROR_MESSAGE_STRATEGY;
    }

    private static String[] headersToMap(KinesisBinderConfigurationProperties kinesisBinderConfigurationProperties) {
        Assert.notNull(kinesisBinderConfigurationProperties, "'configurationProperties' must not be null");
        if (ObjectUtils.isEmpty(kinesisBinderConfigurationProperties.getHeaders())) {
            return BinderHeaders.STANDARD_HEADERS;
        }
        String[] strArr = (String[]) Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, BinderHeaders.STANDARD_HEADERS.length + kinesisBinderConfigurationProperties.getHeaders().length);
        System.arraycopy(kinesisBinderConfigurationProperties.getHeaders(), 0, strArr, BinderHeaders.STANDARD_HEADERS.length, kinesisBinderConfigurationProperties.getHeaders().length);
        return strArr;
    }
}
