package io.reactivex.netty.client.loadbalancer;

import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.client.ConnectionProvider;
import io.reactivex.netty.client.ConnectionProviderFactory;
import io.reactivex.netty.client.HostConnector;
import io.reactivex.netty.client.loadbalancer.HostCollector;
import io.reactivex.netty.internal.VoidToAnythingCast;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Single;
import rx.functions.Action1;
import rx.functions.Func1;

/* loaded from: classes2.dex */
public class LoadBalancerFactory<W, R> implements ConnectionProviderFactory<W, R> {
    private static final Logger logger = LoggerFactory.getLogger(LoadBalancerFactory.class);
    private final HostCollector collector;
    private final LoadBalancingStrategy<W, R> strategy;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes2.dex */
    public class ConnectionProviderImpl implements ConnectionProvider<W, R> {
        private volatile ConnectionProvider<W, R> currentProvider = new ConnectionProvider<W, R>() { // from class: io.reactivex.netty.client.loadbalancer.LoadBalancerFactory.ConnectionProviderImpl.1
            @Override // io.reactivex.netty.client.ConnectionProvider
            public Observable<Connection<R, W>> newConnectionRequest() {
                return Observable.error(NoHostsAvailableException.EMPTY_INSTANCE);
            }
        };

        public ConnectionProviderImpl(Observable<List<HostHolder<W, R>>> observable) {
            observable.subscribe(new Action1<List<HostHolder<W, R>>>() { // from class: io.reactivex.netty.client.loadbalancer.LoadBalancerFactory.ConnectionProviderImpl.2
                @Override // rx.functions.Action1
                public void call(List<HostHolder<W, R>> list) {
                    ConnectionProviderImpl connectionProviderImpl = ConnectionProviderImpl.this;
                    connectionProviderImpl.currentProvider = LoadBalancerFactory.this.strategy.newStrategy(list);
                }
            }, new Action1<Throwable>() { // from class: io.reactivex.netty.client.loadbalancer.LoadBalancerFactory.ConnectionProviderImpl.3
                @Override // rx.functions.Action1
                public void call(Throwable th) {
                    LoadBalancerFactory.logger.error("Error while listening on the host stream. Hosts will not be refreshed.", th);
                }
            });
        }

        @Override // io.reactivex.netty.client.ConnectionProvider
        public Observable<Connection<R, W>> newConnectionRequest() {
            return this.currentProvider.newConnectionRequest();
        }
    }

    private LoadBalancerFactory(LoadBalancingStrategy<W, R> loadBalancingStrategy, HostCollector hostCollector) {
        this.strategy = loadBalancingStrategy;
        this.collector = hostCollector;
    }

    public static <WW, RR> LoadBalancerFactory<WW, RR> create(LoadBalancingStrategy<WW, RR> loadBalancingStrategy) {
        return create(loadBalancingStrategy, new NoBufferHostCollector());
    }

    public static <WW, RR> LoadBalancerFactory<WW, RR> create(LoadBalancingStrategy<WW, RR> loadBalancingStrategy, HostCollector hostCollector) {
        return new LoadBalancerFactory<>(loadBalancingStrategy, hostCollector);
    }

    private Func1<? super HostCollector.HostUpdate<W, R>, ? extends Observable<List<HostHolder<W, R>>>> newCollector(final Func1<HostCollector.HostUpdate<W, R>, Single<List<HostHolder<W, R>>>> func1) {
        return new Func1<HostCollector.HostUpdate<W, R>, Observable<List<HostHolder<W, R>>>>() { // from class: io.reactivex.netty.client.loadbalancer.LoadBalancerFactory.3
            @Override // rx.functions.Func1
            public Observable<List<HostHolder<W, R>>> call(HostCollector.HostUpdate<W, R> hostUpdate) {
                return ((Single) func1.call(hostUpdate)).toObservable();
            }
        };
    }

    @Override // io.reactivex.netty.client.ConnectionProviderFactory
    public ConnectionProvider<W, R> newProvider(Observable<HostConnector<W, R>> observable) {
        return new ConnectionProviderImpl(observable.map(new Func1<HostConnector<W, R>, HostHolder<W, R>>() { // from class: io.reactivex.netty.client.loadbalancer.LoadBalancerFactory.2
            @Override // rx.functions.Func1
            public HostHolder<W, R> call(HostConnector<W, R> hostConnector) {
                HostHolder<W, R> holder = LoadBalancerFactory.this.strategy.toHolder(hostConnector);
                hostConnector.subscribe(holder.getEventListener());
                return holder;
            }
        }).flatMap(new Func1<HostHolder<W, R>, Observable<HostCollector.HostUpdate<W, R>>>() { // from class: io.reactivex.netty.client.loadbalancer.LoadBalancerFactory.1
            @Override // rx.functions.Func1
            public Observable<HostCollector.HostUpdate<W, R>> call(HostHolder<W, R> hostHolder) {
                return hostHolder.getConnector().getHost().getCloseNotifier().map(new VoidToAnythingCast()).ignoreElements().onErrorResumeNext(Observable.empty()).concatWith(Observable.just(new HostCollector.HostUpdate(HostCollector.HostUpdate.Action.Remove, hostHolder))).mergeWith(Observable.just(new HostCollector.HostUpdate(HostCollector.HostUpdate.Action.Add, hostHolder)));
            }
        }).flatMap((Func1<? super R, ? extends Observable<? extends R>>) newCollector(this.collector.newCollector()), 1).distinctUntilChanged());
    }
}
