package io.reactivex.netty.channel;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.FileRegion;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.reactivex.netty.channel.events.ConnectionEventListener;
import io.reactivex.netty.events.Clock;
import io.reactivex.netty.events.EventPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Actions;
import rx.functions.Func1;
import rx.subscriptions.Subscriptions;

/* loaded from: classes2.dex */
public class DefaultChannelOperations<W> implements ChannelOperations<W> {
    static final /* synthetic */ boolean $assertionsDisabled = false;
    private volatile int closeIssued;
    private final Observable<Void> closeObservable;
    private final ConnectionEventListener eventListener;
    private final EventPublisher eventPublisher;
    private final Observable<Void> flushAndCloseObservable;
    private final Func1<W, Boolean> flushOnEachSelector = new Func1<W, Boolean>() { // from class: io.reactivex.netty.channel.DefaultChannelOperations.1
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // rx.functions.Func1
        public Boolean call(W w) {
            return true;
        }

        @Override // rx.functions.Func1
        public /* bridge */ /* synthetic */ Boolean call(Object obj) {
            return call((AnonymousClass1) obj);
        }
    };
    private final Channel nettyChannel;
    private static final Logger logger = LoggerFactory.getLogger(DefaultChannelOperations.class);
    private static final AtomicIntegerFieldUpdater<DefaultChannelOperations> CLOSE_ISSUED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultChannelOperations.class, "closeIssued");

    /* loaded from: classes2.dex */
    private class OnSubscribeForClose implements Observable.OnSubscribe<Void> {
        private final Channel nettyChannel;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: classes2.dex */
        public class ChannelCloseListener extends SubscriberToChannelFutureBridge {
            private final long closeStartTimeNanos;
            private final ConnectionEventListener eventListener;
            private final EventPublisher eventPublisher;
            private final Subscriber<? super Void> subscriber;

            public ChannelCloseListener(ConnectionEventListener connectionEventListener, EventPublisher eventPublisher, long j, Subscriber<? super Void> subscriber) {
                this.eventListener = connectionEventListener;
                this.eventPublisher = eventPublisher;
                this.closeStartTimeNanos = j;
                this.subscriber = subscriber;
            }

            public ChannelCloseListener(OnSubscribeForClose onSubscribeForClose, Subscriber<? super Void> subscriber) {
                this(null, null, -1L, subscriber);
            }

            @Override // io.reactivex.netty.channel.SubscriberToChannelFutureBridge
            protected void doOnFailure(ChannelFuture channelFuture, Throwable th) {
                if (this.eventListener != null && this.eventPublisher.publishingEnabled()) {
                    this.eventListener.onConnectionCloseFailed(Clock.onEndNanos(this.closeStartTimeNanos), TimeUnit.NANOSECONDS, channelFuture.cause());
                }
                if (this.subscriber.isUnsubscribed()) {
                    return;
                }
                this.subscriber.onError(channelFuture.cause());
            }

            @Override // io.reactivex.netty.channel.SubscriberToChannelFutureBridge
            protected void doOnSuccess(ChannelFuture channelFuture) {
                if (this.eventListener != null && this.eventPublisher.publishingEnabled()) {
                    this.eventListener.onConnectionCloseSuccess(Clock.onEndNanos(this.closeStartTimeNanos), TimeUnit.NANOSECONDS);
                }
                if (this.subscriber.isUnsubscribed()) {
                    return;
                }
                this.subscriber.onCompleted();
            }
        }

        public OnSubscribeForClose(Channel channel) {
            this.nettyChannel = channel;
        }

        @Override // rx.functions.Action1
        public void call(Subscriber<? super Void> subscriber) {
            ChannelCloseListener channelCloseListener;
            long newStartTimeNanos = Clock.newStartTimeNanos();
            if (DefaultChannelOperations.CLOSE_ISSUED_UPDATER.compareAndSet(DefaultChannelOperations.this, 0, 1)) {
                if (DefaultChannelOperations.this.eventPublisher.publishingEnabled()) {
                    DefaultChannelOperations.this.eventListener.onConnectionCloseStart();
                }
                this.nettyChannel.close();
                channelCloseListener = new ChannelCloseListener(DefaultChannelOperations.this.eventListener, DefaultChannelOperations.this.eventPublisher, newStartTimeNanos, subscriber);
            } else {
                channelCloseListener = new ChannelCloseListener(this, subscriber);
            }
            channelCloseListener.bridge(this.nettyChannel.closeFuture(), subscriber);
        }
    }

    public DefaultChannelOperations(Channel channel, ConnectionEventListener connectionEventListener, EventPublisher eventPublisher) {
        this.nettyChannel = channel;
        this.eventListener = connectionEventListener;
        this.eventPublisher = eventPublisher;
        this.closeObservable = Observable.create(new OnSubscribeForClose(channel));
        this.flushAndCloseObservable = this.closeObservable.doOnSubscribe(new Action0() { // from class: io.reactivex.netty.channel.DefaultChannelOperations.2
            @Override // rx.functions.Action0
            public void call() {
                DefaultChannelOperations.this.flush();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void _flushInEventloop(long j) {
        this.nettyChannel.flush();
        this.eventListener.onFlushComplete(Clock.onEndNanos(j), TimeUnit.NANOSECONDS);
    }

    private Observable<Void> _write(final Observable<?> observable) {
        return Observable.create(new Observable.OnSubscribe<Void>() { // from class: io.reactivex.netty.channel.DefaultChannelOperations.6
            /* JADX INFO: Access modifiers changed from: private */
            public void _writeStreamToChannel(final Subscriber<? super Void> subscriber, final long j) {
                final ChannelFuture write = DefaultChannelOperations.this.nettyChannel.write(observable.doOnCompleted(new Action0() { // from class: io.reactivex.netty.channel.DefaultChannelOperations.6.2
                    @Override // rx.functions.Action0
                    public void call() {
                        Boolean bool = (Boolean) DefaultChannelOperations.this.nettyChannel.attr(ChannelOperations.FLUSH_ONLY_ON_READ_COMPLETE).get();
                        if (bool == null || !bool.booleanValue()) {
                            DefaultChannelOperations.this.flush();
                        }
                    }
                }));
                subscriber.add(Subscriptions.create(new Action0() { // from class: io.reactivex.netty.channel.DefaultChannelOperations.6.3
                    @Override // rx.functions.Action0
                    public void call() {
                        write.cancel(false);
                    }
                }));
                write.addListener((GenericFutureListener<? extends Future<? super Void>>) new ChannelFutureListener() { // from class: io.reactivex.netty.channel.DefaultChannelOperations.6.4
                    @Override // io.netty.util.concurrent.GenericFutureListener
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (subscriber.isUnsubscribed()) {
                            return;
                        }
                        if (channelFuture.isSuccess()) {
                            if (DefaultChannelOperations.this.eventPublisher.publishingEnabled()) {
                                DefaultChannelOperations.this.eventListener.onWriteSuccess(Clock.onEndNanos(j), TimeUnit.NANOSECONDS);
                            }
                            subscriber.onCompleted();
                        } else {
                            if (DefaultChannelOperations.this.eventPublisher.publishingEnabled()) {
                                DefaultChannelOperations.this.eventListener.onWriteFailed(Clock.onEndNanos(j), TimeUnit.NANOSECONDS, channelFuture.cause());
                            }
                            subscriber.onError(channelFuture.cause());
                        }
                    }
                });
            }

            @Override // rx.functions.Action1
            public void call(final Subscriber<? super Void> subscriber) {
                final long newStartTimeNanos = Clock.newStartTimeNanos();
                if (DefaultChannelOperations.this.eventPublisher.publishingEnabled()) {
                    DefaultChannelOperations.this.eventListener.onWriteStart();
                }
                if (DefaultChannelOperations.this.nettyChannel.eventLoop().inEventLoop()) {
                    _writeStreamToChannel(subscriber, newStartTimeNanos);
                } else {
                    DefaultChannelOperations.this.nettyChannel.eventLoop().execute(new Runnable() { // from class: io.reactivex.netty.channel.DefaultChannelOperations.6.1
                        @Override // java.lang.Runnable
                        public void run() {
                            _writeStreamToChannel(subscriber, newStartTimeNanos);
                        }
                    });
                }
            }
        });
    }

    private <X> Observable<Void> _write(Observable<X> observable, Func1<X, Boolean> func1) {
        return _write(observable.lift(new FlushSelectorOperator(func1, this)));
    }

    @Override // io.reactivex.netty.channel.ChannelOperations
    public Observable<Void> close() {
        return close(true);
    }

    @Override // io.reactivex.netty.channel.ChannelOperations
    public Observable<Void> close(boolean z) {
        return z ? this.flushAndCloseObservable : this.closeObservable;
    }

    @Override // io.reactivex.netty.channel.ChannelOperations
    public Observable<Void> closeListener() {
        return Observable.create(new Observable.OnSubscribe<Void>() { // from class: io.reactivex.netty.channel.DefaultChannelOperations.5
            @Override // rx.functions.Action1
            public void call(final Subscriber<? super Void> subscriber) {
                new SubscriberToChannelFutureBridge() { // from class: io.reactivex.netty.channel.DefaultChannelOperations.5.1
                    @Override // io.reactivex.netty.channel.SubscriberToChannelFutureBridge
                    protected void doOnFailure(ChannelFuture channelFuture, Throwable th) {
                        subscriber.onCompleted();
                    }

                    @Override // io.reactivex.netty.channel.SubscriberToChannelFutureBridge
                    protected void doOnSuccess(ChannelFuture channelFuture) {
                        subscriber.onCompleted();
                    }
                }.bridge(DefaultChannelOperations.this.nettyChannel.closeFuture(), subscriber);
            }
        });
    }

    @Override // io.reactivex.netty.channel.ChannelOperations
    public void closeNow() {
        close().subscribe(Actions.empty(), new Action1<Throwable>() { // from class: io.reactivex.netty.channel.DefaultChannelOperations.4
            @Override // rx.functions.Action1
            public void call(Throwable th) {
                DefaultChannelOperations.logger.error("Error closing connection.", th);
            }
        });
    }

    @Override // io.reactivex.netty.channel.ChannelOperations
    public void flush() {
        if (!this.eventPublisher.publishingEnabled()) {
            this.nettyChannel.flush();
            return;
        }
        final long newStartTimeNanos = Clock.newStartTimeNanos();
        this.eventListener.onFlushStart();
        if (this.nettyChannel.eventLoop().inEventLoop()) {
            _flushInEventloop(newStartTimeNanos);
        } else {
            this.nettyChannel.eventLoop().execute(new Runnable() { // from class: io.reactivex.netty.channel.DefaultChannelOperations.3
                @Override // java.lang.Runnable
                public void run() {
                    DefaultChannelOperations.this._flushInEventloop(newStartTimeNanos);
                }
            });
        }
    }

    @Override // io.reactivex.netty.channel.ChannelOperations
    public <WW> ChannelOperations<WW> transformWrite(AllocatingTransformer<WW, W> allocatingTransformer) {
        this.nettyChannel.pipeline().fireUserEventTriggered((Object) new AppendTransformerEvent(allocatingTransformer));
        return new DefaultChannelOperations(this.nettyChannel, this.eventListener, this.eventPublisher);
    }

    @Override // io.reactivex.netty.channel.ChannelOperations
    public Observable<Void> write(Observable<W> observable) {
        return _write(observable);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.reactivex.netty.channel.ChannelOperations
    public Observable<Void> write(Observable<W> observable, Func1<W, Boolean> func1) {
        return _write(observable, func1);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.reactivex.netty.channel.ChannelOperations
    public Observable<Void> writeAndFlushOnEach(Observable<W> observable) {
        return _write(observable, this.flushOnEachSelector);
    }

    @Override // io.reactivex.netty.channel.ChannelOperations
    public Observable<Void> writeBytes(Observable<byte[]> observable) {
        return _write(observable);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.reactivex.netty.channel.ChannelOperations
    public Observable<Void> writeBytes(Observable<byte[]> observable, Func1<byte[], Boolean> func1) {
        return _write(observable, func1);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.reactivex.netty.channel.ChannelOperations
    public Observable<Void> writeBytesAndFlushOnEach(Observable<byte[]> observable) {
        return _write(observable, FLUSH_ON_EACH_BYTES);
    }

    @Override // io.reactivex.netty.channel.ChannelOperations
    public Observable<Void> writeFileRegion(Observable<FileRegion> observable) {
        return _write(observable);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.reactivex.netty.channel.ChannelOperations
    public Observable<Void> writeFileRegion(Observable<FileRegion> observable, Func1<FileRegion, Boolean> func1) {
        return _write(observable, func1);
    }

    @Override // io.reactivex.netty.channel.ChannelOperations
    public Observable<Void> writeFileRegionAndFlushOnEach(Observable<FileRegion> observable) {
        return writeFileRegion(observable, FLUSH_ON_EACH_FILE_REGION);
    }

    @Override // io.reactivex.netty.channel.ChannelOperations
    public Observable<Void> writeString(Observable<String> observable) {
        return _write(observable);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.reactivex.netty.channel.ChannelOperations
    public Observable<Void> writeString(Observable<String> observable, Func1<String, Boolean> func1) {
        return _write(observable, func1);
    }

    @Override // io.reactivex.netty.channel.ChannelOperations
    public Observable<Void> writeStringAndFlushOnEach(Observable<String> observable) {
        return writeString(observable, FLUSH_ON_EACH_STRING);
    }
}
