package org.apache.activemq.store.journal;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import org.apache.activeio.journal.RecordLocation;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:activemq-core-4.1-r424241.jar:org/apache/activemq/store/journal/JournalMessageStore.class */
public class JournalMessageStore implements MessageStore {
    private static final Log log;
    protected final JournalPersistenceAdapter peristenceAdapter;
    protected final JournalTransactionStore transactionStore;
    protected final MessageStore longTermStore;
    protected final ActiveMQDestination destination;
    protected final TransactionTemplate transactionTemplate;
    private LinkedHashMap cpAddedMessageIds;
    protected RecordLocation lastLocation;
    private UsageManager usageManager;
    static Class class$org$apache$activemq$store$journal$JournalMessageStore;
    private LinkedHashMap messages = new LinkedHashMap();
    private ArrayList messageAcks = new ArrayList();
    protected HashSet inFlightTxLocations = new HashSet();

    public JournalMessageStore(JournalPersistenceAdapter journalPersistenceAdapter, MessageStore messageStore, ActiveMQDestination activeMQDestination) {
        this.peristenceAdapter = journalPersistenceAdapter;
        this.transactionStore = journalPersistenceAdapter.getTransactionStore();
        this.longTermStore = messageStore;
        this.destination = activeMQDestination;
        this.transactionTemplate = new TransactionTemplate(journalPersistenceAdapter, new ConnectionContext());
    }

    @Override // org.apache.activemq.store.MessageStore
    public void setUsageManager(UsageManager usageManager) {
        this.usageManager = usageManager;
        this.longTermStore.setUsageManager(usageManager);
    }

