package co.cask.common.security.authentication;

import co.cask.common.io.Codec;
import co.cask.common.security.Constants;
import co.cask.common.security.config.SecurityConfiguration;
import co.cask.common.security.kerberos.SecurityUtil;
import co.cask.common.security.zookeeper.ResourceListener;
import co.cask.common.security.zookeeper.SharedResourceCache;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.twill.api.ElectionHandler;
import org.apache.twill.internal.zookeeper.LeaderElection;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/common/security/authentication/DistributedKeyManager.class */
public class DistributedKeyManager extends AbstractKeyManager implements ResourceListener<KeyIdentifier> {
    private static final long KEY_UPDATE_FREQUENCY = 60000;
    private static final Logger LOG = LoggerFactory.getLogger(DistributedKeyManager.class);
    private final SharedResourceCache<KeyIdentifier> keyCache;
    private final String parentZNode;
    private Timer timer;
    private long lastKeyUpdate;
    protected final AtomicBoolean leader;
    private LeaderElection leaderElection;
    private ZKClient zookeeper;
    private final long maxTokenExpiration;

    public DistributedKeyManager(SecurityConfiguration securityConfiguration, Codec<KeyIdentifier> codec, ZKClient zKClient) {
        this(securityConfiguration, codec, zKClient, getACLs(securityConfiguration));
    }

    public DistributedKeyManager(SecurityConfiguration securityConfiguration, Codec<KeyIdentifier> codec, ZKClient zKClient, List<ACL> list) {
        super(securityConfiguration);
        this.leader = new AtomicBoolean();
        this.parentZNode = securityConfiguration.get(Constants.DIST_KEY_PARENT_ZNODE);
        this.keyExpirationPeriod = securityConfiguration.getLong(Constants.TOKEN_DIGEST_KEY_EXPIRATION);
        this.maxTokenExpiration = Math.max(securityConfiguration.getLong(Constants.EXTENDED_TOKEN_EXPIRATION), securityConfiguration.getLong(Constants.TOKEN_EXPIRATION));
        this.zookeeper = ZKClients.namespace(zKClient, this.parentZNode);
        if (list.isEmpty()) {
            LOG.warn("Zookeeper ACL list is empty for keys!");
            list = ZooDefs.Ids.OPEN_ACL_UNSAFE;
        }
        LOG.info("Zookeeper ACLs {} for keys", list);
        this.keyCache = new SharedResourceCache<>(zKClient, codec, "/keys", list);
    }

    @Override // co.cask.common.security.authentication.AbstractKeyManager
    protected void doInit() throws IOException {
        this.keyCache.addListener(this);
        try {
            this.keyCache.init();
            this.leaderElection = new LeaderElection(this.zookeeper, "/leader", new ElectionHandler() { // from class: co.cask.common.security.authentication.DistributedKeyManager.1
                public void leader() {
                    DistributedKeyManager.this.leader.set(true);
                    DistributedKeyManager.LOG.info("Transitioned to leader");
                    if (DistributedKeyManager.this.currentKey == null) {
                        DistributedKeyManager.this.rotateKey();
                    }
                }

                public void follower() {
                    DistributedKeyManager.this.leader.set(false);
                    DistributedKeyManager.LOG.info("Transitioned to follower");
                }
            });
            this.leaderElection.start();
            startExpirationThread();
        } catch (InterruptedException e) {
            throw Throwables.propagate(e);
        }
    }

    public void shutDown() {
        this.leaderElection.stopAndWait();
    }

    @Override // co.cask.common.security.authentication.AbstractKeyManager
    protected boolean hasKey(int i) {
        return this.keyCache.getIfPresent(Integer.toString(i)) != null;
    }

    @Override // co.cask.common.security.authentication.AbstractKeyManager
    protected KeyIdentifier getKey(int i) {
        return this.keyCache.get(Integer.toString(i));
    }

    @Override // co.cask.common.security.authentication.AbstractKeyManager
    protected void addKey(KeyIdentifier keyIdentifier) {
        this.keyCache.put(Integer.toString(keyIdentifier.getKeyId()), (String) keyIdentifier);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void rotateKey() {
        long currentTimeMillis = System.currentTimeMillis();
        generateKey();
        for (KeyIdentifier keyIdentifier : this.keyCache.getResources()) {
            if (keyIdentifier.getExpiration() < currentTimeMillis - this.maxTokenExpiration) {
                LOG.info("Removing expired key: id={}, expiration={}", Integer.valueOf(keyIdentifier.getKeyId()), Long.valueOf(keyIdentifier.getExpiration()));
                this.keyCache.remove(Integer.toString(keyIdentifier.getKeyId()));
            }
        }
        this.lastKeyUpdate = currentTimeMillis;
    }

    private void startExpirationThread() {
        this.timer = new Timer("DistributedKeyManager.key-rotator", true);
        this.timer.scheduleAtFixedRate(new TimerTask() { // from class: co.cask.common.security.authentication.DistributedKeyManager.2
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                if (DistributedKeyManager.this.leader.get()) {
                    if (DistributedKeyManager.this.lastKeyUpdate < System.currentTimeMillis() - DistributedKeyManager.this.keyExpirationPeriod) {
                        DistributedKeyManager.this.rotateKey();
                    }
                }
            }
        }, 0L, Math.min(this.keyExpirationPeriod, KEY_UPDATE_FREQUENCY));
    }

    @Override // co.cask.common.security.zookeeper.ResourceListener
    public synchronized void onUpdate() {
        LOG.debug("SharedResourceCache triggered update on key: leader={}", this.leader);
        for (KeyIdentifier keyIdentifier : this.keyCache.getResources()) {
            if (this.currentKey == null || keyIdentifier.getExpiration() > this.currentKey.getExpiration()) {
                this.currentKey = keyIdentifier;
                LOG.info("Set current key: leader={}, key={}", this.leader, Integer.valueOf(this.currentKey.getKeyId()));
            }
        }
    }

    @Override // co.cask.common.security.zookeeper.ResourceListener
    public synchronized void onResourceUpdate(String str, KeyIdentifier keyIdentifier) {
        LOG.debug("SharedResourceCache triggered update: leader={}, resource key={}", this.leader, str);
        if (this.currentKey == null || keyIdentifier.getExpiration() > this.currentKey.getExpiration()) {
            this.currentKey = keyIdentifier;
            LOG.info("Set current key: leader={}, key={}", this.leader, Integer.valueOf(this.currentKey.getKeyId()));
        }
    }

    @Override // co.cask.common.security.zookeeper.ResourceListener
    public void onResourceDelete(String str) {
        LOG.info("Removed key: leader={}, key={}", this.leader, str);
    }

    @Override // co.cask.common.security.zookeeper.ResourceListener
    public void onError(String str, Throwable th) {
    }

    static List<ACL> getACLs(SecurityConfiguration securityConfiguration) {
        if (SecurityUtil.isKerberosEnabled(securityConfiguration)) {
            return ZooDefs.Ids.CREATOR_ALL_ACL;
        }
        LOG.warn("Not adding ACLs on keys in Zookeeper as Kerberos is not enabled");
        return ZooDefs.Ids.OPEN_ACL_UNSAFE;
    }
}
