/*
 * Decompiled with CFR 0.152.
 */
package io.basestar.stream;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.basestar.auth.Caller;
import io.basestar.expression.Context;
import io.basestar.expression.Expression;
import io.basestar.expression.constant.Constant;
import io.basestar.expression.constant.NameConstant;
import io.basestar.expression.function.In;
import io.basestar.expression.logical.Or;
import io.basestar.schema.Consistency;
import io.basestar.schema.Index;
import io.basestar.schema.Instance;
import io.basestar.schema.Namespace;
import io.basestar.schema.ObjectSchema;
import io.basestar.schema.Property;
import io.basestar.schema.Schema;
import io.basestar.schema.use.Use;
import io.basestar.schema.use.UseArray;
import io.basestar.schema.use.UseBinary;
import io.basestar.schema.use.UseString;
import io.basestar.storage.Storage;
import io.basestar.storage.Versioning;
import io.basestar.stream.Subscription;
import io.basestar.stream.SubscriptionInfo;
import io.basestar.stream.Subscriptions;
import io.basestar.util.Name;
import io.basestar.util.Page;
import io.basestar.util.Pager;
import io.basestar.util.Sort;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class StorageSubscriptions
implements Subscriptions {
    private static final int UNSUBSCRIBE_PAGE_SIZE = 50;
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final String SCHEMA_NAME = "Subscription";
    private static final Namespace NAMESPACE;
    private static final ObjectSchema SCHEMA;
    private final Storage storage;

    public StorageSubscriptions(Storage storage) {
        this.storage = storage;
        storage.validate(SCHEMA);
    }

    @Override
    public CompletableFuture<?> subscribe(Caller caller, String sub, String channel, Set<Subscription.Key> keys, Expression expression, SubscriptionInfo info) {
        try {
            Instant now = Instant.now();
            HashMap<String, Object> object = new HashMap<String, Object>();
            String id = StorageSubscriptions.id(sub, channel);
            Instance.setSchema(object, (Name)Name.of((String[])new String[]{SCHEMA_NAME}));
            Instance.setId(object, (String)id);
            Instance.setCreated(object, (Instant)now);
            Instance.setUpdated(object, (Instant)now);
            Instance.setVersion(object, (Long)1L);
            object.put("sub", sub);
            object.put("channel", channel);
            object.put("caller", OBJECT_MAPPER.writeValueAsString((Object)caller));
            object.put("expression", expression.toString());
            object.put("info", info == null ? null : OBJECT_MAPPER.writeValueAsString((Object)info));
            object.put("keys", keys.stream().map(StorageSubscriptions::binaryKey).collect(Collectors.toList()));
            Instance instance = (Instance)SCHEMA.create(object);
            Storage.WriteTransaction write = this.storage.write(Consistency.ATOMIC, Versioning.CHECKED).createObject(SCHEMA, id, (Map)instance);
            return write.write();
        }
        catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    private static byte[] binaryKey(Subscription.Key key) {
        ArrayList<Object> keys = new ArrayList<Object>();
        keys.add(key.getSchema().toString());
        keys.add(key.getIndex());
        keys.addAll(key.getPartition());
        return UseBinary.binaryKey(keys);
    }

    private static Expression keyExpression(Subscription.Key key) {
        return new In((Expression)new Constant((Object)StorageSubscriptions.binaryKey(key)), (Expression)new NameConstant(Name.of((String[])new String[]{"keys"})));
    }

    private static String id(String sub, String channel) {
        return sub + "_" + channel;
    }

    private static List<Sort> sort() {
        return ImmutableList.of((Object)Sort.asc((Name)ObjectSchema.ID_NAME));
    }

    @Override
    public List<Pager.Source<Subscription>> query(Set<Subscription.Key> keys) {
        Or expression = new Or((Expression[])keys.stream().map(StorageSubscriptions::keyExpression).toArray(Expression[]::new));
        return Pager.map((List)this.storage.query(SCHEMA, (Expression)expression, StorageSubscriptions.sort(), Collections.emptySet()), this::fromMap);
    }

    private Subscription fromMap(Map<String, Object> object) {
        try {
            Subscription subscription = new Subscription();
            subscription.setSub((String)object.get("sub"));
            subscription.setChannel((String)object.get("channel"));
            subscription.setExpression(Expression.parse((String)((String)object.get("expression"))));
            subscription.setCaller((Caller)OBJECT_MAPPER.readValue((String)object.get("caller"), Caller.class));
            String info = (String)object.get("info");
            if (info != null) {
                subscription.setInfo((SubscriptionInfo)OBJECT_MAPPER.readValue(info, SubscriptionInfo.class));
            }
            return subscription;
        }
        catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    @Override
    public CompletableFuture<?> unsubscribe(String sub, String channel) {
        String id = StorageSubscriptions.id(sub, channel);
        return this.storage.readObject(SCHEMA, id, Collections.emptySet()).thenCompose(before -> {
            Storage.WriteTransaction write = this.storage.write(Consistency.ATOMIC, Versioning.CHECKED);
            write.deleteObject(SCHEMA, StorageSubscriptions.id(sub, channel), before);
            return write.write();
        });
    }

    @Override
    public CompletableFuture<?> unsubscribeAll(String sub) {
        Context context = Context.init((Map)ImmutableMap.of((Object)"s", (Object)sub));
        Expression expression = Expression.parseAndBind((Context)context, (String)"sub == s");
        return this.unsubscribeAll(this.storage.query(SCHEMA, expression, StorageSubscriptions.sort(), Collections.emptySet()), null);
    }

    private CompletableFuture<?> unsubscribeAll(List<Pager.Source<Map<String, Object>>> sources, Page.Token token) {
        List<Sort> sort = StorageSubscriptions.sort();
        Comparator comparator = Instance.comparator(sort);
        return new Pager(comparator, sources, token).page(50).thenCompose(page -> {
            Storage.WriteTransaction write = this.storage.write(Consistency.NONE, Versioning.CHECKED);
            page.forEach(object -> write.deleteObject(SCHEMA, Instance.getId((Map)object), object));
            if (page.hasMore()) {
                return ((CompletableFuture)write.write().thenCompose(ignored -> this.unsubscribeAll(sources, page.getPaging()))).thenApply(ignored -> null);
            }
            return write.write();
        });
    }

    static {
        Index.Builder subIndex = Index.builder().setConsistency(Consistency.ATOMIC).setPartition((List)ImmutableList.of((Object)Name.of((String[])new String[]{"sub"}))).setSort((List)ImmutableList.of((Object)Sort.asc((Name)Name.of((String[])new String[]{"channel"}))));
        Index.Builder keysIndex = Index.builder().setConsistency(Consistency.ATOMIC).setOver((Map)ImmutableMap.of((Object)"key", (Object)Name.of((String[])new String[]{"keys"}))).setPartition((List)ImmutableList.of((Object)Name.of((String[])new String[]{"key"})));
        ObjectSchema.Builder schema = ObjectSchema.builder().setProperty("sub", (Property.Descriptor)Property.builder().setType((Use)UseString.DEFAULT)).setProperty("channel", (Property.Descriptor)Property.builder().setType((Use)UseString.DEFAULT)).setProperty("caller", (Property.Descriptor)Property.builder().setType((Use)UseString.DEFAULT)).setProperty("expression", (Property.Descriptor)Property.builder().setType((Use)UseString.DEFAULT)).setProperty("info", (Property.Descriptor)Property.builder().setType((Use)UseString.DEFAULT)).setProperty("keys", (Property.Descriptor)Property.builder().setType((Use)new UseArray((Use)UseBinary.DEFAULT))).setIndex("sub", (Index.Descriptor)subIndex).setIndex("keys", (Index.Descriptor)keysIndex);
        NAMESPACE = Namespace.builder().setSchema(Name.of((String[])new String[]{SCHEMA_NAME}), (Schema.Descriptor)schema).build();
        SCHEMA = NAMESPACE.requireObjectSchema(SCHEMA_NAME);
    }
}

