package com.bigdata.service.ndx.pipeline;

import com.bigdata.relation.accesspath.BlockingBuffer;
import com.bigdata.relation.accesspath.BufferClosedException;
import com.bigdata.relation.accesspath.IAsynchronousIterator;
import com.bigdata.resources.StaleLocatorException;
import com.bigdata.service.ndx.pipeline.AbstractMasterStats;
import com.bigdata.service.ndx.pipeline.AbstractSubtask;
import com.bigdata.util.concurrent.AbstractHaltableProcess;
import java.lang.reflect.Array;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.jena.atlas.json.io.JSWriter;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/bigdata/service/ndx/pipeline/AbstractMasterTask.class */
public abstract class AbstractMasterTask<H extends AbstractMasterStats<L, ? extends AbstractSubtaskStats>, E, S extends AbstractSubtask, L> extends AbstractHaltableProcess implements Callable<H>, IMasterTask<E, H> {
    protected static final transient Logger log;
    protected final BlockingBuffer<E[]> buffer;
    protected final IAsynchronousIterator<E[]> src;
    private final ConcurrentHashMap<L, S> sinks;
    public final H stats;
    protected final long sinkIdleTimeoutNanos;
    protected final long sinkPollTimeoutNanos;
    private static final long offerWarningTimeoutNanos;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final BlockingQueue<E[]> redirectQueue = new LinkedBlockingQueue();
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition subtaskDone = this.lock.newCondition();
    private final BlockingQueue<S> finishedSubtaskQueue = new LinkedBlockingQueue();

