package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/streams/DltAwareProcessor.class */
public class DltAwareProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> {
    private final BiFunction<KIn, VIn, KeyValue<KOut, VOut>> delegateFunction;
    private final Supplier<Long> recordTimeSupplier;
    private String dltDestination;
    private DltSenderContext dltSenderContext;
    private BiConsumer<Record<KIn, VIn>, Exception> processorRecordRecoverer;
    private ProcessorContext<KOut, VOut> context;

    public DltAwareProcessor(BiFunction<KIn, VIn, KeyValue<KOut, VOut>> biFunction, String str, DltSenderContext dltSenderContext) {
        this(biFunction, str, dltSenderContext, System::currentTimeMillis);
    }

    public DltAwareProcessor(BiFunction<KIn, VIn, KeyValue<KOut, VOut>> biFunction, String str, DltSenderContext dltSenderContext, Supplier<Long> supplier) {
        this.delegateFunction = biFunction;
        this.recordTimeSupplier = supplier;
        Assert.isTrue(StringUtils.hasText(str), "DLT Destination topic must be provided.");
        this.dltDestination = str;
        Assert.notNull(dltSenderContext, "DltSenderContext cannot be null");
        this.dltSenderContext = dltSenderContext;
    }

    public DltAwareProcessor(BiFunction<KIn, VIn, KeyValue<KOut, VOut>> biFunction, BiConsumer<Record<KIn, VIn>, Exception> biConsumer) {
        this(biFunction, biConsumer, (Supplier<Long>) System::currentTimeMillis);
    }

    public DltAwareProcessor(BiFunction<KIn, VIn, KeyValue<KOut, VOut>> biFunction, BiConsumer<Record<KIn, VIn>, Exception> biConsumer, Supplier<Long> supplier) {
        this.delegateFunction = biFunction;
        this.recordTimeSupplier = supplier;
        Assert.notNull(biConsumer, "You must provide a valid processor recoverer");
        this.processorRecordRecoverer = biConsumer;
    }

    public void init(ProcessorContext<KOut, VOut> processorContext) {
        super.init(processorContext);
        this.context = processorContext;
    }

    public void process(Record<KIn, VIn> record) {
        try {
            KeyValue keyValue = (KeyValue) this.delegateFunction.apply(record.key(), record.value());
            this.context.forward(new Record(keyValue.key, keyValue.value, this.recordTimeSupplier.get().longValue(), record.headers()));
        } catch (Exception e) {
            if (this.processorRecordRecoverer == null) {
                this.processorRecordRecoverer = defaultProcessorRecordRecoverer();
            }
            this.processorRecordRecoverer.accept(record, e);
        }
    }

    public void close() {
        super.close();
    }

    BiConsumer<Record<KIn, VIn>, Exception> defaultProcessorRecordRecoverer() {
        return (record, exc) -> {
            StreamBridge streamBridge = this.dltSenderContext.getStreamBridge();
            if (streamBridge != null) {
                streamBridge.send(this.dltDestination, record.value());
            }
        };
    }
}
