package com.bigdata.ha.pipeline;

import com.bigdata.ha.msg.HAMessageWrapper;
import com.bigdata.ha.msg.HASendState;
import com.bigdata.ha.msg.IHAWriteMessageBase;
import com.bigdata.util.BytesUtil;
import com.bigdata.util.ChecksumError;
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.Adler32;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/bigdata/ha/pipeline/HAReceiveService.class */
public class HAReceiveService<M extends HAMessageWrapper> extends Thread {
    private static final Logger log;
    private static final long selectorTimeout = 500;
    private static final long logTimeout = 10000;
    private final InetSocketAddress addrSelf;
    private final IHAReceiveCallback<M> callback;
    private final HASendService sendService;
    private final ExecutorService executor;
    private final Lock lock;
    private final Condition futureReady;
    private final Condition messageReady;
    private RunState runState;
    private M message;
    private ByteBuffer localBuffer;
    private FutureTask<Void> readFuture;
    private FutureTask<Void> waitFuture;
    private final AtomicReference<InetSocketAddress> addrNextRef;
    private final byte[] heapBuffer;
    private final AtomicReference<Client> clientRef;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bigdata/ha/pipeline/HAReceiveService$Client.class */
    public static class Client {
        private final SocketChannel client;
        private final Selector clientSelector;
        private final SelectionKey clientKey;
        private final AtomicReference<Throwable> firstCause = new AtomicReference<>();

        public Client(ServerSocketChannel serverSocketChannel) throws IOException {
            try {
                this.client = serverSocketChannel.accept();
                this.client.configureBlocking(false);
                if (!this.client.finishConnect()) {
                    throw new IOException("Upstream client not connected");
                }
                this.clientSelector = Selector.open();
                this.clientKey = this.client.register(this.clientSelector, 1);
                if (HAReceiveService.log.isInfoEnabled()) {
                    HAReceiveService.log.info("Accepted new connection");
                }
            } catch (IOException e) {
                close();
                throw e;
            }
        }

