package blm;

import com.uber.streaming.ramen.Msg;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;

/* loaded from: classes12.dex */
public class bc extends al {
    public bc(blq.e eVar, bdr.a aVar, blp.c cVar, int i2, bln.a aVar2) {
        super(i2, eVar, aVar, cVar, aVar2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // blm.al
    public <T> Observable<ajp.b<T>> a(final ajk.m<T> mVar, final String str) {
        Observable<Msg> filter = this.f35550d.filter(new Predicate<Msg>() { // from class: blm.bc.1
            @Override // io.reactivex.functions.Predicate
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public boolean test(Msg msg) {
                return msg.getType().equals(mVar.getMessageType());
            }
        });
        if (this.f35547a != null) {
            filter = filter.observeOn(Schedulers.a(this.f35547a));
        }
        Observable<ajp.b<T>> observable = (Observable<ajp.b<T>>) filter.doOnSubscribe(new Consumer<Disposable>() { // from class: blm.bc.4
            @Override // io.reactivex.functions.Consumer
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public void accept(Disposable disposable) throws Exception {
                bc.this.a(mVar.getMessageType(), str);
            }
        }).doOnDispose(new Action() { // from class: blm.bc.3
            @Override // io.reactivex.functions.Action
            public void run() throws Exception {
                bc.this.b(mVar.getMessageType(), str);
            }
        }).flatMap(new Function<Msg, ObservableSource<ajp.b<T>>>() { // from class: blm.bc.2
            @Override // io.reactivex.functions.Function
            /* renamed from: a, reason: merged with bridge method [inline-methods] */
            public ObservableSource<ajp.b<T>> apply(Msg msg) {
                bc.this.a(msg, mVar.getMessageType());
                bc.this.f35555i.a(msg, bc.this.b(mVar.getMessageType()));
                return Observable.just(bc.this.a(msg, mVar));
            }
        });
        Msg msg = this.f35548b.get(mVar.getMessageType());
        if (msg == null) {
            return observable;
        }
        a(mVar.getMessageType());
        return Observable.merge(observable, Observable.just(a(msg, mVar)));
    }
}
