package org.apache.activemq.transport.stomp;

import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.ProtocolException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.jms.JMSException;
import org.apache.activeio.adapter.PacketInputStream;
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
import org.apache.activeio.util.ByteArrayOutputStream;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.FlushCommand;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.transport.stomp.AsyncHelper;
import org.apache.activemq.transport.stomp.Stomp;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;

/* loaded from: input_file:org/apache/activemq/transport/stomp/StompWireFormat.class */
public class StompWireFormat implements WireFormat {
    private static final IdGenerator connectionIdGenerator;
    private static int transactionIdCounter;
    private short lastCommandId;
    static final boolean $assertionsDisabled;
    static Class class$org$apache$activemq$transport$stomp$StompWireFormat;
    private int version = 1;
    private final CommandParser commandParser = new CommandParser(this);
    private final HeaderParser headerParser = new HeaderParser();
    private final BlockingQueue pendingReadCommands = new LinkedBlockingQueue();
    private final BlockingQueue pendingWriteFrames = new LinkedBlockingQueue();
    private final List receiptListeners = new CopyOnWriteArrayList();
    private final Map subscriptionsByConsumerId = new ConcurrentHashMap();
    private final Map subscriptionsByName = new ConcurrentHashMap();
    private final DestinationMap subscriptionsByDestination = new DestinationMap();
    private final Map transactions = new ConcurrentHashMap();
    private final Map dispachedMap = new ConcurrentHashMap();
    private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
    private final SessionId sessionId = new SessionId(this.connectionId, -1);
    private final ProducerId producerId = new ProducerId(this.sessionId, 1);
    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
    boolean connected = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addResponseListener(ResponseListener responseListener) {
        this.receiptListeners.add(responseListener);
    }

