/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.LongPredicate;
import java.util.function.Supplier;
import org.neo4j.helpers.FutureAdapter;
import org.neo4j.unsafe.impl.batchimport.Parallelizable;
import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;
import org.neo4j.unsafe.impl.batchimport.staging.Processing;

public class TicketedProcessing<FROM, STATE, TO>
implements Parallelizable {
    private static final ParkStrategy park = new ParkStrategy.Park(10L, TimeUnit.MILLISECONDS);
    private final TaskExecutor<STATE> executor;
    private final BiFunction<FROM, STATE, TO> processor;
    private final ArrayBlockingQueue<TO> processed;
    private final AtomicLong submittedTicket = new AtomicLong(-1L);
    private final AtomicLong processedTicket = new AtomicLong(-1L);
    private final LongPredicate myTurnToAddToProcessedQueue = new LongPredicate(){

        @Override
        public boolean test(long ticket) {
            return TicketedProcessing.this.processedTicket.get() == ticket - 1L;
        }
    };
    private final Runnable healthCheck;
    private volatile boolean done;

    public TicketedProcessing(String name2, int maxProcessors, BiFunction<FROM, STATE, TO> processor, Supplier<STATE> threadLocalStateSupplier) {
        this.processor = processor;
        this.executor = new DynamicTaskExecutor<STATE>(1, maxProcessors, maxProcessors, park, name2, threadLocalStateSupplier);
        this.healthCheck = this.executor::assertHealthy;
        this.processed = new ArrayBlockingQueue(maxProcessors);
    }

    public void submit(long ticket, FROM job) {
        this.submittedTicket.incrementAndGet();
        this.executor.submit(threadLocalState -> {
            TO result2 = this.processor.apply(job, threadLocalState);
            Processing.await(this.myTurnToAddToProcessedQueue, ticket, this.healthCheck, park);
            while (!this.processed.offer(result2, 10L, TimeUnit.MILLISECONDS)) {
            }
            this.processedTicket.incrementAndGet();
        });
    }

    public Future<Void> slurp(Iterator<FROM> input2, boolean shutdownAfterAllSubmitted) {
        return FutureAdapter.future(() -> {
            long ticket = 0L;
            while (input2.hasNext()) {
                this.submit(ticket, input2.next());
                ++ticket;
            }
            if (shutdownAfterAllSubmitted) {
                this.shutdown(true);
            }
            return null;
        });
    }

    public void shutdown(boolean awaitAllProcessed) {
        this.done = true;
        this.executor.shutdown(awaitAllProcessed ? 1 : 0);
    }

    public TO next() {
        while (!this.done || this.processedTicket.get() < this.submittedTicket.get() || !this.processed.isEmpty()) {
            try {
                TO next2 = this.processed.poll(10L, TimeUnit.MILLISECONDS);
                if (next2 != null) {
                    return next2;
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
            this.healthCheck.run();
        }
        return null;
    }

    @Override
    public int processors(int delta) {
        return this.executor.processors(delta);
    }
}

