package com.hivemq.client.internal.mqtt.handler.subscribe;

import H4.d;
import U3.b;
import com.hivemq.client.internal.mqtt.MqttClientConnectionConfig;
import com.hivemq.client.internal.mqtt.datatypes.MqttVariableByteInteger;
import com.hivemq.client.internal.mqtt.exceptions.MqttClientStateExceptions;
import com.hivemq.client.internal.mqtt.handler.MqttSessionAwareHandler;
import com.hivemq.client.internal.mqtt.handler.disconnect.MqttDisconnectUtil;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishFlows;
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttSubscribedPublishFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubOrUnsubWithFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubscribeWithFlow;
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttUnsubscribeWithFlow;
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
import com.hivemq.client.internal.mqtt.message.MqttCommonReasonCode;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttStatefulSubscribe;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribe;
import com.hivemq.client.internal.mqtt.message.subscribe.suback.MqttSubAck;
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttStatefulUnsubscribe;
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribe;
import com.hivemq.client.internal.mqtt.message.unsubscribe.unsuback.MqttUnsubAck;
import com.hivemq.client.internal.mqtt.message.unsubscribe.unsuback.mqtt3.Mqtt3UnsubAckView;
import com.hivemq.client.internal.util.collections.j;
import com.hivemq.client.internal.util.collections.l;
import com.hivemq.client.internal.util.collections.m;
import com.hivemq.client.internal.util.i;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5UnsubAckException;
import io.netty.channel.InterfaceC2873m;
import io.netty.channel.N;
import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.ToIntFunction;

@ClientScope
/* loaded from: classes.dex */
public class MqttSubscriptionHandler extends MqttSessionAwareHandler implements Runnable {
    public static final int MAX_SUB_PENDING = 10;
    public static final String NAME = "subscription";
    private MqttSubOrUnsubWithFlow.Stateful currentPending;
    private final MqttIncomingPublishFlows incomingPublishFlows;
    private MqttSubOrUnsubWithFlow.Stateful resendPending;
    private i subscriptionIdentifiers;
    private static final U3.a LOGGER = b.a(MqttSubscriptionHandler.class);
    private static final l.b<MqttSubOrUnsubWithFlow.Stateful> INDEX_SPEC = new l.b<>(new ToIntFunction() { // from class: com.hivemq.client.internal.mqtt.handler.subscribe.a
        @Override // java.util.function.ToIntFunction
        public final int applyAsInt(Object obj) {
            int lambda$static$0;
            lambda$static$0 = MqttSubscriptionHandler.lambda$static$0((MqttSubOrUnsubWithFlow.Stateful) obj);
            return lambda$static$0;
        }
    }, 4);
    private final ConcurrentLinkedQueue<MqttSubOrUnsubWithFlow> queued = new ConcurrentLinkedQueue<>();
    private final AtomicInteger queuedCounter = new AtomicInteger();
    private final l<MqttSubOrUnsubWithFlow.Stateful> pendingIndex = new l<>(INDEX_SPEC);
    private final m<MqttSubOrUnsubWithFlow.Stateful> pending = new m<>();
    private final i packetIdentifiers = new i(65526, 65535);

    /* JADX INFO: Access modifiers changed from: package-private */
    public MqttSubscriptionHandler(MqttIncomingPublishFlows mqttIncomingPublishFlows) {
        this.incomingPublishFlows = mqttIncomingPublishFlows;
    }

    private void addPending(MqttSubOrUnsubWithFlow.Stateful stateful) {
        this.pendingIndex.g(stateful);
        this.pending.add(stateful);
    }

    private void clearQueued(Throwable th) {
        int i10;
        do {
            i10 = 0;
            while (true) {
                MqttSubOrUnsubWithFlow poll = this.queued.poll();
                if (poll == null) {
                    break;
                }
                if (poll.getFlow().init()) {
                    poll.getFlow().onError(th);
                }
                i10++;
            }
        } while (this.queuedCounter.addAndGet(-i10) != 0);
    }

