package jmind.core.dubbo.callback;

import com.alibaba.dubbo.rpc.RpcException;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import jmind.core.dubbo.pojo.BusEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:jmind/core/dubbo/callback/BusServiceImpl.class */
public class BusServiceImpl implements IBusService {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final BlockingQueue<BusEvent> bq = new LinkedBlockingQueue();
    private final Multimap<String, Callback> listeners = HashMultimap.create();

    public BusServiceImpl() {
        System.out.println("init BusServiceImpl");
        Thread thread = new Thread(new Runnable() { // from class: jmind.core.dubbo.callback.BusServiceImpl.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        BusEvent busEvent = (BusEvent) BusServiceImpl.this.bq.take();
                        Collection collection = BusServiceImpl.this.listeners.get(busEvent.getTopic());
                        if (collection.size() > 0) {
                            Iterator it = collection.iterator();
                            while (it.hasNext()) {
                                try {
                                    ((Callback) it.next()).doIt(busEvent);
                                } catch (RpcException e) {
                                    BusServiceImpl.this.logger.error("doit event=", e);
                                    it.remove();
                                }
                            }
                        } else {
                            BusServiceImpl.this.bq.add(busEvent);
                        }
                    } catch (InterruptedException e2) {
                        BusServiceImpl.this.logger.error("", e2);
                    }
                }
            }
        });
        thread.setDaemon(true);
        thread.start();
    }

    @Override // jmind.core.dubbo.callback.IBusService
    public void notify(BusEvent busEvent) {
        this.logger.debug("event=" + busEvent);
        this.bq.offer(busEvent);
    }

    @Override // jmind.core.dubbo.callback.IBusService
    public void subscribe(String str, boolean z, Callback callback) {
        this.logger.debug(str + "====" + z);
        if (z || !this.listeners.containsKey(str)) {
            this.listeners.put(str, callback);
        }
    }
}
