/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.internal.cache.snapshot;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.RegionMembershipListener;
import org.apache.geode.cache.util.RegionMembershipListenerAdapter;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ProcessorKeeper21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;

public class FlowController {
    private static final int MAX_PERMITS = 0x3FFFFFFF;
    @MakeNotStatic
    private static final FlowController instance = new FlowController();
    private final ProcessorKeeper21 processors = new ProcessorKeeper21();

    public static FlowController getInstance() {
        return instance;
    }

    private FlowController() {
    }

    public <K, V> Window create(Region<K, V> region, DistributedMember sink, int windowSize) {
        WindowImpl<K, V> w = new WindowImpl<K, V>(region, sink, windowSize);
        int id = this.processors.put(w);
        w.setWindowId(id);
        return w;
    }

    public void sendAck(DistributionManager dmgr, DistributedMember member, int windowId, String packetId) {
        if (InternalDistributedSystem.getLogger().fineEnabled()) {
            InternalDistributedSystem.getLogger().fine("SNP: Sending ACK for packet " + packetId + " on window " + windowId + " to member " + String.valueOf(member));
        }
        if (dmgr.getDistributionManagerId().equals(member)) {
            WindowImpl win = (WindowImpl)this.processors.retrieve(windowId);
            if (win != null) {
                win.ack(packetId);
            }
        } else {
            FlowControlAckMessage ack = new FlowControlAckMessage(windowId, packetId);
            ack.setRecipient((InternalDistributedMember)member);
            dmgr.putOutgoing(ack);
        }
    }

    public void sendAbort(DistributionManager dmgr, int windowId, DistributedMember member) {
        if (InternalDistributedSystem.getLogger().fineEnabled()) {
            InternalDistributedSystem.getLogger().fine("SNP: Sending ABORT to member " + String.valueOf(member) + " for window " + windowId);
        }
        if (dmgr.getDistributionManagerId().equals(member)) {
            WindowImpl win = (WindowImpl)this.processors.retrieve(windowId);
            if (win != null) {
                win.abort();
            }
        } else {
            FlowControlAbortMessage abort = new FlowControlAbortMessage(windowId);
            abort.setRecipient((InternalDistributedMember)member);
            dmgr.putOutgoing(abort);
        }
    }

    private static class WindowImpl<K, V>
    implements Window {
        private final Semaphore permits;
        private final AtomicBoolean abort;
        private final Region<K, V> region;
        private final RegionMembershipListener<K, V> crash;
        private volatile int windowId;

        public WindowImpl(Region<K, V> region, final DistributedMember sink, int size) {
            this.permits = new Semaphore(size);
            this.abort = new AtomicBoolean(false);
            this.region = region;
            this.crash = new RegionMembershipListenerAdapter<K, V>(){

                @Override
                public void afterRemoteRegionCrash(RegionEvent<K, V> event) {
                    if (event.getDistributedMember().equals(sink)) {
                        if (InternalDistributedSystem.getLogger().fineEnabled()) {
                            InternalDistributedSystem.getLogger().fine("SNP: " + String.valueOf(sink) + " has crashed, closing window");
                        }
                        this.abort();
                    }
                }
            };
            region.getAttributesMutator().addCacheListener(this.crash);
        }

        @Override
        public void close() {
            FlowController.instance.processors.remove(this.windowId);
            this.region.getAttributesMutator().removeCacheListener(this.crash);
            this.permits.release(0x3FFFFFFF);
        }

        @Override
        public int getWindowId() {
            return this.windowId;
        }

        @Override
        public boolean isAborted() {
            return this.abort.get();
        }

        @Override
        public boolean isOpen() {
            return this.permits.availablePermits() > 0;
        }

        @Override
        public void waitForOpening() throws InterruptedException {
            this.permits.acquire();
        }

        private void ack(String packetId) {
            this.permits.release();
        }

        private void abort() {
            this.abort.set(true);
            this.permits.release(0x3FFFFFFF);
        }

        private void setWindowId(int id) {
            this.windowId = id;
        }
    }

    public static class FlowControlAckMessage
    extends DistributionMessage {
        private int windowId;
        private String packetId;

        public FlowControlAckMessage(int windowId, String packetId) {
            this.windowId = windowId;
            this.packetId = packetId;
        }

        public FlowControlAckMessage() {
        }

        public int getDSFID() {
            return 2136;
        }

        @Override
        public int getProcessorType() {
            return 73;
        }

        @Override
        protected void process(ClusterDistributionManager dm) {
            WindowImpl win;
            if (InternalDistributedSystem.getLogger().fineEnabled()) {
                InternalDistributedSystem.getLogger().fine("SNP: Received ACK for packet " + this.packetId + " on window " + this.windowId + " from member " + String.valueOf(this.getSender()));
            }
            if ((win = (WindowImpl)FlowController.getInstance().processors.retrieve(this.windowId)) != null) {
                win.ack(this.packetId);
            }
        }

        @Override
        public void fromData(DataInput in, DeserializationContext context) throws IOException, ClassNotFoundException {
            super.fromData(in, context);
            this.windowId = in.readInt();
            this.packetId = InternalDataSerializer.readString(in);
        }

        @Override
        public void toData(DataOutput out, SerializationContext context) throws IOException {
            super.toData(out, context);
            out.writeInt(this.windowId);
            InternalDataSerializer.writeString(this.packetId, out);
        }
    }

    public static class FlowControlAbortMessage
    extends DistributionMessage {
        private int windowId;

        public FlowControlAbortMessage(int windowId) {
            this.windowId = windowId;
        }

        public FlowControlAbortMessage() {
        }

        public int getDSFID() {
            return 2136;
        }

        @Override
        public int getProcessorType() {
            return 73;
        }

        @Override
        protected void process(ClusterDistributionManager dm) {
            WindowImpl win;
            if (InternalDistributedSystem.getLogger().fineEnabled()) {
                InternalDistributedSystem.getLogger().fine("SNP: Received ABORT on window " + this.windowId + " from member " + String.valueOf(this.getSender()));
            }
            if ((win = (WindowImpl)FlowController.getInstance().processors.retrieve(this.windowId)) != null) {
                win.abort();
            }
        }

        @Override
        public void fromData(DataInput in, DeserializationContext context) throws IOException, ClassNotFoundException {
            super.fromData(in, context);
            this.windowId = in.readInt();
        }

        @Override
        public void toData(DataOutput out, SerializationContext context) throws IOException {
            super.toData(out, context);
            out.writeInt(this.windowId);
        }
    }

    public static interface Window {
        public int getWindowId();

        public boolean isAborted();

        public boolean isOpen();

        public void waitForOpening() throws InterruptedException;

        public void close();
    }
}

