package org.apache.hadoop.hdfs.server.namenode.ha;

import java.io.Closeable;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.ClientGSIContext;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Evolving
/* loaded from: classes4.dex */
public class ObserverReadProxyProvider<T extends ClientProtocol> extends AbstractNNFailoverProxyProvider<T> {
    static final long AUTO_MSYNC_PERIOD_DEFAULT = -1;
    static final String AUTO_MSYNC_PERIOD_KEY_PREFIX = "dfs.client.failover.observer.auto-msync-period";
    static final Logger LOG = LoggerFactory.getLogger((Class<?>) ObserverReadProxyProvider.class);
    private final AlignmentContext alignmentContext;
    private final long autoMsyncPeriodMs;
    private final FailoverProxyProvider.ProxyInfo<T> combinedProxy;
    private int currentIndex;
    private AbstractNNFailoverProxyProvider.NNProxyInfo<T> currentProxy;
    private final AbstractNNFailoverProxyProvider<T> failoverProxy;
    private volatile long lastMsyncTimeMs;
    private volatile FailoverProxyProvider.ProxyInfo<T> lastProxy;
    private volatile boolean msynced;
    private final List<AbstractNNFailoverProxyProvider.NNProxyInfo<T>> nameNodeProxies;
    private boolean observerReadEnabled;
    private final RetryPolicy observerRetryPolicy;

    /* loaded from: classes4.dex */
    private class ObserverReadInvocationHandler implements RpcInvocationHandler {
        private ObserverReadInvocationHandler() {
        }

        public void close() throws IOException {
        }

        public Client.ConnectionId getConnectionId() {
            return RPC.getConnectionIdForProxy(ObserverReadProxyProvider.this.getCurrentProxy().proxy);
        }

        public Object invoke(Object obj, Method method, Object[] objArr) throws Throwable {
            ObserverReadProxyProvider.this.lastProxy = null;
            if (ObserverReadProxyProvider.this.observerReadEnabled && ObserverReadProxyProvider.isRead(method)) {
                if (ObserverReadProxyProvider.this.msynced) {
                    ObserverReadProxyProvider.this.autoMsyncIfNecessary();
                } else {
                    ObserverReadProxyProvider.this.initializeMsync();
                }
                int i = 0;
                int i2 = 0;
                int i3 = 0;
                int i4 = 0;
                while (true) {
                    if (i >= ObserverReadProxyProvider.this.nameNodeProxies.size()) {
                        break;
                    }
                    AbstractNNFailoverProxyProvider.NNProxyInfo currentProxy = ObserverReadProxyProvider.this.getCurrentProxy();
                    HAServiceProtocol.HAServiceState cachedState = currentProxy.getCachedState();
                    if (cachedState != HAServiceProtocol.HAServiceState.OBSERVER) {
                        if (cachedState == HAServiceProtocol.HAServiceState.ACTIVE) {
                            i2++;
                        } else if (cachedState == HAServiceProtocol.HAServiceState.STANDBY) {
                            i3++;
                        }
                        ObserverReadProxyProvider.LOG.debug("Skipping proxy {} for {} because it is in state {}", currentProxy.proxyInfo, method.getName(), cachedState);
                        ObserverReadProxyProvider.this.changeProxy(currentProxy);
                    } else {
                        ObserverReadProxyProvider.LOG.debug("Attempting to service {} using proxy {}", method.getName(), currentProxy.proxyInfo);
                        try {
                            Object invoke = method.invoke(currentProxy.proxy, objArr);
                            ObserverReadProxyProvider.this.lastProxy = currentProxy;
                            ObserverReadProxyProvider.LOG.debug("Invocation of {} using {} was successful", method.getName(), currentProxy.proxyInfo);
                            return invoke;
                        } catch (InvocationTargetException e) {
                            if (!(e.getCause() instanceof Exception)) {
                                throw e.getCause();
                            }
                            RemoteException remoteException = (Exception) e.getCause();
                            if ((remoteException instanceof RemoteException) && (remoteException.unwrapRemoteException(new Class[]{ObserverRetryOnActiveException.class}) instanceof ObserverRetryOnActiveException)) {
                                ObserverReadProxyProvider.LOG.info("Encountered ObserverRetryOnActiveException from {}. Retry active namenode directly.", currentProxy.proxyInfo);
                                break;
                            }
                            if (ObserverReadProxyProvider.this.observerRetryPolicy.shouldRetry(remoteException, 0, 0, method.isAnnotationPresent(Idempotent.class) || method.isAnnotationPresent(AtMostOnce.class)).action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
                                throw remoteException;
                            }
                            i4++;
                            ObserverReadProxyProvider.LOG.warn("Invocation returned exception on [{}]; {} failure(s) so far", currentProxy.proxyInfo, Integer.valueOf(i4), remoteException);
                            ObserverReadProxyProvider.this.changeProxy(currentProxy);
                        }
                    }
                    i++;
                }
                ObserverReadProxyProvider.LOG.warn("{} observers have failed for read request {}; also found {} standby and {} active. Falling back to active.", Integer.valueOf(i4), method.getName(), Integer.valueOf(i3), Integer.valueOf(i2));
            }
            ObserverReadProxyProvider.LOG.debug("Using failoverProxy to service {}", method.getName());
            FailoverProxyProvider.ProxyInfo proxy = ObserverReadProxyProvider.this.failoverProxy.getProxy();
            try {
                Object invoke2 = method.invoke(proxy.proxy, objArr);
                ObserverReadProxyProvider.this.msynced = true;
                ObserverReadProxyProvider.this.lastMsyncTimeMs = Time.monotonicNow();
                ObserverReadProxyProvider.this.lastProxy = proxy;
                return invoke2;
            } catch (InvocationTargetException e2) {
                throw e2.getCause();
            }
        }
    }

