package io.moquette.broker;

import io.moquette.broker.ISessionsRepository;
import io.moquette.broker.Session;
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes5.dex */
public class SessionRegistry {

    /* renamed from: k, reason: collision with root package name */
    static final Duration f83099k = Duration.ofSeconds(1);

    /* renamed from: l, reason: collision with root package name */
    private static final Logger f83100l = LoggerFactory.i(SessionRegistry.class);

    /* renamed from: a, reason: collision with root package name */
    private final ScheduledFuture<?> f83101a;

    /* renamed from: b, reason: collision with root package name */
    private int f83102b;

    /* renamed from: c, reason: collision with root package name */
    private final SessionEventLoopGroup f83103c;

    /* renamed from: e, reason: collision with root package name */
    private final ISubscriptionsDirectory f83105e;

    /* renamed from: f, reason: collision with root package name */
    private final ISessionsRepository f83106f;

    /* renamed from: g, reason: collision with root package name */
    private final IQueueRepository f83107g;

    /* renamed from: h, reason: collision with root package name */
    private final Authorizator f83108h;

    /* renamed from: j, reason: collision with root package name */
    private final Clock f83110j;

    /* renamed from: d, reason: collision with root package name */
    private final ConcurrentMap<String, Session> f83104d = new ConcurrentHashMap();

    /* renamed from: i, reason: collision with root package name */
    private final DelayQueue<ISessionsRepository.SessionData> f83109i = new DelayQueue<>();

    /* loaded from: classes5.dex */
    public enum CreationModeEnum {
        CREATED_CLEAN_NEW,
        REOPEN_EXISTING,
        DROP_EXISTING
    }

    /* loaded from: classes5.dex */
    public static abstract class EnqueuedMessage {
        public void a() {
        }
    }

    /* loaded from: classes5.dex */
    public static final class PubRelMarker extends EnqueuedMessage {
    }

    /* loaded from: classes5.dex */
    public static class PublishedMessage extends EnqueuedMessage {

        /* renamed from: a, reason: collision with root package name */
        final Topic f83115a;

        /* renamed from: b, reason: collision with root package name */
        final MqttQoS f83116b;

        /* renamed from: c, reason: collision with root package name */
        final ByteBuf f83117c;

        /* renamed from: d, reason: collision with root package name */
        final boolean f83118d = false;

        public PublishedMessage(Topic topic, MqttQoS mqttQoS, ByteBuf byteBuf, boolean z2) {
            this.f83115a = topic;
            this.f83116b = mqttQoS;
            this.f83117c = byteBuf;
        }

        @Override // io.moquette.broker.SessionRegistry.EnqueuedMessage
        public void a() {
            this.f83117c.release();
        }

        public ByteBuf b() {
            return this.f83117c;
        }

        public MqttQoS c() {
            return this.f83116b;
        }

        public Topic d() {
            return this.f83115a;
        }

        public void e() {
            this.f83117c.retain();
        }
    }

    /* loaded from: classes5.dex */
    public static class SessionCreationResult {

        /* renamed from: a, reason: collision with root package name */
        final Session f83119a;

        /* renamed from: b, reason: collision with root package name */
        final CreationModeEnum f83120b;

        /* renamed from: c, reason: collision with root package name */
        final boolean f83121c;

