/*
 * Decompiled with CFR 0.152.
 */
package com.github.mizosoft.methanol.internal.extensions;

import com.github.mizosoft.methanol.internal.Utils;
import com.github.mizosoft.methanol.internal.flow.FlowSupport;
import com.github.mizosoft.methanol.internal.flow.Prefetcher;
import com.github.mizosoft.methanol.internal.flow.Upstream;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.spi.AbstractInterruptibleChannel;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

public final class ByteChannelBodySubscriber
implements HttpResponse.BodySubscriber<ReadableByteChannel> {
    private static final ByteBuffer TOMBSTONE = ByteBuffer.allocate(0);
    private static final List<ByteBuffer> TOMBSTONE_LIST = List.of(TOMBSTONE);
    private static final VarHandle PENDING_EXCEPTION;
    private final Upstream upstream = new Upstream();
    private final Prefetcher prefetcher = new Prefetcher();
    private final Channel channel = new Channel(FlowSupport.prefetch());

    @Override
    public CompletionStage<ReadableByteChannel> getBody() {
        return CompletableFuture.completedFuture(this.channel);
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        Objects.requireNonNull(subscription);
        if (this.upstream.setOrCancel(subscription)) {
            this.prefetcher.initialize(this.upstream);
        }
    }

    @Override
    public void onNext(List<ByteBuffer> item) {
        Objects.requireNonNull(item);
        if (this.upstream.isCancelled()) {
            return;
        }
        if (this.channel.receive(item) == ReceiveResult.OVERFLOWED) {
            this.upstream.cancel();
            this.channel.receiveCompletion(new IllegalStateException("Getting more items than requested"));
        }
    }

    @Override
    public void onError(Throwable throwable) {
        Objects.requireNonNull(throwable);
        this.upstream.clear();
        this.channel.receiveCompletion(throwable);
    }

    @Override
    public void onComplete() {
        this.upstream.clear();
        this.channel.receiveCompletion(null);
    }

    static {
        try {
            PENDING_EXCEPTION = MethodHandles.lookup().findVarHandle(Channel.class, "pendingException", Throwable.class);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new ExceptionInInitializerError(e);
        }
    }

    private final class Channel
    extends AbstractInterruptibleChannel
    implements ReadableByteChannel {
        private final Lock readLock = new ReentrantLock();
        private final Deque<ByteBuffer> polledBuffers = new ArrayDeque<ByteBuffer>();
        private final BlockingQueue<List<ByteBuffer>> readQueue;
        private volatile @MonotonicNonNull Throwable pendingException;

        Channel(int capacity) {
            this.readQueue = new ArrayBlockingQueue<List<ByteBuffer>>(capacity + 1);
        }

        ReceiveResult receive(List<ByteBuffer> buffers) {
            if (!this.isOpen()) {
                return ReceiveResult.CLOSED;
            }
            if (this.readQueue.remainingCapacity() <= 1) {
                return ReceiveResult.OVERFLOWED;
            }
            this.readQueue.add(buffers);
            return ReceiveResult.RECEIVED;
        }

        void receiveCompletion(@Nullable Throwable exception) {
            if (exception != null) {
                if (PENDING_EXCEPTION.compareAndSet(this, null, exception)) {
                    this.readQueue.clear();
                } else {
                    FlowSupport.onDroppedException(exception);
                }
            }
            this.readQueue.add(TOMBSTONE_LIST);
        }

        @Override
        public int read(ByteBuffer dst) throws IOException {
            this.requireOpen();
            this.throwIfPending();
            this.readLock.lock();
            try {
                int n = this.readBytes(dst);
                return n;
            }
            finally {
                this.readLock.unlock();
            }
        }

        private void requireOpen() throws IOException {
            if (!this.isOpen()) {
                throw new ClosedChannelException();
            }
        }

        private void throwIfPending() throws IOException {
            Throwable exception = this.pendingException;
            if (exception != null && exception != ConsumedPendingException.INSTANCE) {
                throw new IOException("Upstream error", exception);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @GuardedBy(value="readLock")
        private int readBytes(ByteBuffer dst) throws IOException {
            int read = 0;
            try {
                this.begin();
                while (dst.hasRemaining() && this.isOpen()) {
                    ByteBuffer next;
                    if (read <= 0) {
                        next = this.takeNext();
                    } else {
                        next = this.pollNext();
                        if (next == null) break;
                    }
                    this.throwIfPending();
                    if (next == TOMBSTONE) {
                        if (read != 0) break;
                        read = -1;
                        break;
                    }
                    read += Utils.copyRemaining(next, dst);
                }
                this.end(read > 0);
            }
            catch (Throwable throwable) {
                this.end(read > 0);
                throw throwable;
            }
            return read;
        }

        @GuardedBy(value="readLock")
        private @Nullable ByteBuffer pollNext() {
            ByteBuffer next;
            while ((next = this.nextPolled()) == null) {
                List buffers = (List)this.readQueue.poll();
                if (buffers == null) {
                    return null;
                }
                this.polledBuffers.addAll(buffers);
                this.updatePrefetcher();
            }
            return next;
        }

        @GuardedBy(value="readLock")
        private ByteBuffer takeNext() throws ClosedByInterruptException {
            ByteBuffer next;
            while ((next = this.nextPolled()) == null) {
                try {
                    List<ByteBuffer> buffers = this.readQueue.take();
                    this.polledBuffers.addAll(buffers);
                    this.updatePrefetcher();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new ClosedByInterruptException();
                }
            }
            return next;
        }

        @GuardedBy(value="readLock")
        private @Nullable ByteBuffer nextPolled() {
            ByteBuffer next;
            while ((next = this.polledBuffers.peek()) != null) {
                if (next.hasRemaining() || next == TOMBSTONE) {
                    return next;
                }
                this.polledBuffers.poll();
            }
            return null;
        }

        @GuardedBy(value="readLock")
        void updatePrefetcher() {
            ByteChannelBodySubscriber.this.prefetcher.update(ByteChannelBodySubscriber.this.upstream);
        }

        @Override
        protected void implCloseChannel() {
            ByteChannelBodySubscriber.this.upstream.cancel();
            this.readQueue.clear();
            Throwable exception = PENDING_EXCEPTION.getAndSet(this, ConsumedPendingException.INSTANCE);
            if (exception != null && exception != ConsumedPendingException.INSTANCE) {
                FlowSupport.onDroppedException(exception);
            }
            this.readQueue.add(TOMBSTONE_LIST);
        }
    }

    private static final class ConsumedPendingException
    extends Exception {
        static final ConsumedPendingException INSTANCE = new ConsumedPendingException();

        private ConsumedPendingException() {
            super("", null, false, false);
        }
    }

    static enum ReceiveResult {
        CLOSED,
        OVERFLOWED,
        RECEIVED;

    }
}

