package streaming.dsl.mmlib.algs.python;

import java.io.File;
import java.io.FileWriter;
import java.util.HashMap;
import org.apache.commons.io.FileUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.datasources.json.WowJsonInferSchema$;
import org.apache.spark.sql.types.MapType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.PythonProjectExecuteRunner;
import org.apache.spark.util.PythonProjectExecuteRunner$;
import org.slf4j.Logger;
import scala.Array$;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayBuffer;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.ScalaRunTime$;
import streaming.dsl.MLSQLExecuteContext;
import streaming.dsl.ScriptSQLExec$;
import streaming.dsl.mmlib.algs.Functions$;
import streaming.dsl.mmlib.algs.SQLPythonAlg$;
import streaming.dsl.mmlib.algs.SQLPythonFunc$;
import streaming.log.WowLog;
import tech.mlsql.common.utils.env.python.BasicCondaEnvManager$;
import tech.mlsql.common.utils.hdfs.HDFSOperator$;
import tech.mlsql.common.utils.log.Logging;

/* compiled from: BatchPredict.scala */
@ScalaSignature(bytes = "\u0006\u0001u3Aa\u0001\u0003\u0001\u001f!)A\u0006\u0001C\u0001[!)\u0001\u0007\u0001C\u0001c\ta!)\u0019;dQB\u0013X\rZ5di*\u0011QAB\u0001\u0007af$\bn\u001c8\u000b\u0005\u001dA\u0011\u0001B1mONT!!\u0003\u0006\u0002\u000b5lG.\u001b2\u000b\u0005-a\u0011a\u00013tY*\tQ\"A\u0005tiJ,\u0017-\\5oO\u000e\u00011#\u0002\u0001\u0011-\u0011J\u0003CA\t\u0015\u001b\u0005\u0011\"\"A\n\u0002\u000bM\u001c\u0017\r\\1\n\u0005U\u0011\"AB!osJ+g\r\u0005\u0002\u0018E5\t\u0001D\u0003\u0002\u001a5\u0005\u0019An\\4\u000b\u0005ma\u0012!B;uS2\u001c(BA\u000f\u001f\u0003\u0019\u0019w.\\7p]*\u0011q\u0004I\u0001\u0006[2\u001c\u0018\u000f\u001c\u0006\u0002C\u0005!A/Z2i\u0013\t\u0019\u0003DA\u0004M_\u001e<\u0017N\\4\u0011\u0005\u0015:S\"\u0001\u0014\u000b\u0005ea\u0011B\u0001\u0015'\u0005\u00199vn\u001e'pOB\u0011\u0011CK\u0005\u0003WI\u0011AbU3sS\u0006d\u0017N_1cY\u0016\fa\u0001P5oSRtD#\u0001\u0018\u0011\u0005=\u0002Q\"\u0001\u0003\u0002\u000fA\u0014X\rZ5diR!!\u0007\u0014(Y!\t\u0019\u0014J\u0004\u00025\r:\u0011Qg\u0011\b\u0003m\u0001s!aN\u001f\u000f\u0005aZT\"A\u001d\u000b\u0005ir\u0011A\u0002\u001fs_>$h(C\u0001=\u0003\ry'oZ\u0005\u0003}}\na!\u00199bG\",'\"\u0001\u001f\n\u0005\u0005\u0013\u0015!B:qCJ\\'B\u0001 @\u0013\t!U)A\u0002tc2T!!\u0011\"\n\u0005\u001dC\u0015a\u00029bG.\fw-\u001a\u0006\u0003\t\u0016K!AS&\u0003\u0013\u0011\u000bG/\u0019$sC6,'BA$I\u0011\u0015i%\u00011\u00013\u0003\t!g\rC\u0003P\u0005\u0001\u0007\u0001+A\u0003`a\u0006$\b\u000e\u0005\u0002R+:\u0011!k\u0015\t\u0003qII!\u0001\u0016\n\u0002\rA\u0013X\rZ3g\u0013\t1vK\u0001\u0004TiJLgn\u001a\u0006\u0003)JAQ!\u0017\u0002A\u0002i\u000ba\u0001]1sC6\u001c\b\u0003B)\\!BK!\u0001X,\u0003\u00075\u000b\u0007\u000f")
/* loaded from: input_file:streaming/dsl/mmlib/algs/python/BatchPredict.class */
public class BatchPredict implements Logging, WowLog, Serializable {
    private transient Logger tech$mlsql$common$utils$log$Logging$$log_;

