package io.streamnative.pulsar.handlers.kop.coordinator.group;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import io.streamnative.pulsar.handlers.kop.utils.timer.Timer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.ReaderBuilderImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.class */
public class GroupCoordinator {
    static final String NoState = "";
    static final String NoProtocolType = "";
    static final String NoProtocol = "";
    static final String NoLeader = "";
    static final int NoGeneration = -1;
    static final String NoMemberId = "";
    private final AtomicBoolean isActive = new AtomicBoolean(false);
    private final GroupConfig groupConfig;
    private final GroupMetadataManager groupManager;
    private final DelayedOperationPurgatory<DelayedHeartbeat> heartbeatPurgatory;
    private final DelayedOperationPurgatory<DelayedJoin> joinPurgatory;
    private final Time time;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) GroupCoordinator.class);
    static final GroupMetadata.GroupSummary EmptyGroup = new GroupMetadata.GroupSummary("", "", "", Collections.emptyList());
    static final GroupMetadata.GroupSummary DeadGroup = new GroupMetadata.GroupSummary(GroupState.Dead.toString(), "", "", Collections.emptyList());

    public static GroupCoordinator of(PulsarClientImpl pulsarClientImpl, GroupConfig groupConfig, OffsetConfig offsetConfig, Timer timer, Time time) {
        ScheduledExecutorService build = OrderedScheduler.newSchedulerBuilder().name("group-coordinator-executor").build();
        ProducerBuilder maxPendingMessages = pulsarClientImpl.newProducer(Schema.BYTEBUFFER).maxPendingMessages(100000);
        ReaderBuilderImpl readerBuilderImpl = new ReaderBuilderImpl(pulsarClientImpl, Schema.BYTEBUFFER);
        readerBuilderImpl.startMessageId(MessageId.earliest);
        return new GroupCoordinator(groupConfig, new GroupMetadataManager(offsetConfig, maxPendingMessages, readerBuilderImpl, build, time), DelayedOperationPurgatory.builder().purgatoryName("group-coordinator-delayed-heartbeat").timeoutTimer(timer).build(), DelayedOperationPurgatory.builder().purgatoryName("group-coordinator-delayed-join").timeoutTimer(timer).build(), time);
    }

    public GroupCoordinator(GroupConfig groupConfig, GroupMetadataManager groupMetadataManager, DelayedOperationPurgatory<DelayedHeartbeat> delayedOperationPurgatory, DelayedOperationPurgatory<DelayedJoin> delayedOperationPurgatory2, Time time) {
        this.groupConfig = groupConfig;
        this.groupManager = groupMetadataManager;
        this.heartbeatPurgatory = delayedOperationPurgatory;
        this.joinPurgatory = delayedOperationPurgatory2;
        this.time = time;
    }

    public void startup(boolean z) {
        log.info("Starting up group coordinator.");
        this.groupManager.startup(z);
        this.isActive.set(true);
        log.info("Group coordinator started.");
    }

    public void shutdown() {
        log.info("Shutting down group coordinator ...");
        this.isActive.set(false);
        this.groupManager.shutdown();
        this.heartbeatPurgatory.shutdown();
        this.joinPurgatory.shutdown();
        log.info("Shutdown group coordinator completely.");
    }

    public int partitionFor(String str) {
        return this.groupManager.partitionFor(str);
    }

    public String getTopicPartitionName(int i) {
        return this.groupManager.getTopicPartitionName(i);
    }

    public ConcurrentMap<Integer, CompletableFuture<Producer<ByteBuffer>>> getOffsetsProducers() {
        return this.groupManager.getOffsetsProducers();
    }

    public ConcurrentMap<Integer, CompletableFuture<Reader<ByteBuffer>>> getOffsetsReaders() {
        return this.groupManager.getOffsetsReaders();
    }

    public GroupMetadataManager getGroupManager() {
        return this.groupManager;
    }

    public GroupConfig groupConfig() {
        return this.groupConfig;
    }

    public OffsetConfig offsetConfig() {
        return this.groupManager.offsetConfig();
    }

    public CompletableFuture<JoinGroupResult> handleJoinGroup(String str, String str2, String str3, String str4, int i, int i2, String str5, Map<String, byte[]> map) {
        Optional<Errors> validateGroupStatus = validateGroupStatus(str, ApiKeys.JOIN_GROUP);
        return validateGroupStatus.isPresent() ? CompletableFuture.completedFuture(joinError(str2, validateGroupStatus.get())) : (i2 < this.groupConfig.groupMinSessionTimeoutMs() || i2 > this.groupConfig.groupMaxSessionTimeoutMs()) ? CompletableFuture.completedFuture(joinError(str2, Errors.INVALID_SESSION_TIMEOUT)) : (CompletableFuture) this.groupManager.getGroup(str).map(groupMetadata -> {
            return doJoinGroup(groupMetadata, str2, str3, str4, i, i2, str5, map);
        }).orElseGet(() -> {
            return !"".equals(str2) ? CompletableFuture.completedFuture(joinError(str2, Errors.UNKNOWN_MEMBER_ID)) : doJoinGroup(this.groupManager.addGroup(new GroupMetadata(str, GroupState.Empty)), str2, str3, str4, i, i2, str5, map);
        });
    }

    private CompletableFuture<JoinGroupResult> doJoinGroup(GroupMetadata groupMetadata, String str, String str2, String str3, int i, int i2, String str4, Map<String, byte[]> map) {
        return (CompletableFuture) groupMetadata.inLock(() -> {
            return unsafeJoinGroup(groupMetadata, str, str2, str3, i, i2, str4, map);
        });
    }

    private CompletableFuture<JoinGroupResult> unsafeJoinGroup(GroupMetadata groupMetadata, String str, String str2, String str3, int i, int i2, String str4, Map<String, byte[]> map) {
        CompletableFuture<JoinGroupResult> failedFuture;
        if (!groupMetadata.is(GroupState.Empty) && (!groupMetadata.protocolType().isPresent() || !Objects.equals(groupMetadata.protocolType().get(), str4) || !groupMetadata.supportsProtocols(map.keySet()))) {
            return CompletableFuture.completedFuture(joinError(str, Errors.INCONSISTENT_GROUP_PROTOCOL));
        }
        if (groupMetadata.is(GroupState.Empty) && (map.isEmpty() || str4.isEmpty())) {
            return CompletableFuture.completedFuture(joinError(str, Errors.INCONSISTENT_GROUP_PROTOCOL));
        }
        if (!"".equals(str) && !groupMetadata.has(str)) {
            return CompletableFuture.completedFuture(joinError(str, Errors.UNKNOWN_MEMBER_ID));
        }
        switch (groupMetadata.currentState()) {
            case Dead:
                failedFuture = CompletableFuture.completedFuture(joinError(str, Errors.UNKNOWN_MEMBER_ID));
                break;
            case PreparingRebalance:
                if (!"".equals(str)) {
                    failedFuture = updateMemberAndRebalance(groupMetadata, groupMetadata.get(str), map);
                    break;
                } else {
                    failedFuture = addMemberAndRebalance(i, i2, str2, str3, str4, map, groupMetadata);
                    break;
                }
            case CompletingRebalance:
                if (!"".equals(str)) {
                    MemberMetadata memberMetadata = groupMetadata.get(str);
                    if (!memberMetadata.matches(map)) {
                        failedFuture = updateMemberAndRebalance(groupMetadata, memberMetadata, map);
                        break;
                    } else {
                        failedFuture = CompletableFuture.completedFuture(new JoinGroupResult(groupMetadata.isLeader(str) ? groupMetadata.currentMemberMetadata() : Collections.emptyMap(), str, groupMetadata.generationId(), groupMetadata.protocolOrNull(), groupMetadata.leaderOrNull(), Errors.NONE));
                        break;
                    }
                } else {
                    failedFuture = addMemberAndRebalance(i, i2, str2, str3, str4, map, groupMetadata);
                    break;
                }
            case Empty:
            case Stable:
                if (!"".equals(str)) {
                    MemberMetadata memberMetadata2 = groupMetadata.get(str);
                    if (!groupMetadata.isLeader(str) && memberMetadata2.matches(map)) {
                        failedFuture = CompletableFuture.completedFuture(new JoinGroupResult(Collections.emptyMap(), str, groupMetadata.generationId(), groupMetadata.protocolOrNull(), groupMetadata.leaderOrNull(), Errors.NONE));
                        break;
                    } else {
                        failedFuture = updateMemberAndRebalance(groupMetadata, memberMetadata2, map);
                        break;
                    }
                } else {
                    failedFuture = addMemberAndRebalance(i, i2, str2, str3, str4, map, groupMetadata);
                    break;
                }
                break;
            default:
                failedFuture = FutureUtil.failedFuture(new IllegalStateException("Unknown state " + groupMetadata.currentState()));
                break;
        }
        if (groupMetadata.is(GroupState.PreparingRebalance)) {
            this.joinPurgatory.checkAndComplete(new DelayedOperationKey.GroupKey(groupMetadata.groupId()));
        }
        return failedFuture;
    }

    public CompletableFuture<KeyValue<Errors, byte[]>> handleSyncGroup(String str, int i, String str2, Map<String, byte[]> map) {
        CompletableFuture<KeyValue<Errors, byte[]>> completableFuture = new CompletableFuture<>();
        handleSyncGroup(str, i, str2, map, (bArr, errors) -> {
            completableFuture.complete(new KeyValue(errors, bArr));
        });
        return completableFuture;
    }

    public void handleSyncGroup(String str, int i, String str2, Map<String, byte[]> map, BiConsumer<byte[], Errors> biConsumer) {
        Optional<Errors> validateGroupStatus = validateGroupStatus(str, ApiKeys.SYNC_GROUP);
        if (validateGroupStatus.isPresent()) {
            Errors errors = validateGroupStatus.get();
            if (Errors.COORDINATOR_LOAD_IN_PROGRESS == errors) {
                biConsumer.accept(new byte[0], Errors.REBALANCE_IN_PROGRESS);
                return;
            } else {
                biConsumer.accept(new byte[0], errors);
                return;
            }
        }
        Optional<GroupMetadata> group = this.groupManager.getGroup(str);
        if (group.isPresent()) {
            doSyncGroup(group.get(), i, str2, map, biConsumer);
        } else {
            biConsumer.accept(new byte[0], Errors.UNKNOWN_MEMBER_ID);
        }
    }

    private void doSyncGroup(GroupMetadata groupMetadata, int i, String str, Map<String, byte[]> map, BiConsumer<byte[], Errors> biConsumer) {
        groupMetadata.inLock(() -> {
            if (!groupMetadata.has(str)) {
                biConsumer.accept(new byte[0], Errors.UNKNOWN_MEMBER_ID);
                return null;
            }
            if (i != groupMetadata.generationId()) {
                biConsumer.accept(new byte[0], Errors.ILLEGAL_GENERATION);
                return null;
            }
            switch (groupMetadata.currentState()) {
                case Dead:
                case Empty:
                    biConsumer.accept(new byte[0], Errors.UNKNOWN_MEMBER_ID);
                    return null;
                case PreparingRebalance:
                    biConsumer.accept(new byte[0], Errors.REBALANCE_IN_PROGRESS);
                    return null;
                case CompletingRebalance:
                    groupMetadata.get(str).awaitingSyncCallback(biConsumer);
                    if (!groupMetadata.isLeader(str)) {
                        return null;
                    }
                    log.info("Assignment received from leader for group {} for generation {}", groupMetadata.groupId(), Integer.valueOf(groupMetadata.generationId()));
                    Sets.SetView difference = Sets.difference(groupMetadata.allMembers(), map.keySet());
                    HashMap hashMap = new HashMap();
                    hashMap.putAll(map);
                    hashMap.putAll((Map) difference.stream().collect(Collectors.toMap(str2 -> {
                        return str2;
                    }, str3 -> {
                        return new byte[0];
                    })));
                    this.groupManager.storeGroup(groupMetadata, hashMap).thenApply(errors -> {
                        return groupMetadata.inLock(() -> {
                            if (!groupMetadata.is(GroupState.CompletingRebalance) || i != groupMetadata.generationId()) {
                                return null;
                            }
                            if (errors != Errors.NONE) {
                                resetAndPropagateAssignmentError(groupMetadata, errors);
                                maybePrepareRebalance(groupMetadata);
                                return null;
                            }
                            setAndPropagateAssignment(groupMetadata, hashMap);
                            groupMetadata.transitionTo(GroupState.Stable);
                            return null;
                        });
                    });
                    return null;
                case Stable:
                    biConsumer.accept(groupMetadata.get(str).assignment(), Errors.NONE);
                    completeAndScheduleNextHeartbeatExpiration(groupMetadata, groupMetadata.get(str));
                    return null;
                default:
                    throw new IllegalStateException("Should not reach here");
            }
        });
    }

    public CompletableFuture<Errors> handleLeaveGroup(String str, String str2) {
        return (CompletableFuture) validateGroupStatus(str, ApiKeys.LEAVE_GROUP).map(errors -> {
            return CompletableFuture.completedFuture(errors);
        }).orElseGet(() -> {
            return (CompletableFuture) this.groupManager.getGroup(str).map(groupMetadata -> {
                return (CompletableFuture) groupMetadata.inLock(() -> {
                    if (groupMetadata.is(GroupState.Dead) || !groupMetadata.has(str2)) {
                        return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
                    }
                    MemberMetadata memberMetadata = groupMetadata.get(str2);
                    removeHeartbeatForLeavingMember(groupMetadata, memberMetadata);
                    if (log.isDebugEnabled()) {
                        log.debug("Member {} in group {} has left, removing it from the group", memberMetadata.memberId(), groupMetadata.groupId());
                    }
                    removeMemberAndUpdateGroup(groupMetadata, memberMetadata);
                    return CompletableFuture.completedFuture(Errors.NONE);
                });
            }).orElseGet(() -> {
                return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
            });
        });
    }

    public Map<String, Errors> handleDeleteGroups(Set<String> set) {
        Map<String, Errors> synchronizedMap = Collections.synchronizedMap(new HashMap());
        ArrayList arrayList = new ArrayList();
        set.forEach(str -> {
            validateGroupStatus(str, ApiKeys.DELETE_GROUPS).map(errors -> {
                synchronizedMap.put(str, errors);
                return errors;
            }).orElseGet(() -> {
                return (Errors) this.groupManager.getGroup(str).map(groupMetadata -> {
                    return (Errors) groupMetadata.inLock(() -> {
                        switch (groupMetadata.currentState()) {
                            case Dead:
                                if (!this.groupManager.groupNotExists(str)) {
                                    synchronizedMap.put(str, Errors.NOT_COORDINATOR);
                                    break;
                                } else {
                                    synchronizedMap.put(str, Errors.GROUP_ID_NOT_FOUND);
                                    break;
                                }
                            case Empty:
                                groupMetadata.transitionTo(GroupState.Dead);
                                arrayList.add(groupMetadata);
                                break;
                            default:
                                synchronizedMap.put(str, Errors.NON_EMPTY_GROUP);
                                break;
                        }
                        return Errors.NONE;
                    });
                }).orElseGet(() -> {
                    synchronizedMap.put(str, this.groupManager.groupNotExists(str) ? Errors.GROUP_ID_NOT_FOUND : Errors.NOT_COORDINATOR);
                    return Errors.NONE;
                });
            });
        });
        if (!arrayList.isEmpty()) {
            this.groupManager.cleanGroupMetadata(arrayList.stream(), groupMetadata -> {
                return groupMetadata.removeAllOffsets();
            }).thenAccept(num -> {
                log.info("The following groups were deleted {}. A total of {} offsets were removed.", arrayList.stream().map((v0) -> {
                    return v0.groupId();
                }).collect(Collectors.joining(",")), num);
            });
            synchronizedMap.putAll((Map) arrayList.stream().collect(Collectors.toMap((v0) -> {
                return v0.groupId();
            }, groupMetadata2 -> {
                return Errors.NONE;
            })));
        }
        return synchronizedMap;
    }

    public CompletableFuture<Errors> handleHeartbeat(String str, String str2, int i) {
        return (CompletableFuture) validateGroupStatus(str, ApiKeys.HEARTBEAT).map(errors -> {
            return errors == Errors.COORDINATOR_LOAD_IN_PROGRESS ? CompletableFuture.completedFuture(Errors.NONE) : CompletableFuture.completedFuture(errors);
        }).orElseGet(() -> {
            return (CompletableFuture) this.groupManager.getGroup(str).map(groupMetadata -> {
                return (CompletableFuture) groupMetadata.inLock(() -> {
                    switch (groupMetadata.currentState()) {
                        case Dead:
                            return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
                        case PreparingRebalance:
                            if (!groupMetadata.has(str2)) {
                                return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
                            }
                            if (i != groupMetadata.generationId()) {
                                return CompletableFuture.completedFuture(Errors.ILLEGAL_GENERATION);
                            }
                            completeAndScheduleNextHeartbeatExpiration(groupMetadata, groupMetadata.get(str2));
                            return CompletableFuture.completedFuture(Errors.REBALANCE_IN_PROGRESS);
                        case CompletingRebalance:
                            return !groupMetadata.has(str2) ? CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID) : CompletableFuture.completedFuture(Errors.REBALANCE_IN_PROGRESS);
                        case Empty:
                            return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
                        case Stable:
                            if (!groupMetadata.has(str2)) {
                                return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
                            }
                            if (i != groupMetadata.generationId()) {
                                return CompletableFuture.completedFuture(Errors.ILLEGAL_GENERATION);
                            }
                            completeAndScheduleNextHeartbeatExpiration(groupMetadata, groupMetadata.get(str2));
                            return CompletableFuture.completedFuture(Errors.NONE);
                        default:
                            return CompletableFuture.completedFuture(Errors.NONE);
                    }
                });
            }).orElseGet(() -> {
                return CompletableFuture.completedFuture(Errors.UNKNOWN_MEMBER_ID);
            });
        });
    }

    public CompletableFuture<Map<TopicPartition, Errors>> handleTxnCommitOffsets(String str, long j, short s, Map<TopicPartition, OffsetAndMetadata> map) {
        return (CompletableFuture) validateGroupStatus(str, ApiKeys.TXN_OFFSET_COMMIT).map(errors -> {
            return CompletableFuture.completedFuture(CoreUtils.mapValue(map, offsetAndMetadata -> {
                return errors;
            }));
        }).orElseGet(() -> {
            return doCommitOffsets(this.groupManager.getGroup(str).orElseGet(() -> {
                return this.groupManager.addGroup(new GroupMetadata(str, GroupState.Empty));
            }), "", -1, j, s, map);
        });
    }

    public CompletableFuture<Map<TopicPartition, Errors>> handleCommitOffsets(String str, String str2, int i, Map<TopicPartition, OffsetAndMetadata> map) {
        return (CompletableFuture) validateGroupStatus(str, ApiKeys.OFFSET_COMMIT).map(errors -> {
            return CompletableFuture.completedFuture(CoreUtils.mapValue(map, offsetAndMetadata -> {
                return errors;
            }));
        }).orElseGet(() -> {
            return (CompletableFuture) this.groupManager.getGroup(str).map(groupMetadata -> {
                return doCommitOffsets(groupMetadata, str2, i, -1L, (short) -1, map);
            }).orElseGet(() -> {
                return i < 0 ? doCommitOffsets(this.groupManager.addGroup(new GroupMetadata(str, GroupState.Empty)), str2, i, -1L, (short) -1, map) : CompletableFuture.completedFuture(CoreUtils.mapValue(map, offsetAndMetadata -> {
                    return Errors.ILLEGAL_GENERATION;
                }));
            });
        });
    }

    public CompletableFuture<Void> scheduleHandleTxnCompletion(long j, Stream<TopicPartition> stream, TransactionResult transactionResult) {
        return this.groupManager.scheduleHandleTxnCompletion(j, (Set) stream.map((v0) -> {
            return v0.partition();
        }).collect(Collectors.toSet()), TransactionResult.COMMIT == transactionResult);
    }

    private CompletableFuture<Map<TopicPartition, Errors>> doCommitOffsets(GroupMetadata groupMetadata, String str, int i, long j, short s, Map<TopicPartition, OffsetAndMetadata> map) {
        return (CompletableFuture) groupMetadata.inLock(() -> {
            if (groupMetadata.is(GroupState.Dead)) {
                return CompletableFuture.completedFuture(CoreUtils.mapValue(map, offsetAndMetadata -> {
                    return Errors.UNKNOWN_MEMBER_ID;
                }));
            }
            if ((i < 0 && groupMetadata.is(GroupState.Empty)) || j != -1) {
                return this.groupManager.storeOffsets(groupMetadata, str, map, j, s);
            }
            if (groupMetadata.is(GroupState.CompletingRebalance)) {
                return CompletableFuture.completedFuture(CoreUtils.mapValue(map, offsetAndMetadata2 -> {
                    return Errors.REBALANCE_IN_PROGRESS;
                }));
            }
            if (!groupMetadata.has(str)) {
                return CompletableFuture.completedFuture(CoreUtils.mapValue(map, offsetAndMetadata3 -> {
                    return Errors.UNKNOWN_MEMBER_ID;
                }));
            }
            if (i != groupMetadata.generationId()) {
                return CompletableFuture.completedFuture(CoreUtils.mapValue(map, offsetAndMetadata4 -> {
                    return Errors.ILLEGAL_GENERATION;
                }));
            }
            completeAndScheduleNextHeartbeatExpiration(groupMetadata, groupMetadata.get(str));
            return this.groupManager.storeOffsets(groupMetadata, str, map);
        });
    }

    public KeyValue<Errors, Map<TopicPartition, OffsetFetchResponse.PartitionData>> handleFetchOffsets(String str, Optional<List<TopicPartition>> optional) {
        return (KeyValue) validateGroupStatus(str, ApiKeys.OFFSET_FETCH).map(errors -> {
            return new KeyValue(errors, new HashMap());
        }).orElseGet(() -> {
            return new KeyValue(Errors.NONE, this.groupManager.getOffsets(str, optional));
        });
    }

    public KeyValue<Errors, List<GroupMetadata.GroupOverview>> handleListGroups() {
        if (!this.isActive.get()) {
            return new KeyValue<>(Errors.COORDINATOR_NOT_AVAILABLE, new ArrayList());
        }
        Errors errors = this.groupManager.isLoading() ? Errors.COORDINATOR_LOAD_IN_PROGRESS : Errors.NONE;
        ArrayList arrayList = new ArrayList();
        this.groupManager.currentGroups().forEach(groupMetadata -> {
            arrayList.add(groupMetadata.overview());
        });
        return new KeyValue<>(errors, arrayList);
    }

    public KeyValue<Errors, GroupMetadata.GroupSummary> handleDescribeGroup(String str) {
        return (KeyValue) validateGroupStatus(str, ApiKeys.DESCRIBE_GROUPS).map(errors -> {
            return new KeyValue(errors, EmptyGroup);
        }).orElseGet(() -> {
            return (KeyValue) this.groupManager.getGroup(str).map(groupMetadata -> {
                return (KeyValue) groupMetadata.inLock(() -> {
                    return new KeyValue(Errors.NONE, groupMetadata.summary());
                });
            }).orElseGet(() -> {
                return new KeyValue(Errors.NONE, DeadGroup);
            });
        });
    }

    public CompletableFuture<Integer> handleDeletedPartitions(List<TopicPartition> list) {
        return this.groupManager.cleanGroupMetadata(this.groupManager.currentGroupsStream(), groupMetadata -> {
            return groupMetadata.removeOffsets(list.stream());
        }).thenApply(num -> {
            log.info("Removed {} offsets associated with deleted partitions: {}", num, list.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(",")));
            return num;
        });
    }

    private boolean isValidGroupId(String str, ApiKeys apiKeys) {
        switch (apiKeys) {
            case OFFSET_COMMIT:
            case OFFSET_FETCH:
            case DESCRIBE_GROUPS:
            case DELETE_GROUPS:
                return str != null;
            default:
                return (str == null || str.isEmpty()) ? false : true;
        }
    }

    private Optional<Errors> validateGroupStatus(String str, ApiKeys apiKeys) {
        return !isValidGroupId(str, apiKeys) ? Optional.of(Errors.INVALID_GROUP_ID) : !this.isActive.get() ? Optional.of(Errors.COORDINATOR_NOT_AVAILABLE) : this.groupManager.isGroupLoading(str) ? Optional.of(Errors.COORDINATOR_LOAD_IN_PROGRESS) : !this.groupManager.isGroupLocal(str) ? Optional.of(Errors.NOT_COORDINATOR) : Optional.empty();
    }

    private void onGroupUnloaded(GroupMetadata groupMetadata) {
        groupMetadata.inLock(() -> {
            log.info("Unloading group metadata for {} with generation {}", groupMetadata.groupId(), Integer.valueOf(groupMetadata.generationId()));
            GroupState currentState = groupMetadata.currentState();
            groupMetadata.transitionTo(GroupState.Dead);
            switch (currentState) {
                case Dead:
                case PreparingRebalance:
                case Empty:
                    for (MemberMetadata memberMetadata : groupMetadata.allMemberMetadata()) {
                        if (memberMetadata.awaitingJoinCallback() != null) {
                            memberMetadata.awaitingJoinCallback().complete(joinError(memberMetadata.memberId(), Errors.NOT_COORDINATOR));
                            memberMetadata.awaitingJoinCallback(null);
                        }
                    }
                    this.joinPurgatory.checkAndComplete(new DelayedOperationKey.GroupKey(groupMetadata.groupId()));
                    return null;
                case CompletingRebalance:
                case Stable:
                    for (MemberMetadata memberMetadata2 : groupMetadata.allMemberMetadata()) {
                        if (memberMetadata2.awaitingSyncCallback() != null) {
                            memberMetadata2.awaitingSyncCallback().accept(new byte[0], Errors.NOT_COORDINATOR);
                            memberMetadata2.awaitingSyncCallback(null);
                        }
                        this.heartbeatPurgatory.checkAndComplete(new DelayedOperationKey.MemberKey(memberMetadata2.groupId(), memberMetadata2.memberId()));
                    }
                    return null;
                default:
                    return null;
            }
        });
    }

    private void onGroupLoaded(GroupMetadata groupMetadata) {
        groupMetadata.inLock(() -> {
            log.info("Loading group metadata for {} with generation {}", groupMetadata.groupId(), Integer.valueOf(groupMetadata.generationId()));
            Preconditions.checkArgument(groupMetadata.is(GroupState.Stable) || groupMetadata.is(GroupState.Empty));
            groupMetadata.allMemberMetadata().forEach(memberMetadata -> {
                completeAndScheduleNextHeartbeatExpiration(groupMetadata, memberMetadata);
            });
            return null;
        });
    }

    public CompletableFuture<Void> handleGroupImmigration(int i) {
        return this.groupManager.scheduleLoadGroupAndOffsets(i, this::onGroupLoaded);
    }

    public void handleGroupEmigration(int i) {
        this.groupManager.removeGroupsForPartition(i, this::onGroupUnloaded);
    }

    private void setAndPropagateAssignment(GroupMetadata groupMetadata, Map<String, byte[]> map) {
        Preconditions.checkState(groupMetadata.is(GroupState.CompletingRebalance));
        groupMetadata.allMemberMetadata().forEach(memberMetadata -> {
            memberMetadata.assignment((byte[]) map.get(memberMetadata.memberId()));
        });
        propagateAssignment(groupMetadata, Errors.NONE);
    }

    private void resetAndPropagateAssignmentError(GroupMetadata groupMetadata, Errors errors) {
        Preconditions.checkState(groupMetadata.is(GroupState.CompletingRebalance));
        groupMetadata.allMemberMetadata().forEach(memberMetadata -> {
            memberMetadata.assignment(new byte[0]);
        });
        propagateAssignment(groupMetadata, errors);
    }

    private void propagateAssignment(GroupMetadata groupMetadata, Errors errors) {
        for (MemberMetadata memberMetadata : groupMetadata.allMemberMetadata()) {
            if (memberMetadata.awaitingSyncCallback() != null) {
                memberMetadata.awaitingSyncCallback().accept(memberMetadata.assignment(), errors);
                memberMetadata.awaitingSyncCallback(null);
                completeAndScheduleNextHeartbeatExpiration(groupMetadata, memberMetadata);
            }
        }
    }

    private JoinGroupResult joinError(String str, Errors errors) {
        return new JoinGroupResult(Collections.emptyMap(), str, 0, "", "", errors);
    }

    private void completeAndScheduleNextHeartbeatExpiration(GroupMetadata groupMetadata, MemberMetadata memberMetadata) {
        memberMetadata.latestHeartbeat(this.time.milliseconds());
        DelayedOperationKey.MemberKey memberKey = new DelayedOperationKey.MemberKey(memberMetadata.groupId(), memberMetadata.memberId());
        this.heartbeatPurgatory.checkAndComplete(memberKey);
        this.heartbeatPurgatory.tryCompleteElseWatch(new DelayedHeartbeat(this, groupMetadata, memberMetadata, memberMetadata.latestHeartbeat() + memberMetadata.sessionTimeoutMs(), memberMetadata.sessionTimeoutMs()), Lists.newArrayList(new Object[]{memberKey}));
    }

    private void removeHeartbeatForLeavingMember(GroupMetadata groupMetadata, MemberMetadata memberMetadata) {
        memberMetadata.isLeaving(true);
        this.heartbeatPurgatory.checkAndComplete(new DelayedOperationKey.MemberKey(memberMetadata.groupId(), memberMetadata.memberId()));
    }

    private CompletableFuture<JoinGroupResult> addMemberAndRebalance(int i, int i2, String str, String str2, String str3, Map<String, byte[]> map, GroupMetadata groupMetadata) {
        MemberMetadata memberMetadata = new MemberMetadata(str + "-" + groupMetadata.generateMemberIdSuffix(), groupMetadata.groupId(), str, str2, i, i2, str3, map);
        CompletableFuture<JoinGroupResult> completableFuture = new CompletableFuture<>();
        memberMetadata.awaitingJoinCallback(completableFuture);
        if (groupMetadata.is(GroupState.PreparingRebalance) && groupMetadata.generationId() == 0) {
            groupMetadata.newMemberAdded(true);
        }
        groupMetadata.add(memberMetadata);
        maybePrepareRebalance(groupMetadata);
        return completableFuture;
    }

    private CompletableFuture<JoinGroupResult> updateMemberAndRebalance(GroupMetadata groupMetadata, MemberMetadata memberMetadata, Map<String, byte[]> map) {
        CompletableFuture<JoinGroupResult> completableFuture = new CompletableFuture<>();
        memberMetadata.supportedProtocols(map);
        memberMetadata.awaitingJoinCallback(completableFuture);
        maybePrepareRebalance(groupMetadata);
        return completableFuture;
    }

    private void maybePrepareRebalance(GroupMetadata groupMetadata) {
        groupMetadata.inLock(() -> {
            if (!groupMetadata.canReblance()) {
                return null;
            }
            prepareRebalance(groupMetadata);
            return null;
        });
    }

    private void prepareRebalance(GroupMetadata groupMetadata) {
        if (groupMetadata.is(GroupState.CompletingRebalance)) {
            resetAndPropagateAssignmentError(groupMetadata, Errors.REBALANCE_IN_PROGRESS);
        }
        DelayedJoin initialDelayedJoin = groupMetadata.is(GroupState.Empty) ? new InitialDelayedJoin(this, this.joinPurgatory, groupMetadata, this.groupConfig.groupInitialRebalanceDelayMs(), this.groupConfig.groupInitialRebalanceDelayMs(), Math.max(groupMetadata.rebalanceTimeoutMs() - this.groupConfig.groupInitialRebalanceDelayMs(), 0)) : new DelayedJoin(this, groupMetadata, groupMetadata.rebalanceTimeoutMs());
        groupMetadata.transitionTo(GroupState.PreparingRebalance);
        log.info("Preparing to rebalance group {} with old generation {} ({}-{})", groupMetadata.groupId(), Integer.valueOf(groupMetadata.generationId()), Topic.GROUP_METADATA_TOPIC_NAME, Integer.valueOf(this.groupManager.partitionFor(groupMetadata.groupId())));
        this.joinPurgatory.tryCompleteElseWatch(initialDelayedJoin, Lists.newArrayList(new Object[]{new DelayedOperationKey.GroupKey(groupMetadata.groupId())}));
    }

    private void removeMemberAndUpdateGroup(GroupMetadata groupMetadata, MemberMetadata memberMetadata) {
        groupMetadata.remove(memberMetadata.memberId());
        switch (groupMetadata.currentState()) {
            case Dead:
            case Empty:
            default:
                return;
            case PreparingRebalance:
                this.joinPurgatory.checkAndComplete(new DelayedOperationKey.GroupKey(groupMetadata.groupId()));
                return;
            case CompletingRebalance:
            case Stable:
                maybePrepareRebalance(groupMetadata);
                return;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean tryCompleteJoin(GroupMetadata groupMetadata, Supplier<Boolean> supplier) {
        return ((Boolean) groupMetadata.inLock(() -> {
            if (groupMetadata.notYetRejoinedMembers().isEmpty()) {
                return (Boolean) supplier.get();
            }
            return false;
        })).booleanValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onExpireJoin() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onCompleteJoin(GroupMetadata groupMetadata) {
        groupMetadata.inLock(() -> {
            groupMetadata.notYetRejoinedMembers().forEach(memberMetadata -> {
                removeHeartbeatForLeavingMember(groupMetadata, memberMetadata);
                groupMetadata.remove(memberMetadata.memberId());
            });
            if (groupMetadata.is(GroupState.Dead)) {
                return null;
            }
            groupMetadata.initNextGeneration();
            if (groupMetadata.is(GroupState.Empty)) {
                log.info("Group {} with generation {} is now empty {}-{}", groupMetadata.groupId(), Integer.valueOf(groupMetadata.generationId()), Topic.GROUP_METADATA_TOPIC_NAME, Integer.valueOf(this.groupManager.partitionFor(groupMetadata.groupId())));
                this.groupManager.storeGroup(groupMetadata, Collections.emptyMap()).thenAccept(errors -> {
                    if (errors != Errors.NONE) {
                        log.warn("Failed to write empty metadata for group {}: {}", groupMetadata.groupId(), errors.message());
                    }
                    if (log.isDebugEnabled()) {
                        log.warn("add partition ownership for group {}", groupMetadata.groupId());
                    }
                    this.groupManager.addPartitionOwnership(this.groupManager.partitionFor(groupMetadata.groupId()));
                });
                return null;
            }
            log.info("Stabilized group {} generation {} ({}-{})", groupMetadata.groupId(), Integer.valueOf(groupMetadata.generationId()), Topic.GROUP_METADATA_TOPIC_NAME, Integer.valueOf(this.groupManager.partitionFor(groupMetadata.groupId())));
            for (MemberMetadata memberMetadata2 : groupMetadata.allMemberMetadata()) {
                Objects.requireNonNull(memberMetadata2.awaitingJoinCallback());
                memberMetadata2.awaitingJoinCallback().complete(new JoinGroupResult(groupMetadata.isLeader(memberMetadata2.memberId()) ? groupMetadata.currentMemberMetadata() : Collections.emptyMap(), memberMetadata2.memberId(), groupMetadata.generationId(), groupMetadata.protocolOrNull(), groupMetadata.leaderOrNull(), Errors.NONE));
                memberMetadata2.awaitingJoinCallback(null);
                completeAndScheduleNextHeartbeatExpiration(groupMetadata, memberMetadata2);
            }
            return null;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean tryCompleteHeartbeat(GroupMetadata groupMetadata, MemberMetadata memberMetadata, long j, Supplier<Boolean> supplier) {
        return ((Boolean) groupMetadata.inLock(() -> {
            if (shouldKeepMemberAlive(memberMetadata, j) || memberMetadata.isLeaving()) {
                return (Boolean) supplier.get();
            }
            return false;
        })).booleanValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onExpireHeartbeat(GroupMetadata groupMetadata, MemberMetadata memberMetadata, long j) {
        groupMetadata.inLock(() -> {
            if (shouldKeepMemberAlive(memberMetadata, j)) {
                return null;
            }
            log.info("Member {} in group {} has failed, removing it from the group", memberMetadata.memberId(), groupMetadata.groupId());
            removeMemberAndUpdateGroup(groupMetadata, memberMetadata);
            return null;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onCompleteHeartbeat() {
    }

    private boolean shouldKeepMemberAlive(MemberMetadata memberMetadata, long j) {
        return (memberMetadata.awaitingJoinCallback() == null && memberMetadata.awaitingSyncCallback() == null && memberMetadata.latestHeartbeat() + ((long) memberMetadata.sessionTimeoutMs()) <= j) ? false : true;
    }

    private boolean isCoordinatorForGroup(String str) {
        return this.groupManager.isGroupLocal(str);
    }

    private boolean isCoordinatorLoadInProgress(String str) {
        return this.groupManager.isGroupLoading(str);
    }
}
