/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.cache.client.internal;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.geode.CancelException;
import org.apache.geode.GemFireConfigException;
import org.apache.geode.GemFireException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.NoSubscriptionServersAvailableException;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerRefusedConnectionException;
import org.apache.geode.cache.client.internal.ClientUpdater;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.ConnectionFactory;
import org.apache.geode.cache.client.internal.ConnectionSource;
import org.apache.geode.cache.client.internal.Endpoint;
import org.apache.geode.cache.client.internal.EndpointManager;
import org.apache.geode.cache.client.internal.InternalPool;
import org.apache.geode.cache.client.internal.MakePrimaryOp;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.client.internal.QueueConnectionImpl;
import org.apache.geode.cache.client.internal.QueueManager;
import org.apache.geode.cache.client.internal.QueueState;
import org.apache.geode.cache.client.internal.QueueStateImpl;
import org.apache.geode.cache.client.internal.ReadyForEventsOp;
import org.apache.geode.cache.client.internal.RegisterInterestTracker;
import org.apache.geode.cache.client.internal.ServerDenyList;
import org.apache.geode.cache.client.internal.UserAttributes;
import org.apache.geode.cache.query.internal.CqStateImpl;
import org.apache.geode.cache.query.internal.DefaultQueryService;
import org.apache.geode.cache.query.internal.cq.ClientCQ;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.ClientServerObserver;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.tier.InterestType;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.security.GemFireSecurityException;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;

