package io.streamnative.pulsar.handlers.kop.coordinator.group;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import io.streamnative.pulsar.handlers.kop.utils.MessageIdUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager.class */
public class GroupMetadataManager {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) GroupMetadataManager.class);
    private final byte magicValue = 2;
    private final CompressionType compressionType;
    private final OffsetConfig offsetConfig;
    private final ConcurrentMap<String, GroupMetadata> groupMetadataCache;
    private final ReentrantLock partitionLock;
    private final Set<Integer> loadingPartitions;
    private final Set<Integer> ownedPartitions;
    private final AtomicBoolean shuttingDown;
    private final int groupMetadataTopicPartitionCount;
    private final ConcurrentMap<Integer, CompletableFuture<Producer<ByteBuffer>>> offsetsProducers;
    private final ConcurrentMap<Integer, CompletableFuture<Reader<ByteBuffer>>> offsetsReaders;
    private final ScheduledExecutorService scheduler;
    private final Map<Long, Set<String>> openGroupsForProducer;
    private final ProducerBuilder<ByteBuffer> metadataTopicProducerBuilder;
    private final ReaderBuilder<ByteBuffer> metadataTopicReaderBuilder;
    private final Time time;
    private final Function<String, Integer> partitioner;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager$BaseKey.class */
    public interface BaseKey {
        short version();

        Object key();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager$GroupMetadataKey.class */
    public static class GroupMetadataKey implements BaseKey {
        private final short version;
        private final String key;

        public String toString() {
            return this.key;
        }

        public GroupMetadataKey(short s, String str) {
            this.version = s;
            this.key = str;
        }

        @Override // io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager.BaseKey
        public short version() {
            return this.version;
        }

        @Override // io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager.BaseKey
        public String key() {
            return this.key;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof GroupMetadataKey)) {
                return false;
            }
            GroupMetadataKey groupMetadataKey = (GroupMetadataKey) obj;
            if (!groupMetadataKey.canEqual(this) || version() != groupMetadataKey.version()) {
                return false;
            }
            String key = key();
            String key2 = groupMetadataKey.key();
            return key == null ? key2 == null : key.equals(key2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof GroupMetadataKey;
        }

        public int hashCode() {
            int version = (1 * 59) + version();
            String key = key();
            return (version * 59) + (key == null ? 43 : key.hashCode());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager$GroupTopicPartition.class */
    public static class GroupTopicPartition {
        private final String group;
        private final TopicPartition topicPartition;

        /* JADX INFO: Access modifiers changed from: package-private */
        public GroupTopicPartition(String str, String str2, int i) {
            this.group = str;
            this.topicPartition = new TopicPartition(str2, i);
        }

        public String toString() {
            return String.format("[%s, %s, %d]", this.group, this.topicPartition.topic(), Integer.valueOf(this.topicPartition.partition()));
        }

        public String group() {
            return this.group;
        }

        public TopicPartition topicPartition() {
            return this.topicPartition;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof GroupTopicPartition)) {
                return false;
            }
            GroupTopicPartition groupTopicPartition = (GroupTopicPartition) obj;
            if (!groupTopicPartition.canEqual(this)) {
                return false;
            }
            String group = group();
            String group2 = groupTopicPartition.group();
            if (group == null) {
                if (group2 != null) {
                    return false;
                }
            } else if (!group.equals(group2)) {
                return false;
            }
            TopicPartition topicPartition = topicPartition();
            TopicPartition topicPartition2 = groupTopicPartition.topicPartition();
            return topicPartition == null ? topicPartition2 == null : topicPartition.equals(topicPartition2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof GroupTopicPartition;
        }

        public int hashCode() {
            String group = group();
            int hashCode = (1 * 59) + (group == null ? 43 : group.hashCode());
            TopicPartition topicPartition = topicPartition();
            return (hashCode * 59) + (topicPartition == null ? 43 : topicPartition.hashCode());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/group/GroupMetadataManager$OffsetKey.class */
    public static class OffsetKey implements BaseKey {
        private final short version;
        private final GroupTopicPartition key;

        public String toString() {
            return this.key.toString();
        }

        public OffsetKey(short s, GroupTopicPartition groupTopicPartition) {
            this.version = s;
            this.key = groupTopicPartition;
        }

        @Override // io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager.BaseKey
        public short version() {
            return this.version;
        }

        @Override // io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadataManager.BaseKey
        public GroupTopicPartition key() {
            return this.key;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof OffsetKey)) {
                return false;
            }
            OffsetKey offsetKey = (OffsetKey) obj;
            if (!offsetKey.canEqual(this) || version() != offsetKey.version()) {
                return false;
            }
            GroupTopicPartition key = key();
            GroupTopicPartition key2 = offsetKey.key();
            return key == null ? key2 == null : key.equals(key2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof OffsetKey;
        }

        public int hashCode() {
            int version = (1 * 59) + version();
            GroupTopicPartition key = key();
            return (version * 59) + (key == null ? 43 : key.hashCode());
        }
    }

    public GroupMetadataManager(OffsetConfig offsetConfig, ProducerBuilder<ByteBuffer> producerBuilder, ReaderBuilder<ByteBuffer> readerBuilder, ScheduledExecutorService scheduledExecutorService, Time time) {
        this(offsetConfig, producerBuilder, readerBuilder, scheduledExecutorService, time, str -> {
            return Integer.valueOf(getPartitionId(str, offsetConfig.offsetsTopicNumPartitions()));
        });
    }

    public static int getPartitionId(String str, int i) {
        return MathUtils.signSafeMod(str.hashCode(), i);
    }

    GroupMetadataManager(OffsetConfig offsetConfig, ProducerBuilder<ByteBuffer> producerBuilder, ReaderBuilder<ByteBuffer> readerBuilder, ScheduledExecutorService scheduledExecutorService, Time time, Function<String, Integer> function) {
        this.magicValue = (byte) 2;
        this.partitionLock = new ReentrantLock();
        this.loadingPartitions = new HashSet();
        this.ownedPartitions = new HashSet();
        this.shuttingDown = new AtomicBoolean(false);
        this.offsetsProducers = new ConcurrentHashMap();
        this.offsetsReaders = new ConcurrentHashMap();
        this.openGroupsForProducer = new HashMap();
        this.offsetConfig = offsetConfig;
        this.compressionType = offsetConfig.offsetsTopicCompressionType();
        this.groupMetadataCache = new ConcurrentHashMap();
        this.groupMetadataTopicPartitionCount = offsetConfig.offsetsTopicNumPartitions();
        this.metadataTopicProducerBuilder = producerBuilder;
        this.metadataTopicReaderBuilder = readerBuilder;
        this.scheduler = scheduledExecutorService;
        this.time = time;
        this.partitioner = function;
    }

    public void startup(boolean z) {
        if (z) {
            this.scheduler.scheduleAtFixedRate(this::cleanupGroupMetadata, this.offsetConfig.offsetsRetentionCheckIntervalMs(), this.offsetConfig.offsetsRetentionCheckIntervalMs(), TimeUnit.MILLISECONDS);
        }
    }

    public void shutdown() {
        this.shuttingDown.set(true);
        this.scheduler.shutdown();
        List list = (List) this.offsetsProducers.entrySet().stream().map(entry -> {
            return ((CompletableFuture) entry.getValue()).thenComposeAsync(producer -> {
                return producer.closeAsync();
            }, (Executor) this.scheduler);
        }).collect(Collectors.toList());
        this.offsetsProducers.clear();
        List list2 = (List) this.offsetsReaders.entrySet().stream().map(entry2 -> {
            return ((CompletableFuture) entry2.getValue()).thenComposeAsync(reader -> {
                return reader.closeAsync();
            }, (Executor) this.scheduler);
        }).collect(Collectors.toList());
        this.offsetsReaders.clear();
        FutureUtil.waitForAll(list).whenCompleteAsync((r6, th) -> {
            if (th != null) {
                log.error("Error when close all the {} offsetsProducers in GroupMetadataManager", Integer.valueOf(list.size()), th);
            }
            if (log.isDebugEnabled()) {
                log.debug("Closed all the {} offsetsProducers in GroupMetadataManager", Integer.valueOf(list.size()));
            }
        }, (Executor) this.scheduler);
        FutureUtil.waitForAll(list2).whenCompleteAsync((r62, th2) -> {
            if (th2 != null) {
                log.error("Error when close all the {} offsetsReaders in GroupMetadataManager", Integer.valueOf(list2.size()), th2);
            }
            if (log.isDebugEnabled()) {
                log.debug("Closed all the {} offsetsReaders in GroupMetadataManager.", Integer.valueOf(list2.size()));
            }
        }, (Executor) this.scheduler);
    }

    public ConcurrentMap<Integer, CompletableFuture<Producer<ByteBuffer>>> getOffsetsProducers() {
        return this.offsetsProducers;
    }

    public ConcurrentMap<Integer, CompletableFuture<Reader<ByteBuffer>>> getOffsetsReaders() {
        return this.offsetsReaders;
    }

    public Iterable<GroupMetadata> currentGroups() {
        return this.groupMetadataCache.values();
    }

    public Stream<GroupMetadata> currentGroupsStream() {
        return this.groupMetadataCache.values().stream();
    }

    public boolean isPartitionOwned(int i) {
        return ((Boolean) CoreUtils.inLock(this.partitionLock, () -> {
            return Boolean.valueOf(this.ownedPartitions.contains(Integer.valueOf(i)));
        })).booleanValue();
    }

    public boolean isPartitionLoading(int i) {
        return ((Boolean) CoreUtils.inLock(this.partitionLock, () -> {
            return Boolean.valueOf(this.loadingPartitions.contains(Integer.valueOf(i)));
        })).booleanValue();
    }

    public int partitionFor(String str) {
        return this.partitioner.apply(str).intValue();
    }

    public String getTopicPartitionName() {
        return this.offsetConfig.offsetsTopicName();
    }

    public String getTopicPartitionName(int i) {
        return getTopicPartitionName(this.offsetConfig.offsetsTopicName(), i);
    }

    public static String getTopicPartitionName(String str, int i) {
        return str + "-partition-" + i;
    }

    public int getGroupMetadataTopicPartitionCount() {
        return this.groupMetadataTopicPartitionCount;
    }

    public boolean isGroupLocal(String str) {
        return isPartitionOwned(partitionFor(str));
    }

    public boolean isGroupLoading(String str) {
        return isPartitionLoading(partitionFor(str));
    }

    public boolean isLoading() {
        return ((Boolean) CoreUtils.inLock(this.partitionLock, () -> {
            return Boolean.valueOf(!this.loadingPartitions.isEmpty());
        })).booleanValue();
    }

    public OffsetConfig offsetConfig() {
        return this.offsetConfig;
    }

    public boolean groupNotExists(String str) {
        return ((Boolean) CoreUtils.inLock(this.partitionLock, () -> {
            return Boolean.valueOf(isGroupLocal(str) && ((Boolean) getGroup(str).map(groupMetadata -> {
                return (Boolean) groupMetadata.inLock(() -> {
                    return Boolean.valueOf(groupMetadata.is(GroupState.Dead));
                });
            }).orElse(true)).booleanValue());
        })).booleanValue();
    }

    boolean isGroupOpenForProducer(long j, String str) {
        return this.openGroupsForProducer.getOrDefault(Long.valueOf(j), Collections.emptySet()).contains(str);
    }

    public Optional<GroupMetadata> getGroup(String str) {
        return Optional.ofNullable(this.groupMetadataCache.getOrDefault(str, null));
    }

    public GroupMetadata addGroup(GroupMetadata groupMetadata) {
        GroupMetadata putIfAbsent = this.groupMetadataCache.putIfAbsent(groupMetadata.groupId(), groupMetadata);
        return null != putIfAbsent ? putIfAbsent : groupMetadata;
    }

    public CompletableFuture<Errors> storeGroup(GroupMetadata groupMetadata, Map<String, byte[]> map) {
        TimestampType timestampType = TimestampType.CREATE_TIME;
        long milliseconds = this.time.milliseconds();
        byte[] groupMetadataKey = GroupMetadataConstants.groupMetadataKey(groupMetadata.groupId());
        byte[] groupMetadataValue = GroupMetadataConstants.groupMetadataValue(groupMetadata, map, (short) 1);
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes((byte) 2, this.compressionType, Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(milliseconds, groupMetadataKey, groupMetadataValue)}))), (byte) 2, this.compressionType, timestampType, 0L);
        builder.append(milliseconds, groupMetadataKey, groupMetadataValue);
        MemoryRecords build = builder.build();
        return getOffsetsTopicProducer(groupMetadata.groupId()).thenComposeAsync(producer -> {
            return producer.newMessage().keyBytes(groupMetadataKey).value(build.buffer()).eventTime(milliseconds).sendAsync();
        }, (Executor) this.scheduler).thenApplyAsync((Function<? super U, ? extends U>) messageId -> {
            if (!isGroupLocal(groupMetadata.groupId())) {
                if (log.isDebugEnabled()) {
                    log.warn("add partition ownership for group {}", groupMetadata.groupId());
                }
                addPartitionOwnership(partitionFor(groupMetadata.groupId()));
            }
            return Errors.NONE;
        }, (Executor) this.scheduler).exceptionally(th -> {
            return Errors.COORDINATOR_NOT_AVAILABLE;
        });
    }

    CompletableFuture<MessageId> storeOffsetMessage(String str, byte[] bArr, ByteBuffer byteBuffer, long j) {
        return getOffsetsTopicProducer(str).thenComposeAsync(producer -> {
            return producer.newMessage().keyBytes(bArr).value(byteBuffer).eventTime(j).sendAsync();
        }, (Executor) this.scheduler);
    }

    public CompletableFuture<Map<TopicPartition, Errors>> storeOffsets(GroupMetadata groupMetadata, String str, Map<TopicPartition, OffsetAndMetadata> map) {
        return storeOffsets(groupMetadata, str, map, -1L, (short) -1);
    }

    public CompletableFuture<Map<TopicPartition, Errors>> storeOffsets(GroupMetadata groupMetadata, String str, Map<TopicPartition, OffsetAndMetadata> map, long j, short s) {
        Map map2 = (Map) map.entrySet().stream().filter(entry -> {
            return validateOffsetMetadataLength(((OffsetAndMetadata) entry.getValue()).metadata());
        }).collect(Collectors.toMap(entry2 -> {
            return (TopicPartition) entry2.getKey();
        }, entry3 -> {
            return (OffsetAndMetadata) entry3.getValue();
        }));
        groupMetadata.inLock(() -> {
            if (groupMetadata.hasReceivedConsistentOffsetCommits()) {
                return null;
            }
            log.warn("group: {} with leader: {} has received offset commits from consumers as well as transactional offsetsProducers. Mixing both types of offset commits will generally result in surprises and should be avoided.", groupMetadata.groupId(), groupMetadata.leaderOrNull());
            return null;
        });
        boolean z = j != -1;
        if (map2.isEmpty()) {
            return CompletableFuture.completedFuture((Map) map.entrySet().stream().collect(Collectors.toMap(entry4 -> {
                return (TopicPartition) entry4.getKey();
            }, entry5 -> {
                return Errors.OFFSET_METADATA_TOO_LARGE;
            })));
        }
        TimestampType timestampType = TimestampType.CREATE_TIME;
        long milliseconds = this.time.milliseconds();
        List list = (List) map2.entrySet().stream().map(entry6 -> {
            return new SimpleRecord(milliseconds, GroupMetadataConstants.offsetCommitKey(groupMetadata.groupId(), (TopicPartition) entry6.getKey()), GroupMetadataConstants.offsetCommitValue((OffsetAndMetadata) entry6.getValue()));
        }).collect(Collectors.toList());
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes((byte) 2, this.compressionType, list)), (byte) 2, this.compressionType, timestampType, 0L, milliseconds, j, s, 0, z, -1);
        builder.getClass();
        list.forEach(builder::append);
        MemoryRecords build = builder.build();
        if (z) {
            groupMetadata.inLock(() -> {
                addProducerGroup(j, groupMetadata.groupId());
                groupMetadata.prepareTxnOffsetCommit(j, map);
                return null;
            });
        } else {
            groupMetadata.inLock(() -> {
                groupMetadata.prepareOffsetCommit(map);
                return null;
            });
        }
        return storeOffsetMessage(groupMetadata.groupId(), GroupMetadataConstants.offsetCommitKey(groupMetadata.groupId(), new TopicPartition("", -1)), build.buffer(), milliseconds).thenApplyAsync(messageId -> {
            if (!groupMetadata.is(GroupState.Dead)) {
                MessageIdImpl messageIdImpl = (MessageIdImpl) messageId;
                long mockOffset = MessageIdUtils.getMockOffset(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId());
                map2.forEach((topicPartition, offsetAndMetadata) -> {
                    GroupMetadata.CommitRecordMetadataAndOffset commitRecordMetadataAndOffset = new GroupMetadata.CommitRecordMetadataAndOffset(Optional.of(Long.valueOf(mockOffset)), offsetAndMetadata);
                    if (z) {
                        groupMetadata.onTxnOffsetCommitAppend(j, topicPartition, commitRecordMetadataAndOffset);
                    } else {
                        groupMetadata.onOffsetCommitAppend(topicPartition, commitRecordMetadataAndOffset);
                    }
                });
            }
            return Errors.NONE;
        }, (Executor) this.scheduler).exceptionally((Function<Throwable, ? extends U>) th -> {
            if (!groupMetadata.is(GroupState.Dead)) {
                if (!groupMetadata.hasPendingOffsetCommitsFromProducer(j)) {
                    removeProducerGroup(j, groupMetadata.groupId());
                }
                map2.forEach((topicPartition, offsetAndMetadata) -> {
                    if (z) {
                        groupMetadata.failPendingTxnOffsetCommit(j, topicPartition);
                    } else {
                        groupMetadata.failPendingOffsetWrite(topicPartition, offsetAndMetadata);
                    }
                });
            }
            if (log.isDebugEnabled()) {
                log.debug("Offset commit {} from group {}, consumer {} with generation {} failed when appending to log due to ", map2, groupMetadata.groupId(), str, Integer.valueOf(groupMetadata.generationId()), th);
            }
            return Errors.UNKNOWN_SERVER_ERROR;
        }).thenApplyAsync(errors -> {
            return (Map) map.entrySet().stream().collect(Collectors.toMap(entry7 -> {
                return (TopicPartition) entry7.getKey();
            }, entry8 -> {
                return validateOffsetMetadataLength(((OffsetAndMetadata) entry8.getValue()).metadata()) ? errors : Errors.OFFSET_METADATA_TOO_LARGE;
            }));
        }, (Executor) this.scheduler);
    }

    public Map<TopicPartition, OffsetFetchResponse.PartitionData> getOffsets(String str, Optional<List<TopicPartition>> optional) {
        if (log.isTraceEnabled()) {
            log.trace("Getting offsets of {} for group {}.", optional.map((v0) -> {
                return v0.toString();
            }).orElse("all partitions"), str);
        }
        GroupMetadata groupMetadata = this.groupMetadataCache.get(str);
        return null == groupMetadata ? (Map) optional.orElse(Collections.emptyList()).stream().collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            return new OffsetFetchResponse.PartitionData(-1L, "", Errors.NONE);
        })) : (Map) groupMetadata.inLock(() -> {
            return groupMetadata.is(GroupState.Dead) ? (Map) ((List) optional.orElse(Collections.emptyList())).stream().collect(Collectors.toMap(topicPartition3 -> {
                return topicPartition3;
            }, topicPartition4 -> {
                return new OffsetFetchResponse.PartitionData(-1L, "", Errors.NONE);
            })) : (Map) optional.map(list -> {
                return (Map) list.stream().collect(Collectors.toMap(topicPartition5 -> {
                    return topicPartition5;
                }, topicPartition6 -> {
                    return (OffsetFetchResponse.PartitionData) groupMetadata.offset(topicPartition6).map(offsetAndMetadata -> {
                        return new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset(), offsetAndMetadata.metadata(), Errors.NONE);
                    }).orElseGet(() -> {
                        return new OffsetFetchResponse.PartitionData(-1L, "", Errors.NONE);
                    });
                }));
            }).orElseGet(() -> {
                return (Map) groupMetadata.allOffsets().entrySet().stream().collect(Collectors.toMap(entry -> {
                    return (TopicPartition) entry.getKey();
                }, entry2 -> {
                    OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) entry2.getValue();
                    return new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset(), offsetAndMetadata.metadata(), Errors.NONE);
                }));
            });
        });
    }

    private void addProducerGroup(long j, String str) {
        synchronized (this.openGroupsForProducer) {
            this.openGroupsForProducer.computeIfAbsent(Long.valueOf(j), l -> {
                return new HashSet();
            }).add(str);
        }
    }

    private void removeProducerGroup(long j, String str) {
        synchronized (this.openGroupsForProducer) {
            Set<String> set = this.openGroupsForProducer.get(Long.valueOf(j));
            if (null != set) {
                set.remove(str);
                if (set.isEmpty()) {
                    this.openGroupsForProducer.remove(Long.valueOf(j));
                }
            }
            this.openGroupsForProducer.computeIfAbsent(Long.valueOf(j), l -> {
                return new HashSet();
            }).remove(Long.valueOf(j));
        }
    }

    private Set<String> groupsBelongingToPartitions(long j, Set<Integer> set) {
        Set<String> set2;
        synchronized (this.openGroupsForProducer) {
            set2 = (Set) this.openGroupsForProducer.computeIfAbsent(Long.valueOf(j), l -> {
                return new HashSet();
            }).stream().filter(str -> {
                return set.contains(Integer.valueOf(partitionFor(str)));
            }).collect(Collectors.toSet());
        }
        return set2;
    }

    private void removeGroupFromAllProducers(String str) {
        synchronized (this.openGroupsForProducer) {
            this.openGroupsForProducer.forEach((l, set) -> {
                set.remove(str);
            });
        }
    }

    private boolean validateOffsetMetadataLength(String str) {
        return str == null || str.length() <= this.offsetConfig.maxMetadataSize();
    }

    public CompletableFuture<Void> scheduleLoadGroupAndOffsets(int i, Consumer<GroupMetadata> consumer) {
        String str = this.offsetConfig.offsetsTopicName() + "-partition-" + i;
        if (!addLoadingPartition(i)) {
            log.info("Already loading offsets and group metadata from {}", str);
            return CompletableFuture.completedFuture(null);
        }
        log.info("Scheduling loading of offsets and group metadata from {}", str);
        long milliseconds = this.time.milliseconds();
        return getOffsetsTopicProducer(i).thenComposeAsync(producer -> {
            return producer.newMessage().value(ByteBuffer.allocate(0)).eventTime(this.time.milliseconds()).sendAsync();
        }, (Executor) this.scheduler).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) messageId -> {
            if (log.isTraceEnabled()) {
                log.trace("Successfully write a placeholder record into {} @ {}", str, messageId);
            }
            return doLoadGroupsAndOffsets(getOffsetsTopicReader(i), messageId, consumer);
        }, (Executor) this.scheduler).whenCompleteAsync((r13, th) -> {
            if (null != th) {
                log.error("Error loading offsets from {}", str, th);
                removeLoadingPartition(i);
            } else {
                log.info("Finished loading offsets and group metadata from {} in {} milliseconds", str, Long.valueOf(this.time.milliseconds() - milliseconds));
                CoreUtils.inLock(this.partitionLock, () -> {
                    this.ownedPartitions.add(Integer.valueOf(i));
                    this.loadingPartitions.remove(Integer.valueOf(i));
                    return null;
                });
            }
        }, (Executor) this.scheduler);
    }

    private CompletableFuture<Void> doLoadGroupsAndOffsets(CompletableFuture<Reader<ByteBuffer>> completableFuture, MessageId messageId, Consumer<GroupMetadata> consumer) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        HashSet hashSet = new HashSet();
        CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
        loadNextMetadataMessage(completableFuture, messageId, completableFuture2, consumer, hashMap, hashMap2, hashMap3, hashSet);
        return completableFuture2;
    }

    private void loadNextMetadataMessage(CompletableFuture<Reader<ByteBuffer>> completableFuture, MessageId messageId, CompletableFuture<Void> completableFuture2, Consumer<GroupMetadata> consumer, Map<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset> map, Map<Long, Map<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> map2, Map<String, GroupMetadata> map3, Set<String> set) {
        try {
            unsafeLoadNextMetadataMessage(completableFuture, messageId, completableFuture2, consumer, map, map2, map3, set);
        } catch (Throwable th) {
            log.error("Unknown exception caught when loading group and offsets from topic", th);
            completableFuture2.completeExceptionally(th);
        }
    }

    private void unsafeLoadNextMetadataMessage(CompletableFuture<Reader<ByteBuffer>> completableFuture, MessageId messageId, CompletableFuture<Void> completableFuture2, Consumer<GroupMetadata> consumer, Map<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset> map, Map<Long, Map<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> map2, Map<String, GroupMetadata> map3, Set<String> set) {
        if (this.shuttingDown.get()) {
            completableFuture2.completeExceptionally(new Exception("Group metadata manager is shutting down"));
            return;
        }
        if (log.isTraceEnabled()) {
            log.trace("Reading the next metadata message from topic {}", completableFuture.join().getTopic());
        }
        BiConsumer biConsumer = (message, th) -> {
            if (log.isTraceEnabled()) {
                log.trace("Metadata consumer received a metadata message from {} @ {}", ((Reader) completableFuture.join()).getTopic(), message.getMessageId());
            }
            if (null != th) {
                completableFuture2.completeExceptionally(th);
                return;
            }
            if (message.getMessageId().compareTo(messageId) >= 0) {
                processLoadedAndRemovedGroups(completableFuture2, consumer, map, map2, map3, set);
            } else if (!message.hasKey()) {
                loadNextMetadataMessage(completableFuture, messageId, completableFuture2, consumer, map, map2, map3, set);
            } else {
                MemoryRecords.readableRecords((ByteBuffer) message.getValue()).batches().forEach(mutableRecordBatch -> {
                    boolean isTransactional = mutableRecordBatch.isTransactional();
                    if (mutableRecordBatch.isControlBatch()) {
                        Iterator it = mutableRecordBatch.iterator();
                        if (it.hasNext()) {
                            if (ControlRecordType.parse(((Record) it.next()).key()) == ControlRecordType.COMMIT) {
                                ((Map) map2.getOrDefault(Long.valueOf(mutableRecordBatch.producerId()), Collections.emptyMap())).forEach((groupTopicPartition, commitRecordMetadataAndOffset) -> {
                                    if (!map.containsKey(groupTopicPartition) || ((GroupMetadata.CommitRecordMetadataAndOffset) map.get(groupTopicPartition)).olderThan(commitRecordMetadataAndOffset)) {
                                        map.put(groupTopicPartition, commitRecordMetadataAndOffset);
                                    }
                                });
                            }
                            map2.remove(Long.valueOf(mutableRecordBatch.producerId()));
                            return;
                        }
                        return;
                    }
                    Optional empty = Optional.empty();
                    Iterator it2 = mutableRecordBatch.iterator();
                    while (it2.hasNext()) {
                        Record record = (Record) it2.next();
                        Preconditions.checkArgument(record.hasKey(), "Group metadata/offset entry key should not be null");
                        if (!empty.isPresent()) {
                            empty = Optional.of(Long.valueOf(record.offset()));
                        }
                        BaseKey readMessageKey = GroupMetadataConstants.readMessageKey(record.key());
                        if (log.isTraceEnabled()) {
                            log.trace("Applying metadata record {} received from {}", readMessageKey, ((Reader) completableFuture.join()).getTopic());
                        }
                        if (readMessageKey instanceof OffsetKey) {
                            OffsetKey offsetKey = (OffsetKey) readMessageKey;
                            if (isTransactional && !map2.containsKey(Long.valueOf(mutableRecordBatch.producerId()))) {
                                map2.put(Long.valueOf(mutableRecordBatch.producerId()), new HashMap());
                            }
                            GroupTopicPartition key = offsetKey.key();
                            if (record.hasValue()) {
                                GroupMetadata.CommitRecordMetadataAndOffset commitRecordMetadataAndOffset2 = new GroupMetadata.CommitRecordMetadataAndOffset(empty, GroupMetadataConstants.readOffsetMessageValue(record.value()));
                                if (isTransactional) {
                                    ((Map) map2.get(Long.valueOf(mutableRecordBatch.producerId()))).put(key, commitRecordMetadataAndOffset2);
                                } else {
                                    map.put(key, commitRecordMetadataAndOffset2);
                                }
                            } else if (isTransactional) {
                                ((Map) map2.get(Long.valueOf(mutableRecordBatch.producerId()))).remove(key);
                            } else {
                                map.remove(key);
                            }
                        } else {
                            if (!(readMessageKey instanceof GroupMetadataKey)) {
                                completableFuture2.completeExceptionally(new IllegalStateException("Unexpected message key " + readMessageKey + " while loading offsets and group metadata"));
                                return;
                            }
                            String key2 = ((GroupMetadataKey) readMessageKey).key();
                            GroupMetadata readGroupMessageValue = GroupMetadataConstants.readGroupMessageValue(key2, record.value());
                            if (readGroupMessageValue != null) {
                                set.remove(key2);
                                map3.put(key2, readGroupMessageValue);
                            } else {
                                map3.remove(key2);
                                set.add(key2);
                            }
                        }
                    }
                });
                loadNextMetadataMessage(completableFuture, messageId, completableFuture2, consumer, map, map2, map3, set);
            }
        };
        completableFuture.thenComposeAsync(reader -> {
            return reader.readNextAsync();
        }).whenCompleteAsync((BiConsumer<? super U, ? super Throwable>) (message2, th2) -> {
            try {
                biConsumer.accept(message2, th2);
            } catch (Throwable th2) {
                log.error("Unknown exception caught when processing the received metadata message from topic {}", ((Reader) completableFuture.join()).getTopic(), th2);
                completableFuture2.completeExceptionally(th2);
            }
        }, (Executor) this.scheduler);
    }

    private void processLoadedAndRemovedGroups(CompletableFuture<Void> completableFuture, Consumer<GroupMetadata> consumer, Map<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset> map, Map<Long, Map<GroupTopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> map2, Map<String, GroupMetadata> map3, Set<String> set) {
        if (log.isTraceEnabled()) {
            log.trace("Completing loading : {} loaded groups, {} removed groups, {} loaded offsets, {} pending offsets", Integer.valueOf(map3.size()), Integer.valueOf(set.size()), Integer.valueOf(map.size()), Integer.valueOf(map2.size()));
        }
        try {
            Map partition = CoreUtils.partition((Map) map.entrySet().stream().collect(Collectors.groupingBy(entry -> {
                return ((GroupTopicPartition) entry.getKey()).group();
            }, Collectors.toMap(entry2 -> {
                return ((GroupTopicPartition) entry2.getKey()).topicPartition();
            }, entry3 -> {
                return (GroupMetadata.CommitRecordMetadataAndOffset) entry3.getValue();
            }))), str -> {
                return map3.containsKey(str);
            });
            Map map4 = (Map) partition.get(true);
            Map map5 = (Map) partition.get(false);
            HashMap hashMap = new HashMap();
            map2.forEach((l, map6) -> {
                map6.keySet().stream().map((v0) -> {
                    return v0.group();
                }).forEach(str2 -> {
                    addProducerGroup(l.longValue(), str2);
                });
                ((Map) map6.entrySet().stream().collect(Collectors.groupingBy(entry4 -> {
                    return ((GroupTopicPartition) entry4.getKey()).group;
                }, Collectors.toMap(entry5 -> {
                    return ((GroupTopicPartition) entry5.getKey()).topicPartition();
                }, entry6 -> {
                    return (GroupMetadata.CommitRecordMetadataAndOffset) entry6.getValue();
                })))).forEach((str3, map6) -> {
                    ((Map) ((Map) hashMap.computeIfAbsent(str3, str3 -> {
                        return new HashMap();
                    })).computeIfAbsent(l, l -> {
                        return new HashMap();
                    })).putAll(map6);
                });
            });
            Map partition2 = CoreUtils.partition(hashMap, str2 -> {
                return map3.containsKey(str2);
            });
            Map map7 = (Map) partition2.get(true);
            Map map8 = (Map) partition2.get(false);
            map3.values().forEach(groupMetadata -> {
                Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset> map9 = (Map) map4.getOrDefault(groupMetadata.groupId(), Collections.emptyMap());
                Map<Long, Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> map10 = (Map) map7.getOrDefault(groupMetadata.groupId(), Collections.emptyMap());
                if (log.isDebugEnabled()) {
                    log.debug("Loaded group metadata {} with offsets {} and pending offsets {}", groupMetadata, map9, map10);
                }
                loadGroup(groupMetadata, map9, map10);
                consumer.accept(groupMetadata);
            });
            Sets.union(map5.keySet(), map8.keySet()).forEach(str3 -> {
                GroupMetadata groupMetadata2 = new GroupMetadata(str3, GroupState.Empty);
                Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset> map9 = (Map) map5.getOrDefault(str3, Collections.emptyMap());
                Map<Long, Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> map10 = (Map) map8.getOrDefault(str3, Collections.emptyMap());
                if (log.isDebugEnabled()) {
                    log.debug("Loaded group metadata {} with offsets {} and pending offsets {}", groupMetadata2, map9, map10);
                }
                loadGroup(groupMetadata2, map9, map10);
                consumer.accept(groupMetadata2);
            });
            set.forEach(str4 -> {
                if (this.groupMetadataCache.containsKey(str4) && !map5.containsKey(str4)) {
                    throw new IllegalStateException("Unexpected unload of active group " + str4 + " while loading partition");
                }
            });
            completableFuture.complete(null);
        } catch (RuntimeException e) {
            completableFuture.completeExceptionally(e);
        }
    }

    private void loadGroup(GroupMetadata groupMetadata, Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset> map, Map<Long, Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset>> map2) {
        Map<TopicPartition, GroupMetadata.CommitRecordMetadataAndOffset> mapValue = CoreUtils.mapValue(map, commitRecordMetadataAndOffset -> {
            OffsetAndMetadata offsetAndMetadata;
            OffsetAndMetadata offsetAndMetadata2 = commitRecordMetadataAndOffset.offsetAndMetadata();
            if (offsetAndMetadata2.expireTimestamp() == -1) {
                offsetAndMetadata = OffsetAndMetadata.apply(offsetAndMetadata2.offset(), offsetAndMetadata2.metadata(), offsetAndMetadata2.commitTimestamp(), offsetAndMetadata2.commitTimestamp() + this.offsetConfig.offsetsRetentionMs());
            } else {
                offsetAndMetadata = offsetAndMetadata2;
            }
            return new GroupMetadata.CommitRecordMetadataAndOffset(commitRecordMetadataAndOffset.appendedBatchOffset(), offsetAndMetadata);
        });
        if (log.isTraceEnabled()) {
            log.trace("Initialized offsets {} from group {}", mapValue, groupMetadata.groupId());
        }
        groupMetadata.initializeOffsets(mapValue, map2);
        GroupMetadata addGroup = addGroup(groupMetadata);
        if (groupMetadata != addGroup) {
            log.debug("Attempt to load group {} from log with generation {} failed because there is already a cached group with generation {}", groupMetadata.groupId(), Integer.valueOf(groupMetadata.generationId()), Integer.valueOf(addGroup.generationId()));
        }
    }

    public void removeGroupsForPartition(int i, Consumer<GroupMetadata> consumer) {
        TopicPartition topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, i);
        log.info("Scheduling unloading of offsets and group metadata from {}", topicPartition);
        this.scheduler.submit(() -> {
            AtomicInteger atomicInteger = new AtomicInteger();
            AtomicInteger atomicInteger2 = new AtomicInteger();
            CoreUtils.inLock(this.partitionLock, () -> {
                this.ownedPartitions.remove(Integer.valueOf(i));
                for (GroupMetadata groupMetadata : this.groupMetadataCache.values()) {
                    if (partitionFor(groupMetadata.groupId()) == i) {
                        consumer.accept(groupMetadata);
                        this.groupMetadataCache.remove(groupMetadata.groupId(), groupMetadata);
                        removeGroupFromAllProducers(groupMetadata.groupId());
                        atomicInteger2.incrementAndGet();
                        atomicInteger.addAndGet(groupMetadata.numOffsets());
                    }
                }
                CompletableFuture<Producer<ByteBuffer>> remove = this.offsetsProducers.remove(Integer.valueOf(i));
                CompletableFuture<Reader<ByteBuffer>> remove2 = this.offsetsReaders.remove(Integer.valueOf(i));
                if (remove != null) {
                    remove.thenApplyAsync(producer -> {
                        return producer.closeAsync();
                    }).whenCompleteAsync((BiConsumer<? super U, ? super Throwable>) (completableFuture, th) -> {
                        if (th != null) {
                            log.error("Failed to close producer when remove partition {}.", ((Producer) remove.join()).getTopic());
                        }
                    }, (Executor) this.scheduler);
                }
                if (remove2 == null) {
                    return null;
                }
                remove2.thenApplyAsync(reader -> {
                    return reader.closeAsync();
                }).whenCompleteAsync((BiConsumer<? super U, ? super Throwable>) (completableFuture2, th2) -> {
                    if (th2 != null) {
                        log.error("Failed to close reader when remove partition {}.", ((Reader) remove2.join()).getTopic());
                    }
                }, (Executor) this.scheduler);
                return null;
            });
            log.info("Finished unloading {}. Removed {} cached offsets and {} cached groups.", topicPartition, atomicInteger, atomicInteger2);
        });
    }

    CompletableFuture<Void> cleanupGroupMetadata() {
        long milliseconds = this.time.milliseconds();
        return cleanGroupMetadata(this.groupMetadataCache.values().stream(), groupMetadata -> {
            return groupMetadata.removeExpiredOffsets(this.time.milliseconds());
        }).thenAcceptAsync(num -> {
            log.info("Removed {} expired offsets in {} milliseconds.", num, Long.valueOf(this.time.milliseconds() - milliseconds));
        }, (Executor) this.scheduler);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Integer> cleanGroupMetadata(Stream<GroupMetadata> stream, Function<GroupMetadata, Map<TopicPartition, OffsetAndMetadata>> function) {
        return FutureUtils.collect((List) stream.map(groupMetadata -> {
            String groupId = groupMetadata.groupId();
            Triple triple = (Triple) groupMetadata.inLock(() -> {
                Map synchronizedMap = Collections.synchronizedMap((Map) function.apply(groupMetadata));
                if (groupMetadata.is(GroupState.Empty) && !groupMetadata.hasOffsets()) {
                    log.info("Group {} transitioned to Dead in generation {}", groupId, Integer.valueOf(groupMetadata.generationId()));
                    groupMetadata.transitionTo(GroupState.Dead);
                }
                return Triple.of(synchronizedMap, Boolean.valueOf(groupMetadata.is(GroupState.Dead)), Integer.valueOf(groupMetadata.generationId()));
            });
            Map map = (Map) triple.getLeft();
            boolean booleanValue = ((Boolean) triple.getMiddle()).booleanValue();
            int intValue = ((Integer) triple.getRight()).intValue();
            TimestampType timestampType = TimestampType.CREATE_TIME;
            long milliseconds = this.time.milliseconds();
            ArrayList arrayList = new ArrayList();
            map.forEach((topicPartition, offsetAndMetadata) -> {
                arrayList.add(new SimpleRecord(milliseconds, GroupMetadataConstants.offsetCommitKey(groupId, topicPartition), (byte[]) null));
            });
            if (booleanValue && this.groupMetadataCache.remove(groupId, groupMetadata) && intValue > 0) {
                arrayList.add(new SimpleRecord(milliseconds, GroupMetadataConstants.groupMetadataKey(groupMetadata.groupId()), (byte[]) null));
            }
            if (arrayList.isEmpty()) {
                return CompletableFuture.completedFuture(0);
            }
            MemoryRecords withRecords = MemoryRecords.withRecords((byte) 2, 0L, this.compressionType, timestampType, (SimpleRecord[]) arrayList.toArray(new SimpleRecord[arrayList.size()]));
            byte[] groupMetadataKey = GroupMetadataConstants.groupMetadataKey(groupMetadata.groupId());
            return getOffsetsTopicProducer(groupMetadata.groupId()).thenComposeAsync(producer -> {
                return producer.newMessage().keyBytes(groupMetadataKey).value(withRecords.buffer()).eventTime(milliseconds).sendAsync();
            }, (Executor) this.scheduler).thenApplyAsync((Function<? super U, ? extends U>) messageId -> {
                return Integer.valueOf(map.size());
            }, (Executor) this.scheduler).exceptionally(th -> {
                log.error("Failed to append {} tombstones to topic {} for expired/deleted offsets and/or metadata for group {}", Integer.valueOf(arrayList.size()), this.offsetConfig.offsetsTopicName() + '-' + this.partitioner.apply(groupMetadata.groupId()), groupMetadata.groupId(), th);
                return 0;
            });
        }).collect(Collectors.toList())).thenApplyAsync(list -> {
            return Integer.valueOf(list.stream().mapToInt((v0) -> {
                return v0.intValue();
            }).sum());
        }, (Executor) this.scheduler);
    }

    public CompletableFuture<Void> scheduleHandleTxnCompletion(long j, Set<Integer> set, boolean z) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.scheduler.submit(() -> {
            handleTxnCompletion(j, set, z, completableFuture);
        });
        return completableFuture;
    }

    protected void handleTxnCompletion(long j, Set<Integer> set, boolean z, CompletableFuture<Void> completableFuture) {
        ArrayList arrayList = new ArrayList();
        groupsBelongingToPartitions(j, set).forEach(str -> {
            CompletableFuture completableFuture2 = new CompletableFuture();
            arrayList.add(completableFuture2);
            getGroup(str).map(groupMetadata -> {
                return groupMetadata.inLock(() -> {
                    if (!groupMetadata.is(GroupState.Dead)) {
                        groupMetadata.completePendingTxnOffsetCommit(j, z);
                        removeProducerGroup(j, str);
                    }
                    completableFuture2.complete(null);
                    return null;
                });
            }).orElseGet(() -> {
                log.info("Group {} has moved away from this coordinator after transaction marker was written but before the cache was updated. The cache on the new group owner will be updated instead.", str);
                completableFuture2.complete(null);
                return null;
            });
        });
        FutureUtil.waitForAll(arrayList).whenComplete((r4, th) -> {
            if (th == null) {
                completableFuture.complete(null);
            } else {
                log.error("Failed to handle txn completion.");
                completableFuture.completeExceptionally(th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addPartitionOwnership(int i) {
        CoreUtils.inLock(this.partitionLock, () -> {
            this.ownedPartitions.add(Integer.valueOf(i));
            return null;
        });
    }

    boolean addLoadingPartition(int i) {
        return ((Boolean) CoreUtils.inLock(this.partitionLock, () -> {
            return Boolean.valueOf(this.loadingPartitions.add(Integer.valueOf(i)));
        })).booleanValue();
    }

    boolean removeLoadingPartition(int i) {
        return ((Boolean) CoreUtils.inLock(this.partitionLock, () -> {
            return Boolean.valueOf(this.loadingPartitions.remove(Integer.valueOf(i)));
        })).booleanValue();
    }

    CompletableFuture<Producer<ByteBuffer>> getOffsetsTopicProducer(String str) {
        return this.offsetsProducers.computeIfAbsent(Integer.valueOf(partitionFor(str)), num -> {
            if (log.isDebugEnabled()) {
                log.debug("Created Partitioned producer: {} for consumer group: {}", this.offsetConfig.offsetsTopicName() + "-partition-" + num, str);
            }
            return this.metadataTopicProducerBuilder.clone().topic(this.offsetConfig.offsetsTopicName() + "-partition-" + num).createAsync();
        });
    }

    CompletableFuture<Producer<ByteBuffer>> getOffsetsTopicProducer(int i) {
        return this.offsetsProducers.computeIfAbsent(Integer.valueOf(i), num -> {
            if (log.isDebugEnabled()) {
                log.debug("Will create Partitioned producer: {}", this.offsetConfig.offsetsTopicName() + "-partition-" + num);
            }
            return this.metadataTopicProducerBuilder.clone().topic(this.offsetConfig.offsetsTopicName() + "-partition-" + num).createAsync();
        });
    }

    CompletableFuture<Reader<ByteBuffer>> getOffsetsTopicReader(int i) {
        return this.offsetsReaders.computeIfAbsent(Integer.valueOf(i), num -> {
            if (log.isDebugEnabled()) {
                log.debug("Will create Partitioned reader: {}", this.offsetConfig.offsetsTopicName() + "-partition-" + num);
            }
            return this.metadataTopicReaderBuilder.clone().topic(this.offsetConfig.offsetsTopicName() + "-partition-" + i).readCompacted(true).createAsync();
        });
    }

    public OffsetConfig getOffsetConfig() {
        return this.offsetConfig;
    }
}
