package org.springframework.integration.aggregator;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.springframework.core.log.LogMessage;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.messaging.Message;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-core-5.5.3.jar:org/springframework/integration/aggregator/CorrelatingMessageBarrier.class */
public class CorrelatingMessageBarrier extends AbstractMessageHandler implements MessageSource<Object> {
    private final ConcurrentMap<Object, Object> correlationLocks;
    private final MessageGroupStore store;
    private CorrelationStrategy correlationStrategy;
    private ReleaseStrategy releaseStrategy;

    public CorrelatingMessageBarrier() {
        this(new SimpleMessageStore(0));
    }

    public CorrelatingMessageBarrier(MessageGroupStore messageGroupStore) {
        this.correlationLocks = new ConcurrentHashMap();
        this.store = messageGroupStore;
    }

    public void setCorrelationStrategy(CorrelationStrategy correlationStrategy) {
        this.correlationStrategy = correlationStrategy;
    }

    public void setReleaseStrategy(ReleaseStrategy releaseStrategy) {
        this.releaseStrategy = releaseStrategy;
    }

    @Override // org.springframework.integration.handler.AbstractMessageHandler
    protected void handleMessageInternal(Message<?> message) {
        Object correlationKey = this.correlationStrategy.getCorrelationKey(message);
        synchronized (getLock(correlationKey)) {
            this.store.addMessagesToGroup(correlationKey, message);
        }
        this.logger.debug(LogMessage.format("Handled message for key [%s]: %s.", correlationKey, message));
    }

    private Object getLock(Object obj) {
        Object putIfAbsent = this.correlationLocks.putIfAbsent(obj, obj);
        return putIfAbsent == null ? obj : putIfAbsent;
    }

    @Override // org.springframework.integration.core.MessageSource
    public Message<Object> receive() {
        for (Object obj : this.correlationLocks.keySet()) {
            synchronized (getLock(obj)) {
                MessageGroup messageGroup = this.store.getMessageGroup(obj);
                if (messageGroup != null && this.releaseStrategy.canRelease(messageGroup)) {
                    Message<?> message = null;
                    Iterator<Message<?>> it = messageGroup.getMessages().iterator();
                    if (it.hasNext()) {
                        message = it.next();
                        this.store.removeMessagesFromGroup(obj, message);
                        this.logger.debug(LogMessage.format("Released message for key [%s]: %s.", obj, message));
                    } else {
                        remove(obj);
                    }
                    return message;
                }
            }
        }
        return null;
    }

    private void remove(Object obj) {
        this.correlationLocks.remove(obj);
        this.store.removeMessageGroup(obj);
    }
}