        public SessionCreationResult(Session session, CreationModeEnum creationModeEnum, boolean z2) {
            this.f83119a = session;
            this.f83120b = creationModeEnum;
            this.f83121c = z2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SessionRegistry(ISubscriptionsDirectory iSubscriptionsDirectory, ISessionsRepository iSessionsRepository, IQueueRepository iQueueRepository, Authorizator authorizator, ScheduledExecutorService scheduledExecutorService, Clock clock, int i2, SessionEventLoopGroup sessionEventLoopGroup) {
        this.f83105e = iSubscriptionsDirectory;
        this.f83106f = iSessionsRepository;
        this.f83107g = iQueueRepository;
        this.f83108h = authorizator;
        Runnable runnable = new Runnable() { // from class: io.moquette.broker.T
            @Override // java.lang.Runnable
            public final void run() {
                SessionRegistry.this.e();
            }
        };
        Duration duration = f83099k;
        this.f83101a = scheduledExecutorService.scheduleWithFixedDelay(runnable, duration.getSeconds(), duration.getSeconds(), TimeUnit.SECONDS);
        this.f83110j = clock;
        this.f83102b = i2;
        this.f83103c = sessionEventLoopGroup;
        q();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void e() {
        ArrayList<ISessionsRepository.SessionData> arrayList = new ArrayList();
        f83100l.debug("Retrieved {} expired sessions or {}", Integer.valueOf(this.f83109i.drainTo(arrayList)), Integer.valueOf(this.f83109i.size()));
        for (ISessionsRepository.SessionData sessionData : arrayList) {
            f83100l.debug("Removing session {}, expired on {}", sessionData.f(), (String) sessionData.h().map(new Function() { // from class: io.moquette.broker.U
                @Override // java.util.function.Function
                public final Object apply(Object obj) {
                    return ((Instant) obj).toString();
                }
            }).orElse("UNDEFINED"));
            r(sessionData.f());
            this.f83106f.c(sessionData);
        }
    }

    private void h(MqttConnectMessage mqttConnectMessage, Session session) {
        session.L(mqttConnectMessage.variableHeader().isCleanSession(), mqttConnectMessage.variableHeader().isWillFlag() ? k(mqttConnectMessage) : null);
    }

    private Session i(MqttConnectMessage mqttConnectMessage, String str) {
        boolean isCleanSession = mqttConnectMessage.variableHeader().isCleanSession();
        SessionMessageQueue<EnqueuedMessage> a2 = !isCleanSession ? this.f83107g.a(str) : new InMemoryQueue();
        ISessionsRepository.SessionData sessionData = new ISessionsRepository.SessionData(str, MqttVersion.MQTT_3_1_1, isCleanSession ? 0 : this.f83102b, this.f83110j);
        Session session = mqttConnectMessage.variableHeader().isWillFlag() ? new Session(sessionData, isCleanSession, k(mqttConnectMessage), a2) : new Session(sessionData, isCleanSession, a2);
        session.w();
        this.f83106f.a(sessionData);
        return session;
    }

    private Session.Will k(MqttConnectMessage mqttConnectMessage) {
        ByteBuf copiedBuffer = Unpooled.copiedBuffer(mqttConnectMessage.payload().willMessageInBytes());
        return new Session.Will(mqttConnectMessage.payload().willTopic(), copiedBuffer, MqttQoS.valueOf(mqttConnectMessage.variableHeader().willQos()), mqttConnectMessage.variableHeader().isWillRetain());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ String l(Session session) {
        session.e();
        return null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ boolean m(Session session) {
        return !session.v();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ boolean n(ISessionsRepository.SessionData sessionData) {
        return !sessionData.h().isPresent();
    }

    private void o(Session session) {
        f83100l.debug("Remove session state for client {}", session.o());
        if (session.b(Session.SessionStatus.DISCONNECTED, Session.SessionStatus.DESTROYED)) {
            v(session);
            r(session.o());
        } else {
            throw new SessionCorruptedException("Session has already changed state: " + session);
        }
    }

    private void p(Session session, String str) {
        for (Subscription subscription : session.q()) {
            if (!this.f83108h.a(subscription.j(), str, session.o())) {
                this.f83105e.a(subscription.j(), session.o());
            }
        }
    }

    private void q() {
        Set<String> c2 = this.f83107g.c();
        for (ISessionsRepository.SessionData sessionData : this.f83106f.b()) {
            if (this.f83107g.b(sessionData.f())) {
                SessionMessageQueue<EnqueuedMessage> a2 = this.f83107g.a(sessionData.f());
                c2.remove(sessionData.f());
                this.f83104d.put(sessionData.f(), new Session(sessionData, false, a2));
                u(sessionData);
            }
        }
        if (c2.isEmpty()) {
            return;
        }
        f83100l.error("Recreating sessions left {} unused queues. This is probably a bug. Session IDs: {}", Integer.valueOf(c2.size()), Arrays.toString(c2.toArray()));
    }

    private SessionCreationResult s(MqttConnectMessage mqttConnectMessage, String str, Session session, String str2) {
        SessionCreationResult sessionCreationResult;
        boolean isCleanSession = mqttConnectMessage.variableHeader().isCleanSession();
        if (!session.k()) {
            session.f();
        }
        if (isCleanSession) {
            o(session);
            Session i2 = i(mqttConnectMessage, str);
            this.f83104d.put(str, i2);
            f83100l.trace("case 2, oldSession with same CId {} disconnected", str);
            sessionCreationResult = new SessionCreationResult(i2, CreationModeEnum.CREATED_CLEAN_NEW, true);
        } else {
            if (!session.b(Session.SessionStatus.DISCONNECTED, Session.SessionStatus.CONNECTING)) {
                throw new SessionCorruptedException("old session moved in connected state by other thread");
            }
            h(mqttConnectMessage, session);
            p(session, str2);
            f83100l.trace("case 3, oldSession with same CId {} disconnected", str);
            sessionCreationResult = new SessionCreationResult(session, CreationModeEnum.REOPEN_EXISTING, true);
        }
        w(sessionCreationResult.f83119a.p());
        return sessionCreationResult;
    }

    private void u(ISessionsRepository.SessionData sessionData) {
        if (sessionData.h().isPresent()) {
            f83100l.debug("start tracking the session {} for removal", sessionData.f());
            this.f83109i.add((DelayQueue<ISessionsRepository.SessionData>) sessionData);
        } else {
            throw new RuntimeException("Can't track for expiration a session without expiry instant, client_id: " + sessionData.f());
        }
    }

    private void v(Session session) {
        Iterator<Subscription> it = session.q().iterator();
        while (it.hasNext()) {
            this.f83105e.a(it.next().j(), session.o());
        }
    }

    private void w(ISessionsRepository.SessionData sessionData) {
        this.f83109i.remove(sessionData);
    }

    private void x() {
        Stream map = this.f83104d.values().stream().filter(new Predicate() { // from class: io.moquette.broker.M
            @Override // java.util.function.Predicate
            public final boolean test(Object obj) {
                boolean m2;
                m2 = SessionRegistry.m((Session) obj);
                return m2;
            }
        }).map(new Function() { // from class: io.moquette.broker.N
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                return ((Session) obj).p();
            }
        }).filter(new Predicate() { // from class: io.moquette.broker.O
            @Override // java.util.function.Predicate
            public final boolean test(Object obj) {
                boolean n2;
                n2 = SessionRegistry.n((ISessionsRepository.SessionData) obj);
                return n2;
            }
        }).map(new Function() { // from class: io.moquette.broker.P
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                return ((ISessionsRepository.SessionData) obj).l();
            }
        });
        final ISessionsRepository iSessionsRepository = this.f83106f;
        Objects.requireNonNull(iSessionsRepository);
        map.forEach(new Consumer() { // from class: io.moquette.broker.Q
            @Override // java.util.function.Consumer
            public final void accept(Object obj) {
                ISessionsRepository.this.a((ISessionsRepository.SessionData) obj);
            }
        });
    }

    public void f() {
        if (this.f83101a.cancel(false)) {
            f83100l.info("Successfully cancelled expired sessions task");
        } else {
            f83100l.warn("Can't cancel the execution of expired sessions task, was already cancelled? {}, was done? {}", Boolean.valueOf(this.f83101a.isCancelled()), Boolean.valueOf(this.f83101a.isDone()));
        }
        x();
        this.f83107g.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void g(Session session) {
        session.j();
        if (session.m()) {
            o(session);
        } else {
            u(session.p().l());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SessionCreationResult j(MqttConnectMessage mqttConnectMessage, String str, String str2) {
        Session t2 = t(str);
        if (t2 != null) {
            return s(mqttConnectMessage, str, t2, str2);
        }
        Session i2 = i(mqttConnectMessage, str);
        SessionCreationResult sessionCreationResult = new SessionCreationResult(i2, CreationModeEnum.CREATED_CLEAN_NEW, false);
        if (this.f83104d.put(str, i2) != null) {
            f83100l.error("Another thread added a Session for our clientId {}, this is a bug!", str);
        }
        f83100l.trace("case 1, not existing session with CId {}", str);
        return sessionCreationResult;
    }

    void r(String str) {
        final Session remove = this.f83104d.remove(str);
        if (remove != null) {
            this.f83109i.remove(remove.p());
            this.f83103c.f(str, "Clean up removed session", new Callable() { // from class: io.moquette.broker.S
                @Override // java.util.concurrent.Callable
                public final Object call() {
                    String l2;
                    l2 = SessionRegistry.l(Session.this);
                    return l2;
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Session t(String str) {
        return this.f83104d.get(str);
    }
}