    @Override // org.apache.activemq.store.MessageStore
    public void addMessage(ConnectionContext connectionContext, Message message) throws IOException {
        MessageId messageId = message.getMessageId();
        boolean isDebugEnabled = log.isDebugEnabled();
        message.incrementReferenceCount();
        RecordLocation writeCommand = this.peristenceAdapter.writeCommand(message, message.isResponseRequired());
        if (!connectionContext.isInTransaction()) {
            if (isDebugEnabled) {
                log.debug(new StringBuffer().append("Journalled message add for: ").append(messageId).append(", at: ").append(writeCommand).toString());
            }
            addMessage(message, writeCommand);
        } else {
            if (isDebugEnabled) {
                log.debug(new StringBuffer().append("Journalled transacted message add for: ").append(messageId).append(", at: ").append(writeCommand).toString());
            }
            synchronized (this) {
                this.inFlightTxLocations.add(writeCommand);
            }
            this.transactionStore.addMessage(this, message, writeCommand);
            connectionContext.getTransaction().addSynchronization(new Synchronization(this, isDebugEnabled, messageId, writeCommand, message) { // from class: org.apache.activemq.store.journal.JournalMessageStore.1
                private final boolean val$debug;
                private final MessageId val$id;
                private final RecordLocation val$location;
                private final Message val$message;
                private final JournalMessageStore this$0;

                {
                    this.this$0 = this;
                    this.val$debug = isDebugEnabled;
                    this.val$id = messageId;
                    this.val$location = writeCommand;
                    this.val$message = message;
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    if (this.val$debug) {
                        JournalMessageStore.log.debug(new StringBuffer().append("Transacted message add commit for: ").append(this.val$id).append(", at: ").append(this.val$location).toString());
                    }
                    synchronized (this.this$0) {
                        this.this$0.inFlightTxLocations.remove(this.val$location);
                        this.this$0.addMessage(this.val$message, this.val$location);
                    }
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterRollback() throws Exception {
                    if (this.val$debug) {
                        JournalMessageStore.log.debug(new StringBuffer().append("Transacted message add rollback for: ").append(this.val$id).append(", at: ").append(this.val$location).toString());
                    }
                    synchronized (this.this$0) {
                        this.this$0.inFlightTxLocations.remove(this.val$location);
                    }
                    this.val$message.decrementReferenceCount();
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addMessage(Message message, RecordLocation recordLocation) {
        synchronized (this) {
            this.lastLocation = recordLocation;
            this.messages.put(message.getMessageId(), message);
        }
    }

    public void replayAddMessage(ConnectionContext connectionContext, Message message) {
        try {
            if (this.longTermStore.getMessage(message.getMessageId()) == null) {
                this.longTermStore.addMessage(connectionContext, message);
            }
        } catch (Throwable th) {
            log.warn(new StringBuffer().append("Could not replay add for message '").append(message.getMessageId()).append("'.  Message may have already been added. reason: ").append(th).toString());
        }
    }

    @Override // org.apache.activemq.store.MessageStore
    public void removeMessage(ConnectionContext connectionContext, MessageAck messageAck) throws IOException {
        boolean isDebugEnabled = log.isDebugEnabled();
        JournalQueueAck journalQueueAck = new JournalQueueAck();
        journalQueueAck.setDestination(this.destination);
        journalQueueAck.setMessageAck(messageAck);
        RecordLocation writeCommand = this.peristenceAdapter.writeCommand(journalQueueAck, messageAck.isResponseRequired());
        if (!connectionContext.isInTransaction()) {
            if (isDebugEnabled) {
                log.debug(new StringBuffer().append("Journalled message remove for: ").append(messageAck.getLastMessageId()).append(", at: ").append(writeCommand).toString());
            }
            removeMessage(messageAck, writeCommand);
        } else {
            if (isDebugEnabled) {
                log.debug(new StringBuffer().append("Journalled transacted message remove for: ").append(messageAck.getLastMessageId()).append(", at: ").append(writeCommand).toString());
            }
            synchronized (this) {
                this.inFlightTxLocations.add(writeCommand);
            }
            this.transactionStore.removeMessage(this, messageAck, writeCommand);
            connectionContext.getTransaction().addSynchronization(new Synchronization(this, isDebugEnabled, messageAck, writeCommand) { // from class: org.apache.activemq.store.journal.JournalMessageStore.2
                private final boolean val$debug;
                private final MessageAck val$ack;
                private final RecordLocation val$location;
                private final JournalMessageStore this$0;

                {
                    this.this$0 = this;
                    this.val$debug = isDebugEnabled;
                    this.val$ack = messageAck;
                    this.val$location = writeCommand;
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterCommit() throws Exception {
                    if (this.val$debug) {
                        JournalMessageStore.log.debug(new StringBuffer().append("Transacted message remove commit for: ").append(this.val$ack.getLastMessageId()).append(", at: ").append(this.val$location).toString());
                    }
                    synchronized (this.this$0) {
                        this.this$0.inFlightTxLocations.remove(this.val$location);
                        this.this$0.removeMessage(this.val$ack, this.val$location);
                    }
                }

                @Override // org.apache.activemq.transaction.Synchronization
                public void afterRollback() throws Exception {
                    if (this.val$debug) {
                        JournalMessageStore.log.debug(new StringBuffer().append("Transacted message remove rollback for: ").append(this.val$ack.getLastMessageId()).append(", at: ").append(this.val$location).toString());
                    }
                    synchronized (this.this$0) {
                        this.this$0.inFlightTxLocations.remove(this.val$location);
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeMessage(MessageAck messageAck, RecordLocation recordLocation) {
        synchronized (this) {
            this.lastLocation = recordLocation;
            Message message = (Message) this.messages.remove(messageAck.getLastMessageId());
            if (message == null) {
                this.messageAcks.add(messageAck);
            } else {
                message.decrementReferenceCount();
            }
        }
    }

    public void replayRemoveMessage(ConnectionContext connectionContext, MessageAck messageAck) {
        try {
            if (this.longTermStore.getMessage(messageAck.getLastMessageId()) != null) {
                this.longTermStore.removeMessage(connectionContext, messageAck);
            }
        } catch (Throwable th) {
            log.warn(new StringBuffer().append("Could not replay acknowledge for message '").append(messageAck.getLastMessageId()).append("'.  Message may have already been acknowledged. reason: ").append(th).toString());
        }
    }

    public RecordLocation checkpoint() throws IOException {
        return checkpoint(null);
    }

    public RecordLocation checkpoint(Callback callback) throws IOException {
        ArrayList arrayList;
        ArrayList arrayList2;
        int maxCheckpointMessageAddSize = this.peristenceAdapter.getMaxCheckpointMessageAddSize();
        synchronized (this) {
            this.cpAddedMessageIds = this.messages;
            arrayList = this.messageAcks;
            arrayList2 = new ArrayList(this.inFlightTxLocations);
            this.messages = new LinkedHashMap();
            this.messageAcks = new ArrayList();
        }
        this.transactionTemplate.run(new Callback(this, maxCheckpointMessageAddSize, arrayList, callback) { // from class: org.apache.activemq.store.journal.JournalMessageStore.3
            private final int val$maxCheckpointMessageAddSize;
            private final ArrayList val$cpRemovedMessageLocations;
            private final Callback val$postCheckpointTest;
            private final JournalMessageStore this$0;

            {
                this.this$0 = this;
                this.val$maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
                this.val$cpRemovedMessageLocations = arrayList;
                this.val$postCheckpointTest = callback;
            }

            @Override // org.apache.activemq.util.Callback
            public void execute() throws Exception {
                int i = 0;
                PersistenceAdapter persistenceAdapter = this.this$0.transactionTemplate.getPersistenceAdapter();
                ConnectionContext context = this.this$0.transactionTemplate.getContext();
                for (Message message : this.this$0.cpAddedMessageIds.values()) {
                    try {
                        this.this$0.longTermStore.addMessage(context, message);
                    } catch (Throwable th) {
                        JournalMessageStore.log.warn(new StringBuffer().append("Message could not be added to long term store: ").append(th.getMessage()).toString(), th);
                    }
                    i += message.getSize();
                    message.decrementReferenceCount();
                    if (i >= this.val$maxCheckpointMessageAddSize) {
                        persistenceAdapter.commitTransaction(context);
                        persistenceAdapter.beginTransaction(context);
                        i = 0;
                    }
                }
                persistenceAdapter.commitTransaction(context);
                persistenceAdapter.beginTransaction(context);
                Iterator it = this.val$cpRemovedMessageLocations.iterator();
                while (it.hasNext()) {
                    try {
                        this.this$0.longTermStore.removeMessage(this.this$0.transactionTemplate.getContext(), (MessageAck) it.next());
                    } catch (Throwable th2) {
                        JournalMessageStore.log.debug(new StringBuffer().append("Message could not be removed from long term store: ").append(th2.getMessage()).toString(), th2);
                    }
                }
                if (this.val$postCheckpointTest != null) {
                    this.val$postCheckpointTest.execute();
                }
            }
        });
        synchronized (this) {
            this.cpAddedMessageIds = null;
        }
        if (arrayList2.size() <= 0) {
            return this.lastLocation;
        }
        Collections.sort(arrayList2);
        return (RecordLocation) arrayList2.get(0);
    }

    @Override // org.apache.activemq.store.MessageStore
    public Message getMessage(MessageId messageId) throws IOException {
        Message message;
        synchronized (this) {
            message = (Message) this.messages.get(messageId);
            if (message == null && this.cpAddedMessageIds != null) {
                message = (Message) this.cpAddedMessageIds.get(messageId);
            }
        }
        return message != null ? message : this.longTermStore.getMessage(messageId);
    }

    @Override // org.apache.activemq.store.MessageStore
    public void recover(MessageRecoveryListener messageRecoveryListener) throws Exception {
        this.peristenceAdapter.checkpoint(true, true);
        this.longTermStore.recover(messageRecoveryListener);
    }

    @Override // org.apache.activemq.Service
    public void start() throws Exception {
        if (this.usageManager != null) {
            this.usageManager.addUsageListener(this.peristenceAdapter);
        }
        this.longTermStore.start();
    }

    @Override // org.apache.activemq.Service
    public void stop() throws Exception {
        this.longTermStore.stop();
        if (this.usageManager != null) {
            this.usageManager.removeUsageListener(this.peristenceAdapter);
        }
    }

    public MessageStore getLongTermMessageStore() {
        return this.longTermStore;
    }

    @Override // org.apache.activemq.store.MessageStore
    public void removeAllMessages(ConnectionContext connectionContext) throws IOException {
        this.peristenceAdapter.checkpoint(true, true);
        this.longTermStore.removeAllMessages(connectionContext);
    }

    @Override // org.apache.activemq.store.MessageStore
    public ActiveMQDestination getDestination() {
        return this.destination;
    }

    @Override // org.apache.activemq.store.MessageStore
    public void addMessageReference(ConnectionContext connectionContext, MessageId messageId, long j, String str) throws IOException {
        throw new IOException("The journal does not support message references.");
    }

    @Override // org.apache.activemq.store.MessageStore
    public String getMessageReference(MessageId messageId) throws IOException {
        throw new IOException("The journal does not support message references.");
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$apache$activemq$store$journal$JournalMessageStore == null) {
            cls = class$("org.apache.activemq.store.journal.JournalMessageStore");
            class$org$apache$activemq$store$journal$JournalMessageStore = cls;
        } else {
            cls = class$org$apache$activemq$store$journal$JournalMessageStore;
        }
        log = LogFactory.getLog(cls);
    }
}
