package com.alibaba.ai.sdk.biz.sse;

import android.alibaba.support.util.LogUtil;
import android.alibaba.track.base.MonitorTrackInterface;
import android.alibaba.track.base.model.TrackMap;
import android.os.Handler;
import android.os.Looper;
import android.text.TextUtils;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import androidx.annotation.VisibleForTesting;
import com.alibaba.ai.base.AiInterface;
import com.alibaba.ai.base.pojo.AiParams;
import com.alibaba.ai.sdk.biz.strategy.RequestStrategy;
import com.alibaba.ai.sdk.pojo.AiResponse4SSE;
import com.alibaba.ai.sdk.pojo.AiResponseError;
import com.alibaba.ai.sdk.pojo.SSEAiResponseError;
import com.alibaba.ai.utils.AiRequestUtils;
import com.alibaba.android.powermsgbridge.IDataDispatcher;
import com.alibaba.android.powermsgbridge.IcbuMsgWrapper;
import com.alibaba.android.powermsgbridge.PowerMsgInitializer;
import com.alibaba.android.powermsgbridge.constant.Constant;
import com.alibaba.android.sourcingbase.SourcingBase;
import com.alibaba.fastjson.JSON;
import com.taobao.orange.OrangeConfig;
import com.taobao.trtc.api.TrtcConstants;
import j$.lang.Iterable$EL;
import j$.util.Map;
import j$.util.concurrent.ConcurrentHashMap;
import j$.util.function.BiConsumer$CC;
import j$.util.function.Consumer$CC;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/* loaded from: classes3.dex */
public class SSEAdapter {
    private static final String ORANGE_KEY_TIMEOUT = "streamTimeoutInterval";
    private static final String ORANGE_NAME_AI_CONFIG = "getAIConfig";
    private static final String SSE_APPKEY_BUYER = "50004";
    private static final String SSE_APPKEY_SELLER = "50003";
    private static final String TAG = "SSEAdapter";
    private Set<String> activeRequests = new ConcurrentSkipListSet();
    private Map<String, AIStreamRequestCache> requestCache = new ConcurrentHashMap();

    private void closeAllRequest() {
        Map.EL.forEach(this.requestCache, new BiConsumer() { // from class: com.alibaba.ai.sdk.biz.sse.j
            @Override // java.util.function.BiConsumer
            public final void accept(Object obj, Object obj2) {
                SSEAdapter.this.lambda$closeAllRequest$0((String) obj, (AIStreamRequestCache) obj2);
            }

            @Override // java.util.function.BiConsumer
            public /* synthetic */ BiConsumer andThen(BiConsumer biConsumer) {
                return BiConsumer$CC.$default$andThen(this, biConsumer);
            }
        });
    }

