package net.degols.libs.workflow.core.engine;

import akka.actor.Actor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.package$;
import akka.pattern.AskableActorRef$;
import akka.util.Timeout;
import akka.util.Timeout$;
import net.degols.libs.cluster.ClusterTools$;
import net.degols.libs.cluster.manager.ClusterServiceLeader;
import net.degols.libs.cluster.messages.InfoFromActorRef;
import net.degols.libs.cluster.utils.PriorityStashedActor;
import net.degols.libs.workflow.core.configuration.EngineActorOrder;
import net.degols.libs.workflow.core.configuration.WorkflowConfiguration;
import net.degols.libs.workflow.core.configuration.WorkflowConfiguration$;
import net.degols.libs.workflow.core.configuration.WorkflowConfigurationApi;
import play.api.libs.json.JsObject;
import play.api.libs.json.Json$;
import play.api.libs.json.Reads$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.PartialFunction;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.duration.package;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;

/* compiled from: EngineActor.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005-c\u0001B\f\u0019\u0001\u0015B\u0001B\f\u0001\u0003\u0002\u0003\u0006Ia\f\u0005\tk\u0001\u0011\t\u0011)A\u0005m!)A\b\u0001C\u0001{!9!\t\u0001a\u0001\n\u0003\u0019\u0005bB'\u0001\u0001\u0004%\tA\u0014\u0005\u0007)\u0002\u0001\u000b\u0015\u0002#\t\u000fe\u0001!\u0019!C\u0001+\"1\u0011\f\u0001Q\u0001\nYCqA\u0017\u0001A\u0002\u0013\u00051\fC\u0004f\u0001\u0001\u0007I\u0011\u00014\t\r!\u0004\u0001\u0015)\u0003]\u0011\u0015I\u0007\u0001\"\u0011k\u0011\u0015Y\u0007\u0001\"\u0011m\u0011\u0015\u0019\b\u0001\"\u0001m\u0011\u0015!\b\u0001\"\u0001v\u0011\u001d\t)\u0001\u0001C\u0001\u0003\u000fAq!a\u0003\u0001\t\u0003\tiaB\u0004\u0002\"aA\t!a\t\u0007\r]A\u0002\u0012AA\u0013\u0011\u0019a4\u0003\"\u0001\u0002.!I\u0011qF\nC\u0002\u0013\u0005\u0011\u0011\u0007\u0005\t\u0003\u0013\u001a\u0002\u0015!\u0003\u00024\tYQI\\4j]\u0016\f5\r^8s\u0015\tI\"$\u0001\u0004f]\u001eLg.\u001a\u0006\u00037q\tAaY8sK*\u0011QDH\u0001\to>\u00148N\u001a7po*\u0011q\u0004I\u0001\u0005Y&\u00147O\u0003\u0002\"E\u00051A-Z4pYNT\u0011aI\u0001\u0004]\u0016$8\u0001A\n\u0003\u0001\u0019\u0002\"a\n\u0017\u000e\u0003!R!!\u000b\u0016\u0002\u000bU$\u0018\u000e\\:\u000b\u0005-r\u0012aB2mkN$XM]\u0005\u0003[!\u0012A\u0003\u0015:j_JLG/_*uCNDW\rZ!di>\u0014\u0018\u0001F2mkN$XM]*feZL7-\u001a'fC\u0012,'\u000f\u0005\u00021g5\t\u0011G\u0003\u00023U\u00059Q.\u00198bO\u0016\u0014\u0018B\u0001\u001b2\u0005Q\u0019E.^:uKJ\u001cVM\u001d<jG\u0016dU-\u00193fe\u0006Aro\u001c:lM2|woQ8oM&<WO]1uS>t\u0017\t]5\u0011\u0005]RT\"\u0001\u001d\u000b\u0005eR\u0012!D2p]\u001aLw-\u001e:bi&|g.\u0003\u0002<q\tArk\u001c:lM2|woQ8oM&<WO]1uS>t\u0017\t]5\u0002\rqJg.\u001b;?)\rq\u0004)\u0011\t\u0003\u007f\u0001i\u0011\u0001\u0007\u0005\u0006]\r\u0001\ra\f\u0005\u0006k\r\u0001\rAN\u0001\u0017?^|'o\u001b4m_^\u001cuN\u001c4jOV\u0014\u0018\r^5p]V\tA\tE\u0002F\u0011*k\u0011A\u0012\u0006\u0002\u000f\u0006)1oY1mC&\u0011\u0011J\u0012\u0002\u0007\u001fB$\u0018n\u001c8\u0011\u0005]Z\u0015B\u0001'9\u0005U9vN]6gY><8i\u001c8gS\u001e,(/\u0019;j_:\f!dX<pe.4Gn\\<D_:4\u0017nZ;sCRLwN\\0%KF$\"a\u0014*\u0011\u0005\u0015\u0003\u0016BA)G\u0005\u0011)f.\u001b;\t\u000fM+\u0011\u0011!a\u0001\t\u0006\u0019\u0001\u0010J\u0019\u0002/};xN]6gY><8i\u001c8gS\u001e,(/\u0019;j_:\u0004S#\u0001,\u0011\u0005}:\u0016B\u0001-\u0019\u0005\u0019)enZ5oK\u00069QM\\4j]\u0016\u0004\u0013!I0tG\",G-\u001e7fIN+g\u000e\u001a*fY>\fGmQ8oM&<WO]1uS>tW#\u0001/\u0011\u0007\u0015CU\f\u0005\u0002_G6\tqL\u0003\u0002aC\u0006)\u0011m\u0019;pe*\t!-\u0001\u0003bW.\f\u0017B\u00013`\u0005-\u0019\u0015M\\2fY2\f'\r\\3\u0002K}\u001b8\r[3ek2,GmU3oIJ+Gn\\1e\u0007>tg-[4ve\u0006$\u0018n\u001c8`I\u0015\fHCA(h\u0011\u001d\u0019&\"!AA\u0002q\u000b!eX:dQ\u0016$W\u000f\\3e'\u0016tGMU3m_\u0006$7i\u001c8gS\u001e,(/\u0019;j_:\u0004\u0013\u0001\u00039sKN#\u0018M\u001d;\u0015\u0003=\u000bqA]3dK&4X-F\u0001n!\tqw.D\u0001\u0001\u0013\t\u0001\u0018OA\u0004SK\u000e,\u0017N^3\n\u0005I|&!B!di>\u0014\u0018a\u0002:v]:LgnZ\u0001\u0005Y>\fG\rF\u0001w!\r9(\u0010`\u0007\u0002q*\u0011\u0011PR\u0001\u000bG>t7-\u001e:sK:$\u0018BA>y\u0005\u00191U\u000f^;sKB!Q) &��\u0013\tqhI\u0001\u0004UkBdWM\r\t\u0004\u000b\u0006\u0005\u0011bAA\u0002\r\n9!i\\8mK\u0006t\u0017!I:dQ\u0016$W\u000f\\3O_RLg-\u001f*fY>\fGmQ8oM&<WO]1uS>tGCAA\u0005!\r9(pT\u0001\u001a]>$\u0018NZ=SK2|\u0017\rZ\"p]\u001aLw-\u001e:bi&|g\u000e\u0006\u0003\u0002\u0010\u0005]\u0001\u0003B<{\u0003#\u00012!RA\n\u0013\r\t)B\u0012\u0002\u0004\u0003:L\bbBA\r#\u0001\u0007\u00111D\u0001#g\u0016tGMU3m_\u0006$7i\u001c8gS\u001e,(/\u0019;j_:$v.\u00138ti\u0006t7-Z:\u0011\u0007}\ni\"C\u0002\u0002 a\u0011!eU3oIJ+Gn\\1e\u0007>tg-[4ve\u0006$\u0018n\u001c8U_&s7\u000f^1oG\u0016\u001c\u0018aC#oO&tW-Q2u_J\u0004\"aP\n\u0014\u0007M\t9\u0003E\u0002F\u0003SI1!a\u000bG\u0005\u0019\te.\u001f*fMR\u0011\u00111E\u0001\u0005\u001d\u0006kU)\u0006\u0002\u00024A!\u0011QGA\"\u001d\u0011\t9$a\u0010\u0011\u0007\u0005eb)\u0004\u0002\u0002<)\u0019\u0011Q\b\u0013\u0002\rq\u0012xn\u001c;?\u0013\r\t\tER\u0001\u0007!J,G-\u001a4\n\t\u0005\u0015\u0013q\t\u0002\u0007'R\u0014\u0018N\\4\u000b\u0007\u0005\u0005c)A\u0003O\u00036+\u0005\u0005")
/* loaded from: input_file:net/degols/libs/workflow/core/engine/EngineActor.class */
public class EngineActor extends PriorityStashedActor {
    private final ClusterServiceLeader clusterServiceLeader;
    private final WorkflowConfigurationApi workflowConfigurationApi;
    private final Engine engine;
    private Option<WorkflowConfiguration> _workflowConfiguration = None$.MODULE$;
    private Option<Cancellable> _scheduledSendReloadConfiguration = None$.MODULE$;