        public String toString() {
            Socket socket = this.client.socket();
            return super.toString() + "{client.isOpen()=" + this.client.isOpen() + ",client.isConnected()=" + this.client.isConnected() + ",socket.isInputShutdown()=" + (socket == null ? "N/A" : Boolean.valueOf(socket.isInputShutdown())) + ",clientSelector.isOpen=" + this.clientSelector.isOpen() + "}";
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close() throws IOException {
            if (HAReceiveService.log.isInfoEnabled()) {
                HAReceiveService.log.info("Closing client connection: " + this);
            }
            this.clientKey.cancel();
            try {
                this.client.close();
                this.clientSelector.close();
            } catch (Throwable th) {
                this.clientSelector.close();
                throw th;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int read(ByteBuffer byteBuffer) throws IOException {
            int read = this.client.read(byteBuffer);
            if (read == -1) {
                close();
            }
            return read;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void checkFirstCause() throws RuntimeException {
            Throwable andSet = this.firstCause.getAndSet(null);
            if (andSet != null) {
                try {
                    close();
                } catch (IOException e) {
                    HAReceiveService.log.warn(e, e);
                }
                throw new RuntimeException(andSet);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bigdata/ha/pipeline/HAReceiveService$DrainToMarkerUtil.class */
    public static class DrainToMarkerUtil {
        private final byte[] marker;
        private final byte[] markerBuffer;
        private final ByteBuffer markerBB;
        private final Client client;
        private boolean foundMarkerInInitialPosition = true;
        private int markerIndex = 0;
        private int nreads = 0;
        private int nmarkerbytematches = 0;
        private long bytesRead = 0;

        DrainToMarkerUtil(byte[] bArr, Client client) {
            this.marker = bArr;
            this.markerBuffer = bArr == null ? null : new byte[bArr.length];
            this.markerBB = bArr == null ? null : ByteBuffer.wrap(this.markerBuffer);
            this.client = client;
            if (HAReceiveService.log.isDebugEnabled()) {
                HAReceiveService.log.debug("Receive token: " + BytesUtil.toHexString(bArr));
            }
        }

        boolean findMarker() throws IOException {
            if (this.markerIndex == this.marker.length) {
                return true;
            }
            if (HAReceiveService.log.isDebugEnabled()) {
                HAReceiveService.log.debug("Looking for token, " + BytesUtil.toHexString(this.marker) + ", reads: " + this.nreads);
            }
            while (this.markerIndex < this.marker.length) {
                this.markerBB.limit(this.marker.length - this.markerIndex);
                this.markerBB.position(0);
                int read = this.client.read(this.markerBB);
                if (read == -1) {
                    throw new IOException("EOF: nreads=" + this.nreads + ", bytesRead=" + this.bytesRead);
                }
                this.nreads++;
                this.bytesRead += read;
                for (int i = 0; i < read; i++) {
                    if (this.markerBuffer[i] != this.marker[this.markerIndex]) {
                        if (this.foundMarkerInInitialPosition) {
                            this.foundMarkerInInitialPosition = false;
                            HAReceiveService.log.error("Marker not found: skipping");
                        }
                        this.markerIndex = 0;
                        if (this.markerBuffer[i] == this.marker[this.markerIndex]) {
                            this.markerIndex++;
                        }
                    } else {
                        this.markerIndex++;
                        this.nmarkerbytematches++;
                    }
                }
                if (this.nreads % 10000 == 0 && HAReceiveService.log.isDebugEnabled()) {
                    HAReceiveService.log.debug("...still looking: reads=" + this.nreads + ", bytesRead=" + this.bytesRead);
                }
            }
            if (this.markerIndex != this.marker.length) {
                if (!HAReceiveService.log.isDebugEnabled()) {
                    return false;
                }
                HAReceiveService.log.debug("Not found token yet!");
                return false;
            }
            if (!HAReceiveService.log.isDebugEnabled()) {
                return true;
            }
            HAReceiveService.log.debug("Found token after " + this.nreads + " token reads and " + this.nmarkerbytematches + " byte matches");
            return true;
        }
    }

    /* loaded from: input_file:com/bigdata/ha/pipeline/HAReceiveService$IHAReceiveCallback.class */
    public interface IHAReceiveCallback<M extends IHAWriteMessageBase> {
        void incReceive(M m, int i, int i2, int i3) throws Exception;

        void callback(M m, ByteBuffer byteBuffer) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bigdata/ha/pipeline/HAReceiveService$ReadTask.class */
    public static class ReadTask<M extends HAMessageWrapper> implements Callable<Void> {
        private final ServerSocketChannel server;
        private final AtomicReference<Client> clientRef;
        private final M message;
        private final ByteBuffer localBuffer;
        private final HASendService sendService;
        private final AtomicReference<InetSocketAddress> addrNextRef;
        private final IHAReceiveCallback<M> callback;
        private final Adler32 chk = new Adler32();
        private final byte[] heapBuffer;

        public ReadTask(ServerSocketChannel serverSocketChannel, AtomicReference<Client> atomicReference, M m, ByteBuffer byteBuffer, byte[] bArr, HASendService hASendService, AtomicReference<InetSocketAddress> atomicReference2, IHAReceiveCallback<M> iHAReceiveCallback) {
            if (serverSocketChannel == null) {
                throw new IllegalArgumentException();
            }
            if (atomicReference == null) {
                throw new IllegalArgumentException();
            }
            if (m == null) {
                throw new IllegalArgumentException();
            }
            if (bArr == null) {
                throw new IllegalArgumentException();
            }
            if (byteBuffer == null) {
                throw new IllegalArgumentException();
            }
            if (hASendService == null) {
                throw new IllegalArgumentException();
            }
            this.server = serverSocketChannel;
            this.clientRef = atomicReference;
            this.message = m;
            this.localBuffer = byteBuffer;
            this.heapBuffer = bArr;
            this.sendService = hASendService;
            this.addrNextRef = atomicReference2;
            this.callback = iHAReceiveCallback;
        }

        /* JADX WARN: Finally extract failed */
        protected void awaitAccept() throws IOException {
            Selector open = Selector.open();
            try {
                SelectionKey register = this.server.register(open, 16);
                try {
                    open.select();
                    Iterator<SelectionKey> it2 = open.selectedKeys().iterator();
                    if (it2.hasNext()) {
                        SelectionKey next = it2.next();
                        it2.remove();
                        if (next != register) {
                            throw new AssertionError();
                        }
                    }
                    register.cancel();
                } catch (Throwable th) {
                    register.cancel();
                    throw th;
                }
            } finally {
                open.close();
            }
        }

        private void updateChk(int i) {
            ByteBuffer asReadOnlyBuffer = this.localBuffer.asReadOnlyBuffer();
            int position = asReadOnlyBuffer.position();
            asReadOnlyBuffer.position(position - i);
            int i2 = position - i;
            while (true) {
                int i3 = i2;
                if (i3 >= position) {
                    return;
                }
                int min = Math.min(position - i3, this.heapBuffer.length);
                asReadOnlyBuffer.get(this.heapBuffer, 0, min);
                this.chk.update(this.heapBuffer, 0, min);
                i2 = i3 + this.heapBuffer.length;
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            try {
                return doInnerCall();
            } catch (Throwable th) {
                HAReceiveService.log.error("client=" + this.clientRef.get() + ", msg=" + this.message + ", marker=" + HASendState.decode(this.message.getHASendState().getMarker()) + ", cause=" + th, th);
                if (th instanceof Exception) {
                    throw ((Exception) th);
                }
                if (th instanceof RuntimeException) {
                    throw ((RuntimeException) th);
                }
                throw new RuntimeException(th);
            }
        }

        private Void doInnerCall() throws Exception {
            Client client = this.clientRef.get();
            if (client == null || !client.client.isOpen() || !client.clientSelector.isOpen()) {
                HAReceiveService.log.warn("Re-opening upstream client connection");
                Client andSet = this.clientRef.getAndSet(null);
                if (andSet != null) {
                    andSet.close();
                }
                awaitAccept();
                client = new Client(this.server);
                this.clientRef.set(client);
            }
            doReceiveAndReplicate(client);
            return null;
        }

        private void doReceiveAndReplicate(Client client) throws Exception {
            long currentTimeMillis = System.currentTimeMillis();
            int size = this.message.getSize();
            boolean z = false;
            int i = 0;
            DrainToMarkerUtil drainToMarkerUtil = this.message.getHASendState() != null ? new DrainToMarkerUtil(this.message.getHASendState().getMarker(), client) : null;
            while (size > 0 && !z) {
                int select = client.clientSelector.select(500L);
                client.checkFirstCause();
                if (select == 0) {
                    long currentTimeMillis2 = System.currentTimeMillis();
                    if (currentTimeMillis2 - currentTimeMillis > 10000) {
                        HAReceiveService.log.warn("Blocked: awaiting " + size + " out of " + this.message.getSize() + " bytes.");
                        currentTimeMillis = currentTimeMillis2;
                    }
                    if (!client.client.isOpen() || !client.clientSelector.isOpen()) {
                        throw new AsynchronousCloseException();
                    }
                } else {
                    Iterator<SelectionKey> it2 = client.clientSelector.selectedKeys().iterator();
                    while (true) {
                        if (it2.hasNext()) {
                            client.checkFirstCause();
                            it2.next();
                            it2.remove();
                            if (drainToMarkerUtil.findMarker()) {
                                int read = client.read(this.localBuffer);
                                if (HAReceiveService.log.isTraceEnabled()) {
                                    HAReceiveService.log.trace("Read " + read + " bytes with " + (read > 0 ? size - read : size) + " bytes remaining.");
                                }
                                if (read > 0) {
                                    i++;
                                    updateChk(read);
                                }
                                if (read == -1) {
                                    z = true;
                                    break;
                                }
                                size -= read;
                                if (this.callback != null) {
                                    this.callback.incReceive(this.message, i, read, size);
                                }
                                forwardReceivedBytes(client, read);
                            }
                        }
                    }
                }
            }
            if (this.localBuffer.position() != this.message.getSize()) {
                throw new IOException("Receive length error: rem=" + size + ", EOS=" + z + ", localBuffer.pos=" + this.localBuffer.position() + ", message.size=" + this.message.getSize());
            }
            this.localBuffer.flip();
            if (HAReceiveService.log.isTraceEnabled()) {
                HAReceiveService.log.trace("Prior check checksum: " + this.chk.getValue() + " for position: " + this.localBuffer.position() + ", limit: " + this.localBuffer.limit() + ", number of reads: " + i + ", buffer: " + this.localBuffer);
            }
            if (this.message.getChk() != ((int) this.chk.getValue())) {
                throw new ChecksumError("msg=" + this.message.toString() + ", actual=" + ((int) this.chk.getValue()));
            }
            client.checkFirstCause();
            if (this.callback != null) {
                this.callback.callback(this.message, this.localBuffer);
            }
        }

        private void forwardReceivedBytes(Client client, int i) throws InterruptedException, ExecutionException, ImmediateDownstreamReplicationException {
            while (i != 0 && this.addrNextRef.get() != null) {
                if (HAReceiveService.log.isTraceEnabled()) {
                    HAReceiveService.log.trace("Incremental send of " + i + " bytes");
                }
                ByteBuffer asReadOnlyBuffer = this.localBuffer.asReadOnlyBuffer();
                asReadOnlyBuffer.position(this.localBuffer.position() - i);
                asReadOnlyBuffer.limit(this.localBuffer.position());
                synchronized (this.sendService) {
                    if (!this.sendService.isRunning()) {
                        client.checkFirstCause();
                        this.sendService.start(this.addrNextRef.get());
                    }
                }
                client.checkFirstCause();
                this.sendService.send(asReadOnlyBuffer, (asReadOnlyBuffer.position() != 0 || this.message.getHASendState() == null) ? null : this.message.getHASendState().getMarker()).get();
                return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bigdata/ha/pipeline/HAReceiveService$RunState.class */
    public enum RunState {
        Start(0),
        Running(1),
        ShuttingDown(2),
        Shutdown(3);

        private final int level;

        RunState(int i) {
            this.level = i;
        }
    }

    public HASendService getSendService() {
        return this.sendService;
    }

    @Override // java.lang.Thread
    public String toString() {
        return super.toString() + "{addrSelf=" + this.addrSelf + ", addrNext=" + this.addrNextRef.get() + "}";
    }

    public InetSocketAddress getAddrSelf() {
        return this.addrSelf;
    }

    public InetSocketAddress getAddrNext() {
        return this.addrNextRef.get();
    }

    public HAReceiveService(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2) {
        this(inetSocketAddress, inetSocketAddress2, null);
    }

    public HAReceiveService(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2, IHAReceiveCallback<M> iHAReceiveCallback) {
        this.executor = Executors.newSingleThreadExecutor();
        this.lock = new ReentrantLock();
        this.futureReady = this.lock.newCondition();
        this.messageReady = this.lock.newCondition();
        this.runState = RunState.Start;
        this.heapBuffer = new byte[512];
        this.clientRef = new AtomicReference<>(null);
        if (inetSocketAddress == null) {
            throw new IllegalArgumentException();
        }
        this.addrSelf = inetSocketAddress;
        this.addrNextRef = new AtomicReference<>(inetSocketAddress2);
        this.callback = iHAReceiveCallback;
        this.sendService = new HASendService();
        setDaemon(true);
        setName(HAReceiveService.class.getName() + "@" + hashCode() + "{addrSelf=" + inetSocketAddress + "}");
        if (log.isInfoEnabled()) {
            log.info("Created: " + this);
        }
    }

    protected void finalize() throws Throwable {
        terminate();
        super.finalize();
    }

    public void terminate() {
        this.lock.lock();
        try {
            switch (this.runState) {
                case ShuttingDown:
                case Shutdown:
                    return;
                default:
                    this.runState = RunState.ShuttingDown;
                    interrupt();
                    this.lock.unlock();
                    if (this.sendService != null) {
                        this.sendService.terminate();
                    }
                    this.executor.shutdownNow();
                    return;
            }
        } finally {
            this.lock.unlock();
        }
        this.lock.unlock();
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:4:0x0014. Please report as an issue. */
    public void awaitShutdown() throws InterruptedException {
        this.lock.lockInterruptibly();
        while (true) {
            try {
                switch (this.runState) {
                    case ShuttingDown:
                    case Start:
                    case Running:
                        this.futureReady.await();
                    case Shutdown:
                        return;
                    default:
                        throw new AssertionError();
                }
            } finally {
                this.lock.unlock();
            }
        }
    }

    @Override // java.lang.Thread
    public void start() {
        super.start();
        this.lock.lock();
        while (this.runState == RunState.Start) {
            try {
                try {
                    this.futureReady.await();
                } catch (InterruptedException e) {
                }
            } finally {
                this.lock.unlock();
            }
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.lock.lock();
        try {
            this.runState = RunState.Running;
            this.futureReady.signalAll();
            this.messageReady.signalAll();
            this.lock.unlock();
            ServerSocketChannel serverSocketChannel = null;
            try {
                try {
                    serverSocketChannel = ServerSocketChannel.open();
                    boolean z = false;
                    for (int i = 0; i < 3; i++) {
                        try {
                            serverSocketChannel.socket().bind(this.addrSelf);
                            z = true;
                            break;
                        } catch (BindException e) {
                            log.warn("Sleeping to retry: " + e);
                            Thread.sleep(100L);
                        }
                    }
                    if (!z) {
                        serverSocketChannel.socket().bind(this.addrSelf);
                    }
                    serverSocketChannel.configureBlocking(false);
                    if (log.isInfoEnabled()) {
                        log.info("Listening on: " + this.addrSelf);
                    }
                    runNoBlock(serverSocketChannel);
                    if (serverSocketChannel != null) {
                        try {
                            serverSocketChannel.close();
                        } catch (IOException e2) {
                            log.error(e2, e2);
                        }
                    }
                    this.lock.lock();
                    try {
                        this.runState = RunState.Shutdown;
                        this.messageReady.signalAll();
                        this.futureReady.signalAll();
                        this.lock.unlock();
                    } finally {
                    }
                } catch (Throwable th) {
                    if (serverSocketChannel != null) {
                        try {
                            serverSocketChannel.close();
                        } catch (IOException e3) {
                            log.error(e3, e3);
                        }
                    }
                    this.lock.lock();
                    try {
                        this.runState = RunState.Shutdown;
                        this.messageReady.signalAll();
                        this.futureReady.signalAll();
                        this.lock.unlock();
                        throw th;
                    } finally {
                    }
                }
            } catch (InterruptedException e4) {
                log.info("Shutdown");
                if (serverSocketChannel != null) {
                    try {
                        serverSocketChannel.close();
                    } catch (IOException e5) {
                        log.error(e5, e5);
                    }
                }
                this.lock.lock();
                try {
                    this.runState = RunState.Shutdown;
                    this.messageReady.signalAll();
                    this.futureReady.signalAll();
                    this.lock.unlock();
                } finally {
                    this.lock.unlock();
                }
            } catch (Throwable th2) {
                log.error(th2, th2);
                throw new RuntimeException(th2);
            }
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:7:0x001b. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:54:0x0142  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void runNoBlock(java.nio.channels.ServerSocketChannel r16) throws java.io.IOException, java.lang.InterruptedException, java.util.concurrent.ExecutionException {
        /*
            Method dump skipped, instructions count: 330
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.bigdata.ha.pipeline.HAReceiveService.runNoBlock(java.nio.channels.ServerSocketChannel):void");
    }

    public Future<Void> receiveData(M m, ByteBuffer byteBuffer) throws InterruptedException {
        if (m == null) {
            throw new IllegalArgumentException();
        }
        if (byteBuffer == null) {
            throw new IllegalArgumentException();
        }
        this.lock.lockInterruptibly();
        try {
            if (!$assertionsDisabled && this.message != null) {
                throw new AssertionError();
            }
            this.message = m;
            this.localBuffer = byteBuffer;
            this.localBuffer.limit(this.message.getSize());
            this.localBuffer.position(0);
            this.messageReady.signalAll();
            if (log.isTraceEnabled()) {
                log.trace("Will accept data for message: msg=" + m);
            }
            while (this.waitFuture == null) {
                switch (this.runState) {
                    case ShuttingDown:
                    case Shutdown:
                        throw new RuntimeException("Service closed.");
                    case Start:
                    case Running:
                        this.futureReady.await();
                    default:
                        throw new AssertionError();
                }
            }
            if (!$assertionsDisabled && this.waitFuture == null) {
                throw new AssertionError();
            }
            FutureTask<Void> futureTask = this.waitFuture;
            this.waitFuture = null;
            this.lock.unlock();
            return futureTask;
        } catch (Throwable th) {
            this.waitFuture = null;
            this.lock.unlock();
            throw th;
        }
    }

    public void changeDownStream(InetSocketAddress inetSocketAddress) {
        this.lock.lock();
        try {
            if (log.isInfoEnabled()) {
                log.info("addrNext(old)=" + this.addrNextRef.get() + ", addrNext(new)=" + inetSocketAddress + ", readFuture=" + this.readFuture);
            }
            Client client = this.clientRef.get();
            if (client != null && this.readFuture != null) {
                client.firstCause.set(new PipelineDownstreamChange());
            }
            synchronized (this.sendService) {
                if (this.sendService.isRunning()) {
                    this.sendService.terminate();
                }
                this.addrNextRef.set(inetSocketAddress);
            }
        } finally {
            this.lock.unlock();
        }
    }

    public void changeUpStream() {
        this.lock.lock();
        try {
            if (log.isInfoEnabled()) {
                log.info("");
            }
            Client andSet = this.clientRef.getAndSet(null);
            if (andSet != null) {
                log.warn("Cleared Client reference.");
            }
            if (andSet != null && this.readFuture != null) {
                andSet.firstCause.set(new PipelineUpstreamChange());
            }
            if (andSet != null) {
                if (log.isInfoEnabled()) {
                    log.info("Closing client connection");
                }
                try {
                    andSet.client.close();
                } catch (IOException e) {
                    log.warn(e, e);
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

    static {
        $assertionsDisabled = !HAReceiveService.class.desiredAssertionStatus();
        log = Logger.getLogger((Class<?>) HAReceiveService.class);
    }
}