    private String getTopic(AIStreamRequestCache aIStreamRequestCache) {
        if (aIStreamRequestCache == null) {
            return null;
        }
        return aIStreamRequestCache.requestParams.getChannelId();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$closeAllRequest$0(String str, AIStreamRequestCache aIStreamRequestCache) {
        closeRequest(str, AiRequestUtils.getStrategyByService(aIStreamRequestCache.requestParams.getService()).autoUnregisterSSE());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$reactivateALlAIStream$4(AIStreamRequestCache aIStreamRequestCache) {
        reactivateAIStream(getTopic(aIStreamRequestCache));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void processStreamResponse(AiResponse4SSE aiResponse4SSE, java.util.Map map, RequestStrategy requestStrategy) {
        final AIStreamRequestCache aIStreamRequestCache = this.requestCache.get(requestStrategy.getRequestUniqueKey(aiResponse4SSE));
        if (!isResponseValid(aiResponse4SSE)) {
            LogUtil.d(TAG, "sse回复无效");
            TrackMap trackMap = new TrackMap();
            trackMap.addMap("sseData", JSON.toJSONString(map));
            MonitorTrackInterface.getInstance().sendCustomEvent("ASCAIStreamResponseNotValid", trackMap);
            if (aiResponse4SSE.aiSuccess() || aIStreamRequestCache == null) {
                return;
            }
            SSEAiResponseError sSEAiResponseError = new SSEAiResponseError("", aiResponse4SSE.responseCode(), "AI响应失败");
            sSEAiResponseError.streamCount = aIStreamRequestCache.getCntOfStreams();
            sSEAiResponseError.requestId = aIStreamRequestCache.requestParams.getRequestId();
            sSEAiResponseError.streamId = aiResponse4SSE.streamId();
            aIStreamRequestCache.callback.callback(sSEAiResponseError);
            return;
        }
        final String streamId = aiResponse4SSE.streamId();
        if (aIStreamRequestCache == null) {
            LogUtil.d(TAG, "该response找不到对应的request");
            MonitorTrackInterface.getInstance().sendCustomEvent("AiStreamResponseBeforeMtop", new TrackMap("requestId", aiResponse4SSE.requestId()).addMap(TrtcConstants.TRTC_PARAMS_STREAM_ID, aiResponse4SSE.streamId()));
            return;
        }
        if (aIStreamRequestCache.getLatestResponse() != null) {
            LogUtil.d(TAG, "response:" + aiResponse4SSE.msgId() + " old:" + aIStreamRequestCache.getLatestResponse(aiResponse4SSE.streamId()).msgId);
            if (aiResponse4SSE.success() && aiResponse4SSE.streamDataTimestamp() < aIStreamRequestCache.getLatestResponse(aiResponse4SSE.streamId()).streamDataTimestamp()) {
                LogUtil.d(TAG, "该response 比上一个包的时间戳小，丢弃");
                return;
            } else if (aIStreamRequestCache.getLatestResponse(aiResponse4SSE.streamId()).isStreamEnd()) {
                LogUtil.d(TAG, "流已经结束，此次数据丢弃");
                return;
            }
        }
        LogUtil.d(TAG, "更新回复tool数量:" + aiResponse4SSE.intentionCount());
        aIStreamRequestCache.setCntOfStreams(aiResponse4SSE.intentionCount());
        aIStreamRequestCache.setLatestResponse(aiResponse4SSE);
        final String requestUniqueKey = requestStrategy.getRequestUniqueKey(aiResponse4SSE);
        requestStrategy.postData(aIStreamRequestCache, aiResponse4SSE, new Runnable() { // from class: com.alibaba.ai.sdk.biz.sse.SSEAdapter.3
            @Override // java.lang.Runnable
            public void run() {
                if (aIStreamRequestCache.end(streamId)) {
                    LogUtil.d(SSEAdapter.TAG, "流结束:" + streamId);
                    if (aIStreamRequestCache.isRequestEnd()) {
                        SSEAdapter.this.requestCache.remove(requestUniqueKey);
                        LogUtil.d(SSEAdapter.TAG, "请求结束:" + requestUniqueKey);
                    }
                }
            }
        });
        if (aiResponse4SSE.isStreamEnd() && aIStreamRequestCache.isRequestEnd()) {
            LogUtil.d(TAG, "请求已结束，关闭请求");
            closeRequest(requestUniqueKey, requestStrategy.autoUnregisterSSE());
        } else if (aiResponse4SSE.isStreamEnd()) {
            startUnArrivedStreamTimeoutCheck(requestUniqueKey);
        } else {
            startStreamTimeoutCheck(requestUniqueKey, streamId, aiResponse4SSE.streamDataTimestamp());
        }
    }

    private String sseAppKey() {
        return SourcingBase.getInstance().getRuntimeContext().getAppType() == 0 ? SSE_APPKEY_BUYER : SSE_APPKEY_SELLER;
    }

    private void startStreamTimeoutCheck(final String str, final String str2, final long j3) {
        new Handler(Looper.getMainLooper()).postDelayed(new Runnable() { // from class: com.alibaba.ai.sdk.biz.sse.g
            @Override // java.lang.Runnable
            public final void run() {
                SSEAdapter.this.lambda$startStreamTimeoutCheck$2(str, str2, j3);
            }
        }, getTimeoutCheckDelay() * 1000);
    }

    private void startUnArrivedStreamTimeoutCheck(final String str) {
        new Handler(Looper.getMainLooper()).postDelayed(new Runnable() { // from class: com.alibaba.ai.sdk.biz.sse.i
            @Override // java.lang.Runnable
            public final void run() {
                SSEAdapter.this.lambda$startUnArrivedStreamTimeoutCheck$3(str);
            }
        }, getTimeoutCheckDelay() * 1000);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* renamed from: timeoutCheck, reason: merged with bridge method [inline-methods] */
    public void lambda$startTimeoutCheck$1(String str, long j3) {
        AIStreamRequestCache aIStreamRequestCache = this.requestCache.get(str);
        if (!isNeedCheckTimeout(str) || aIStreamRequestCache == null) {
            return;
        }
        RequestStrategy strategyByService = AiRequestUtils.getStrategyByService(aIStreamRequestCache.requestParams.getService());
        if (TextUtils.equals(PowerMsgInitializer.getCurrentTopic(), getTopic(aIStreamRequestCache))) {
            boolean z3 = true;
            boolean z4 = j3 == 0 && aIStreamRequestCache.getLatestResponse() == null;
            boolean z5 = (j3 == 0 || aIStreamRequestCache.getLatestResponse() == null || j3 != aIStreamRequestCache.getLatestResponse().streamDataTimestamp()) ? false : true;
            if (!z4 && !z5) {
                z3 = false;
            }
            if (z3) {
                LogUtil.d(TAG, "请求超时：" + strategyByService.getRequestUniqueKey(aIStreamRequestCache));
                if (aIStreamRequestCache.callback != null) {
                    SSEAiResponseError sSEAiResponseError = new SSEAiResponseError(5004, "请求超时");
                    sSEAiResponseError.streamCount = aIStreamRequestCache.getCntOfStreams();
                    sSEAiResponseError.requestId = aIStreamRequestCache.requestParams.getRequestId();
                    aIStreamRequestCache.callback.callback(sSEAiResponseError);
                }
                closeRequest(strategyByService.getRequestUniqueKey(aIStreamRequestCache), strategyByService.autoUnregisterSSE());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* renamed from: unArrivedStreamTimeoutCheck, reason: merged with bridge method [inline-methods] */
    public void lambda$startUnArrivedStreamTimeoutCheck$3(String str) {
        AIStreamRequestCache aIStreamRequestCache = this.requestCache.get(str);
        if (!isNeedCheckTimeout(str) || aIStreamRequestCache == null) {
            return;
        }
        RequestStrategy strategyByService = AiRequestUtils.getStrategyByService(aIStreamRequestCache.requestParams.getService());
        if (aIStreamRequestCache.isRequestEnd()) {
            return;
        }
        if (aIStreamRequestCache.callback != null) {
            SSEAiResponseError sSEAiResponseError = new SSEAiResponseError("", 5005, "流缺失");
            sSEAiResponseError.streamCount = aIStreamRequestCache.getCntOfStreams();
            sSEAiResponseError.requestId = aIStreamRequestCache.requestParams.getRequestId();
            aIStreamRequestCache.callback.callback(sSEAiResponseError);
            LogUtil.d(TAG, "流缺失：");
        }
        if (aIStreamRequestCache.isRequestEnd()) {
            closeRequest(strategyByService.getRequestUniqueKey(aIStreamRequestCache), strategyByService.autoUnregisterSSE());
        }
    }

    public void activeSSE(String str) {
        PowerMsgInitializer.registerDispatcher(str, sseAppKey(), new IDataDispatcher() { // from class: com.alibaba.ai.sdk.biz.sse.SSEAdapter.1
            @Override // com.alibaba.android.powermsgbridge.IDataDispatcher
            public void onDispatch(IcbuMsgWrapper icbuMsgWrapper) {
                LogUtil.d(SSEAdapter.TAG, "sse消息：" + JSON.toJSONString(icbuMsgWrapper));
                String str2 = icbuMsgWrapper.name;
                if (str2 == null || !TextUtils.equals(str2, Constant.NOTICE_MSG)) {
                    return;
                }
                java.util.Map<String, Object> map = icbuMsgWrapper.msg;
                if (((map.containsKey("noticeBizType") && (map.get("noticeBizType") instanceof Integer)) ? ((Integer) map.get("noticeBizType")).intValue() : 0) == 1) {
                    AiResponse4SSE aiResponse4SSE = new AiResponse4SSE(map);
                    SSEAdapter.this.processStreamResponse(aiResponse4SSE, map, aiResponse4SSE.requestStrategy());
                }
            }

            @Override // com.alibaba.android.powermsgbridge.IDataDispatcher
            public void onRawDataDispatch(String str2) {
            }
        }, new PowerMsgInitializer.OnRegisterListener() { // from class: com.alibaba.ai.sdk.biz.sse.SSEAdapter.2
            @Override // com.alibaba.android.powermsgbridge.PowerMsgInitializer.OnRegisterListener
            public void onRegister(int i3, String str2) {
                StringBuilder sb = new StringBuilder();
                sb.append("code:");
                sb.append(i3);
                sb.append(" topic:");
                sb.append(str2);
            }
        });
    }

    @VisibleForTesting
    public void addToRequestList(AIStreamRequestCache aIStreamRequestCache) {
        RequestStrategy strategyByService = AiRequestUtils.getStrategyByService(aIStreamRequestCache.requestParams.getService());
        this.activeRequests.add(strategyByService.getRequestUniqueKey(aIStreamRequestCache));
        if (!this.requestCache.containsKey(strategyByService.getRequestUniqueKey(aIStreamRequestCache))) {
            this.requestCache.put(strategyByService.getRequestUniqueKey(aIStreamRequestCache), aIStreamRequestCache);
            LogUtil.d(TAG, "加入请求队列成功,key:" + strategyByService.getRequestUniqueKey(aIStreamRequestCache));
            return;
        }
        AIStreamRequestCache aIStreamRequestCache2 = this.requestCache.get(strategyByService.getRequestUniqueKey(aIStreamRequestCache));
        aIStreamRequestCache2.requestParams = aIStreamRequestCache.requestParams;
        aIStreamRequestCache2.callback = aIStreamRequestCache.callback;
        if (aIStreamRequestCache2.getLatestResponse() != null) {
            AiInterface.AiCallback aiCallback = aIStreamRequestCache.callback;
            if (aiCallback != null) {
                aiCallback.callback(aIStreamRequestCache2.getLatestResponse());
            }
            if (aIStreamRequestCache2.getLatestResponse().isStreamEnd()) {
                closeRequest(strategyByService.getRequestUniqueKey(aIStreamRequestCache), strategyByService.autoUnregisterSSE());
            }
        }
    }

    @VisibleForTesting
    public void closeRequest(String str, boolean z3) {
        AIStreamRequestCache aIStreamRequestCache;
        this.activeRequests.remove(str);
        if (!z3 || (aIStreamRequestCache = this.requestCache.get(str)) == null) {
            return;
        }
        deactiveSSE(getTopic(aIStreamRequestCache));
    }

    @VisibleForTesting
    public void deactiveSSE(String str) {
        if (TextUtils.isEmpty(str)) {
            return;
        }
        PowerMsgInitializer.unRegister(str, sseAppKey());
    }

    @VisibleForTesting
    public long getTimeoutCheckDelay() {
        String str;
        java.util.Map<String, String> configs = OrangeConfig.getInstance().getConfigs(ORANGE_NAME_AI_CONFIG);
        if (configs == null || (str = configs.get(ORANGE_KEY_TIMEOUT)) == null) {
            return 30L;
        }
        return Long.parseLong(str);
    }

    public boolean isNeedCheckTimeout(String str) {
        AIStreamRequestCache aIStreamRequestCache;
        return this.activeRequests.contains(str) && (aIStreamRequestCache = this.requestCache.get(str)) != null && TextUtils.equals(PowerMsgInitializer.getCurrentTopic(), getTopic(aIStreamRequestCache));
    }

    @VisibleForTesting
    public boolean isRequestValid(AIStreamRequestCache aIStreamRequestCache) {
        if (aIStreamRequestCache.callback == null) {
            return false;
        }
        RequestStrategy strategyByService = AiRequestUtils.getStrategyByService(aIStreamRequestCache.requestParams.getService());
        if (!strategyByService.isRequestValid(aIStreamRequestCache)) {
            return false;
        }
        AIStreamRequestCache aIStreamRequestCache2 = this.requestCache.get(strategyByService.getRequestUniqueKey(aIStreamRequestCache));
        if (aIStreamRequestCache2 == null || aIStreamRequestCache2.callback == null) {
            return true;
        }
        aIStreamRequestCache.callback.callback(new AiResponseError(5002, "重复请求"));
        return false;
    }

    @VisibleForTesting
    public boolean isResponseValid(AiResponse4SSE aiResponse4SSE) {
        if (aiResponse4SSE == null || aiResponse4SSE.streamId() == null || aiResponse4SSE.streamId().length() == 0) {
            return false;
        }
        return (aiResponse4SSE.success() && !aiResponse4SSE.isStreamEnd() && aiResponse4SSE.streamDataTimestamp() == 0) ? false : true;
    }

    public void reactivateAIStream(String str) {
        if (this.activeRequests.isEmpty() || TextUtils.isEmpty(str)) {
            return;
        }
        if (TextUtils.equals(PowerMsgInitializer.getCurrentTopic(), str)) {
            MonitorTrackInterface.getInstance().sendCustomEvent("ASCAIStreamReuse", null);
        } else {
            activeSSE(str);
        }
    }

    public void reactivateALlAIStream() {
        Iterable$EL.forEach(this.requestCache.values(), new Consumer() { // from class: com.alibaba.ai.sdk.biz.sse.h
            @Override // java.util.function.Consumer
            /* renamed from: accept */
            public final void p(Object obj) {
                SSEAdapter.this.lambda$reactivateALlAIStream$4((AIStreamRequestCache) obj);
            }

            @Override // java.util.function.Consumer
            public /* synthetic */ Consumer andThen(Consumer consumer) {
                return Consumer$CC.$default$andThen(this, consumer);
            }
        });
    }

    @VisibleForTesting
    public void startTimeoutCheck(final String str, final long j3) {
        new Handler(Looper.getMainLooper()).postDelayed(new Runnable() { // from class: com.alibaba.ai.sdk.biz.sse.f
            @Override // java.lang.Runnable
            public final void run() {
                SSEAdapter.this.lambda$startTimeoutCheck$1(str, j3);
            }
        }, getTimeoutCheckDelay() * 1000);
    }

    public void streamRequestAi(@Nullable String str, @NonNull AiParams aiParams, @NonNull AiInterface.AiCallback aiCallback) {
        AIStreamRequestCache aIStreamRequestCache = new AIStreamRequestCache(aiParams, str, aiCallback);
        if (isRequestValid(aIStreamRequestCache)) {
            closeAllRequest();
            addToRequestList(aIStreamRequestCache);
            reactivateAIStream(getTopic(aIStreamRequestCache));
            startTimeoutCheck(AiRequestUtils.getStrategyByService(aiParams.getService()).getRequestUniqueKey(aIStreamRequestCache), 0L);
        }
    }

    @VisibleForTesting
    /* renamed from: streamTimeoutCheck, reason: merged with bridge method [inline-methods] */
    public void lambda$startStreamTimeoutCheck$2(String str, String str2, long j3) {
        AIStreamRequestCache aIStreamRequestCache = this.requestCache.get(str);
        if (!isNeedCheckTimeout(str) || aIStreamRequestCache == null) {
            return;
        }
        RequestStrategy strategyByService = AiRequestUtils.getStrategyByService(aIStreamRequestCache.requestParams.getService());
        if ((j3 == 0 || aIStreamRequestCache.getLatestResponse(str2) == null || j3 != aIStreamRequestCache.getLatestResponse().streamDataTimestamp()) ? false : true) {
            aIStreamRequestCache.recordFailStream(str2);
            aIStreamRequestCache.recordTimeoutStream(str2);
            if (aIStreamRequestCache.callback != null) {
                SSEAiResponseError sSEAiResponseError = new SSEAiResponseError(str2, 5003, "流超时");
                sSEAiResponseError.streamCount = aIStreamRequestCache.getCntOfStreams();
                sSEAiResponseError.requestId = aIStreamRequestCache.requestParams.getRequestId();
                aIStreamRequestCache.callback.callback(sSEAiResponseError);
                LogUtil.d(TAG, "流超时：" + str2);
            }
            if (aIStreamRequestCache.isRequestEnd()) {
                closeRequest(strategyByService.getRequestUniqueKey(aIStreamRequestCache), strategyByService.autoUnregisterSSE());
            }
        }
    }
}
