/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.internal.cache.wan.parallel;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.wan.GatewayEventFilter;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.logging.log4j.Logger;

public class ParallelQueueRemovalMessage
extends PooledDistributionMessage {
    private static final Logger logger = LogService.getLogger();
    private Map<String, Map<Integer, List<Object>>> regionToDispatchedKeysMap;

    public ParallelQueueRemovalMessage() {
    }

    public ParallelQueueRemovalMessage(Map<String, Map<Integer, List<Object>>> rgnToDispatchedKeysMap) {
        this.regionToDispatchedKeysMap = rgnToDispatchedKeysMap;
    }

    public int getDSFID() {
        return 2161;
    }

    @Override
    public String toString() {
        String cname = this.getShortClassName();
        return cname + "regionToDispatchedKeysMap=" + String.valueOf(this.regionToDispatchedKeysMap) + " sender=" + String.valueOf(this.getSender());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void process(ClusterDistributionManager dm) {
        boolean isDebugEnabled = logger.isDebugEnabled();
        InternalCache cache = dm.getCache();
        if (cache != null) {
            LocalRegion.InitializationLevel oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE);
            try {
                for (String name : this.regionToDispatchedKeysMap.keySet()) {
                    String regionName = name;
                    PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
                    if (region == null) continue;
                    AbstractGatewaySender abstractSender = region.getParallelGatewaySender();
                    Map<Integer, List<Object>> bucketIdToDispatchedKeys = this.regionToDispatchedKeysMap.get(regionName);
                    for (Integer bId : bucketIdToDispatchedKeys.keySet()) {
                        List<Object> dispatchedKeys;
                        String bucketFullPath = "/__PR/" + region.getBucketName(bId);
                        AbstractBucketRegionQueue brq = (AbstractBucketRegionQueue)cache.getInternalRegionByPath(bucketFullPath);
                        if (isDebugEnabled) {
                            logger.debug("ParallelQueueRemovalMessage : The bucket in the cache is bucketRegionName : {} bucket: {}", (Object)bucketFullPath, (Object)brq);
                        }
                        if ((dispatchedKeys = bucketIdToDispatchedKeys.get(bId)) == null) continue;
                        for (Object key : dispatchedKeys) {
                            abstractSender.removeFromTempQueueEvents(key);
                            if (brq != null) {
                                if (brq.isInitialized()) {
                                    if (isDebugEnabled) {
                                        logger.debug("ParallelQueueRemovalMessage : The bucket {} is initialized. Destroying the key {} from BucketRegionQueue.", (Object)bucketFullPath, key);
                                    }
                                    this.afterAckForSecondary_EventInBucket(abstractSender, brq, key);
                                    this.destroyKeyFromBucketQueue(brq, key, region);
                                    continue;
                                }
                                boolean isDestroyed = false;
                                if (isDebugEnabled) {
                                    logger.debug("ParallelQueueRemovalMessage : The bucket {} is not yet initialized.", (Object)bucketFullPath);
                                }
                                brq.getInitializationLock().readLock().lock();
                                try {
                                    if (brq.containsKey(key)) {
                                        this.afterAckForSecondary_EventInBucket(abstractSender, brq, key);
                                        this.destroyKeyFromBucketQueue(brq, key, region);
                                        isDestroyed = true;
                                    }
                                    this.destroyFromTempQueue(brq.getPartitionedRegion(), bId, key);
                                    brq.addToFailedBatchRemovalMessageKeys(key);
                                    continue;
                                }
                                finally {
                                    brq.getInitializationLock().readLock().unlock();
                                    continue;
                                }
                            }
                            this.destroyFromTempQueue(region, bId, key);
                        }
                    }
                }
            }
            finally {
                LocalRegion.setThreadInitLevelRequirement(oldLevel);
            }
        }
    }

    private void afterAckForSecondary_EventInBucket(AbstractGatewaySender abstractSender, AbstractBucketRegionQueue brq, Object key) {
        for (GatewayEventFilter filter : abstractSender.getGatewayEventFilters()) {
            GatewayQueueEvent eventForFilter = (GatewayQueueEvent)brq.get(key);
            try {
                if (eventForFilter == null) continue;
                filter.afterAcknowledgement(eventForFilter);
            }
            catch (Exception e) {
                logger.fatal(String.format("Exception occurred while handling call to %s.afterAcknowledgement for event %s:", filter.toString(), eventForFilter), (Throwable)e);
            }
        }
    }

    void destroyKeyFromBucketQueue(AbstractBucketRegionQueue brq, Object key, PartitionedRegion prQ) {
        boolean isDebugEnabled = logger.isDebugEnabled();
        try {
            brq.destroyKey(key);
            if (!brq.getBucketAdvisor().isPrimary()) {
                prQ.getParallelGatewaySender().getStatistics().decSecondaryQueueSize();
                prQ.getParallelGatewaySender().getStatistics().incEventsProcessedByPQRM(1);
            }
            if (isDebugEnabled) {
                logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", key, (Object)prQ.getName(), (Object)brq.getId());
            }
        }
        catch (EntryNotFoundException e) {
            if (isDebugEnabled) {
                logger.debug("Got EntryNotFoundException while destroying the key {} for bucket {}", key, (Object)brq.getId());
            }
            if (!brq.isFailedBatchRemovalMessageKeysClearedFlag()) {
                brq.addToFailedBatchRemovalMessageKeys(key);
            }
        }
        catch (ForceReattemptException fe) {
            if (isDebugEnabled) {
                logger.debug("Got ForceReattemptException while getting bucket {} to destroyLocally the keys.", (Object)brq.getId());
            }
        }
        catch (CancelException e) {
            return;
        }
        catch (CacheException e) {
            logger.error(String.format("ParallelQueueRemovalMessage::process:Exception in processing the last disptached key for a ParallelGatewaySenderQueue's shadowPR. The problem is with key,%s for shadowPR with name=%s", key, prQ.getName()), (Throwable)e);
        }
    }

    private boolean destroyFromTempQueue(PartitionedRegion qPR, int bId, Object key) {
        ConcurrentParallelGatewaySenderQueue prq;
        BlockingQueue<GatewaySenderEventImpl> tempQueue;
        boolean isDestroyed = false;
        Set<RegionQueue> queues = qPR.getParallelGatewaySender().getQueues();
        if (queues != null && (tempQueue = (prq = (ConcurrentParallelGatewaySenderQueue)queues.toArray()[0]).getBucketTmpQueue(bId)) != null) {
            Iterator itr = tempQueue.iterator();
            while (itr.hasNext()) {
                GatewaySenderEventImpl eventForFilter = (GatewaySenderEventImpl)itr.next();
                this.afterAckForSecondary_EventInTempQueue(qPR.getParallelGatewaySender(), eventForFilter);
                if (!eventForFilter.getShadowKey().equals(key)) continue;
                itr.remove();
                eventForFilter.release();
                isDestroyed = true;
            }
        }
        return isDestroyed;
    }

    private void afterAckForSecondary_EventInTempQueue(AbstractGatewaySender parallelGatewaySenderImpl, GatewaySenderEventImpl eventForFilter) {
        for (GatewayEventFilter filter : parallelGatewaySenderImpl.getGatewayEventFilters()) {
            try {
                if (eventForFilter == null) continue;
                filter.afterAcknowledgement(eventForFilter);
            }
            catch (Exception e) {
                logger.fatal(String.format("Exception occurred while handling call to %s.afterAcknowledgement for event %s:", filter.toString(), eventForFilter), (Throwable)e);
            }
        }
    }

    @Override
    public void toData(DataOutput out, SerializationContext context) throws IOException {
        super.toData(out, context);
        DataSerializer.writeHashMap(this.regionToDispatchedKeysMap, out);
    }

    @Override
    public void fromData(DataInput in, DeserializationContext context) throws IOException, ClassNotFoundException {
        super.fromData(in, context);
        this.regionToDispatchedKeysMap = DataSerializer.readHashMap(in);
    }
}