    public org.apache.activemq.command.Command readCommand(DataInput dataInput) throws IOException, JMSException {
        org.apache.activemq.command.Command command = (org.apache.activemq.command.Command) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn(this) { // from class: org.apache.activemq.transport.stomp.StompWireFormat.1
            private final StompWireFormat this$0;

            {
                this.this$0 = this;
            }

            @Override // org.apache.activemq.transport.stomp.AsyncHelper.HelperWithReturn
            public Object cycle() throws InterruptedException {
                return this.this$0.pendingReadCommands.poll(0L, TimeUnit.MILLISECONDS);
            }
        });
        if (command != null) {
            return command;
        }
        try {
            addToPendingReadCommands(this.commandParser.parse(dataInput));
            org.apache.activemq.command.Command command2 = (org.apache.activemq.command.Command) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn(this) { // from class: org.apache.activemq.transport.stomp.StompWireFormat.2
                private final StompWireFormat this$0;

                {
                    this.this$0 = this;
                }

                @Override // org.apache.activemq.transport.stomp.AsyncHelper.HelperWithReturn
                public Object cycle() throws InterruptedException {
                    return this.this$0.pendingReadCommands.poll(0L, TimeUnit.MILLISECONDS);
                }
            });
            if (this.connected || command2.getDataStructureType() == 3) {
                return command2;
            }
            throw new IOException("Not yet connected.");
        } catch (ProtocolException e) {
            sendError(e.getMessage());
            return FlushCommand.COMMAND;
        }
    }

    public org.apache.activemq.command.Command writeCommand(org.apache.activemq.command.Command command, DataOutput dataOutput) throws IOException, JMSException {
        flushPendingFrames(dataOutput);
        if (command == null) {
            return null;
        }
        if (command.getDataStructureType() == 30) {
            if (!$assertionsDisabled && !(command instanceof Response)) {
                throw new AssertionError();
            }
            Response response = (Response) command;
            for (int i = 0; i < this.receiptListeners.size(); i++) {
                ResponseListener responseListener = (ResponseListener) this.receiptListeners.get(i);
                if (responseListener.onResponse(response, dataOutput)) {
                    this.receiptListeners.remove(responseListener);
                    return null;
                }
            }
        }
        if (!command.isMessageDispatch()) {
            return null;
        }
        MessageDispatch messageDispatch = (MessageDispatch) command;
        messageDispatch.getMessage();
        Subscription subscription = (Subscription) this.subscriptionsByConsumerId.get(messageDispatch.getConsumerId());
        if (subscription == null) {
            return null;
        }
        subscription.receive(messageDispatch, dataOutput);
        return null;
    }

    private void flushPendingFrames(DataOutput dataOutput) throws IOException {
        byte[] bArr;
        boolean z = false;
        do {
            try {
                bArr = (byte[]) this.pendingWriteFrames.poll(0L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                z = true;
            }
            if (bArr == null) {
                return;
            } else {
                dataOutput.write(bArr);
            }
        } while (z);
    }

    private void sendError(String str) {
        AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper(this, str) { // from class: org.apache.activemq.transport.stomp.StompWireFormat.3
            private final String val$message;
            private final StompWireFormat this$0;

            {
                this.this$0 = this;
                this.val$message = str;
            }

            @Override // org.apache.activemq.transport.stomp.AsyncHelper.Helper
            public void cycle() throws InterruptedException {
                this.this$0.pendingWriteFrames.put(new FrameBuilder(Stomp.Responses.ERROR).addHeader(Stomp.Headers.Error.MESSAGE, this.val$message).toFrame());
            }
        });
    }

    public void onFullyConnected() {
        this.connected = true;
    }

    public void addToPendingReadCommands(org.apache.activemq.command.Command command) {
        AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper(this, command) { // from class: org.apache.activemq.transport.stomp.StompWireFormat.4
            private final org.apache.activemq.command.Command val$info;
            private final StompWireFormat this$0;

            {
                this.this$0 = this;
                this.val$info = command;
            }

            @Override // org.apache.activemq.transport.stomp.AsyncHelper.Helper
            public void cycle() throws InterruptedException {
                this.this$0.pendingReadCommands.put(this.val$info);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearTransactionId(String str) {
        this.transactions.remove(str);
    }

    public SessionId getSessionId() {
        return this.sessionId;
    }

    public ProducerId getProducerId() {
        return this.producerId;
    }

    public Subscription getSubcription(ConsumerId consumerId) {
        return (Subscription) this.subscriptionsByConsumerId.get(consumerId);
    }

    public Set getSubcriptions(ActiveMQDestination activeMQDestination) {
        return this.subscriptionsByDestination.get(activeMQDestination);
    }

    public Subscription getSubcription(String str) {
        return (Subscription) this.subscriptionsByName.get(str);
    }

    public void addSubscription(Subscription subscription) {
        if (subscription.getSubscriptionId() != null && this.subscriptionsByName.containsKey(subscription.getSubscriptionId())) {
            Subscription subscription2 = (Subscription) this.subscriptionsByName.get(subscription.getSubscriptionId());
            removeSubscription(subscription2);
            enqueueCommand(subscription2.close());
        }
        if (subscription.getSubscriptionId() != null) {
            this.subscriptionsByName.put(subscription.getSubscriptionId(), subscription);
        }
        this.subscriptionsByConsumerId.put(subscription.getConsumerInfo().getConsumerId(), subscription);
        this.subscriptionsByDestination.put(subscription.getConsumerInfo().getDestination(), subscription);
    }

    public void removeSubscription(Subscription subscription) {
        if (subscription.getSubscriptionId() != null) {
            this.subscriptionsByName.remove(subscription.getSubscriptionId());
        }
        this.subscriptionsByConsumerId.remove(subscription.getConsumerInfo().getConsumerId());
        this.subscriptionsByDestination.remove(subscription.getConsumerInfo().getDestination(), subscription);
    }

    public void enqueueCommand(org.apache.activemq.command.Command command) {
        AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper(this, command) { // from class: org.apache.activemq.transport.stomp.StompWireFormat.5
            private final org.apache.activemq.command.Command val$ack;
            private final StompWireFormat this$0;

            {
                this.this$0 = this;
                this.val$ack = command;
            }

            @Override // org.apache.activemq.transport.stomp.AsyncHelper.Helper
            public void cycle() throws InterruptedException {
                this.this$0.pendingReadCommands.put(this.val$ack);
            }
        });
    }

    public TransactionId getTransactionId(String str) {
        return (TransactionId) this.transactions.get(str);
    }

    public TransactionId registerTransactionId(String str, int i) {
        LocalTransactionId localTransactionId = new LocalTransactionId(getConnectionId(), i);
        this.transactions.put(str, localTransactionId);
        return localTransactionId;
    }

    public int getVersion() {
        return this.version;
    }

    public void setVersion(int i) {
        this.version = i;
    }

    public ConnectionId getConnectionId() {
        return this.connectionId;
    }

    public static synchronized int generateTransactionId() {
        int i = transactionIdCounter + 1;
        transactionIdCounter = i;
        return i;
    }

    public ConsumerId createConsumerId() {
        return new ConsumerId(this.sessionId, this.consumerIdGenerator.getNextSequenceId());
    }

    public MessageId createMessageId() {
        return new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId());
    }

    public synchronized short generateCommandId() {
        short s = this.lastCommandId;
        this.lastCommandId = (short) (s + 1);
        return s;
    }

    public SessionId generateSessionId() {
        throw new RuntimeException("TODO!!");
    }

    public Packet marshal(Object obj) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        marshal(obj, dataOutputStream);
        dataOutputStream.close();
        return new ByteArrayPacket(byteArrayOutputStream.toByteSequence());
    }

    public Object unmarshal(Packet packet) throws IOException {
        return unmarshal(new DataInputStream(new PacketInputStream(packet)));
    }

    public void marshal(Object obj, DataOutputStream dataOutputStream) throws IOException {
        try {
            writeCommand((org.apache.activemq.command.Command) obj, dataOutputStream);
        } catch (JMSException e) {
            throw IOExceptionSupport.create((Exception) e);
        } catch (IOException e2) {
            throw e2;
        }
    }

    public Object unmarshal(DataInputStream dataInputStream) throws IOException {
        try {
            return readCommand(dataInputStream);
        } catch (IOException e) {
            throw e;
        } catch (JMSException e2) {
            throw IOExceptionSupport.create((Exception) e2);
        }
    }

    public Map getDispachedMap() {
        return this.dispachedMap;
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$apache$activemq$transport$stomp$StompWireFormat == null) {
            cls = class$("org.apache.activemq.transport.stomp.StompWireFormat");
            class$org$apache$activemq$transport$stomp$StompWireFormat = cls;
        } else {
            cls = class$org$apache$activemq$transport$stomp$StompWireFormat;
        }
        $assertionsDisabled = !cls.desiredAssertionStatus();
        connectionIdGenerator = new IdGenerator();
    }
}
