/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.internal.cache.partitioned;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.geode.CancelException;
import org.apache.geode.GemFireRethrowable;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.streaming.StreamingOperation;
import org.apache.geode.internal.CopyOnWriteHashSet;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PrimaryBucketException;
import org.apache.geode.internal.cache.Token;
import org.apache.geode.internal.cache.partitioned.PartitionMessage;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.serialization.Versioning;
import org.apache.geode.internal.util.BlobHelper;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.logging.log4j.Logger;

public abstract class StreamingPartitionOperation
extends StreamingOperation {
    private static final Logger logger = LogService.getLogger();
    protected final int regionId;

    public StreamingPartitionOperation(InternalDistributedSystem sys, int regionId) {
        super(sys);
        this.regionId = regionId;
    }

    @Override
    public void getDataFromAll(Set recipients) {
        throw new UnsupportedOperationException("call getPartitionedDataFrom instead");
    }

    public Set<InternalDistributedMember> getPartitionedDataFrom(Set recipients) throws TimeoutException, InterruptedException, QueryException, ForceReattemptException {
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }
        if (recipients.isEmpty()) {
            return Collections.emptySet();
        }
        StreamingPartitionResponse processor = new StreamingPartitionResponse(this.sys, recipients);
        DistributionMessage m = this.createRequestMessage(recipients, processor);
        this.sys.getDistributionManager().putOutgoing(m);
        Set<InternalDistributedMember> failedMembers = processor.waitForCacheOrQueryException();
        return failedMembers;
    }

    @Override
    protected abstract DistributionMessage createRequestMessage(Set var1, ReplyProcessor21 var2);

    protected class StreamingPartitionResponse
    extends ReplyProcessor21 {
        protected volatile boolean abort;
        protected final Map statusMap;
        protected final AtomicInteger msgsBeingProcessed;
        private volatile String memberDepartedMessage;
        private final Set<InternalDistributedMember> failedMembers;
        private volatile boolean finishedWaiting;

        public StreamingPartitionResponse(InternalDistributedSystem system, Set members) {
            super(system, (Collection)members);
            this.abort = false;
            this.statusMap = new HashMap();
            this.msgsBeingProcessed = new AtomicInteger();
            this.memberDepartedMessage = null;
            this.failedMembers = new CopyOnWriteHashSet<InternalDistributedMember>();
            this.finishedWaiting = false;
        }

        @Override
        protected boolean stopBecauseOfExceptions() {
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void process(DistributionMessage msg) {
            if (!this.waitingOnMember(msg.getSender())) {
                return;
            }
            this.msgsBeingProcessed.incrementAndGet();
            try {
                StreamingOperation.StreamingReplyMessage m = (StreamingOperation.StreamingReplyMessage)msg;
                boolean isLast = true;
                List objects = m.getObjects();
                if (objects != null) {
                    boolean isAborted = this.abort;
                    if (!isAborted) {
                        boolean bl = isAborted = !StreamingPartitionOperation.this.processChunk(objects, m.getSender(), m.getMessageNumber(), m.isLastMessage());
                        if (isAborted) {
                            this.abort = true;
                        }
                    }
                    isLast = isAborted || this.trackMessage(m);
                } else {
                    isLast = true;
                }
                if (isLast) {
                    super.process(msg, false);
                }
            }
            finally {
                this.msgsBeingProcessed.decrementAndGet();
                this.checkIfDone();
            }
        }

        @Override
        protected synchronized void processException(DistributionMessage msg, ReplyException ex) {
            Throwable t = ex.getCause();
            if (t instanceof ForceReattemptException || t instanceof CacheClosedException) {
                if (logger.isDebugEnabled()) {
                    logger.debug("StreamingPartitionResponse received exception {} for member {} query retry required.", (Object)t, (Object)msg.getSender());
                }
                this.failedMembers.add(msg.getSender());
            } else {
                super.processException(msg, ex);
            }
        }

        @Override
        public void memberDeparted(DistributionManager distributionManager, InternalDistributedMember id, boolean crashed) {
            if (id != null && this.waitingOnMember(id)) {
                this.failedMembers.add(id);
                this.memberDepartedMessage = String.format("Streaming reply processor got memberDeparted event for < %s > crashed, %s", id, crashed);
            }
            super.memberDeparted(distributionManager, id, crashed);
        }

        public Set<InternalDistributedMember> waitForCacheOrQueryException() throws CacheException, QueryException {
            try {
                this.waitForRepliesUninterruptibly();
                return this.failedMembers;
            }
            catch (ReplyException e) {
                Throwable t = e.getCause();
                if (t instanceof CacheException) {
                    throw (CacheException)t;
                }
                if (t instanceof RegionDestroyedException) {
                    throw (RegionDestroyedException)t;
                }
                if (t instanceof QueryException) {
                    throw (QueryException)t;
                }
                if (t instanceof PrimaryBucketException) {
                    throw new PrimaryBucketException("Peer failed primary test", t);
                }
                e.handleCause();
                throw e;
            }
        }

        @Override
        protected boolean stillWaiting() {
            if (this.finishedWaiting) {
                return false;
            }
            if (this.msgsBeingProcessed.get() > 0 && this.numMembers() > 0) {
                return true;
            }
            this.finishedWaiting = this.finishedWaiting || this.abort || !super.stillWaiting();
            return !this.finishedWaiting;
        }

        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("<");
            sb.append(this.getClass().getName());
            sb.append(" ");
            sb.append(this.getProcessorId());
            if (this.members == null) {
                sb.append(" (null memebrs)");
            } else {
                sb.append(" waiting for ");
                sb.append(this.numMembers());
                sb.append(" replies");
                sb.append((String)(this.exception == null ? "" : " exception: " + String.valueOf(this.exception)));
                sb.append(" from ");
                sb.append(this.membersToString());
            }
            sb.append("; waiting for ");
            sb.append(this.msgsBeingProcessed.get());
            sb.append(" messages in the process of being processed>");
            return sb.toString();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected boolean trackMessage(StreamingOperation.StreamingReplyMessage m) {
            Status status;
            StreamingPartitionResponse streamingPartitionResponse = this;
            synchronized (streamingPartitionResponse) {
                status = (Status)this.statusMap.get(m.getSender());
                if (status == null) {
                    status = new Status();
                    this.statusMap.put(m.getSender(), status);
                }
            }
            return status.trackMessage(m);
        }

        public void removeFailedSenders(Set notReceivedMembers) {
            for (Object notReceivedMember : notReceivedMembers) {
                this.removeMember((InternalDistributedMember)notReceivedMember, true);
            }
        }

        class Status {
            int msgsProcessed = 0;
            int numMsgs = 0;

            Status() {
            }

            protected synchronized boolean trackMessage(StreamingOperation.StreamingReplyMessage m) {
                ++this.msgsProcessed;
                if (m.isLastMessage()) {
                    this.numMsgs = m.getMessageNumber() + 1;
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Streaming Message Tracking Status: Processor id: {}; Sender: {}; Messages Processed: {}; NumMsgs: {}", (Object)StreamingPartitionResponse.this.getProcessorId(), (Object)m.getSender(), (Object)this.msgsProcessed, (Object)this.numMsgs);
                }
                return this.msgsProcessed == this.numMsgs;
            }
        }
    }

    public static abstract class StreamingPartitionMessage
    extends PartitionMessage {
        transient HeapDataOutputStream outStream = null;
        transient int replyMsgNum = 0;
        transient boolean replyLastMsg = true;
        transient int numObjectsInChunk = 0;

        public StreamingPartitionMessage() {
        }

        public StreamingPartitionMessage(Set recipients, int regionId, ReplyProcessor21 processor) {
            super(recipients, regionId, processor);
        }

        public StreamingPartitionMessage(InternalDistributedMember recipient, int regionId, ReplyProcessor21 processor) {
            super(recipient, regionId, processor);
        }

        @Override
        protected void sendReply(InternalDistributedMember member, int procId, DistributionManager dm, ReplyException ex, PartitionedRegion pr, long startTime) {
            if (ex != null) {
                this.outStream = null;
                this.replyMsgNum = 0;
                this.replyLastMsg = true;
            }
            if (this.replyLastMsg && pr != null && startTime > 0L) {
                pr.getPrStats().endPartitionMessagesProcessing(startTime);
            }
            StreamingOperation.StreamingReplyMessage.send(member, procId, ex, dm, this.outStream, this.numObjectsInChunk, this.replyMsgNum, this.replyLastMsg);
        }

        @Override
        protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm, PartitionedRegion pr, long startTime) throws CacheException, QueryException, ForceReattemptException, InterruptedException {
            boolean isTraceEnabled = logger.isTraceEnabled();
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            Object failedObject = null;
            int socketBufferSize = dm.getSystem().getConfig().getSocketBufferSize();
            int chunkSize = socketBufferSize - 200;
            boolean sentFinalMessage = false;
            boolean receiverCacheClosed = false;
            this.outStream = new HeapDataOutputStream(chunkSize, Versioning.getKnownVersionOrDefault((Version)this.getSender().getVersion(), (KnownVersion)KnownVersion.CURRENT));
            try {
                do {
                    block15: {
                        Object nextObject;
                        if (failedObject == null) {
                            nextObject = this.getNextReplyObject(pr);
                            this.replyLastMsg = nextObject == Token.END_OF_STREAM;
                        } else {
                            nextObject = failedObject;
                            failedObject = null;
                        }
                        if (!this.replyLastMsg) {
                            this.numObjectsInChunk = 1;
                            if (isTraceEnabled) {
                                logger.trace("Writing this object to StreamingPartitionMessage outStream: '{}'", nextObject);
                            }
                            BlobHelper.serializeTo(nextObject, this.outStream);
                            do {
                                this.outStream.disallowExpansion(StreamingOperation.CHUNK_FULL);
                                nextObject = this.getNextReplyObject(pr);
                                boolean bl = this.replyLastMsg = nextObject == Token.END_OF_STREAM;
                                if (this.replyLastMsg) continue;
                                try {
                                    if (isTraceEnabled) {
                                        logger.trace("Writing this object to StreamingPartitionMessage outStream: '{}'", nextObject);
                                    }
                                    BlobHelper.serializeTo(nextObject, this.outStream);
                                    ++this.numObjectsInChunk;
                                }
                                catch (GemFireRethrowable e) {
                                    failedObject = nextObject;
                                    break;
                                }
                            } while (nextObject != Token.END_OF_STREAM);
                        }
                        try {
                            this.sendReply(this.getSender(), this.processorId, dm, null, pr, startTime);
                            ++this.replyMsgNum;
                            if (!this.replyLastMsg) break block15;
                            sentFinalMessage = true;
                        }
                        catch (CancelException e) {
                            receiverCacheClosed = true;
                            break;
                        }
                    }
                    this.outStream.reset();
                    this.numObjectsInChunk = 0;
                } while (!this.replyLastMsg);
            }
            catch (IOException ioe) {
                throw new InternalGemFireException(ioe);
            }
            if (!sentFinalMessage && !receiverCacheClosed) {
                throw new InternalGemFireError("unexpected condition");
            }
            return false;
        }

        protected abstract Object getNextReplyObject(PartitionedRegion var1) throws CacheException, ForceReattemptException, InterruptedException;

        protected Object getNextReplyObject() {
            throw new UnsupportedOperationException("use getNextReplyObject(PartitionedRegion) instead");
        }
    }
}