    private void completePending(InterfaceC2873m interfaceC2873m, MqttSubOrUnsubWithFlow.Stateful stateful) {
        this.pending.remove(stateful);
        int packetIdentifier = stateful.getMessage().getPacketIdentifier();
        MqttSubOrUnsubWithFlow poll = this.queued.poll();
        if (poll == null) {
            this.packetIdentifiers.d(packetIdentifier);
        } else {
            this.queuedCounter.getAndDecrement();
            writeSubscribeOrUnsubscribe(interfaceC2873m, poll, packetIdentifier);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ int lambda$static$0(MqttSubOrUnsubWithFlow.Stateful stateful) {
        return stateful.getMessage().getPacketIdentifier();
    }

    private void queue(MqttSubOrUnsubWithFlow mqttSubOrUnsubWithFlow) {
        this.queued.offer(mqttSubOrUnsubWithFlow);
        if (this.queuedCounter.getAndIncrement() == 0) {
            mqttSubOrUnsubWithFlow.getFlow().getEventLoop().execute(this);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void readSubAck(InterfaceC2873m interfaceC2873m, MqttSubAck mqttSubAck) {
        MqttSubOrUnsubWithFlow.Stateful j10 = this.pendingIndex.j(mqttSubAck.getPacketIdentifier());
        if (j10 == null) {
            MqttDisconnectUtil.disconnect(interfaceC2873m.channel(), d.PROTOCOL_ERROR, "Unknown packet identifier for SUBACK");
            return;
        }
        if (!(j10 instanceof MqttSubscribeWithFlow.Stateful)) {
            MqttDisconnectUtil.disconnect(interfaceC2873m.channel(), d.PROTOCOL_ERROR, "SUBACK received for an UNSUBSCRIBE");
            return;
        }
        MqttSubscribeWithFlow.Stateful stateful = (MqttSubscribeWithFlow.Stateful) j10;
        MqttStatefulSubscribe message = stateful.getMessage();
        MqttSubscriptionFlow<MqttSubAck> flow = stateful.getFlow();
        boolean z10 = ((MqttSubscribe) message.stateless()).m141getSubscriptions().size() != mqttSubAck.getReasonCodes().size();
        boolean allErrors = MqttCommonReasonCode.allErrors(mqttSubAck.getReasonCodes());
        this.incomingPublishFlows.subAck(message, mqttSubAck, stateful.getPublishFlow());
        if (z10 || allErrors) {
            String str = z10 ? "Count of Reason Codes in SUBACK does not match count of subscriptions in SUBSCRIBE" : "SUBACK contains only Error Codes";
            if (flow.isCancelled()) {
                LOGGER.warn(str + " but the SubAck flow has been cancelled");
            } else {
                flow.onError(new Mqtt5SubAckException(mqttSubAck, str));
            }
        } else if (flow.isCancelled()) {
            LOGGER.warn("Subscribe was successful but the SubAck flow has been cancelled");
        } else {
            flow.onSuccess(mqttSubAck);
        }
        completePending(interfaceC2873m, stateful);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void readUnsubAck(InterfaceC2873m interfaceC2873m, MqttUnsubAck mqttUnsubAck) {
        MqttSubOrUnsubWithFlow.Stateful j10 = this.pendingIndex.j(mqttUnsubAck.getPacketIdentifier());
        if (j10 == null) {
            MqttDisconnectUtil.disconnect(interfaceC2873m.channel(), d.PROTOCOL_ERROR, "Unknown packet identifier for UNSUBACK");
            return;
        }
        if (!(j10 instanceof MqttUnsubscribeWithFlow.Stateful)) {
            MqttDisconnectUtil.disconnect(interfaceC2873m.channel(), d.PROTOCOL_ERROR, "UNSUBACK received for a SUBSCRIBE");
            return;
        }
        MqttUnsubscribeWithFlow.Stateful stateful = (MqttUnsubscribeWithFlow.Stateful) j10;
        MqttStatefulUnsubscribe message = stateful.getMessage();
        MqttSubOrUnsubAckFlow<MqttUnsubAck> flow = stateful.getFlow();
        j<R4.b> reasonCodes = mqttUnsubAck.getReasonCodes();
        boolean z10 = ((MqttUnsubscribe) message.stateless()).m149getTopicFilters().size() != reasonCodes.size();
        boolean allErrors = MqttCommonReasonCode.allErrors(mqttUnsubAck.getReasonCodes());
        if (reasonCodes == Mqtt3UnsubAckView.REASON_CODES_ALL_SUCCESS || !(z10 || allErrors)) {
            this.incomingPublishFlows.unsubscribe(message, mqttUnsubAck);
            if (flow.isCancelled()) {
                LOGGER.warn("Unsubscribe was successful but the UnsubAck flow has been cancelled");
            } else {
                flow.onSuccess(mqttUnsubAck);
            }
        } else {
            String str = z10 ? "Count of Reason Codes in UNSUBACK does not match count of Topic Filters in UNSUBSCRIBE" : "UNSUBACK contains only Error Codes";
            if (flow.isCancelled()) {
                LOGGER.warn(str + " but the UnsubAck flow has been cancelled");
            } else {
                flow.onError(new Mqtt5UnsubAckException(mqttUnsubAck, str));
            }
        }
        completePending(interfaceC2873m, stateful);
    }

    private boolean writeSubscribe(InterfaceC2873m interfaceC2873m, MqttSubscribeWithFlow.Stateful stateful) {
        MqttStatefulSubscribe message = stateful.getMessage();
        this.currentPending = stateful;
        interfaceC2873m.writeAndFlush(message, interfaceC2873m.voidPromise());
        if (this.currentPending == null) {
            return false;
        }
        this.currentPending = null;
        return true;
    }

    private void writeSubscribeOrUnsubscribe(InterfaceC2873m interfaceC2873m, MqttSubOrUnsubWithFlow mqttSubOrUnsubWithFlow, int i10) {
        if (mqttSubOrUnsubWithFlow.getFlow().init()) {
            if (!(mqttSubOrUnsubWithFlow instanceof MqttSubscribeWithFlow)) {
                MqttUnsubscribeWithFlow mqttUnsubscribeWithFlow = (MqttUnsubscribeWithFlow) mqttSubOrUnsubWithFlow;
                MqttUnsubscribeWithFlow.Stateful stateful = new MqttUnsubscribeWithFlow.Stateful(mqttUnsubscribeWithFlow.getMessage().createStateful(i10), mqttUnsubscribeWithFlow.getFlow());
                addPending(stateful);
                writeUnsubscribe(interfaceC2873m, stateful);
                return;
            }
            MqttSubscribeWithFlow mqttSubscribeWithFlow = (MqttSubscribeWithFlow) mqttSubOrUnsubWithFlow;
            i iVar = this.subscriptionIdentifiers;
            MqttStatefulSubscribe createStateful = mqttSubscribeWithFlow.getMessage().createStateful(i10, iVar != null ? iVar.a() : -1);
            MqttSubscribeWithFlow.Stateful stateful2 = new MqttSubscribeWithFlow.Stateful(createStateful, mqttSubscribeWithFlow.getFlow());
            addPending(stateful2);
            if (writeSubscribe(interfaceC2873m, stateful2)) {
                this.incomingPublishFlows.subscribe(createStateful, stateful2.getPublishFlow());
            }
        }
    }

    private void writeUnsubscribe(InterfaceC2873m interfaceC2873m, MqttUnsubscribeWithFlow.Stateful stateful) {
        this.currentPending = stateful;
        interfaceC2873m.writeAndFlush(stateful.getMessage(), interfaceC2873m.voidPromise());
        this.currentPending = null;
    }

    @Override // io.netty.channel.C2876p, io.netty.channel.InterfaceC2875o
    public void channelRead(InterfaceC2873m interfaceC2873m, Object obj) {
        if (obj instanceof MqttSubAck) {
            readSubAck(interfaceC2873m, (MqttSubAck) obj);
        } else if (obj instanceof MqttUnsubAck) {
            readUnsubAck(interfaceC2873m, (MqttUnsubAck) obj);
        } else {
            interfaceC2873m.fireChannelRead(obj);
        }
    }

    @Override // io.netty.channel.C2876p, io.netty.channel.AbstractC2872l, io.netty.channel.InterfaceC2871k, io.netty.channel.InterfaceC2875o
    public void exceptionCaught(InterfaceC2873m interfaceC2873m, Throwable th) {
        MqttSubOrUnsubWithFlow.Stateful stateful;
        if ((th instanceof IOException) || (stateful = this.currentPending) == null) {
            interfaceC2873m.fireExceptionCaught(th);
            return;
        }
        this.pendingIndex.j(stateful.getMessage().getPacketIdentifier());
        this.currentPending.getFlow().onError(th);
        completePending(interfaceC2873m, this.currentPending);
        this.currentPending = null;
    }

    @Override // com.hivemq.client.internal.mqtt.handler.MqttSessionAwareHandler
    public void onSessionEnd(Throwable th) {
        super.onSessionEnd(th);
        for (MqttSubOrUnsubWithFlow.Stateful first = this.pending.getFirst(); first != null; first = first.getNext()) {
            this.packetIdentifiers.d(first.getMessage().getPacketIdentifier());
            if (!(first.getFlow() instanceof MqttSubscribedPublishFlow)) {
                first.getFlow().onError(th);
            }
        }
        this.pendingIndex.e();
        this.pending.clear();
        this.resendPending = null;
        this.subscriptionIdentifiers = null;
        clearQueued(th);
    }

    @Override // com.hivemq.client.internal.mqtt.handler.MqttSessionAwareHandler
    public void onSessionStartOrResume(MqttClientConnectionConfig mqttClientConnectionConfig, N n10) {
        super.onSessionStartOrResume(mqttClientConnectionConfig, n10);
        if (mqttClientConnectionConfig.areSubscriptionIdentifiersAvailable() && this.subscriptionIdentifiers == null) {
            this.subscriptionIdentifiers = new i(1, MqttVariableByteInteger.FOUR_BYTES_MAX_VALUE);
        }
        if (this.pending.getFirst() != null || this.queuedCounter.get() > 0) {
            this.resendPending = this.pending.getFirst();
            n10.execute(this);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        int i10;
        if (!this.hasSession) {
            clearQueued(MqttClientStateExceptions.notConnected());
            return;
        }
        InterfaceC2873m interfaceC2873m = this.ctx;
        if (interfaceC2873m == null) {
            return;
        }
        while (true) {
            MqttSubOrUnsubWithFlow.Stateful stateful = this.resendPending;
            if (stateful == null) {
                break;
            }
            if (stateful instanceof MqttSubscribeWithFlow.Stateful) {
                writeSubscribe(interfaceC2873m, (MqttSubscribeWithFlow.Stateful) stateful);
            } else {
                writeUnsubscribe(interfaceC2873m, (MqttUnsubscribeWithFlow.Stateful) stateful);
            }
            this.resendPending = this.resendPending.getNext();
        }
        do {
            i10 = 0;
            while (this.pendingIndex.m() != 10) {
                MqttSubOrUnsubWithFlow poll = this.queued.poll();
                if (poll != null) {
                    int a10 = this.packetIdentifiers.a();
                    if (a10 == -1) {
                        LOGGER.error("No Packet Identifier available for (UN)SUBSCRIBE. This must not happen and is a bug.");
                        return;
                    } else {
                        writeSubscribeOrUnsubscribe(interfaceC2873m, poll, a10);
                        i10++;
                    }
                }
            }
            this.queuedCounter.getAndAdd(-i10);
            return;
        } while (this.queuedCounter.addAndGet(-i10) != 0);
    }

    public void subscribe(MqttSubscribe mqttSubscribe, MqttSubscriptionFlow<MqttSubAck> mqttSubscriptionFlow) {
        queue(new MqttSubscribeWithFlow(mqttSubscribe, mqttSubscriptionFlow));
    }

    public void unsubscribe(MqttUnsubscribe mqttUnsubscribe, MqttSubOrUnsubAckFlow<MqttUnsubAck> mqttSubOrUnsubAckFlow) {
        queue(new MqttUnsubscribeWithFlow(mqttUnsubscribe, mqttSubOrUnsubAckFlow));
    }
}
