package com.taobao.tao.messagekit.base;

import a.a.a.a.c.f$$ExternalSyntheticOutline0;
import android.content.Context;
import android.content.Intent;
import android.text.TextUtils;
import androidx.annotation.Nullable;
import androidx.appcompat.widget.Toolbar$$ExternalSyntheticOutline0;
import androidx.collection.ArrayMap;
import anet.channel.util.HttpUrl$$ExternalSyntheticOutline0;
import com.taobao.accs.ACCSManager;
import com.taobao.tao.messagekit.base.monitor.MonitorManager;
import com.taobao.tao.messagekit.base.monitor.monitorthread.MonitorThreadPool;
import com.taobao.tao.messagekit.base.mtop.MtopBusinessManager;
import com.taobao.tao.messagekit.core.Contants.Constant;
import com.taobao.tao.messagekit.core.MsgEnvironment;
import com.taobao.tao.messagekit.core.model.BaseMessage;
import com.taobao.tao.messagekit.core.model.Package;
import com.taobao.tao.messagekit.core.model.PausableBuffer;
import com.taobao.tao.messagekit.core.model.Pipe;
import com.taobao.tao.messagekit.core.utils.MsgLog;
import com.taobao.tao.messagekit.core.utils.MsgMonitor;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

/* loaded from: classes12.dex */
public class MsgRouter {
    public static final int MODE_CACHE = 10001;
    public static final int MODE_NOTIFY = 10000;
    private static MsgRouter instance = new MsgRouter();
    private MtopBusinessManager mtopBusinessManager;
    private Pipe<Package> mUpStream = new Pipe<>();
    private PausableBuffer<Package> sender = new PausableBuffer<>();
    private Pipe<Package> mDownStream = new Pipe<>();
    private Pipe<Package> controlStream = new Pipe<>();
    private Pipe<Package> errorStream = new Pipe<>();
    private SubscribeManager subscribeManager = new SubscribeManager();
    private ResponseManager responseManager = new ResponseManager();
    private CallbackManager callbackManager = new CallbackManager();
    private CommandManager commandManager = new CommandManager();

    /* loaded from: classes12.dex */
    private static class DataPackage {
        String ip;
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        int sys;
        String topic;

        public DataPackage(@Nullable String str, int i, String str2) {
            this.ip = "";
            this.topic = "";
            this.ip = str;
            this.sys = i;
            this.topic = str2;
        }

        static String key(BaseMessage baseMessage) {
            String str = baseMessage.routerId;
            int i = baseMessage.sysCode;
            int i2 = baseMessage.bizCode;
            String str2 = baseMessage.header.topic;
            StringBuilder m = HttpUrl$$ExternalSyntheticOutline0.m("sys:", i, "biz:", i2, "t:");
            m.append(str2);
            String sb = m.toString();
            return !TextUtils.isEmpty(str) ? f$$ExternalSyntheticOutline0.m("ip:", str, sb) : sb;
        }

        public final String getTarget() {
            String str = "";
            if (!TextUtils.isEmpty(this.ip)) {
                str = "" + this.ip;
            }
            if (TextUtils.isEmpty(this.topic)) {
                return str;
            }
            StringBuilder m26m = Toolbar$$ExternalSyntheticOutline0.m26m(str, ":T_");
            m26m.append(this.topic);
            return m26m.toString();
        }
    }