    public ObserverReadProxyProvider(Configuration configuration, URI uri, Class<T> cls, HAProxyFactory<T> hAProxyFactory) {
        this(configuration, uri, cls, hAProxyFactory, new ConfiguredFailoverProxyProvider(configuration, uri, cls, hAProxyFactory));
    }

    public ObserverReadProxyProvider(Configuration configuration, URI uri, Class<T> cls, HAProxyFactory<T> hAProxyFactory, AbstractNNFailoverProxyProvider<T> abstractNNFailoverProxyProvider) {
        super(configuration, uri, cls, hAProxyFactory);
        this.lastMsyncTimeMs = -1L;
        this.msynced = false;
        this.currentIndex = -1;
        this.lastProxy = null;
        this.failoverProxy = abstractNNFailoverProxyProvider;
        ClientGSIContext clientGSIContext = new ClientGSIContext();
        this.alignmentContext = clientGSIContext;
        ((ClientHAProxyFactory) hAProxyFactory).setAlignmentContext(clientGSIContext);
        this.observerRetryPolicy = RetryPolicies.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, 1);
        this.nameNodeProxies = (List<AbstractNNFailoverProxyProvider.NNProxyInfo<T>>) getProxyAddresses(uri, HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
        StringBuilder sb = new StringBuilder("[");
        for (int i = 0; i < this.nameNodeProxies.size(); i++) {
            if (i > 0) {
                sb.append(",");
            }
            sb.append(this.nameNodeProxies.get(i).proxyInfo);
        }
        sb.append(']');
        this.combinedProxy = new FailoverProxyProvider.ProxyInfo<>((ClientProtocol) Proxy.newProxyInstance(ObserverReadInvocationHandler.class.getClassLoader(), new Class[]{cls}, new ObserverReadInvocationHandler()), sb.toString());
        this.autoMsyncPeriodMs = configuration.getTimeDuration("dfs.client.failover.observer.auto-msync-period." + uri.getHost(), -1L, TimeUnit.MILLISECONDS);
        this.observerReadEnabled = true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void autoMsyncIfNecessary() throws IOException {
        long j = this.autoMsyncPeriodMs;
        if (j == 0) {
            ((ClientProtocol) this.failoverProxy.getProxy().proxy).msync();
            return;
        }
        if (j <= 0 || Time.monotonicNow() - this.lastMsyncTimeMs <= this.autoMsyncPeriodMs) {
            return;
        }
        synchronized (this) {
            if (Time.monotonicNow() - this.lastMsyncTimeMs > this.autoMsyncPeriodMs) {
                ((ClientProtocol) this.failoverProxy.getProxy().proxy).msync();
                this.lastMsyncTimeMs = Time.monotonicNow();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized AbstractNNFailoverProxyProvider.NNProxyInfo<T> changeProxy(AbstractNNFailoverProxyProvider.NNProxyInfo<T> nNProxyInfo) {
        AbstractNNFailoverProxyProvider.NNProxyInfo<T> nNProxyInfo2 = this.currentProxy;
        if (nNProxyInfo2 != nNProxyInfo) {
            return nNProxyInfo2;
        }
        int size = (this.currentIndex + 1) % this.nameNodeProxies.size();
        this.currentIndex = size;
        AbstractNNFailoverProxyProvider.NNProxyInfo<T> createProxyIfNeeded = createProxyIfNeeded(this.nameNodeProxies.get(size));
        this.currentProxy = createProxyIfNeeded;
        createProxyIfNeeded.setCachedState(getHAServiceState(createProxyIfNeeded));
        LOG.debug("Changed current proxy from {} to {}", nNProxyInfo == null ? "none" : nNProxyInfo.proxyInfo, this.currentProxy.proxyInfo);
        return this.currentProxy;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public AbstractNNFailoverProxyProvider.NNProxyInfo<T> getCurrentProxy() {
        return changeProxy(null);
    }

    private HAServiceProtocol.HAServiceState getHAServiceState(AbstractNNFailoverProxyProvider.NNProxyInfo<T> nNProxyInfo) {
        try {
            return ((ClientProtocol) nNProxyInfo.proxy).getHAServiceState();
        } catch (IOException e) {
            e = e;
            LOG.info("Failed to connect to {}. Assuming Standby state", nNProxyInfo.getAddress(), e);
            return HAServiceProtocol.HAServiceState.STANDBY;
        } catch (RemoteException e2) {
            e = e2;
            if (e.unwrapRemoteException() instanceof StandbyException) {
                LOG.debug("NameNode {} threw StandbyException when fetching HAState", nNProxyInfo.getAddress());
                return HAServiceProtocol.HAServiceState.STANDBY;
            }
            LOG.info("Failed to connect to {}. Assuming Standby state", nNProxyInfo.getAddress(), e);
            return HAServiceProtocol.HAServiceState.STANDBY;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void initializeMsync() throws IOException {
        if (this.msynced) {
            return;
        }
        ((ClientProtocol) this.failoverProxy.getProxy().proxy).msync();
        this.msynced = true;
        this.lastMsyncTimeMs = Time.monotonicNow();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isRead(Method method) {
        Annotation[] annotationsByType;
        if (!method.isAnnotationPresent(ReadOnly.class)) {
            return false;
        }
        annotationsByType = method.getAnnotationsByType(ReadOnly.class);
        return !((ReadOnly[]) annotationsByType)[0].activeOnly();
    }

    public synchronized void close() throws IOException {
        for (AbstractNNFailoverProxyProvider.NNProxyInfo<T> nNProxyInfo : this.nameNodeProxies) {
            if (((FailoverProxyProvider.ProxyInfo) nNProxyInfo).proxy != null) {
                if (((FailoverProxyProvider.ProxyInfo) nNProxyInfo).proxy instanceof Closeable) {
                    ((Closeable) ((FailoverProxyProvider.ProxyInfo) nNProxyInfo).proxy).close();
                } else {
                    RPC.stopProxy(((FailoverProxyProvider.ProxyInfo) nNProxyInfo).proxy);
                }
                ((FailoverProxyProvider.ProxyInfo) nNProxyInfo).proxy = null;
            }
        }
        this.failoverProxy.close();
    }

    public AlignmentContext getAlignmentContext() {
        return this.alignmentContext;
    }

    FailoverProxyProvider.ProxyInfo<T> getLastProxy() {
        return this.lastProxy;
    }

    public FailoverProxyProvider.ProxyInfo<T> getProxy() {
        return this.combinedProxy;
    }

    public void performFailover(T t) {
        this.failoverProxy.performFailover(t);
    }

    void setObserverReadEnabled(boolean z) {
        this.observerReadEnabled = z;
    }

    @Override // org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider
    public boolean useLogicalURI() {
        return this.failoverProxy.useLogicalURI();
    }
}