    public final int getRedirectQueueSize() {
        return this.redirectQueue.size();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void redirectChunk(E[] eArr) throws InterruptedException {
        this.lock.lockInterruptibly();
        try {
            this.redirectQueue.put(eArr);
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // com.bigdata.service.ndx.pipeline.IMasterTask
    public BlockingBuffer<E[]> getBuffer() {
        return this.buffer;
    }

    public void mapOperationOverSubtasks(SubtaskOp<S> subtaskOp) throws InterruptedException, ExecutionException {
        Iterator<S> it2 = this.sinks.values().iterator();
        while (it2.hasNext()) {
            try {
                subtaskOp.call(it2.next());
            } catch (Exception e) {
                throw new ExecutionException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifySubtaskDone(AbstractSubtask abstractSubtask) throws InterruptedException {
        if (abstractSubtask == null) {
            throw new IllegalArgumentException();
        }
        if (abstractSubtask.buffer.isOpen()) {
            throw new IllegalStateException();
        }
        this.lock.lockInterruptibly();
        try {
            moveSinkToFinishedQueueAtomically(abstractSubtask.locator, abstractSubtask);
            this.subtaskDone.signalAll();
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // com.bigdata.service.ndx.pipeline.IMasterTask
    public H getStats() {
        return this.stats;
    }

    public AbstractMasterTask(H h, BlockingBuffer<E[]> blockingBuffer, long j, long j2) {
        if (h == null) {
            throw new IllegalArgumentException();
        }
        if (blockingBuffer == null) {
            throw new IllegalArgumentException();
        }
        if (j <= 0) {
            throw new IllegalArgumentException();
        }
        if (j2 <= 0) {
            throw new IllegalArgumentException();
        }
        this.stats = h;
        this.buffer = blockingBuffer;
        this.sinkIdleTimeoutNanos = j;
        this.sinkPollTimeoutNanos = j2;
        this.src = blockingBuffer.iterator();
        this.sinks = new ConcurrentHashMap<>();
        h.addMaster(this);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v41, types: [java.lang.Object[]] */
    @Override // java.util.concurrent.Callable
    public H call() throws Exception {
        boolean z = this.sinkIdleTimeoutNanos != 0;
        while (true) {
            try {
                halted();
                drainFutures();
                E[] poll = this.redirectQueue.poll();
                if (poll == null) {
                    if (this.src.hasNext(this.buffer.getChunkTimeout(), TimeUnit.NANOSECONDS)) {
                        poll = (Object[]) this.src.next();
                    } else if (!this.buffer.isOpen() && this.buffer.isEmpty()) {
                        awaitAll();
                        return this.stats;
                    }
                } else if (log.isInfoEnabled()) {
                    log.info("Read chunk from redirectQueue");
                }
                if (poll.length != 0) {
                    synchronized (this.stats) {
                        this.stats.chunksIn.incrementAndGet();
                        this.stats.elementsIn.addAndGet(poll.length);
                    }
                    handleChunk(poll, z);
                }
            } catch (Throwable th) {
                log.error("Cancelling: job=" + this + ", cause=" + th, th);
                try {
                    cancelAll(true);
                } catch (Throwable th2) {
                    log.error(th2);
                }
                throw new RuntimeException(th);
            }
        }
    }

    protected abstract void handleChunk(E[] eArr, boolean z) throws InterruptedException;

    protected boolean nothingPending() {
        return true;
    }

    protected void willShutdown() throws InterruptedException {
    }

    private void awaitAll() throws InterruptedException, ExecutionException {
        if (this.buffer.isOpen()) {
            throw new IllegalStateException();
        }
        willShutdown();
        while (true) {
            halted();
            this.lock.lockInterruptibly();
            try {
                E[] poll = this.redirectQueue.poll();
                if (poll == null) {
                    if (this.finishedSubtaskQueue.isEmpty() && this.sinks.isEmpty() && this.redirectQueue.isEmpty() && nothingPending()) {
                        break;
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("Waiting for " + this.sinks.size() + " subtasks : " + this);
                    }
                    drainFutures();
                    if (!this.finishedSubtaskQueue.isEmpty()) {
                        this.subtaskDone.await(50L, TimeUnit.MILLISECONDS);
                    }
                }
                this.lock.unlock();
                if (poll != null) {
                    handleChunk(poll, true);
                }
            } finally {
                this.lock.unlock();
            }
        }
        if (log.isInfoEnabled()) {
            log.info("All subtasks are done: " + this);
        }
    }

    private void cancelAll(boolean z) throws InterruptedException {
        log.warn("Cancelling job: " + this);
        this.buffer.close();
        this.buffer.clear();
        Iterator<S> it2 = this.sinks.values().iterator();
        while (it2.hasNext()) {
            Future future = it2.next().buffer.getFuture();
            if (!future.isDone()) {
                future.cancel(z);
            }
        }
        for (S s : this.sinks.values()) {
            try {
                s.buffer.getFuture().get();
            } catch (InterruptedException e) {
                throw e;
            } catch (ExecutionException e2) {
                log.warn("sink=" + s + JSWriter.ObjectPairSep + e2);
            }
        }
        this.sinks.clear();
        this.finishedSubtaskQueue.clear();
        this.redirectQueue.clear();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public S getSink(L l, boolean z) throws InterruptedException {
        if (l == null) {
            throw new IllegalArgumentException();
        }
        halted();
        S s = this.sinks.get(l);
        if (s != null && s.buffer.isOpen()) {
            return s;
        }
        this.lock.lockInterruptibly();
        if (z && s != null) {
            try {
                if (!s.buffer.isOpen()) {
                    if (log.isInfoEnabled()) {
                        log.info("Reopening sink (was closed): " + this + ", locator=" + l);
                    }
                    moveSinkToFinishedQueueAtomically(l, s);
                    s = null;
                }
            } finally {
                this.lock.unlock();
            }
        }
        if (s == null) {
            if (log.isInfoEnabled()) {
                log.info("Creating output buffer: " + this + ", locator=" + l);
            }
            BlockingBuffer<E[]> newSubtaskBuffer = newSubtaskBuffer();
            s = newSubtask(l, newSubtaskBuffer);
            S put = this.sinks.put(l, s);
            if (!$assertionsDisabled && put != null) {
                throw new AssertionError("locator=" + l);
            }
            FutureTask<? extends AbstractSubtaskStats> futureTask = new FutureTask<>(s);
            newSubtaskBuffer.setFuture(futureTask);
            submitSubtask(futureTask);
            this.stats.subtaskStartCount.incrementAndGet();
        }
        return s;
    }

    protected abstract BlockingBuffer<E[]> newSubtaskBuffer();

    protected abstract S newSubtask(L l, BlockingBuffer<E[]> blockingBuffer);

    protected abstract void submitSubtask(FutureTask<? extends AbstractSubtaskStats> futureTask);

    /* JADX WARN: Code restructure failed: missing block: B:20:0x005f, code lost:
    
        throw new java.lang.AssertionError();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void drainFutures() throws java.lang.InterruptedException, java.util.concurrent.ExecutionException {
        /*
            r4 = this;
        L0:
            r0 = r4
            r0.halted()
            r0 = r4
            java.util.concurrent.BlockingQueue<S extends com.bigdata.service.ndx.pipeline.AbstractSubtask> r0 = r0.finishedSubtaskQueue
            java.lang.Object r0 = r0.peek()
            com.bigdata.service.ndx.pipeline.AbstractSubtask r0 = (com.bigdata.service.ndx.pipeline.AbstractSubtask) r0
            r5 = r0
            r0 = r5
            if (r0 != 0) goto L16
            return
        L16:
            r0 = r5
            com.bigdata.relation.accesspath.BlockingBuffer<E[]> r0 = r0.buffer
            boolean r0 = r0.isOpen()
            if (r0 == 0) goto L2c
            java.lang.IllegalStateException r0 = new java.lang.IllegalStateException
            r1 = r0
            r2 = r5
            java.lang.String r2 = r2.toString()
            r1.<init>(r2)
            throw r0
        L2c:
            r0 = r5
            com.bigdata.relation.accesspath.BlockingBuffer<E[]> r0 = r0.buffer
            java.util.concurrent.Future r0 = r0.getFuture()
            r6 = r0
            r0 = r5
            com.bigdata.relation.accesspath.BlockingBuffer<E[]> r0 = r0.buffer
            java.util.concurrent.Future r0 = r0.getFuture()
            boolean r0 = r0.isDone()
            if (r0 != 0) goto L44
            return
        L44:
            r0 = r6
            java.lang.Object r0 = r0.get()     // Catch: java.util.concurrent.ExecutionException -> L93 java.lang.Throwable -> L9d
            r0 = r5
            r1 = r4
            java.util.concurrent.BlockingQueue<S extends com.bigdata.service.ndx.pipeline.AbstractSubtask> r1 = r1.finishedSubtaskQueue     // Catch: java.util.concurrent.ExecutionException -> L93 java.lang.Throwable -> L9d
            java.lang.Object r1 = r1.remove()     // Catch: java.util.concurrent.ExecutionException -> L93 java.lang.Throwable -> L9d
            if (r0 == r1) goto L60
            java.lang.AssertionError r0 = new java.lang.AssertionError     // Catch: java.util.concurrent.ExecutionException -> L93 java.lang.Throwable -> L9d
            r1 = r0
            r1.<init>()     // Catch: java.util.concurrent.ExecutionException -> L93 java.lang.Throwable -> L9d
            throw r0     // Catch: java.util.concurrent.ExecutionException -> L93 java.lang.Throwable -> L9d
        L60:
            r0 = r4
            H extends com.bigdata.service.ndx.pipeline.AbstractMasterStats<L, ? extends com.bigdata.service.ndx.pipeline.AbstractSubtaskStats> r0 = r0.stats
            java.util.concurrent.atomic.AtomicLong r0 = r0.subtaskEndCount
            long r0 = r0.incrementAndGet()
            org.apache.log4j.Logger r0 = com.bigdata.service.ndx.pipeline.AbstractMasterTask.log
            boolean r0 = r0.isDebugEnabled()
            if (r0 == 0) goto Ld2
            org.apache.log4j.Logger r0 = com.bigdata.service.ndx.pipeline.AbstractMasterTask.log
            java.lang.StringBuilder r1 = new java.lang.StringBuilder
            r2 = r1
            r2.<init>()
            java.lang.String r2 = "subtaskEndCount incremented: "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r5
            L r2 = r2.locator
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r1 = r1.toString()
            r0.debug(r1)
            goto Ld2
        L93:
            r7 = move-exception
            r0 = r4
            r1 = r7
            java.lang.Throwable r0 = r0.halt(r1)     // Catch: java.lang.Throwable -> L9d
            java.util.concurrent.ExecutionException r0 = (java.util.concurrent.ExecutionException) r0     // Catch: java.lang.Throwable -> L9d
            throw r0     // Catch: java.lang.Throwable -> L9d
        L9d:
            r8 = move-exception
            r0 = r4
            H extends com.bigdata.service.ndx.pipeline.AbstractMasterStats<L, ? extends com.bigdata.service.ndx.pipeline.AbstractSubtaskStats> r0 = r0.stats
            java.util.concurrent.atomic.AtomicLong r0 = r0.subtaskEndCount
            long r0 = r0.incrementAndGet()
            org.apache.log4j.Logger r0 = com.bigdata.service.ndx.pipeline.AbstractMasterTask.log
            boolean r0 = r0.isDebugEnabled()
            if (r0 == 0) goto Lcf
            org.apache.log4j.Logger r0 = com.bigdata.service.ndx.pipeline.AbstractMasterTask.log
            java.lang.StringBuilder r1 = new java.lang.StringBuilder
            r2 = r1
            r2.<init>()
            java.lang.String r2 = "subtaskEndCount incremented: "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r5
            L r2 = r2.locator
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r1 = r1.toString()
            r0.debug(r1)
        Lcf:
            r0 = r8
            throw r0
        Ld2:
            goto L0
        */
        throw new UnsupportedOperationException("Method not decompiled: com.bigdata.service.ndx.pipeline.AbstractMasterTask.drainFutures():void");
    }

    protected void moveSinkToFinishedQueueAtomically(L l, AbstractSubtask abstractSubtask) throws InterruptedException {
        if (l == null) {
            throw new IllegalArgumentException();
        }
        if (abstractSubtask == null) {
            throw new IllegalArgumentException();
        }
        this.lock.lockInterruptibly();
        try {
            this.finishedSubtaskQueue.put(abstractSubtask);
            if (this.sinks.remove(l, abstractSubtask) && log.isDebugEnabled()) {
                log.debug("Removed output buffer: " + l);
            }
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public void addToOutputBuffer(L l, E[] eArr, int i, int i2, boolean z) throws InterruptedException {
        int i3 = i2 - i;
        if (i3 == 0) {
            return;
        }
        Object[] objArr = (Object[]) Array.newInstance(eArr.getClass().getComponentType(), i3);
        System.arraycopy(eArr, i, objArr, 0, i3);
        long nanoTime = System.nanoTime();
        boolean z2 = false;
        while (!z2) {
            halted();
            AbstractSubtask sink = getSink(l, z);
            try {
                z2 = sink.buffer.add(objArr, offerWarningTimeoutNanos, TimeUnit.NANOSECONDS);
                long nanoTime2 = System.nanoTime();
                if (z2) {
                    sink.lastChunkNanos = nanoTime2;
                } else {
                    log.warn("Sink is slow: elapsed=" + TimeUnit.NANOSECONDS.toMillis(nanoTime2 - nanoTime) + "ms, sink=" + sink);
                }
            } catch (BufferClosedException e) {
                if (e.getCause() instanceof StaleLocatorException) {
                    if (log.isInfoEnabled()) {
                        log.info("Sink closed asynchronously by stale locator exception: " + sink);
                    }
                    redirectChunk(objArr);
                    z2 = true;
                } else {
                    if (!(e.getCause() instanceof IdleTimeoutException) && !(e.getCause() instanceof MasterExhaustedException)) {
                        throw e;
                    }
                    if (log.isInfoEnabled()) {
                        log.info("Sink closed asynchronously: cause=" + e.getCause() + ", sink=" + sink);
                    }
                    z = true;
                }
            }
        }
        synchronized (this.stats) {
            this.stats.chunksTransferred.incrementAndGet();
            this.stats.elementsTransferred.addAndGet(objArr.length);
            this.stats.elementsOnSinkQueues.addAndGet(objArr.length);
            this.stats.elapsedSinkOfferNanos += System.nanoTime() - nanoTime;
        }
    }

    static {
        $assertionsDisabled = !AbstractMasterTask.class.desiredAssertionStatus();
        log = Logger.getLogger((Class<?>) AbstractMasterTask.class);
        offerWarningTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(5000L);
    }
}