    public String format(String str, boolean z) {
        return WowLog.format$(this, str, z);
    }

    public boolean format$default$2() {
        return WowLog.format$default$2$(this);
    }

    public String wow_format(String str) {
        return WowLog.wow_format$(this, str);
    }

    public String format_exception(Exception exc) {
        return WowLog.format_exception$(this, exc);
    }

    public String format_throwable(Throwable th, boolean z) {
        return WowLog.format_throwable$(this, th, z);
    }

    public boolean format_throwable$default$2() {
        return WowLog.format_throwable$default$2$(this);
    }

    public String format_cause(Exception exc) {
        return WowLog.format_cause$(this, exc);
    }

    public void format_full_exception(ArrayBuffer<String> arrayBuffer, Exception exc, boolean z) {
        WowLog.format_full_exception$(this, arrayBuffer, exc, z);
    }

    public boolean format_full_exception$default$3() {
        return WowLog.format_full_exception$default$3$(this);
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public Logger tech$mlsql$common$utils$log$Logging$$log_() {
        return this.tech$mlsql$common$utils$log$Logging$$log_;
    }

    public void tech$mlsql$common$utils$log$Logging$$log__$eq(Logger logger) {
        this.tech$mlsql$common$utils$log$Logging$$log_ = logger;
    }

    public Dataset<Row> predict(Dataset<Row> dataset, String str, Map<String, String> map) {
        Option<PythonScript> loadProject;
        SparkSession sparkSession = dataset.sparkSession();
        boolean z = new StringOps(Predef$.MODULE$.augmentString((String) map.getOrElse("keepLocalDirectory", () -> {
            return "false";
        }))).toBoolean();
        ModelMeta loadMetaAndModel = new ModelMetaManager(sparkSession, str, map).loadMetaAndModel(null, (Map) Predef$.MODULE$.Map().apply(Nil$.MODULE$));
        Tuple3<Map<String, String>, Map<String, String>, Map<String, String>> loadResourceInRegister = new ResourceManager(map).loadResourceInRegister(sparkSession, loadMetaAndModel);
        if (loadResourceInRegister == null) {
            throw new MatchError(loadResourceInRegister);
        }
        Tuple3 tuple3 = new Tuple3((Map) loadResourceInRegister._1(), (Map) loadResourceInRegister._2(), (Map) loadResourceInRegister._3());
        Map<String, Object> map2 = (Map) tuple3._1();
        loadMetaAndModel.copy(loadMetaAndModel.copy$default$1(), loadMetaAndModel.copy$default$2(), loadMetaAndModel.copy$default$3(), map2, loadMetaAndModel.copy$default$5(), (Map) tuple3._3(), loadMetaAndModel.copy$default$7());
        Map<String, Object> resources = loadMetaAndModel.resources();
        Option<String> pythonScriptPath = PythonAlgProject$.MODULE$.getPythonScriptPath(map);
        if (pythonScriptPath instanceof Some) {
            loadProject = PythonAlgProject$.MODULE$.loadProject(map, dataset.sparkSession());
        } else {
            if (!None$.MODULE$.equals(pythonScriptPath)) {
                throw new MatchError(pythonScriptPath);
            }
            loadProject = PythonAlgProject$.MODULE$.loadProject(loadMetaAndModel.trainParams(), dataset.sparkSession());
        }
        Option<PythonScript> option = loadProject;
        String projectName = ((PythonScript) option.get()).projectName();
        Map<String, String> mapParams = Functions$.MODULE$.mapParams("systemParam", loadMetaAndModel.trainParams());
        MLSQLExecuteContext contextGetOrForTest = ScriptSQLExec$.MODULE$.contextGetOrForTest();
        StructType schema = dataset.schema();
        String sessionLocalTimeZone = dataset.sparkSession().sessionState().conf().sessionLocalTimeZone();
        String str2 = (String) loadMetaAndModel.modelEntityPaths().head();
        String sb = new StringBuilder(7).append(SQLPythonFunc$.MODULE$.getAlgTmpPath(str)).append("/output").toString();
        HDFSOperator$.MODULE$.deleteDir(sb);
        HDFSOperator$.MODULE$.createDir(sb);
        Map<String, String> trainParams = loadMetaAndModel.trainParams();
        String str3 = dataset.sparkSession().sparkContext().getConf().get("spark.app.name");
        dataset.rdd().mapPartitionsWithIndex((obj, iterator) -> {
            return $anonfun$predict$2(this, contextGetOrForTest, mapParams, str3, option, str, schema, sessionLocalTimeZone, str2, loadMetaAndModel, resources, trainParams, map, projectName, z, sb, BoxesRunTime.unboxToInt(obj), iterator);
        }, dataset.rdd().mapPartitionsWithIndex$default$2(), ClassTag$.MODULE$.apply(String.class)).count();
        return sparkSession.read().json(sb);
    }

    public static final /* synthetic */ void $anonfun$predict$3(FileWriter fileWriter, String str) {
        fileWriter.write(new StringBuilder(1).append(str).append("\n").toString());
    }

    public static final /* synthetic */ void $anonfun$predict$5(BatchPredict batchPredict, MLSQLExecuteContext mLSQLExecuteContext, ObjectRef objectRef, String str) {
        ScriptSQLExec$.MODULE$.setContextIfNotPresent(mLSQLExecuteContext);
        String format = batchPredict.format(str, batchPredict.format$default$2());
        batchPredict.logInfo(() -> {
            return format;
        });
        objectRef.elem = new StringBuilder(1).append((String) objectRef.elem).append(format).append("\n").toString();
    }

    public static final /* synthetic */ void $anonfun$predict$7(BatchPredict batchPredict, String str) {
        batchPredict.logInfo(() -> {
            return batchPredict.format(str, batchPredict.format$default$2());
        });
    }

    public static final /* synthetic */ Iterator $anonfun$predict$2(BatchPredict batchPredict, MLSQLExecuteContext mLSQLExecuteContext, Map map, String str, Option option, String str2, StructType structType, String str3, String str4, ModelMeta modelMeta, Map map2, Map map3, Map map4, String str5, boolean z, String str6, int i, Iterator iterator) {
        ScriptSQLExec$.MODULE$.setContext(mLSQLExecuteContext);
        MLFlowConfig buildFromSystemParam = MLFlowConfig$.MODULE$.buildFromSystemParam(map);
        PythonConfig buildFromSystemParam2 = PythonConfig$.MODULE$.buildFromSystemParam(map);
        Map $plus$plus = EnvConfig$.MODULE$.buildFromSystemParam(map).$plus$plus(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BasicCondaEnvManager$.MODULE$.MLSQL_INSTNANCE_NAME_KEY()), str)})));
        PythonAlgExecCommand pythonAlgExecCommand = new PythonAlgExecCommand((PythonScript) option.get(), Option$.MODULE$.apply(buildFromSystemParam), Option$.MODULE$.apply(buildFromSystemParam2), $plus$plus);
        Seq<String> generateCommand = pythonAlgExecCommand.generateCommand(MLProject$.MODULE$.batch_predict_command(), pythonAlgExecCommand.generateCommand$default$2());
        LocalPathConfig buildFromParams = LocalPathConfig$.MODULE$.buildFromParams(str2);
        File file = new File(buildFromParams.localDataPath());
        if (file.exists()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxesRunTime.boxToBoolean(file.mkdirs());
        }
        FileWriter fileWriter = new FileWriter(new File(new StringBuilder(6).append(buildFromParams.localDataPath()).append("/").append(i).append(".json").toString()));
        try {
            try {
                WowJsonInferSchema$.MODULE$.toJson(iterator, structType, str3, str7 -> {
                    $anonfun$predict$3(fileWriter, str7);
                    return BoxedUnit.UNIT;
                });
            } catch (Exception e) {
                batchPredict.logError(() -> {
                    return batchPredict.format_exception(e);
                });
            }
            fileWriter.close();
            HashMap hashMap = new HashMap();
            String sb = new StringBuilder(1).append(buildFromParams.localModelPath()).append("/").append(i).toString();
            HDFSOperator$.MODULE$.copyToLocalFile(sb, str4, true);
            String sb2 = new StringBuilder(13).append(buildFromParams.localOutputPath()).append("/output-").append(i).append(".json").toString();
            Map $plus$plus2 = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("tempDataLocalPath"), new StringBuilder(6).append(buildFromParams.localDataPath()).append("/").append(i).append(".json").toString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("tempOutputLocalPath"), sb2), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("tempModelLocalPath"), sb)})).$plus$plus(modelMeta.resources());
            File file2 = new File(buildFromParams.localOutputPath());
            if (file2.exists()) {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            } else {
                BoxesRunTime.boxToBoolean(file2.mkdirs());
            }
            hashMap.put(RunPythonConfig$.MODULE$.internalSystemParam(), JavaConverters$.MODULE$.mapAsJavaMapConverter(map2.$plus$plus($plus$plus2)).asJava());
            hashMap.put(RunPythonConfig$.MODULE$.systemParam(), JavaConverters$.MODULE$.mapAsJavaMapConverter(map).asJava());
            hashMap.put("trainParams", JavaConverters$.MODULE$.mapAsJavaMapConverter(map3).asJava());
            hashMap.put("fitParams", JavaConverters$.MODULE$.mapAsJavaMapConverter(map4).asJava());
            String sb3 = new StringBuilder(1).append(buildFromParams.localRunPath()).append("/").append(str5).toString();
            SQLPythonAlg$.MODULE$.downloadPythonProject(sb3, Option$.MODULE$.apply(((PythonScript) option.get()).filePath()));
            ObjectRef create = ObjectRef.create("");
            try {
                try {
                    new PythonProjectExecuteRunner(sb3, z, $plus$plus, str8 -> {
                        $anonfun$predict$5(batchPredict, mLSQLExecuteContext, create, str8);
                        return BoxedUnit.UNIT;
                    }, PythonProjectExecuteRunner$.MODULE$.$lessinit$greater$default$5(), PythonProjectExecuteRunner$.MODULE$.$lessinit$greater$default$6(), PythonProjectExecuteRunner$.MODULE$.$lessinit$greater$default$7()).run(generateCommand, hashMap, MapType$.MODULE$.apply(StringType$.MODULE$, MapType$.MODULE$.apply(StringType$.MODULE$, StringType$.MODULE$)), ((PythonScript) option.get()).fileContent(), ((PythonScript) option.get()).fileName(), (byte[][]) Array$.MODULE$.apply(Nil$.MODULE$, ClassTag$.MODULE$.apply(ScalaRunTime$.MODULE$.arrayClass(Byte.TYPE)))).foreach(str9 -> {
                        $anonfun$predict$7(batchPredict, str9);
                        return BoxedUnit.UNIT;
                    });
                    HDFSOperator$.MODULE$.copyToHDFS(sb2, str6, false, false);
                } catch (Exception e2) {
                    String format_cause = batchPredict.format_cause(e2);
                    batchPredict.logError(() -> {
                        return format_cause;
                    });
                }
                FileUtils.deleteDirectory(new File(sb));
                FileUtils.deleteDirectory(new File(buildFromParams.localDataPath()));
                FileUtils.deleteDirectory(new File(buildFromParams.localOutputPath()));
                return Nil$.MODULE$.toIterator();
            } catch (Throwable th) {
                FileUtils.deleteDirectory(new File(sb));
                FileUtils.deleteDirectory(new File(buildFromParams.localDataPath()));
                FileUtils.deleteDirectory(new File(buildFromParams.localOutputPath()));
                throw th;
            }
        } catch (Throwable th2) {
            fileWriter.close();
            throw th2;
        }
    }

    public BatchPredict() {
        Logging.$init$(this);
        WowLog.$init$(this);
    }
}