    public static String NAME() {
        return EngineActor$.MODULE$.NAME();
    }

    public Option<WorkflowConfiguration> _workflowConfiguration() {
        return this._workflowConfiguration;
    }

    public void _workflowConfiguration_$eq(Option<WorkflowConfiguration> option) {
        this._workflowConfiguration = option;
    }

    public Engine engine() {
        return this.engine;
    }

    public Option<Cancellable> _scheduledSendReloadConfiguration() {
        return this._scheduledSendReloadConfiguration;
    }

    public void _scheduledSendReloadConfiguration_$eq(Option<Cancellable> option) {
        this._scheduledSendReloadConfiguration = option;
    }

    public void preStart() {
        Actor.preStart$(this);
        package$.MODULE$.actorRef2Scala(self()).$bang(FirstLoading$.MODULE$, self());
    }

    public PartialFunction<Object, BoxedUnit> receive() {
        return new EngineActor$$anonfun$receive$1(this);
    }

    public PartialFunction<Object, BoxedUnit> running() {
        return new EngineActor$$anonfun$running$1(this);
    }

    public Future<Tuple2<WorkflowConfiguration, Object>> load() {
        return WorkflowConfiguration$.MODULE$.from(this.workflowConfigurationApi, ec()).flatMap(workflowConfiguration -> {
            return this.clusterServiceLeader.communication().infoFromActorRef(this.self(), this.context()).map(option -> {
                return (InfoFromActorRef) option.get();
            }, this.ec()).map(infoFromActorRef -> {
                return new EngineActorOrder((JsObject) Json$.MODULE$.parse(infoFromActorRef.workerTypeOrder().metadata()).as(Reads$.MODULE$.JsObjectReads()));
            }, this.ec()).map(engineActorOrder -> {
                if (!this._workflowConfiguration().isEmpty() && !WorkflowConfiguration$.MODULE$.hasChanged((WorkflowConfiguration) this._workflowConfiguration().get(), workflowConfiguration)) {
                    return Future$.MODULE$.successful(new Tuple2(workflowConfiguration, BoxesRunTime.boxToBoolean(false)));
                }
                this.info(() -> {
                    return "WorkflowConfiguration first loading, or it has changed, apply it for the Engine";
                });
                this._workflowConfiguration_$eq(Option$.MODULE$.apply(workflowConfiguration));
                this.engine().setEngineActorOrder(engineActorOrder);
                this.engine().setWorkflowConfiguration(workflowConfiguration);
                return this.engine().sendWorkerOrders().map(obj -> {
                    return new Tuple2(workflowConfiguration, BoxesRunTime.boxToBoolean(true));
                }, this.ec());
            }, this.ec());
        }, ec()).flatten(Predef$.MODULE$.$conforms());
    }

