package org.apache.activemq.store.kahadaptor;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StringMarshaller;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;

/* loaded from: input_file:activemq-core-4.1-r424241.jar:org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.class */
public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore {
    private Map ackContainer;
    private Map subscriberContainer;
    private Store store;
    private Map subscriberAcks;
    static Class class$java$lang$String;

    public KahaTopicMessageStore(Store store, MapContainer mapContainer, MapContainer mapContainer2, MapContainer mapContainer3, ActiveMQDestination activeMQDestination) throws IOException {
        super(mapContainer, activeMQDestination);
        this.subscriberAcks = new ConcurrentHashMap();
        this.store = store;
        this.ackContainer = mapContainer2;
        this.subscriberContainer = mapContainer3;
        Iterator it = this.subscriberContainer.keySet().iterator();
        while (it.hasNext()) {
            addSubscriberAckContainer(it.next());
        }
    }

    @Override // org.apache.activemq.store.kahadaptor.KahaMessageStore, org.apache.activemq.store.MessageStore
    public synchronized void addMessage(ConnectionContext connectionContext, Message message) throws IOException {
        int size = this.subscriberAcks.size();
        if (size > 0) {
            String messageId = message.getMessageId().toString();
            this.ackContainer.put(messageId, new AtomicInteger(size));
            Iterator it = this.subscriberAcks.keySet().iterator();
            while (it.hasNext()) {
                this.store.getListContainer(it.next(), "durable-subs").add(messageId);
            }
            super.addMessage(connectionContext, message);
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized void acknowledge(ConnectionContext connectionContext, String str, String str2, MessageId messageId) throws IOException {
        String subscriptionKey = getSubscriptionKey(str, str2);
        String messageId2 = messageId.toString();
        ListContainer listContainer = (ListContainer) this.subscriberAcks.get(subscriptionKey);
        if (listContainer != null) {
            listContainer.removeFirst();
            AtomicInteger atomicInteger = (AtomicInteger) this.ackContainer.remove(messageId2);
            if (atomicInteger != null) {
                if (atomicInteger.decrementAndGet() > 0) {
                    this.ackContainer.put(messageId2, atomicInteger);
                } else {
                    super.removeMessage(messageId);
                }
            }
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public SubscriptionInfo lookupSubscription(String str, String str2) throws IOException {
        return (SubscriptionInfo) this.subscriberContainer.get(getSubscriptionKey(str, str2));
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized void addSubsciption(String str, String str2, String str3, boolean z) throws IOException {
        SubscriptionInfo subscriptionInfo = new SubscriptionInfo();
        subscriptionInfo.setDestination(this.destination);
        subscriptionInfo.setClientId(str);
        subscriptionInfo.setSelector(str3);
        subscriptionInfo.setSubcriptionName(str2);
        String subscriptionKey = getSubscriptionKey(str, str2);
        if (!this.subscriberContainer.containsKey(subscriptionKey)) {
            this.subscriberContainer.put(subscriptionKey, subscriptionInfo);
        }
        addSubscriberAckContainer(subscriptionKey);
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public synchronized void deleteSubscription(String str, String str2) {
        String subscriptionKey = getSubscriptionKey(str, str2);
        this.subscriberContainer.remove(subscriptionKey);
        Iterator it = ((ListContainer) this.subscriberAcks.get(subscriptionKey)).iterator();
        while (it.hasNext()) {
            String obj = it.next().toString();
            AtomicInteger atomicInteger = (AtomicInteger) this.ackContainer.remove(obj);
            if (atomicInteger != null) {
                if (atomicInteger.decrementAndGet() > 0) {
                    this.ackContainer.put(obj, atomicInteger);
                } else {
                    this.messageContainer.remove(obj);
                }
            }
        }
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public void recoverSubscription(String str, String str2, MessageRecoveryListener messageRecoveryListener) throws Exception {
        Class<?> cls;
        ListContainer listContainer = (ListContainer) this.subscriberAcks.get(getSubscriptionKey(str, str2));
        if (listContainer == null) {
            messageRecoveryListener.finished();
            return;
        }
        Iterator it = listContainer.iterator();
        while (it.hasNext()) {
            Object obj = this.messageContainer.get(it.next());
            if (obj != null) {
                Class<?> cls2 = obj.getClass();
                if (class$java$lang$String == null) {
                    cls = class$("java.lang.String");
                    class$java$lang$String = cls;
                } else {
                    cls = class$java$lang$String;
                }
                if (cls2 == cls) {
                    messageRecoveryListener.recoverMessageReference((String) obj);
                } else {
                    messageRecoveryListener.recoverMessage((Message) obj);
                }
            }
            messageRecoveryListener.finished();
        }
    }

    @Override // org.apache.activemq.store.kahadaptor.KahaMessageStore
    public void delete() {
        super.delete();
        this.ackContainer.clear();
        this.subscriberContainer.clear();
    }

    @Override // org.apache.activemq.store.TopicMessageStore
    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
        return (SubscriptionInfo[]) this.subscriberContainer.values().toArray(new SubscriptionInfo[this.subscriberContainer.size()]);
    }

    protected String getSubscriptionKey(String str, String str2) {
        return new StringBuffer().append(new StringBuffer().append(str).append(":").toString()).append(str2 != null ? str2 : "NOT_SET").toString();
    }

    protected void addSubscriberAckContainer(Object obj) throws IOException {
        ListContainer listContainer = this.store.getListContainer(obj, "topic-subs");
        listContainer.setMarshaller(new StringMarshaller());
        this.subscriberAcks.put(obj, listContainer);
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }
}