    public MsgRouter() {
        new MonitorManager();
        this.mtopBusinessManager = new MtopBusinessManager();
        this.callbackManager.inject(this);
        this.commandManager.inject(this);
        Observable<Package> filter = this.mUpStream.getObservable().observeOn(Schedulers.io()).filter(new Func1<Package, Boolean>() { // from class: com.taobao.tao.messagekit.base.MsgRouter.1
            @Override // rx.functions.Func1
            public final Boolean call(Package r6) {
                Package r62 = r6;
                MsgLog.d("MsgRouter", "UpStream >");
                MsgLog.d("MsgRouter", r62);
                MsgRouter msgRouter = MsgRouter.this;
                return Boolean.valueOf((msgRouter.commandManager.internalExecute(303, r62) || msgRouter.commandManager.internalExecute(301, r62) || !MsgRouter.access$100(msgRouter, r62)) ? false : true);
            }
        });
        PausableBuffer<Package> pausableBuffer = this.sender;
        pausableBuffer.from(filter);
        pausableBuffer.buffer();
        pausableBuffer.subscribe(new Action1<List<Package>>() { // from class: com.taobao.tao.messagekit.base.MsgRouter.2
            /* JADX WARN: Multi-variable type inference failed */
            @Override // rx.functions.Action1
            public final void call(List<Package> list) {
                List<Package> list2 = list;
                if (list2 == null || list2.size() <= 0) {
                    return;
                }
                ArrayMap arrayMap = new ArrayMap(5);
                String str = list2.get(0).msg.header.messageId;
                for (Package r5 : list2) {
                    try {
                        String key = DataPackage.key(r5.msg);
                        DataPackage dataPackage = (DataPackage) arrayMap.get(key);
                        if (dataPackage == null) {
                            BaseMessage baseMessage = r5.msg;
                            dataPackage = new DataPackage(baseMessage.routerId, r5.sysCode, baseMessage.header.topic);
                            arrayMap.put(key, dataPackage);
                        }
                        r5.packTime = System.currentTimeMillis();
                        byte[] protocol = r5.msg.toProtocol();
                        int length = protocol.length;
                        dataPackage.stream.write(protocol);
                        r5.packTime = System.currentTimeMillis() - r5.packTime;
                        ResponseManager responseManager = MsgRouter.this.responseManager;
                        r5.dataId = str;
                        responseManager.record(str, r5);
                    } catch (Exception e) {
                        MsgLog.e("MsgRouter", new Object[]{"protocol packet error"}, e);
                        e.printStackTrace();
                    }
                }
                long currentTimeMillis = System.currentTimeMillis();
                Iterator<Package> it = list2.iterator();
                while (it.hasNext()) {
                    it.next().netTime = currentTimeMillis;
                }
                for (Map.Entry entry : arrayMap.entrySet()) {
                    ACCSManager.AccsRequest accsRequest = new ACCSManager.AccsRequest("" + MsgEnvironment.getUserId(), MsgEnvironment.serviceMap.get(Integer.valueOf(((DataPackage) entry.getValue()).sys)), ((DataPackage) entry.getValue()).stream.toByteArray(), str);
                    accsRequest.setTarget(((DataPackage) entry.getValue()).getTarget());
                    ACCSManager.sendData(MsgEnvironment.application, accsRequest);
                    MsgLog.d("MsgRouter", "send msgs:", Integer.valueOf(list2.size()), "from:", entry.getKey(), ((DataPackage) entry.getValue()).getTarget());
                }
            }
        });
        this.errorStream.getObservable().subscribeOn(Schedulers.computation()).subscribe(new Action1<Package>() { // from class: com.taobao.tao.messagekit.base.MsgRouter.3
            @Override // rx.functions.Action1
            public final void call(Package r4) {
                Package r42 = r4;
                MsgLog.d("MsgRouter", "Error Result >");
                MsgLog.d("MsgRouter", r42);
                MonitorThreadPool.record(r42.msg.header.statusCode, r42);
                ReplyManager.send(r42);
                MsgMonitor.commitFail(Constant.Monitor.MODULE, Constant.Monitor.MSG_CONSUME_RATE, "" + r42.msg.header.statusCode, "");
            }
        });
    }

    static boolean access$100(MsgRouter msgRouter, Package r5) {
        boolean sendMsg;
        msgRouter.getClass();
        if (r5 == null) {
            return false;
        }
        BaseMessage baseMessage = r5.msg;
        int i = baseMessage.msgType;
        if (i == 8) {
            sendMsg = msgRouter.mtopBusinessManager.subscribe(r5);
        } else {
            if (i != 10) {
                if (baseMessage.canSwitchToMtop() && ConfigManager.getRemoteInt(1, Constant.PM_UP_CHANNEL_CONFIG_KEY) == 2) {
                    int i2 = r5.msg.type;
                    if (i2 == 2) {
                        sendMsg = msgRouter.mtopBusinessManager.count(r5);
                    } else if (i2 == 1) {
                        sendMsg = msgRouter.mtopBusinessManager.sendMsg(r5);
                    }
                }
                return true;
            }
            sendMsg = msgRouter.mtopBusinessManager.unSubscribe(r5);
        }
        return !sendMsg;
    }

    public static MsgRouter getInstance() {
        return instance;
    }

    public final CallbackManager getCallbackManager() {
        return this.callbackManager;
    }

    public final Pipe<Package> getControlStream() {
        return this.controlStream;
    }

    public final Pipe<Package> getDownStream() {
        return this.mDownStream;
    }

    public final Pipe<Package> getErrorStream() {
        return this.errorStream;
    }

    public final ResponseManager getResponseManager() {
        return this.responseManager;
    }

    public final SubscribeManager getSubscribeManager() {
        return this.subscribeManager;
    }

    public final Pipe<Package> getUpStream() {
        return this.mUpStream;
    }

    public final void init(Context context) {
        MsgEnvironment.init();
        context.sendBroadcast(new Intent(Constant.ACTION_RECEIVE));
        MsgMonitor.register(Constant.Monitor.MODULE, Constant.Monitor.MSG_DURATION, new ArrayList<String>() { // from class: com.taobao.tao.messagekit.base.MsgRouter.4
            {
                add(Constant.Monitor.D_BIZ);
                add(Constant.Monitor.D_DUP);
                add(Constant.Monitor.D_MQTT);
                add(Constant.Monitor.D_TYPE);
                add(Constant.Monitor.D_SUB);
                add(Constant.Monitor.D_TOPIC);
            }
        }, new ArrayList<String>() { // from class: com.taobao.tao.messagekit.base.MsgRouter.5
            {
                add(Constant.Monitor.M_FLOW);
                add(Constant.Monitor.M_NET);
                add(Constant.Monitor.M_PACK);
            }
        });
    }
}