    public Future<BoxedUnit> scheduleNotifyReloadConfiguration() {
        _scheduledSendReloadConfiguration().foreach(cancellable -> {
            return BoxesRunTime.boxToBoolean($anonfun$scheduleNotifyReloadConfiguration$1(this, cancellable));
        });
        return engine().instanceActorRefs().map(seq -> {
            $anonfun$scheduleNotifyReloadConfiguration$3(this, seq);
            return BoxedUnit.UNIT;
        }, ec());
    }

    public Future<Object> notifyReloadConfiguration(SendReloadConfigurationToInstances sendReloadConfigurationToInstances) {
        Timeout durationToTimeout = Timeout$.MODULE$.durationToTimeout(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(10)).seconds());
        return Future$.MODULE$.sequence((TraversableOnce) ((Seq) sendReloadConfigurationToInstances.remainingActorRefs().map(actorRef -> {
            return (ClusterTools$.MODULE$.actorsFromSameJVM(this.self(), actorRef) ? AskableActorRef$.MODULE$.$qmark$extension1(akka.pattern.package$.MODULE$.ask(actorRef), this._workflowConfiguration().get(), durationToTimeout, this.self()) : AskableActorRef$.MODULE$.$qmark$extension1(akka.pattern.package$.MODULE$.ask(actorRef), ReloadConfiguration$.MODULE$, durationToTimeout, this.self())).transformWith(r8 -> {
                Future recoverWith;
                if (r8 instanceof Success) {
                    this.debug(() -> {
                        return new StringBuilder(68).append("Successfully notified pipelineInstance ").append(actorRef).append(" to reload its configuration.").toString();
                    });
                    recoverWith = Future$.MODULE$.apply(() -> {
                        return new Tuple2(actorRef, BoxesRunTime.boxToBoolean(true));
                    }, this.ec());
                } else {
                    if (!(r8 instanceof Failure)) {
                        throw new MatchError(r8);
                    }
                    Throwable exception = ((Failure) r8).exception();
                    this.error(() -> {
                        return new StringBuilder(64).append("Failure to contact pipelineInstance ").append(actorRef).append(" to reload its configuration").toString();
                    }, () -> {
                        return exception;
                    });
                    recoverWith = this.clusterServiceLeader.communication().infoFromActorRef(actorRef, this.context()).flatMap(option -> {
                        Future apply;
                        if (option instanceof Some) {
                            apply = Future$.MODULE$.apply(() -> {
                                return new Tuple2(actorRef, BoxesRunTime.boxToBoolean(false));
                            }, this.ec());
                        } else {
                            if (!None$.MODULE$.equals(option)) {
                                throw new MatchError(option);
                            }
                            this.error(() -> {
                                return new StringBuilder(69).append("Seems like the pipelineInstance ").append(actorRef).append(" does not exist anymore, false alert.").toString();
                            });
                            apply = Future$.MODULE$.apply(() -> {
                                return new Tuple2(actorRef, BoxesRunTime.boxToBoolean(true));
                            }, this.ec());
                        }
                        return apply;
                    }, this.ec()).recoverWith(new EngineActor$$anonfun$$nestedInanonfun$notifyReloadConfiguration$2$1(this, actorRef), this.ec());
                }
                return recoverWith;
            }, this.ec());
        }, Seq$.MODULE$.canBuildFrom())).map(future -> {
            return ClusterTools$.MODULE$.futureToFutureTry(future, this.ec());
        }, Seq$.MODULE$.canBuildFrom()), Seq$.MODULE$.canBuildFrom(), ec()).map(seq -> {
            return (Seq) ((TraversableLike) ((TraversableLike) seq.flatMap(r5 -> {
                if (r5.isFailure()) {
                    this.error(() -> {
                        return "Seems like the developer made a mistake, every future to connect to the pipelineInstance should be proper recovered to not have any failure";
                    }, () -> {
                        return (Throwable) r5.failed().get();
                    });
                }
                return Option$.MODULE$.option2Iterable(r5.toOption());
            }, Seq$.MODULE$.canBuildFrom())).filterNot(tuple2 -> {
                return BoxesRunTime.boxToBoolean(tuple2._2$mcZ$sp());
            })).map(tuple22 -> {
                return (ActorRef) tuple22._1();
            }, Seq$.MODULE$.canBuildFrom());
        }, ec()).map(seq2 -> {
            if (seq2.isEmpty()) {
                this.debug(() -> {
                    return "All PipelineInstances were correctly notified with a ReloadConfiguration.";
                });
                this._scheduledSendReloadConfiguration_$eq(None$.MODULE$);
                return BoxedUnit.UNIT;
            }
            this.warn(() -> {
                return new StringBuilder(90).append("Some PipelineInstances could not be notified of a ReloadConfiguration: ").append(((TraversableOnce) seq2.map(actorRef2 -> {
                    return actorRef2.toString();
                }, Seq$.MODULE$.canBuildFrom())).mkString(", ")).append(", we will try again").toString();
            });
            this._scheduledSendReloadConfiguration_$eq(Option$.MODULE$.apply(this.context().system().scheduler().scheduleOnce(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(20)).seconds(), this.self(), new SendReloadConfigurationToInstances(seq2), this.ec(), this.self())));
            return BoxedUnit.UNIT;
        }, ec());
    }

    public static final /* synthetic */ boolean $anonfun$scheduleNotifyReloadConfiguration$1(EngineActor engineActor, Cancellable cancellable) {
        engineActor.debug(() -> {
            return "A previous attempt to send ReloadConfiguration to pipelineInstances has been canceled as the config has just changed";
        });
        return cancellable.cancel();
    }

    public static final /* synthetic */ void $anonfun$scheduleNotifyReloadConfiguration$3(EngineActor engineActor, Seq seq) {
        engineActor.debug(() -> {
            return new StringBuilder(46).append("Send ReloadConfiguration to ").append(seq.size()).append(" PipelineInstances").toString();
        });
        engineActor._scheduledSendReloadConfiguration_$eq(Option$.MODULE$.apply(engineActor.context().system().scheduler().scheduleOnce(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(20)).seconds(), engineActor.self(), new SendReloadConfigurationToInstances(seq), engineActor.ec(), engineActor.self())));
    }

    public EngineActor(ClusterServiceLeader clusterServiceLeader, WorkflowConfigurationApi workflowConfigurationApi) {
        this.clusterServiceLeader = clusterServiceLeader;
        this.workflowConfigurationApi = workflowConfigurationApi;
        this.engine = new Engine(clusterServiceLeader, workflowConfigurationApi.executionContext(), context());
    }
}
