package com.microsoft.yammer.realtime.repo;

import android.os.Looper;
import com.google.gson.Gson;
import com.jakewharton.rxrelay.PublishRelay;
import com.microsoft.identity.common.internal.providers.oauth2.ResponseType;
import com.microsoft.yammer.common.exception.RealtimeConnectionException;
import com.microsoft.yammer.common.rx.ISchedulerProvider;
import com.microsoft.yammer.domain.auth.TokenService;
import com.microsoft.yammer.logger.EventLogger;
import com.microsoft.yammer.logger.Logger;
import com.microsoft.yammer.realtime.repo.network.RealtimeApiRepository;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import kotlin.TuplesKt;
import kotlin.Unit;
import kotlin.collections.MapsKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.text.StringsKt;
import okhttp3.OkHttpClient;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSession;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import rx.Completable;
import rx.Observable;
import rx.lang.kotlin.SubscribersKt;
import timber.log.Timber;

/* loaded from: classes2.dex */
public final class BayeuxDataStream {
    public static final Companion Companion = new Companion(null);
    private static final String TAG = BayeuxDataStream.class.getName();
    private final BayeuxClientFactory bayeuxClientFactory;
    private HashMap canResubscribe;
    private BayeuxClient client;
    private final Gson gson;
    private final OkHttpClient okHttpClient;
    private final PublishRelay publishSubject;
    private final RealtimeApiRepository realtimeApiRepository;
    private final ISchedulerProvider schedulerProvider;
    private final Observable stream;
    private final HashMap subscriptionRetries;
    private final TokenService tokenService;

    /* loaded from: classes2.dex */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    public BayeuxDataStream(BayeuxClientFactory bayeuxClientFactory, RealtimeApiRepository realtimeApiRepository, TokenService tokenService, ISchedulerProvider schedulerProvider, Gson gson, OkHttpClient okHttpClient) {
        Intrinsics.checkNotNullParameter(bayeuxClientFactory, "bayeuxClientFactory");
        Intrinsics.checkNotNullParameter(realtimeApiRepository, "realtimeApiRepository");
        Intrinsics.checkNotNullParameter(tokenService, "tokenService");
        Intrinsics.checkNotNullParameter(schedulerProvider, "schedulerProvider");
        Intrinsics.checkNotNullParameter(gson, "gson");
        Intrinsics.checkNotNullParameter(okHttpClient, "okHttpClient");
        this.bayeuxClientFactory = bayeuxClientFactory;
        this.realtimeApiRepository = realtimeApiRepository;
        this.tokenService = tokenService;
        this.schedulerProvider = schedulerProvider;
        this.gson = gson;
        this.okHttpClient = okHttpClient;
        PublishRelay create = PublishRelay.create();
        this.publishSubject = create;
        Observable asObservable = create.asObservable();
        Intrinsics.checkNotNullExpressionValue(asObservable, "asObservable(...)");
        this.stream = asObservable;
        this.subscriptionRetries = new HashMap();
        this.canResubscribe = new HashMap();
    }

    private final Function1 callback(final String str, final String str2) {
        return new Function1() { // from class: com.microsoft.yammer.realtime.repo.BayeuxDataStream$callback$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(1);
            }

            @Override // kotlin.jvm.functions.Function1
            public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                invoke((Message) obj);
                return Unit.INSTANCE;
            }

