/*
 * Decompiled with CFR 0.152.
 */
package com.github.mizosoft.methanol.decoder;

import com.github.mizosoft.methanol.BodyDecoder;
import com.github.mizosoft.methanol.decoder.AsyncDecoder;
import com.github.mizosoft.methanol.internal.Utils;
import com.github.mizosoft.methanol.internal.Validate;
import com.github.mizosoft.methanol.internal.flow.AbstractQueueSubscription;
import com.github.mizosoft.methanol.internal.flow.FlowSupport;
import com.github.mizosoft.methanol.internal.flow.Prefetcher;
import com.github.mizosoft.methanol.internal.flow.Upstream;
import java.io.IOException;
import java.net.http.HttpResponse;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

public final class AsyncBodyDecoder<T>
implements BodyDecoder<T> {
    private final AsyncDecoder decoder;
    private final HttpResponse.BodySubscriber<T> downstream;
    private final Executor executor;
    private final boolean isDefaultExecutor;
    private final Upstream upstream = new Upstream();
    private final Prefetcher prefetcher = new Prefetcher();
    private final QueueByteSource source = new QueueByteSource();
    private final StackByteSink sink;
    private @MonotonicNonNull SubscriptionImpl downstreamSubscription;
    private boolean completed;

    public AsyncBodyDecoder(AsyncDecoder decoder, HttpResponse.BodySubscriber<T> downstream) {
        this(decoder, downstream, FlowSupport.SYNC_EXECUTOR, false, Utils.BUFFER_SIZE);
    }

    public AsyncBodyDecoder(AsyncDecoder decoder, HttpResponse.BodySubscriber<T> downstream, Executor executor) {
        this(decoder, downstream, executor, true, Utils.BUFFER_SIZE);
    }

    public AsyncBodyDecoder(AsyncDecoder decoder, HttpResponse.BodySubscriber<T> downstream, Executor executor, int bufferSize) {
        this(decoder, downstream, executor, true, bufferSize);
    }

    private AsyncBodyDecoder(AsyncDecoder decoder, HttpResponse.BodySubscriber<T> downstream, Executor executor, boolean isDefaultExecutor, int bufferSize) {
        this.decoder = Objects.requireNonNull(decoder);
        this.downstream = Objects.requireNonNull(downstream);
        this.executor = Objects.requireNonNull(executor);
        this.isDefaultExecutor = isDefaultExecutor;
        this.sink = new StackByteSink(bufferSize);
        Validate.requireArgument(bufferSize > 0, "Expected a positive buffer size: %d", bufferSize);
    }

    public AsyncDecoder asyncDecoder() {
        return this.decoder;
    }

    @Override
    public String encoding() {
        return this.decoder.encoding();
    }

    @Override
    public Optional<Executor> executor() {
        return this.isDefaultExecutor ? Optional.of(this.executor) : Optional.empty();
    }

    @Override
    public HttpResponse.BodySubscriber<T> downstream() {
        return this.downstream;
    }

    @Override
    public void onSubscribe(Flow.Subscription upstreamSubscription) {
        Objects.requireNonNull(upstreamSubscription);
        if (this.upstream.setOrCancel(upstreamSubscription)) {
            SubscriptionImpl subscription;
            this.downstreamSubscription = subscription = new SubscriptionImpl();
            this.prefetcher.initialize(this.upstream);
            subscription.fireOrKeepAlive();
        }
    }

    @Override
    public void onNext(List<ByteBuffer> buffers) {
        Objects.requireNonNull(buffers);
        if (this.completed) {
            return;
        }
        this.source.push(buffers);
        try {
            this.decoder.decode(this.source, this.sink);
        }
        catch (Throwable t) {
            this.upstream.cancel();
            this.onError(t);
            return;
        }
        this.subscription().submit(this.sink.slice(false));
    }

    @Override
    public void onError(Throwable throwable) {
        Objects.requireNonNull(throwable);
        if (this.completed) {
            return;
        }
        this.completed = true;
        this.upstream.clear();
        this.subscription().fireOrKeepAliveOnError(throwable);
    }

    @Override
    public void onComplete() {
        if (this.completed) {
            return;
        }
        this.completed = true;
        this.upstream.clear();
        try (AsyncDecoder asyncDecoder = this.decoder;){
            this.source.complete();
            this.decoder.decode(this.source, this.sink);
            if (this.source.hasRemaining()) {
                throw new IOException("un-exhausted bytes after final source: " + this.source.remaining());
            }
        }
        catch (IOException e) {
            this.subscription().fireOrKeepAliveOnError(e);
            return;
        }
        List<ByteBuffer> remaining = this.sink.slice(true);
        if (remaining.isEmpty()) {
            this.subscription().complete();
        } else {
            this.subscription().submitAndComplete(remaining);
        }
    }

    private SubscriptionImpl subscription() {
        SubscriptionImpl subscription = this.downstreamSubscription;
        Validate.requireState(subscription != null, "onSubscribe() expected");
        return subscription;
    }

    private final class SubscriptionImpl
    extends AbstractQueueSubscription<List<ByteBuffer>> {
        SubscriptionImpl() {
            super(AsyncBodyDecoder.this.downstream, AsyncBodyDecoder.this.executor);
        }

        @Override
        protected @Nullable List<ByteBuffer> poll() {
            List next;
            while ((next = (List)super.poll()) != null) {
                AsyncBodyDecoder.this.prefetcher.update(AsyncBodyDecoder.this.upstream);
                if (next.isEmpty()) continue;
                return next;
            }
            return null;
        }

        @Override
        protected void submit(List<ByteBuffer> item) {
            super.submit(item);
        }

        @Override
        protected void submitAndComplete(List<ByteBuffer> lastItem) {
            super.submitAndComplete(lastItem);
        }

        @Override
        protected void complete() {
            super.complete();
        }

        @Override
        protected void abort(boolean flowInterrupted) {
            super.abort(flowInterrupted);
            try (AsyncDecoder asyncDecoder = AsyncBodyDecoder.this.decoder;){
                AsyncBodyDecoder.this.upstream.cancel(flowInterrupted);
            }
        }
    }

    private static final class StackByteSink
    implements AsyncDecoder.ByteSink {
        private final List<ByteBuffer> sinkBuffers = new ArrayList<ByteBuffer>();
        private final int bufferSize;

        StackByteSink(int bufferSize) {
            this.bufferSize = bufferSize;
        }

        @Override
        public ByteBuffer currentSink() {
            Buffer last;
            int size = this.sinkBuffers.size();
            Buffer buffer = last = size > 0 ? this.sinkBuffers.get(size - 1) : null;
            if (last == null || !last.hasRemaining()) {
                last = ByteBuffer.allocate(this.bufferSize);
                this.sinkBuffers.add((ByteBuffer)last);
            }
            return last;
        }

        List<ByteBuffer> slice(boolean finished) {
            int size;
            if (this.sinkBuffers.isEmpty()) {
                return List.of();
            }
            int sliceSize = size = this.sinkBuffers.size();
            ByteBuffer last = this.sinkBuffers.get(size - 1);
            if (last.hasRemaining() && (!finished || last.position() == 0)) {
                --sliceSize;
            }
            List<ByteBuffer> slice = this.sinkBuffers.subList(0, sliceSize);
            List<ByteBuffer> snapshot = slice.stream().map(ByteBuffer::asReadOnlyBuffer).collect(Collectors.toUnmodifiableList());
            snapshot.forEach(ByteBuffer::flip);
            slice.clear();
            return snapshot;
        }
    }

    private static final class QueueByteSource
    implements AsyncDecoder.ByteSource {
        private static final ByteBuffer NO_INPUT = ByteBuffer.allocate(0);
        private final Queue<ByteBuffer> sourceBuffers = new ArrayDeque<ByteBuffer>();
        private boolean complete;

        QueueByteSource() {
        }

        @Override
        public ByteBuffer currentSource() {
            ByteBuffer head;
            while ((head = this.sourceBuffers.peek()) != null && !head.hasRemaining()) {
                this.sourceBuffers.remove();
            }
            return head != null ? head : NO_INPUT;
        }

        @Override
        public long remaining() {
            long sum = 0L;
            for (ByteBuffer b : this.sourceBuffers) {
                sum += (long)b.remaining();
            }
            return sum;
        }

        @Override
        public boolean hasRemaining() {
            return this.currentSource() != NO_INPUT;
        }

        @Override
        public boolean finalSource() {
            return this.complete;
        }

        void push(List<ByteBuffer> buffers) {
            this.sourceBuffers.addAll(buffers);
        }

        void complete() {
            this.complete = true;
        }
    }
}