public class QueueManagerImpl
implements QueueManager {
    private static final Logger logger = LogService.getLogger();
    @Immutable
    private static final Comparator<ServerQueueStatus> QSIZE_COMPARATOR = new QSizeComparator();
    private final long redundancyRetryInterval;
    private final EndpointManager endpointManager;
    private final EndpointManager.EndpointListenerAdapter endpointListener;
    private final ConnectionSource source;
    private final int redundancyLevel;
    protected final ConnectionFactory factory;
    private final InternalLogWriter securityLogger;
    private final ClientProxyMembershipID proxyId;
    protected final InternalPool pool;
    private final QueueStateImpl state;
    private boolean printPrimaryNotFoundError = true;
    private boolean printRedundancyNotSatisfiedError = true;
    private boolean printRecoveringPrimary = true;
    private boolean printRecoveringRedundant = true;
    private final ServerDenyList denyList;
    protected final Object lock = new Object();
    private final CountDownLatch initializedLatch = new CountDownLatch(1);
    private ScheduledExecutorService recoveryThread;
    private volatile boolean sentClientReady;
    private volatile ConnectionList queueConnections = new ConnectionList();
    private volatile RedundancySatisfierTask redundancySatisfierTask = null;
    private volatile boolean shuttingDown;

    void clearQueueConnections() {
        this.queueConnections = new ConnectionList();
    }

    public QueueManagerImpl(InternalPool pool, EndpointManager endpointManager, ConnectionSource source, ConnectionFactory factory, int redundancyLevel, long redundancyRetryInterval, InternalLogWriter securityLogger, ClientProxyMembershipID proxyId) {
        this.pool = pool;
        this.endpointManager = endpointManager;
        this.source = source;
        this.factory = factory;
        this.redundancyLevel = redundancyLevel;
        this.securityLogger = securityLogger;
        this.proxyId = proxyId;
        this.redundancyRetryInterval = redundancyRetryInterval;
        this.denyList = new ServerDenyList(redundancyRetryInterval);
        this.endpointListener = new EndpointManager.EndpointListenerAdapter(){

            @Override
            public void endpointCrashed(Endpoint endpoint) {
                QueueManagerImpl.this.endpointCrashed(endpoint);
            }
        };
        this.state = new QueueStateImpl(this);
    }

    @Override
    public InternalPool getPool() {
        return this.pool;
    }

    boolean isPrimaryUpdaterAlive() {
        ClientUpdater cu;
        boolean result = false;
        QueueConnectionImpl primary = (QueueConnectionImpl)this.queueConnections.getPrimary();
        if (primary != null && (cu = primary.getUpdater()) != null) {
            result = cu.isAlive();
        }
        return result;
    }

    @Override
    public QueueManager.QueueConnections getAllConnectionsNoWait() {
        return this.queueConnections;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public QueueManager.QueueConnections getAllConnections() {
        ConnectionList snapshot = this.queueConnections;
        if (snapshot.getPrimary() == null) {
            Object object = this.lock;
            synchronized (object) {
                snapshot = this.queueConnections;
                while (snapshot.getPrimary() == null && !snapshot.primaryDiscoveryFailed() && !this.shuttingDown && this.pool.getPoolOrCacheCancelInProgress() == null) {
                    try {
                        this.lock.wait();
                    }
                    catch (InterruptedException ignore) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                    snapshot = this.queueConnections;
                }
            }
        }
        if (snapshot.getPrimary() == null) {
            this.pool.getCancelCriterion().checkCancelInProgress(null);
            GemFireException exception = snapshot.getPrimaryDiscoveryException();
            exception = exception == null || exception instanceof NoSubscriptionServersAvailableException ? new NoSubscriptionServersAvailableException(exception) : new ServerConnectivityException(exception.getMessage(), exception);
            throw exception;
        }
        return snapshot;
    }

    @Override
    @Deprecated
    public InternalLogWriter getSecurityLogger() {
        return this.securityLogger;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(boolean keepAlive) {
        this.endpointManager.removeListener(this.endpointListener);
        Object object = this.lock;
        synchronized (object) {
            this.shuttingDown = true;
            if (this.redundancySatisfierTask != null) {
                this.redundancySatisfierTask.cancel();
            }
            this.lock.notifyAll();
        }
        if (this.recoveryThread != null) {
            this.recoveryThread.shutdown();
        }
        if (this.recoveryThread != null) {
            try {
                if (!this.recoveryThread.awaitTermination(PoolImpl.SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) {
                    logger.warn("Timeout waiting for recovery thread to complete");
                }
            }
            catch (InterruptedException ignore) {
                Thread.currentThread().interrupt();
                logger.debug("Interrupted waiting for recovery thread termination");
            }
        }
        QueueConnectionImpl primary = (QueueConnectionImpl)this.queueConnections.getPrimary();
        if (logger.isDebugEnabled()) {
            logger.debug("QueueManagerImpl - closing connections with keepAlive={}", (Object)keepAlive);
        }
        if (primary != null) {
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug("QueueManagerImpl - closing primary {}", (Object)primary);
                }
                primary.internalClose(keepAlive);
            }
            catch (Exception e) {
                logger.warn("Error closing primary connection to " + primary.getEndpoint(), (Throwable)e);
            }
        }
        List<Connection> backups = this.queueConnections.getBackups();
        for (Connection connection : backups) {
            QueueConnectionImpl backup = (QueueConnectionImpl)connection;
            if (backup == null) continue;
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug("QueueManagerImpl - closing backup {}", (Object)backup);
                }
                backup.internalClose(keepAlive);
            }
            catch (Exception e) {
                logger.warn("Error closing backup connection to " + backup.getEndpoint(), (Throwable)e);
            }
        }
    }

    @Override
    public void emergencyClose() {
        this.shuttingDown = true;
        this.queueConnections.getPrimary().emergencyClose();
        List<Connection> backups = this.queueConnections.getBackups();
        for (Connection backup : backups) {
            backup.emergencyClose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start(ScheduledExecutorService background) {
        try {
            this.denyList.start(background);
            this.endpointManager.addListener(this.endpointListener);
            String name = "queueTimer-" + this.pool.getName();
            this.recoveryThread = LoggingExecutors.newScheduledThreadPool((int)1, (String)name, (boolean)false);
            this.getState().start(background, this.getPool().getSubscriptionAckInterval());
            this.initializeConnections();
            this.scheduleRedundancySatisfierIfNeeded(this.redundancyRetryInterval);
            ServerDenyList.DenyListListenerAdapter denyListListener = new ServerDenyList.DenyListListenerAdapter(){

                @Override
                public void serverRemoved(ServerLocation location) {
                    QueueManagerImpl.this.scheduleRedundancySatisfierIfNeeded(0L);
                }
            };
            this.denyList.addListener(denyListListener);
            this.factory.getDenyList().addListener(denyListListener);
        }
        finally {
            this.initializedLatch.countDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void setSendClientReadyInTestOnly() {
        Object object = this.lock;
        synchronized (object) {
            this.sentClientReady = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void readyForEvents(InternalDistributedSystem system) {
        Object object = this.lock;
        synchronized (object) {
            this.sentClientReady = true;
        }
        QueueConnectionImpl primary = null;
        while (primary == null) {
            try {
                primary = (QueueConnectionImpl)this.getAllConnections().getPrimary();
            }
            catch (NoSubscriptionServersAvailableException ignore) {
                break;
            }
            if (!primary.sendClientReady()) continue;
            try {
                logger.info("Sending ready for events to primary: {}", (Object)primary);
                ReadyForEventsOp.execute(this.pool, primary);
            }
            catch (Exception e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Error sending ready for events to {}", (Object)primary, (Object)e);
                }
                primary.destroy();
                primary = null;
            }
        }
    }

    private void readyForEventsAfterFailover(QueueConnectionImpl primary) {
        try {
            logger.info("Sending ready for events to primary: {}", (Object)primary);
            ReadyForEventsOp.execute(this.pool, primary);
        }
        catch (Exception e) {
            if (logger.isDebugEnabled()) {
                logger.debug("Error sending ready for events to {}", (Object)primary, (Object)e);
            }
            primary.destroy();
        }
    }

    void connectionCrashed(Connection con) {
        this.endpointCrashed(con.getEndpoint());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void endpointCrashed(Endpoint endpoint) {
        QueueConnectionImpl deadConnection;
        Object object = this.lock;
        synchronized (object) {
            deadConnection = this.queueConnections.getConnection(endpoint);
            if (deadConnection != null) {
                this.queueConnections = this.queueConnections.removeConnection(deadConnection);
            }
        }
        if (deadConnection != null) {
            Object[] objectArray = new Object[2];
            objectArray[0] = deadConnection.getUpdater() != null ? (deadConnection.getUpdater().isPrimary() ? "Primary" : "Redundant") : "Queue";
            objectArray[1] = endpoint;
            logger.info("{} subscription endpoint {} crashed. Scheduling recovery.", objectArray);
            deadConnection.internalDestroy();
            this.scheduleRedundancySatisfierIfNeeded(0L);
        } else if (logger.isDebugEnabled()) {
            logger.debug("Ignoring crashed endpoint {} it does not have a queue.", (Object)endpoint);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void checkEndpoint(ClientUpdater ccu, Endpoint endpoint) {
        QueueConnectionImpl deadConnection;
        Object object = this.lock;
        synchronized (object) {
            if (this.shuttingDown) {
                return;
            }
            deadConnection = this.queueConnections.getConnection(endpoint);
            if (deadConnection != null && ccu.equals(deadConnection.getUpdater())) {
                this.queueConnections = this.queueConnections.removeConnection(deadConnection);
                try {
                    deadConnection.internalClose(this.pool.getKeepAlive());
                }
                catch (Exception e) {
                    logger.warn("Error destroying client to server connection to {}", (Object)deadConnection.getEndpoint(), (Object)e);
                }
            }
        }
        logger.info("Cache client updater for {} on endpoint {} exiting. Scheduling recovery.", (Object)(deadConnection != null && deadConnection.getUpdater() != null ? (deadConnection.getUpdater().isPrimary() ? "Primary" : "Redundant") : "Queue"), (Object)endpoint);
        this.scheduleRedundancySatisfierIfNeeded(0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - void declaration
     */
    private void initializeConnections() {
        Object object;
        void var8_19;
        void var8_16;
        void var8_15;
        boolean isDebugEnabled = logger.isDebugEnabled();
        if (isDebugEnabled) {
            logger.debug("SubscriptionManager - initializing connections");
        }
        int queuesNeeded = this.redundancyLevel == -1 ? -1 : this.redundancyLevel + 1;
        HashSet<ServerLocation> excludedServers = new HashSet<ServerLocation>(this.denyList.getBadServers());
        List<ServerLocation> servers = this.findQueueServers(excludedServers, queuesNeeded, true, false, null);
        if (servers == null || servers.isEmpty()) {
            logger.warn("Could not create a queue. No queue servers available.");
            this.scheduleRedundancySatisfierIfNeeded(this.redundancyRetryInterval);
            Object object2 = this.lock;
            synchronized (object2) {
                this.queueConnections = this.queueConnections.setPrimaryDiscoveryFailed(null);
                this.lock.notifyAll();
            }
            return;
        }
        if (isDebugEnabled) {
            logger.debug("SubscriptionManager - discovered subscription servers {}", servers);
        }
        TreeMap<ServerQueueStatus, Connection> oldQueueServers = new TreeMap<ServerQueueStatus, Connection>(QSIZE_COMPARATOR);
        ArrayList<Connection> nonRedundantServers = new ArrayList<Connection>();
        for (ServerLocation serverLocation : servers) {
            Connection connection;
            block36: {
                connection = null;
                try {
                    connection = this.factory.createClientToServerConnection(serverLocation, true);
                }
                catch (GemFireConfigException | ServerRefusedConnectionException | GemFireSecurityException e) {
                    throw e;
                }
                catch (Exception e) {
                    if (!isDebugEnabled) break block36;
                    logger.debug("SubscriptionManager - Error connected to server: {}", (Object)serverLocation, (Object)e);
                }
            }
            if (connection == null) continue;
            ServerQueueStatus status = connection.getQueueStatus();
            if (status.isRedundant() || status.isPrimary()) {
                oldQueueServers.put(status, connection);
                continue;
            }
            nonRedundantServers.add(connection);
        }
        Connection newPrimary = null;
        if (!oldQueueServers.isEmpty()) {
            newPrimary = (Connection)oldQueueServers.remove(oldQueueServers.lastKey());
        } else if (!nonRedundantServers.isEmpty()) {
            newPrimary = (Connection)nonRedundantServers.remove(0);
        }
        nonRedundantServers.addAll(0, oldQueueServers.values());
        for (Connection connection : nonRedundantServers) {
            QueueConnectionImpl queueConnection = this.initializeQueueConnection(connection, false, null);
            if (queueConnection == null) continue;
            this.addToConnectionList(queueConnection, false);
        }
        Object var8_12 = null;
        if (newPrimary != null) {
            QueueConnectionImpl queueConnectionImpl = this.initializeQueueConnection(newPrimary, true, null);
            if (queueConnectionImpl == null) {
                newPrimary.destroy();
            } else if (!this.addToConnectionList(queueConnectionImpl, true)) {
                Object var8_14 = null;
            }
        }
        excludedServers.addAll(servers);
        if (this.redundancyLevel != -1 && this.getCurrentRedundancy() < this.redundancyLevel) {
            if (isDebugEnabled) {
                logger.debug("SubscriptionManager - Some initial connections failed. Trying to create redundant queues");
            }
            this.recoverRedundancy(excludedServers, false);
        }
        if (this.redundancyLevel != -1 && var8_15 == null) {
            QueueConnectionImpl queueConnectionImpl;
            if (isDebugEnabled) {
                logger.debug("SubscriptionManager - Intial primary creation failed. Trying to create a new primary");
            }
            while (var8_16 == null && (queueConnectionImpl = this.createNewPrimary(excludedServers)) != null) {
                this.markQueueAsReadyForEvents(queueConnectionImpl);
                if (this.addToConnectionList(queueConnectionImpl, true)) continue;
                excludedServers.add(queueConnectionImpl.getServer());
                Object var8_18 = null;
            }
        }
        if (var8_16 == null) {
            QueueConnectionImpl queueConnectionImpl;
            if (isDebugEnabled) {
                logger.debug("SubscriptionManager - Unable to create a new primary queue, using one of the redundant queues");
            }
            while (var8_19 == null && (queueConnectionImpl = this.promoteBackupToPrimary(this.queueConnections.getBackups())) != null) {
                if (this.addToConnectionList(queueConnectionImpl, true)) continue;
                object = this.lock;
                synchronized (object) {
                    this.queueConnections = this.queueConnections.removeConnection(queueConnectionImpl);
                }
                Object var8_21 = null;
            }
        }
        if (var8_19 == null) {
            logger.error("Could not initialize a primary queue on startup. No queue servers available.");
            object = this.lock;
            synchronized (object) {
                this.queueConnections = this.queueConnections.setPrimaryDiscoveryFailed(new NoSubscriptionServersAvailableException("Could not initialize a primary queue on startup. No queue servers available."));
                this.lock.notifyAll();
            }
            this.cqsDisconnected();
        } else {
            this.cqsConnected();
        }
        if (this.getCurrentRedundancy() < this.redundancyLevel) {
            logger.warn("Unable to initialize enough redundant queues on startup. The redundancy count is currently {}.", (Object)this.getCurrentRedundancy());
        }
    }

    private InternalCache getInternalCache() {
        return GemFireCacheImpl.getInstance();
    }

    private void cqsConnected() {
        InternalCache cache = this.getInternalCache();
        if (cache != null) {
            CqService cqService = cache.getCqService();
            cqService.cqsConnected(this.pool);
        }
    }

    private void cqsDisconnected() {
        InternalCache cache = this.getInternalCache();
        if (cache != null) {
            CqService cqService = cache.getCqService();
            cqService.cqsDisconnected(this.pool);
        }
    }

    private int getCurrentRedundancy() {
        return this.queueConnections.getBackups().size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void recoverRedundancy(Set<ServerLocation> excludedServers, boolean recoverInterest) {
        int additionalBackups;
        if (this.pool.getPoolOrCacheCancelInProgress() != null) {
            return;
        }
        while (this.pool.getPoolOrCacheCancelInProgress() == null && ((additionalBackups = this.redundancyLevel - this.getCurrentRedundancy()) > 0 || this.redundancyLevel == -1)) {
            List<ServerLocation> servers;
            if (this.redundancyLevel != -1 && this.printRecoveringRedundant) {
                logger.info("SubscriptionManager redundancy satisfier - redundant endpoint has been lost. Attempting to recover.");
                this.printRecoveringRedundant = false;
            }
            if ((servers = this.findQueueServers(excludedServers, this.redundancyLevel == -1 ? -1 : additionalBackups, false, this.redundancyLevel != -1 && this.printRedundancyNotSatisfiedError, "Could not find any server to host redundant client queue. Number of excluded servers is %s and exception is %s")) == null || servers.isEmpty()) {
                if (this.redundancyLevel != -1 && this.printRedundancyNotSatisfiedError) {
                    logger.info("Redundancy level {} is not satisfied, but there are no more servers available. Redundancy is currently {}.", new Object[]{this.redundancyLevel, this.getCurrentRedundancy()});
                }
                this.printRedundancyNotSatisfiedError = false;
                return;
            }
            excludedServers.addAll(servers);
            boolean isDebugEnabled = logger.isDebugEnabled();
            for (ServerLocation server : servers) {
                QueueConnectionImpl queueConnection;
                Connection connection;
                block15: {
                    connection = null;
                    try {
                        connection = this.factory.createClientToServerConnection(server, true);
                    }
                    catch (GemFireSecurityException e) {
                        throw e;
                    }
                    catch (Exception e) {
                        if (!isDebugEnabled) break block15;
                        logger.debug("SubscriptionManager - Error connecting to server: {}", (Object)server, (Object)e);
                    }
                }
                if (connection == null || (queueConnection = this.initializeQueueConnection(connection, false, null)) == null) continue;
                boolean isFirstNewConnection = false;
                Object object = this.lock;
                synchronized (object) {
                    if (recoverInterest && this.queueConnections.getPrimary() == null && this.queueConnections.getBackups().isEmpty()) {
                        isFirstNewConnection = true;
                    }
                }
                boolean promotionFailed = false;
                if (isFirstNewConnection && !this.promoteBackupCnxToPrimary(queueConnection)) {
                    promotionFailed = true;
                }
                if (promotionFailed || !this.addToConnectionList(queueConnection, isFirstNewConnection)) continue;
                this.printRedundancyNotSatisfiedError = true;
                this.printRecoveringRedundant = true;
                if (logger.isDebugEnabled()) {
                    logger.debug("SubscriptionManager redundancy satisfier - created a queue on server {}", (Object)queueConnection.getEndpoint());
                }
                if (!recoverInterest) continue;
                this.recoverInterest(queueConnection, isFirstNewConnection);
            }
        }
    }

    @VisibleForTesting
    public QueueConnectionImpl promoteBackupToPrimary(List<Connection> backups) {
        QueueConnectionImpl primary = null;
        for (int i = 0; primary == null && i < backups.size(); ++i) {
            QueueConnectionImpl lastConnection = (QueueConnectionImpl)backups.get(i);
            if (!this.promoteBackupCnxToPrimary(lastConnection)) continue;
            primary = lastConnection;
        }
        return primary;
    }

    private boolean promoteBackupCnxToPrimary(QueueConnectionImpl cnx) {
        boolean result;
        block8: {
            result = false;
            if (PoolImpl.BEFORE_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG) {
                ClientServerObserver bo = ClientServerObserverHolder.getInstance();
                bo.beforePrimaryIdentificationFromBackup();
            }
            try {
                ClientUpdater updater;
                boolean haveSentClientReady = this.sentClientReady;
                if (haveSentClientReady) {
                    cnx.sendClientReady();
                }
                if ((updater = cnx.getUpdater()) == null) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("backup connection was destroyed before it could become the primary.");
                    }
                    Assert.assertTrue(cnx.isDestroyed());
                } else {
                    updater.setFailedUpdater(this.queueConnections.getFailedUpdater());
                    MakePrimaryOp.execute(this.pool, cnx, haveSentClientReady);
                    result = true;
                    if (PoolImpl.AFTER_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG) {
                        ClientServerObserver bo = ClientServerObserverHolder.getInstance();
                        bo.afterPrimaryIdentificationFromBackup(cnx.getServer());
                    }
                }
            }
            catch (Exception e) {
                if (this.pool.getPoolOrCacheCancelInProgress() != null || !logger.isDebugEnabled()) break block8;
                logger.debug("Error making a backup server the primary server for client subscriptions", (Throwable)e);
            }
        }
        return result;
    }

    private QueueConnectionImpl createNewPrimary(Set<ServerLocation> excludedServers) {
        QueueConnectionImpl primary = null;
        while (primary == null && this.pool.getPoolOrCacheCancelInProgress() == null) {
            Connection connection;
            List<ServerLocation> servers;
            block5: {
                servers = this.findQueueServers(excludedServers, 1, false, this.printPrimaryNotFoundError, "Could not find any server to host primary client queue. Number of excluded servers is %s and exception is %s");
                this.printPrimaryNotFoundError = false;
                if (servers == null || servers.isEmpty()) break;
                connection = null;
                try {
                    connection = this.factory.createClientToServerConnection(servers.get(0), true);
                }
                catch (GemFireSecurityException e) {
                    throw e;
                }
                catch (Exception e) {
                    if (!logger.isDebugEnabled()) break block5;
                    logger.debug("SubscriptionManagerImpl - error creating a connection to server {}", (Object)servers.get(0));
                }
            }
            if (connection != null) {
                primary = this.initializeQueueConnection(connection, true, this.queueConnections.getFailedUpdater());
            }
            excludedServers.addAll(servers);
        }
        return primary;
    }

    public void markQueueAsReadyForEvents(@NotNull QueueConnectionImpl primary) {
        if (this.sentClientReady && primary.sendClientReady()) {
            this.readyForEventsAfterFailover(primary);
        }
    }

    private List<ServerLocation> findQueueServers(Set<ServerLocation> excludedServers, int count, boolean findDurable, boolean printErrorMessage, String msg) {
        Exception ex;
        List<ServerLocation> servers;
        block5: {
            servers = null;
            ex = null;
            try {
                if (this.pool.getPoolOrCacheCancelInProgress() != null) {
                    return null;
                }
                servers = this.source.findServersForQueue(excludedServers, count, this.proxyId, findDurable);
            }
            catch (GemFireSecurityException e) {
                throw e;
            }
            catch (Exception e) {
                ex = e;
                if (!logger.isDebugEnabled()) break block5;
                logger.debug("SubscriptionManager - Error getting the list of servers: {}", (Object)e.getMessage());
            }
        }
        if (printErrorMessage && (servers == null || servers.isEmpty())) {
            logger.error(String.format(msg, excludedServers != null ? excludedServers.size() : 0, ex != null ? ex.getMessage() : "no exception"));
        }
        return servers;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void recoverPrimary(Set<ServerLocation> excludedServers) {
        ClientServerObserver bo;
        List<Connection> backups;
        if (this.pool.getPoolOrCacheCancelInProgress() != null) {
            return;
        }
        boolean isDebugEnabled = logger.isDebugEnabled();
        if (!QueueManagerImpl.isPrimaryRecoveryNeeded(this.queueConnections)) {
            if (isDebugEnabled) {
                logger.debug("Primary recovery not needed");
            }
            return;
        }
        if (isDebugEnabled) {
            logger.debug("SubscriptionManager redundancy satisfier - primary endpoint has been lost. Attempting to recover");
        }
        if (this.printRecoveringPrimary) {
            logger.info("SubscriptionManager redundancy satisfier - primary endpoint has been lost. Attempting to recover.");
            this.printRecoveringPrimary = false;
        }
        QueueConnectionImpl newPrimary = null;
        while (newPrimary == null && this.pool.getPoolOrCacheCancelInProgress() == null && (newPrimary = this.promoteBackupToPrimary(backups = this.queueConnections.getBackups())) != null) {
            if (this.addToConnectionList(newPrimary, true)) continue;
            Object object = this.lock;
            synchronized (object) {
                this.queueConnections = this.queueConnections.removeConnection(newPrimary);
            }
            newPrimary = null;
        }
        if (newPrimary != null) {
            if (isDebugEnabled) {
                logger.debug("SubscriptionManager redundancy satisfier - Switched backup server to primary: {}", (Object)newPrimary.getEndpoint());
            }
            if (PoolImpl.AFTER_PRIMARY_RECOVERED_CALLBACK_FLAG) {
                bo = ClientServerObserverHolder.getInstance();
                bo.afterPrimaryRecovered(newPrimary.getServer());
            }
            this.cqsConnected();
            this.printPrimaryNotFoundError = true;
            this.printRecoveringPrimary = true;
            return;
        }
        if (newPrimary == null && (newPrimary = this.createNewPrimary(excludedServers)) != null) {
            if (!this.addToConnectionList(newPrimary, true)) {
                excludedServers.add(newPrimary.getServer());
                newPrimary = null;
            }
            if (newPrimary != null) {
                if (isDebugEnabled) {
                    logger.debug("SubscriptionManager redundancy satisfier - Non backup server was made primary. Recovering interest {}", (Object)newPrimary.getEndpoint());
                }
                if (!this.recoverInterest(newPrimary, true)) {
                    excludedServers.add(newPrimary.getServer());
                    newPrimary = null;
                }
                this.markQueueAsReadyForEvents(newPrimary);
                this.cqsConnected();
            }
            if (newPrimary != null && PoolImpl.AFTER_PRIMARY_RECOVERED_CALLBACK_FLAG) {
                bo = ClientServerObserverHolder.getInstance();
                bo.afterPrimaryRecovered(newPrimary.getServer());
            }
            this.printPrimaryNotFoundError = true;
            this.printRecoveringPrimary = true;
            return;
        }
        this.cqsDisconnected();
        if (isDebugEnabled) {
            logger.debug("SubscriptionManager redundancy satisfier - Could not recover a new primary");
        }
        Object object = this.lock;
        synchronized (object) {
            this.queueConnections = this.queueConnections.setPrimaryDiscoveryFailed(null);
            this.lock.notifyAll();
        }
    }

    static boolean isPrimaryRecoveryNeeded(ConnectionList queueConnectionList) {
        Connection primaryConnection;
        if (queueConnectionList != null && (primaryConnection = queueConnectionList.getPrimary()) != null) {
            return primaryConnection.isDestroyed();
        }
        return true;
    }

    private QueueConnectionImpl initializeQueueConnection(Connection connection, boolean isPrimary, ClientUpdater failedUpdater) {
        ServerDenyList.FailureTracker failureTracker;
        QueueConnectionImpl queueConnection;
        block5: {
            queueConnection = null;
            failureTracker = this.denyList.getFailureTracker(connection.getServer());
            try {
                ClientUpdater updater = this.factory.createServerToClientConnection(connection.getEndpoint(), this, isPrimary, failedUpdater);
                if (updater != null) {
                    queueConnection = new QueueConnectionImpl(this, connection, updater, failureTracker);
                } else {
                    logger.warn("unable to create a subscription connection to server {}", (Object)connection.getEndpoint());
                }
            }
            catch (Exception e) {
                if (!logger.isDebugEnabled()) break block5;
                logger.debug("error creating subscription connection to server {}", (Object)connection.getEndpoint(), (Object)e);
            }
        }
        if (queueConnection == null) {
            failureTracker.addFailure();
            connection.destroy();
        }
        return queueConnection;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public boolean addToConnectionList(QueueConnectionImpl connection, boolean isPrimary) {
        boolean isBadConnection;
        block12: {
            Object object = this.lock;
            synchronized (object) {
                ClientUpdater cu = connection.getUpdater();
                if (cu == null || !cu.isAlive() || !cu.isProcessing()) {
                    return false;
                }
                if (connection.getEndpoint().isClosed() || connection.isDestroyed() || this.shuttingDown || this.pool.getPoolOrCacheCancelInProgress() != null) {
                    isBadConnection = true;
                } else {
                    isBadConnection = false;
                    if (isPrimary) {
                        this.queueConnections = this.queueConnections.setPrimary(connection);
                        this.lock.notifyAll();
                    } else {
                        this.queueConnections = this.queueConnections.addBackup(connection);
                    }
                }
            }
            if (isBadConnection) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Endpoint {} crashed while creating a connection. The connection will be destroyed", (Object)connection.getEndpoint());
                }
                try {
                    connection.internalClose(true);
                }
                catch (Exception e) {
                    if (!logger.isDebugEnabled()) break block12;
                    logger.debug("Error destroying client to server connection to {}", (Object)connection.getEndpoint(), (Object)e);
                }
            }
        }
        return !isBadConnection;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void scheduleRedundancySatisfierIfNeeded(long delay) {
        if (this.shuttingDown) {
            return;
        }
        Object object = this.lock;
        synchronized (object) {
            if (this.shuttingDown) {
                return;
            }
            if (this.queueConnections.getPrimary() == null || this.getCurrentRedundancy() < this.redundancyLevel || this.redundancyLevel == -1 || this.queueConnections.primaryDiscoveryFailed()) {
                if (this.redundancySatisfierTask != null) {
                    if (this.redundancySatisfierTask.getRemainingDelay() > delay) {
                        this.redundancySatisfierTask.cancel();
                    } else {
                        return;
                    }
                }
                this.redundancySatisfierTask = new RedundancySatisfierTask();
                try {
                    ScheduledFuture<?> future = this.recoveryThread.schedule(this.redundancySatisfierTask, delay, TimeUnit.MILLISECONDS);
                    this.redundancySatisfierTask.setFuture(future);
                }
                catch (RejectedExecutionException rejectedExecutionException) {
                    // empty catch block
                }
            }
        }
    }

    private boolean recoverInterest(QueueConnectionImpl newConnection, boolean isFirstNewConnection) {
        if (this.pool.getPoolOrCacheCancelInProgress() != null) {
            return true;
        }
        try {
            this.recoverAllInterestTypes(newConnection, isFirstNewConnection);
            newConnection.getFailureTracker().reset();
            return true;
        }
        catch (CancelException ignore) {
            return true;
        }
        catch (VirtualMachineError err) {
            SystemFailure.initiateFailure(err);
            throw err;
        }
        catch (Throwable t) {
            SystemFailure.checkFailure();
            this.pool.getCancelCriterion().checkCancelInProgress(t);
            logger.warn("QueueManagerImpl failed to recover interest to server " + newConnection.getServer(), t);
            newConnection.getFailureTracker().addFailure();
            newConnection.destroy();
            return false;
        }
    }

    @Override
    public QueueState getState() {
        return this.state;
    }

    private void recoverSingleList(@NotNull InterestType interestType2, Connection recoveredConnection, boolean isDurable, boolean receiveValues, boolean isFirstNewConnection) {
        for (RegisterInterestTracker.RegionInterestEntry e : this.getPool().getRITracker().getRegionToInterestsMap(interestType2, isDurable, !receiveValues).values()) {
            this.recoverSingleRegion(e.getRegion(), e.getInterests(), interestType2, recoveredConnection, isDurable, receiveValues, isFirstNewConnection);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void recoverCqs(Connection recoveredConnection, boolean isDurable) {
        Map<ClientCQ, Boolean> cqs = this.getPool().getRITracker().getCqsMap();
        for (Map.Entry<ClientCQ, Boolean> e : cqs.entrySet()) {
            ClientCQ cqi = e.getKey();
            String name = cqi.getName();
            if (this.pool.getMultiuserAuthentication()) {
                UserAttributes.userAttributes.set(((DefaultQueryService)this.pool.getQueryService()).getUserAttributes(name));
            }
            try {
                if (((CqStateImpl)cqi.getState()).getState() == 4 || cqi.isDurable() != isDurable) continue;
                cqi.createOn(recoveredConnection, isDurable);
            }
            finally {
                UserAttributes.userAttributes.set(null);
            }
        }
    }

    private void recoverSingleRegion(LocalRegion r, Map<Object, InterestResultPolicy> keys, @NotNull InterestType interestType2, Connection recoveredConnection, boolean isDurable, boolean receiveValues, boolean isFirstNewConnection) {
        InterestResultPolicy pol;
        if (logger.isDebugEnabled()) {
            logger.debug("{}.recoverSingleRegion starting kind={} region={}: {}", (Object)this, (Object)InterestType.getString(interestType2), (Object)r.getFullPath(), keys);
        }
        HashMap<InterestResultPolicy, LinkedList<Object>> policyMap = new HashMap<InterestResultPolicy, LinkedList<Object>>();
        for (Map.Entry<Object, InterestResultPolicy> entry : keys.entrySet()) {
            Object key = entry.getKey();
            pol = entry.getValue();
            if (interestType2 == InterestType.KEY) {
                LinkedList<Object> keyList = (LinkedList<Object>)policyMap.get(pol);
                if (keyList == null) {
                    keyList = new LinkedList<Object>();
                }
                keyList.add(key);
                policyMap.put(pol, keyList);
                continue;
            }
            this.recoverSingleKey(r, key, pol, interestType2, recoveredConnection, isDurable, receiveValues, isFirstNewConnection);
        }
        for (Map.Entry<Object, InterestResultPolicy> entry : policyMap.entrySet()) {
            LinkedList keyList = (LinkedList)((Object)entry.getValue());
            pol = (InterestResultPolicy)entry.getKey();
            this.recoverSingleKey(r, keyList, pol, interestType2, recoveredConnection, isDurable, receiveValues, isFirstNewConnection);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void recoverSingleKey(LocalRegion r, @NotNull Object keys, InterestResultPolicy policy, @NotNull InterestType interestType2, Connection recoveredConnection, boolean isDurable, boolean receiveValues, boolean isFirstNewConnection) {
        r.startRegisterInterest();
        try {
            if (isFirstNewConnection) {
                r.clearKeysOfInterest(keys, interestType2, policy);
                if (logger.isDebugEnabled()) {
                    logger.debug("{}.recoverSingleRegion :Endpoint recovered is primary so clearing the keys of interest starting kind={} region={}: {}", (Object)this, (Object)InterestType.getString(interestType2), (Object)r.getFullPath(), keys);
                }
            }
            if (policy != InterestResultPolicy.KEYS_VALUES) {
                List<Object> serverKeys = r.getServerProxy().registerInterestOn(recoveredConnection, keys, interestType2, policy, isDurable, !receiveValues, r.getAttributes().getDataPolicy());
                if (isFirstNewConnection) {
                    r.refreshEntriesFromServerKeys(recoveredConnection, serverKeys, policy);
                }
            } else if (!isFirstNewConnection) {
                r.getServerProxy().registerInterestOn(recoveredConnection, keys, interestType2, InterestResultPolicy.NONE, isDurable, !receiveValues, r.getAttributes().getDataPolicy());
            } else {
                List<Object> serverKeys = r.getServerProxy().registerInterestOn(recoveredConnection, keys, interestType2, policy, isDurable, !receiveValues, r.getAttributes().getDataPolicy());
                r.refreshEntriesFromServerKeys(recoveredConnection, serverKeys, policy);
            }
        }
        finally {
            r.finishRegisterInterest();
        }
    }

    private void recoverInterestList(Connection recoveredConnection, boolean durable, boolean receiveValues, boolean isFirstNewConnection) {
        this.recoverSingleList(InterestType.KEY, recoveredConnection, durable, receiveValues, isFirstNewConnection);
        this.recoverSingleList(InterestType.REGULAR_EXPRESSION, recoveredConnection, durable, receiveValues, isFirstNewConnection);
        this.recoverSingleList(InterestType.FILTER_CLASS, recoveredConnection, durable, receiveValues, isFirstNewConnection);
        this.recoverSingleList(InterestType.OQL_QUERY, recoveredConnection, durable, receiveValues, isFirstNewConnection);
    }

    private void recoverAllInterestTypes(Connection recoveredConnection, boolean isFirstNewConnection) {
        if (PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG) {
            ClientServerObserver bo = ClientServerObserverHolder.getInstance();
            bo.beforeInterestRecovery();
        }
        this.recoverInterestList(recoveredConnection, false, true, isFirstNewConnection);
        this.recoverInterestList(recoveredConnection, false, false, isFirstNewConnection);
        this.recoverCqs(recoveredConnection, false);
        if (this.getPool().isDurableClient()) {
            this.recoverInterestList(recoveredConnection, true, true, isFirstNewConnection);
            this.recoverInterestList(recoveredConnection, true, false, isFirstNewConnection);
            this.recoverCqs(recoveredConnection, true);
        }
    }

    private void logError(String message, Throwable t) {
        if (t instanceof GemFireSecurityException) {
            this.securityLogger.error(message, t);
        } else {
            logger.error(message, t);
        }
    }

    protected class RedundancySatisfierTask
    extends PoolImpl.PoolTask {
        private boolean isCancelled;
        private ScheduledFuture<?> future;

        protected RedundancySatisfierTask() {
        }

        public void setFuture(ScheduledFuture<?> future) {
            this.future = future;
        }

        long getRemainingDelay() {
            return this.future.getDelay(TimeUnit.MILLISECONDS);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run2() {
            try {
                QueueManagerImpl.this.initializedLatch.await();
                Object object = QueueManagerImpl.this.lock;
                synchronized (object) {
                    if (this.isCancelled) {
                        return;
                    }
                    QueueManagerImpl.this.redundancySatisfierTask = null;
                    if (QueueManagerImpl.this.pool.getPoolOrCacheCancelInProgress() != null) {
                        QueueManagerImpl.this.lock.notifyAll();
                        return;
                    }
                }
                Set<ServerLocation> excludedServers = QueueManagerImpl.this.queueConnections.getAllLocations();
                excludedServers.addAll(QueueManagerImpl.this.denyList.getBadServers());
                excludedServers.addAll(QueueManagerImpl.this.factory.getDenyList().getBadServers());
                QueueManagerImpl.this.recoverPrimary(excludedServers);
                QueueManagerImpl.this.recoverRedundancy(excludedServers, true);
            }
            catch (VirtualMachineError err) {
                SystemFailure.initiateFailure(err);
                throw err;
            }
            catch (CancelException e) {
                throw e;
            }
            catch (Throwable t) {
                SystemFailure.checkFailure();
                Object object = QueueManagerImpl.this.lock;
                synchronized (object) {
                    if (t instanceof GemFireSecurityException) {
                        QueueManagerImpl.this.queueConnections = QueueManagerImpl.this.queueConnections.setPrimaryDiscoveryFailed((GemFireSecurityException)t);
                    } else {
                        QueueManagerImpl.this.queueConnections = QueueManagerImpl.this.queueConnections.setPrimaryDiscoveryFailed(null);
                    }
                    QueueManagerImpl.this.lock.notifyAll();
                    QueueManagerImpl.this.pool.getCancelCriterion().checkCancelInProgress(t);
                    QueueManagerImpl.this.logError("Error in redundancy satisfier", t);
                }
            }
            QueueManagerImpl.this.scheduleRedundancySatisfierIfNeeded(QueueManagerImpl.this.redundancyRetryInterval);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean cancel() {
            Object object = QueueManagerImpl.this.lock;
            synchronized (object) {
                if (this.isCancelled) {
                    return false;
                }
                this.isCancelled = true;
                this.future.cancel(false);
                QueueManagerImpl.this.redundancySatisfierTask = null;
                return true;
            }
        }
    }

    public class ConnectionList
    implements QueueManager.QueueConnections {
        private final QueueConnectionImpl primary;
        private final Map<Endpoint, Connection> connectionMap;
        private final List<Connection> backups;
        private final GemFireException primaryDiscoveryException;
        private final QueueConnectionImpl failedPrimary;

        ConnectionList() {
            this.primary = null;
            this.connectionMap = Collections.emptyMap();
            this.backups = Collections.emptyList();
            this.primaryDiscoveryException = null;
            this.failedPrimary = null;
        }

        private ConnectionList(QueueConnectionImpl primary, List<Connection> backups, GemFireException discoveryException, QueueConnectionImpl failedPrimary) {
            this.primary = primary;
            HashMap<Endpoint, Connection> allConnectionsTmp = new HashMap<Endpoint, Connection>();
            for (Connection nextConnection : backups) {
                allConnectionsTmp.put(nextConnection.getEndpoint(), nextConnection);
            }
            if (primary != null) {
                allConnectionsTmp.put(primary.getEndpoint(), primary);
            }
            this.connectionMap = Collections.unmodifiableMap(allConnectionsTmp);
            this.backups = Collections.unmodifiableList(new ArrayList<Connection>(backups));
            QueueManagerImpl.this.pool.getStats().setSubscriptionCount(this.connectionMap.size());
            this.primaryDiscoveryException = discoveryException;
            this.failedPrimary = failedPrimary;
        }

        public ConnectionList setPrimary(QueueConnectionImpl newPrimary) {
            List<Connection> newBackups = this.backups;
            if (this.backups.contains(newPrimary)) {
                newBackups = new ArrayList<Connection>(this.backups);
                newBackups.remove(newPrimary);
            }
            return new ConnectionList(newPrimary, newBackups, null, null);
        }

        ConnectionList setPrimaryDiscoveryFailed(GemFireException p_discoveryException) {
            GemFireException discoveryException = p_discoveryException;
            if (discoveryException == null) {
                discoveryException = new NoSubscriptionServersAvailableException("Primary discovery failed.");
            }
            return new ConnectionList(this.primary, this.backups, discoveryException, this.failedPrimary);
        }

        ConnectionList addBackup(QueueConnectionImpl queueConnection) {
            ArrayList<Connection> newBackups = new ArrayList<Connection>(this.backups);
            newBackups.add(queueConnection);
            return new ConnectionList(this.primary, newBackups, this.primaryDiscoveryException, this.failedPrimary);
        }

        ConnectionList removeConnection(QueueConnectionImpl connection) {
            if (this.primary == connection) {
                return new ConnectionList(null, this.backups, this.primaryDiscoveryException, this.primary);
            }
            ArrayList<Connection> newBackups = new ArrayList<Connection>(this.backups);
            newBackups.remove(connection);
            return new ConnectionList(this.primary, newBackups, this.primaryDiscoveryException, this.failedPrimary);
        }

        @Override
        public Connection getPrimary() {
            return this.primary;
        }

        @Override
        public List<Connection> getBackups() {
            return this.backups;
        }

        ClientUpdater getFailedUpdater() {
            if (this.failedPrimary != null) {
                return this.failedPrimary.getUpdater();
            }
            return null;
        }

        boolean primaryDiscoveryFailed() {
            return this.primaryDiscoveryException != null;
        }

        GemFireException getPrimaryDiscoveryException() {
            return this.primaryDiscoveryException;
        }

        @Override
        public QueueConnectionImpl getConnection(Endpoint endpoint) {
            return (QueueConnectionImpl)this.connectionMap.get(endpoint);
        }

        Set<ServerLocation> getAllLocations() {
            HashSet<ServerLocation> locations = new HashSet<ServerLocation>();
            for (Endpoint endpoint : this.connectionMap.keySet()) {
                locations.add(endpoint.getLocation());
            }
            return locations;
        }
    }

    @Immutable
    protected static class QSizeComparator
    implements Comparator<ServerQueueStatus> {
        protected QSizeComparator() {
        }

        @Override
        public int compare(ServerQueueStatus s1, ServerQueueStatus s2) {
            if (s1.isPrimary() && !s2.isPrimary()) {
                return -1;
            }
            if (!s1.isPrimary() && s2.isPrimary()) {
                return 1;
            }
            int diff = s1.getServerQueueSize() - s2.getServerQueueSize();
            if (diff != 0) {
                return diff;
            }
            return s1.getMemberId().compareTo(s2.getMemberId());
        }
    }
}