            public final void invoke(Message message) {
                HashMap hashMap;
                HashMap hashMap2;
                Intrinsics.checkNotNullParameter(message, "message");
                String valueOf = String.valueOf(message.get("subscription"));
                hashMap = BayeuxDataStream.this.subscriptionRetries;
                Integer num = (Integer) hashMap.get(valueOf);
                if (num == null) {
                    num = 0;
                }
                BayeuxDataStream.this.logEvent(str, str2, message, valueOf, num.intValue());
                hashMap2 = BayeuxDataStream.this.subscriptionRetries;
                hashMap2.remove(valueOf);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void connect$lambda$1$lambda$0(BayeuxDataStream this$0, Message message) {
        Intrinsics.checkNotNullParameter(this$0, "this$0");
        Function1 callback = this$0.callback("realtime_handshake_succeeded", "realtime_handshake_failed");
        Intrinsics.checkNotNull(message);
        callback.invoke(message);
    }

    private final BayeuxClient createBayeuxClient() {
        String str;
        try {
            str = this.realtimeApiRepository.getRealtimeUri();
        } catch (RuntimeException e) {
            Logger logger = Logger.INSTANCE;
            String TAG2 = TAG;
            Intrinsics.checkNotNullExpressionValue(TAG2, "TAG");
            Timber.Forest forest = Timber.Forest;
            if (forest.treeCount() > 0) {
                forest.tag(TAG2).e(e, "Error getting realtime URL", new Object[0]);
            }
            str = null;
        }
        if (str == null || StringsKt.isBlank(str)) {
            return null;
        }
        return this.bayeuxClientFactory.create(str, this.okHttpClient);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void logEvent(String str, String str2, Message message, String str3, int i) {
        Object obj;
        HashMap hashMapOf = MapsKt.hashMapOf(TuplesKt.to("ClientId", String.valueOf(message.get("clientId"))), TuplesKt.to("channel", String.valueOf(message.get("channel"))), TuplesKt.to("channelId", str3), TuplesKt.to("retry_count", String.valueOf(i)));
        if (message.isSuccessful()) {
            String TAG2 = TAG;
            Intrinsics.checkNotNullExpressionValue(TAG2, "TAG");
            EventLogger.event(TAG2, str, hashMapOf);
            return;
        }
        if (message.containsKey("error")) {
            obj = message.get("error");
        } else {
            Object obj2 = message.get("failure");
            Map map = obj2 instanceof Map ? (Map) obj2 : null;
            if (map == null || (obj = map.get(SemanticAttributes.EXCEPTION_EVENT_NAME)) == null) {
                obj = "Unknown exception";
            }
        }
        hashMapOf.put("error", String.valueOf(obj));
        String TAG3 = TAG;
        Intrinsics.checkNotNullExpressionValue(TAG3, "TAG");
        EventLogger.event(TAG3, str2, hashMapOf);
    }

    public static /* synthetic */ void subscribe$default(BayeuxDataStream bayeuxDataStream, String str, Class cls, boolean z, int i, Object obj) {
        if ((i & 4) != 0) {
            z = false;
        }
        bayeuxDataStream.subscribe(str, cls, z);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void subscribe$lambda$13(final BayeuxDataStream this$0, final String channelName, Function1 subscribeCallback, final Class dataClass, Message message) {
        Intrinsics.checkNotNullParameter(this$0, "this$0");
        Intrinsics.checkNotNullParameter(channelName, "$channelName");
        Intrinsics.checkNotNullParameter(subscribeCallback, "$subscribeCallback");
        Intrinsics.checkNotNullParameter(dataClass, "$dataClass");
        if (!this$0.subscriptionRetries.containsKey(channelName)) {
            this$0.subscriptionRetries.put(channelName, 0);
        }
        if (message.isSuccessful()) {
            Logger logger = Logger.INSTANCE;
            String TAG2 = TAG;
            Intrinsics.checkNotNullExpressionValue(TAG2, "TAG");
            Timber.Forest forest = Timber.Forest;
            if (forest.treeCount() > 0) {
                forest.tag(TAG2).d("Subscription to " + channelName + " succeeded", new Object[0]);
            }
            Intrinsics.checkNotNull(message);
            subscribeCallback.invoke(message);
            return;
        }
        int intValue = ((Number) MapsKt.getValue(this$0.subscriptionRetries, channelName)).intValue();
        if (intValue >= 10) {
            Logger logger2 = Logger.INSTANCE;
            String TAG3 = TAG;
            Intrinsics.checkNotNullExpressionValue(TAG3, "TAG");
            Timber.Forest forest2 = Timber.Forest;
            if (forest2.treeCount() > 0) {
                forest2.tag(TAG3).d("Failed to subscribe to " + channelName, new Object[0]);
            }
            Intrinsics.checkNotNull(message);
            subscribeCallback.invoke(message);
            return;
        }
        Logger logger3 = Logger.INSTANCE;
        String TAG4 = TAG;
        Intrinsics.checkNotNullExpressionValue(TAG4, "TAG");
        Timber.Forest forest3 = Timber.Forest;
        if (forest3.treeCount() > 0) {
            forest3.tag(TAG4).d("Retrying subscription to " + channelName, new Object[0]);
        }
        this$0.subscriptionRetries.put(channelName, Integer.valueOf(intValue + 1));
        this$0.unsubscribe(channelName, false);
        Completable timer = Completable.timer(intValue * 2000, TimeUnit.MILLISECONDS, this$0.schedulerProvider.getIOScheduler());
        Intrinsics.checkNotNullExpressionValue(timer, "timer(...)");
        SubscribersKt.subscribeBy(timer, new Function1() { // from class: com.microsoft.yammer.realtime.repo.BayeuxDataStream$subscribe$3$3
            @Override // kotlin.jvm.functions.Function1
            public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                invoke((Throwable) obj);
                return Unit.INSTANCE;
            }

            public final void invoke(Throwable it) {
                String str;
                Intrinsics.checkNotNullParameter(it, "it");
                Logger logger4 = Logger.INSTANCE;
                str = BayeuxDataStream.TAG;
                Intrinsics.checkNotNullExpressionValue(str, "access$getTAG$cp(...)");
                Timber.Forest forest4 = Timber.Forest;
                if (forest4.treeCount() > 0) {
                    forest4.tag(str).d(it, "Realtime subscription failed", new Object[0]);
                }
            }
        }, new Function0() { // from class: com.microsoft.yammer.realtime.repo.BayeuxDataStream$subscribe$3$4
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(0);
            }

            @Override // kotlin.jvm.functions.Function0
            public /* bridge */ /* synthetic */ Object invoke() {
                m5329invoke();
                return Unit.INSTANCE;
            }

            /* renamed from: invoke, reason: collision with other method in class */
            public final void m5329invoke() {
                BayeuxDataStream.this.subscribe(channelName, dataClass, true);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void subscribe$lambda$9(BayeuxDataStream this$0, String channelName, Class dataClass, ClientSessionChannel clientSessionChannel, Message message) {
        Intrinsics.checkNotNullParameter(this$0, "this$0");
        Intrinsics.checkNotNullParameter(channelName, "$channelName");
        Intrinsics.checkNotNullParameter(dataClass, "$dataClass");
        Gson gson = this$0.gson;
        this$0.publishSubject.call(new RealtimeChannelData(channelName, gson.fromJson(gson.toJson(message), dataClass)));
    }

    public static /* synthetic */ void unsubscribe$default(BayeuxDataStream bayeuxDataStream, String str, boolean z, int i, Object obj) {
        if ((i & 2) != 0) {
            z = true;
        }
        bayeuxDataStream.unsubscribe(str, z);
    }

    public final boolean connect() {
        boolean waitFor;
        int i;
        BayeuxClient bayeuxClient = this.client;
        if (bayeuxClient == null) {
            bayeuxClient = createBayeuxClient();
        }
        this.client = bayeuxClient;
        if (bayeuxClient == null) {
            return false;
        }
        if (bayeuxClient.isConnected()) {
            return true;
        }
        ClientTransport transport = bayeuxClient.getTransport();
        if (transport != null) {
            transport.init();
        }
        bayeuxClient.handshake(MapsKt.mapOf(TuplesKt.to("ext", MapsKt.mapOf(TuplesKt.to("push_message_bodies", Boolean.FALSE), TuplesKt.to("auth", "oauth"), TuplesKt.to(ResponseType.TOKEN, this.tokenService.getTokenForCurrentNetwork())))), new ClientSession.MessageListener() { // from class: com.microsoft.yammer.realtime.repo.BayeuxDataStream$$ExternalSyntheticLambda0
            @Override // org.cometd.bayeux.client.ClientSession.MessageListener
            public final void onMessage(Message message) {
                BayeuxDataStream.connect$lambda$1$lambda$0(BayeuxDataStream.this, message);
            }
        });
        int i2 = 0;
        do {
            waitFor = bayeuxClient.waitFor(7000L, BayeuxClient.State.CONNECTED, new BayeuxClient.State[0]);
            i = i2 + 1;
            if (waitFor) {
                break;
            }
            i2 += 2;
        } while (i < 10);
        return waitFor;
    }

    public final synchronized void disconnect() {
        ClientTransport transport;
        try {
            try {
                try {
                    BayeuxClient bayeuxClient = this.client;
                    if (bayeuxClient != null) {
                        bayeuxClient.disconnect(7000L);
                    }
                    BayeuxClient bayeuxClient2 = this.client;
                    if (bayeuxClient2 == null || bayeuxClient2.isDisconnected()) {
                        Logger logger = Logger.INSTANCE;
                        String TAG2 = TAG;
                        Intrinsics.checkNotNullExpressionValue(TAG2, "TAG");
                        Timber.Forest forest = Timber.Forest;
                        if (forest.treeCount() > 0) {
                            forest.tag(TAG2).d("Realtime stream DISCONNECTED", new Object[0]);
                        }
                    } else {
                        Logger logger2 = Logger.INSTANCE;
                        String TAG3 = TAG;
                        Intrinsics.checkNotNullExpressionValue(TAG3, "TAG");
                        Timber.Forest forest2 = Timber.Forest;
                        if (forest2.treeCount() > 0) {
                            forest2.tag(TAG3).e("Error disconnecting (Timeout)", new Object[0]);
                        }
                    }
                } catch (Exception e) {
                    Logger logger3 = Logger.INSTANCE;
                    String TAG4 = TAG;
                    Intrinsics.checkNotNullExpressionValue(TAG4, "TAG");
                    Timber.Forest forest3 = Timber.Forest;
                    if (forest3.treeCount() > 0) {
                        forest3.tag(TAG4).e(e, "Error while disconnecting", new Object[0]);
                    }
                    BayeuxClient bayeuxClient3 = this.client;
                    if (bayeuxClient3 == null || bayeuxClient3.isDisconnected()) {
                        Intrinsics.checkNotNullExpressionValue(TAG4, "TAG");
                        if (forest3.treeCount() > 0) {
                            forest3.tag(TAG4).d("Realtime stream DISCONNECTED", new Object[0]);
                        }
                    } else {
                        Intrinsics.checkNotNullExpressionValue(TAG4, "TAG");
                        if (forest3.treeCount() > 0) {
                            forest3.tag(TAG4).e("Error disconnecting (Timeout)", new Object[0]);
                        }
                    }
                }
                try {
                    BayeuxClient bayeuxClient4 = this.client;
                    if (bayeuxClient4 != null && (transport = bayeuxClient4.getTransport()) != null) {
                        transport.terminate();
                    }
                } catch (Throwable th) {
                    try {
                        Logger logger4 = Logger.INSTANCE;
                        String TAG5 = TAG;
                        Intrinsics.checkNotNullExpressionValue(TAG5, "TAG");
                        Timber.Forest forest4 = Timber.Forest;
                        if (forest4.treeCount() > 0) {
                            forest4.tag(TAG5).e(th, "Error while stopping httpClient", new Object[0]);
                        }
                    } finally {
                        this.client = null;
                    }
                }
                this.subscriptionRetries.clear();
            } catch (Throwable th2) {
                BayeuxClient bayeuxClient5 = this.client;
                if (bayeuxClient5 == null || bayeuxClient5.isDisconnected()) {
                    Logger logger5 = Logger.INSTANCE;
                    String TAG6 = TAG;
                    Intrinsics.checkNotNullExpressionValue(TAG6, "TAG");
                    Timber.Forest forest5 = Timber.Forest;
                    if (forest5.treeCount() > 0) {
                        forest5.tag(TAG6).d("Realtime stream DISCONNECTED", new Object[0]);
                    }
                } else {
                    Logger logger6 = Logger.INSTANCE;
                    String TAG7 = TAG;
                    Intrinsics.checkNotNullExpressionValue(TAG7, "TAG");
                    Timber.Forest forest6 = Timber.Forest;
                    if (forest6.treeCount() > 0) {
                        forest6.tag(TAG7).e("Error disconnecting (Timeout)", new Object[0]);
                    }
                }
                throw th2;
            }
        } catch (Throwable th3) {
            throw th3;
        }
    }

    public final Observable getStream() {
        return this.stream;
    }

    public final boolean hasObservers() {
        return this.publishSubject.hasObservers();
    }

    public final synchronized void subscribe(final String channelName, final Class dataClass, boolean z) {
        Intrinsics.checkNotNullParameter(channelName, "channelName");
        Intrinsics.checkNotNullParameter(dataClass, "dataClass");
        if (z && Intrinsics.areEqual(this.canResubscribe.get(channelName), Boolean.FALSE)) {
            return;
        }
        if (!z) {
            this.canResubscribe.put(channelName, Boolean.TRUE);
        }
        if (connect()) {
            BayeuxClient bayeuxClient = this.client;
            ClientSessionChannel channel = bayeuxClient != null ? bayeuxClient.getChannel(channelName) : null;
            if (channel != null) {
                final Function1 callback = callback("realtime_subscription_succeeded", "realtime_subscription_failed");
                if (channel.getSubscribers().isEmpty()) {
                    channel.subscribe(new ClientSessionChannel.MessageListener() { // from class: com.microsoft.yammer.realtime.repo.BayeuxDataStream$$ExternalSyntheticLambda1
                        @Override // org.cometd.bayeux.client.ClientSessionChannel.MessageListener
                        public final void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
                            BayeuxDataStream.subscribe$lambda$9(BayeuxDataStream.this, channelName, dataClass, clientSessionChannel, message);
                        }
                    }, new ClientSession.MessageListener() { // from class: com.microsoft.yammer.realtime.repo.BayeuxDataStream$$ExternalSyntheticLambda2
                        @Override // org.cometd.bayeux.client.ClientSession.MessageListener
                        public final void onMessage(Message message) {
                            BayeuxDataStream.subscribe$lambda$13(BayeuxDataStream.this, channelName, callback, dataClass, message);
                        }
                    });
                }
                return;
            } else {
                throw new Exception("Could not get channel " + channelName);
            }
        }
        Logger logger = Logger.INSTANCE;
        String TAG2 = TAG;
        Intrinsics.checkNotNullExpressionValue(TAG2, "TAG");
        Timber.Forest forest = Timber.Forest;
        if (forest.treeCount() > 0) {
            forest.tag(TAG2).d("Failed to connect to realtime stream for channel " + channelName, new Object[0]);
        }
        throw new RealtimeConnectionException();
    }

    public final synchronized void unsubscribe(String channelName, boolean z) {
        try {
            Intrinsics.checkNotNullParameter(channelName, "channelName");
            this.canResubscribe.put(channelName, Boolean.valueOf(!z));
            if (Intrinsics.areEqual(Looper.myLooper(), Looper.getMainLooper())) {
                Logger logger = Logger.INSTANCE;
                String TAG2 = TAG;
                Intrinsics.checkNotNullExpressionValue(TAG2, "TAG");
                Timber.Forest forest = Timber.Forest;
                if (forest.treeCount() > 0) {
                    forest.tag(TAG2).e("Attempting to realtime unsubscribe from main thread", new Object[0]);
                }
            }
            BayeuxClient bayeuxClient = this.client;
            ClientSessionChannel channel = bayeuxClient != null ? bayeuxClient.getChannel(channelName) : null;
            if (channel != null) {
                channel.unsubscribe();
            }
            if (channel != null) {
                channel.release();
            }
            if (z) {
                this.subscriptionRetries.remove(channelName);
            }
        } catch (Throwable th) {
            throw th;
        }
    }
}
