package io.gridgo.extras.flink;

import io.gridgo.connector.Connector;
import io.gridgo.connector.ConnectorFactory;
import io.gridgo.connector.Consumer;
import io.gridgo.connector.impl.factories.DefaultConnectorFactory;
import io.gridgo.framework.support.Message;
import java.util.concurrent.locks.LockSupport;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/* loaded from: input_file:io/gridgo/extras/flink/GridgoConnectorSource.class */
public class GridgoConnectorSource implements SourceFunction<Message> {
    private static final ConnectorFactory DEFAULT_FACTORY = new DefaultConnectorFactory();
    private static final long serialVersionUID = -7568626695827640831L;
    private transient Connector connector;
    private String endpoint;
    private transient boolean running;

    public GridgoConnectorSource(String str) {
        this.endpoint = str;
    }

    public void run(SourceFunction.SourceContext<Message> sourceContext) throws Exception {
        this.connector = DEFAULT_FACTORY.createConnector(this.endpoint);
        ((Consumer) this.connector.getConsumer().orElseThrow()).subscribe((message, deferred) -> {
            deferred.resolve(Message.ofEmpty());
            sourceContext.collect(message);
        });
        this.connector.start();
        idleSpin();
    }

    private void idleSpin() {
        this.running = true;
        while (this.running) {
            LockSupport.parkNanos(100L);
        }
    }

    public void cancel() {
        this.running = false;
        if (this.connector != null) {
            this.connector.stop();
        }
    }
}
