package group.insyde.statefun.tsukuyomi.core.capture;

import group.insyde.statefun.tsukuyomi.core.capture.Envelope;
import java.util.Base64;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.statefun.sdk.java.Address;
import org.apache.flink.statefun.sdk.java.Context;
import org.apache.flink.statefun.sdk.java.StatefulFunction;
import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
import org.apache.flink.statefun.sdk.java.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:group/insyde/statefun/tsukuyomi/core/capture/MessageCaptureFunction.class */
public class MessageCaptureFunction implements StatefulFunction {
    private static final Logger log = LoggerFactory.getLogger(MessageCaptureFunction.class);
    public static final MessageCaptureFunction INSTANCE = new MessageCaptureFunction();

    public CompletableFuture<Void> apply(Context context, Message message) {
        Address address = (Address) context.caller().orElseThrow(() -> {
            return new IllegalStateException("Caller is missing");
        });
        Address self = context.self();
        log.info("Captured outgoing message {} from function {} to function {}", new Object[]{message.valueTypeName(), address, self});
        context.send(EgressMessageBuilder.forEgress(Egresses.CAPTURED_MESSAGES).withCustomType(Envelope.TYPE, Envelope.builder().from(address.type(), address.id()).toFunction(self.type(), self.id()).data(Envelope.Data.of(message.valueTypeName().asTypeNameString(), Base64.getEncoder().encodeToString(message.rawValue().toByteArray()))).build()).build());
        return context.done();
    }
}
