package org.codehaus.activemq.store.journal;

import EDU.oswego.cs.dl.util.concurrent.Channel;
import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.activeio.adapter.PacketByteArrayOutputStream;
import org.activeio.adapter.PacketInputStream;
import org.activeio.journal.InvalidRecordLocationException;
import org.activeio.journal.Journal;
import org.activeio.journal.JournalEventListener;
import org.activeio.journal.RecordLocation;
import org.activeio.journal.active.JournalImpl;
import org.activeio.journal.howl.HowlJournal;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.io.WireFormat;
import org.codehaus.activemq.io.impl.DefaultWireFormat;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQXid;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
import org.codehaus.activemq.store.MessageStore;
import org.codehaus.activemq.store.PersistenceAdapter;
import org.codehaus.activemq.store.TopicMessageStore;
import org.codehaus.activemq.store.TransactionStore;
import org.codehaus.activemq.util.JMSExceptionHelper;
import org.codehaus.activemq.util.TransactionTemplate;
import org.objectweb.howl.log.Configuration;

/* loaded from: input_file:activemq-2.0.jar:org/codehaus/activemq/store/journal/JournalPersistenceAdapter.class */
public class JournalPersistenceAdapter extends PersistenceAdapterSupport implements JournalEventListener {
    private static final Log log;
    public static final String DEFAULT_JOURNAL_TYPE = "default";
    public static final String HOWL_JOURNAL_TYPE = "howl";
    private Journal journal;
    private String journalType;
    private PersistenceAdapter longTermPersistence;
    private File directory;
    private WireFormat wireFormat;
    private TransactionTemplate transactionTemplate;
    private boolean sync;
    private final ConcurrentHashMap messageStores;
    private final ConcurrentHashMap topicMessageStores;
    private boolean performingRecovery;
    private static final int PACKET_RECORD_TYPE = 0;
    private static final int COMMAND_RECORD_TYPE = 1;
    private static final int TX_COMMAND_RECORD_TYPE = 2;
    private static final int ACK_RECORD_TYPE = 3;
    private Channel checkpointRequests;
    private QueuedExecutor checkpointExecutor;
    ClockDaemon clockDaemon;
    private Object clockTicket;
    private JournalTransactionStore transactionStore;
    static Class class$org$codehaus$activemq$store$journal$JournalPersistenceAdapter;

    public JournalPersistenceAdapter() {
        this.journalType = "default";
        this.directory = new File("logs");
        this.wireFormat = new DefaultWireFormat();
        this.sync = true;
        this.messageStores = new ConcurrentHashMap();
        this.topicMessageStores = new ConcurrentHashMap();
        this.checkpointRequests = new LinkedQueue();
        this.checkpointExecutor = new QueuedExecutor(new LinkedQueue());
        this.checkpointExecutor.setThreadFactory(new ThreadFactory(this) { // from class: org.codehaus.activemq.store.journal.JournalPersistenceAdapter.1
            private final JournalPersistenceAdapter this$0;

            {
                this.this$0 = this;
            }

            @Override // EDU.oswego.cs.dl.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, "Checkpoint Worker");
                thread.setDaemon(true);
                thread.setPriority(10);
                return thread;
            }
        });
    }

    public JournalPersistenceAdapter(File file, PersistenceAdapter persistenceAdapter, DefaultWireFormat defaultWireFormat) throws IOException {
        this();
        this.directory = file;
        this.longTermPersistence = persistenceAdapter;
        this.wireFormat = defaultWireFormat;
    }

    @Override // org.codehaus.activemq.store.PersistenceAdapter
    public Map getInitialDestinations() {
        return this.longTermPersistence.getInitialDestinations();
    }

    private MessageStore createMessageStore(String str, boolean z) throws JMSException {
        return z ? createQueueMessageStore(str) : createTopicMessageStore(str);
    }

    @Override // org.codehaus.activemq.store.PersistenceAdapter
    public MessageStore createQueueMessageStore(String str) throws JMSException {
        JournalMessageStore journalMessageStore = (JournalMessageStore) this.messageStores.get(str);
        if (journalMessageStore == null) {
            journalMessageStore = new JournalMessageStore(this, this.longTermPersistence.createQueueMessageStore(str), str, this.sync);
            this.messageStores.put(str, journalMessageStore);
        }
        return journalMessageStore;
    }

    @Override // org.codehaus.activemq.store.PersistenceAdapter
    public TopicMessageStore createTopicMessageStore(String str) throws JMSException {
        JournalTopicMessageStore journalTopicMessageStore = (JournalTopicMessageStore) this.topicMessageStores.get(str);
        if (journalTopicMessageStore == null) {
            journalTopicMessageStore = new JournalTopicMessageStore(this, this.longTermPersistence.createTopicMessageStore(str), str, this.sync);
            this.topicMessageStores.put(str, journalTopicMessageStore);
        }
        return journalTopicMessageStore;
    }

    @Override // org.codehaus.activemq.store.PersistenceAdapter
    public TransactionStore createTransactionStore() throws JMSException {
        if (this.transactionStore == null) {
            this.transactionStore = new JournalTransactionStore(this, this.longTermPersistence.createTransactionStore());
        }
        return this.transactionStore;
    }

    @Override // org.codehaus.activemq.store.PersistenceAdapter
    public void beginTransaction() throws JMSException {
        this.longTermPersistence.beginTransaction();
    }

    @Override // org.codehaus.activemq.store.PersistenceAdapter
    public void commitTransaction() throws JMSException {
        this.longTermPersistence.commitTransaction();
    }

    @Override // org.codehaus.activemq.store.PersistenceAdapter
    public void rollbackTransaction() {
        this.longTermPersistence.rollbackTransaction();
    }

    @Override // org.codehaus.activemq.service.Service
    public synchronized void start() throws JMSException {
        this.longTermPersistence.start();
        createTransactionStore();
        if (this.journal == null) {
            try {
                log.info("Opening journal.");
                this.journal = createJournal();
                log.info(new StringBuffer().append("Opened journal: ").append(this.journal).toString());
                this.journal.setJournalEventListener(this);
                try {
                    recover();
                } catch (Exception e) {
                    throw JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to recover transactions from journal: ").append(e).toString(), e);
                }
            } catch (Exception e2) {
                throw JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to open transaction journal: ").append(e2).toString(), e2);
            }
        }
        this.clockTicket = getClockDaemon().executePeriodically(60000L, new Runnable(this) { // from class: org.codehaus.activemq.store.journal.JournalPersistenceAdapter.2
            private final JournalPersistenceAdapter this$0;

            {
                this.this$0 = this;
            }

            @Override // java.lang.Runnable
            public void run() {
                this.this$0.checkpoint();
            }
        }, false);
    }

    @Override // org.codehaus.activemq.service.Service
    public synchronized void stop() throws JMSException {
        if (this.clockTicket != null) {
            ClockDaemon.cancel(this.clockTicket);
        }
        checkpoint();
        this.checkpointExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
        Throwable th = null;
        if (this.journal != null) {
            try {
                this.journal.close();
                this.journal = null;
            } catch (Exception e) {
                th = JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to close Howl transaction log due to: ").append(e).toString(), e);
            }
        }
        this.longTermPersistence.stop();
        if (th != null) {
            throw th;
        }
    }

    public PersistenceAdapter getLongTermPersistence() {
        return this.longTermPersistence;
    }

    public void setLongTermPersistence(PersistenceAdapter persistenceAdapter) {
        this.longTermPersistence = persistenceAdapter;
    }

    public File getDirectory() {
        return this.directory;
    }

    public void setDirectory(File file) {
        this.directory = file;
    }

    public boolean isSync() {
        return this.sync;
    }

    public void setSync(boolean z) {
        this.sync = z;
    }

    public WireFormat getWireFormat() {
        return this.wireFormat;
    }

    public void setWireFormat(WireFormat wireFormat) {
        this.wireFormat = wireFormat;
    }

    public String getJournalType() {
        return this.journalType;
    }

    public void setJournalType(String str) {
        this.journalType = str;
    }

    protected Journal createJournal() throws IOException {
        if ("default".equals(this.journalType)) {
            return new JournalImpl(this.directory);
        }
        if (!HOWL_JOURNAL_TYPE.equals(this.journalType)) {
            throw new IllegalStateException(new StringBuffer().append("Unsupported valued for journalType attribute: ").append(this.journalType).toString());
        }
        try {
            Configuration configuration = new Configuration();
            configuration.setLogFileDir(this.directory.getCanonicalPath());
            return new HowlJournal(configuration);
        } catch (IOException e) {
            throw e;
        } catch (Exception e2) {
            throw ((IOException) new IOException(new StringBuffer().append("Could not open HOWL journal: ").append(e2.getMessage()).toString()).initCause(e2));
        }
    }

    @Override // org.activeio.journal.JournalEventListener
    public void overflowNotification(RecordLocation recordLocation) {
        checkpoint();
    }

    public void checkpoint() {
        try {
            this.checkpointRequests.put(Boolean.TRUE);
            this.checkpointExecutor.execute(new Runnable(this) { // from class: org.codehaus.activemq.store.journal.JournalPersistenceAdapter.3
                private final JournalPersistenceAdapter this$0;

                {
                    this.this$0 = this;
                }

                @Override // java.lang.Runnable
                public void run() {
                    boolean z = false;
                    while (this.this$0.checkpointRequests.poll(0L) != null) {
                        try {
                            z = true;
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                    if (z) {
                        JournalPersistenceAdapter.log.info("Checkpoint started.");
                        RecordLocation recordLocation = null;
                        Iterator it = this.this$0.messageStores.values().iterator();
                        while (it.hasNext()) {
                            try {
                                RecordLocation checkpoint = ((JournalMessageStore) it.next()).checkpoint();
                                if (checkpoint != null && (recordLocation == null || recordLocation.compareTo(checkpoint) < 0)) {
                                    recordLocation = checkpoint;
                                }
                            } catch (Exception e2) {
                                JournalPersistenceAdapter.log.error(new StringBuffer().append("Failed to checkpoint a message store: ").append(e2).toString(), e2);
                            }
                        }
                        Iterator it2 = this.this$0.topicMessageStores.values().iterator();
                        while (it2.hasNext()) {
                            try {
                                RecordLocation checkpoint2 = ((JournalTopicMessageStore) it2.next()).checkpoint();
                                if (checkpoint2 != null && (recordLocation == null || recordLocation.compareTo(checkpoint2) < 0)) {
                                    recordLocation = checkpoint2;
                                }
                            } catch (Exception e3) {
                                JournalPersistenceAdapter.log.error(new StringBuffer().append("Failed to checkpoint a message store: ").append(e3).toString(), e3);
                            }
                        }
                        if (recordLocation != null) {
                            try {
                                this.this$0.journal.setMark(recordLocation, true);
                            } catch (Exception e4) {
                                JournalPersistenceAdapter.log.error(new StringBuffer().append("Failed to mark the Journal: ").append(e4).toString(), e4);
                            }
                        }
                        JournalPersistenceAdapter.log.info("Checkpoint done.");
                    }
                }
            });
        } catch (InterruptedException e) {
            log.warn(new StringBuffer().append("Request to start checkpoint failed: ").append(e).toString(), e);
        }
    }

    public RecordLocation writePacket(String str, Packet packet, boolean z) throws JMSException {
        try {
            PacketByteArrayOutputStream packetByteArrayOutputStream = new PacketByteArrayOutputStream();
            DataOutputStream dataOutputStream = new DataOutputStream(packetByteArrayOutputStream);
            dataOutputStream.writeByte(0);
            dataOutputStream.writeUTF(str);
            this.wireFormat.writePacket(packet, dataOutputStream);
            dataOutputStream.close();
            return this.journal.write(packetByteArrayOutputStream.getPacket(), z);
        } catch (IOException e) {
            throw createWriteException(packet, e);
        }
    }

    public RecordLocation writeCommand(String str, boolean z) throws JMSException {
        try {
            PacketByteArrayOutputStream packetByteArrayOutputStream = new PacketByteArrayOutputStream();
            DataOutputStream dataOutputStream = new DataOutputStream(packetByteArrayOutputStream);
            dataOutputStream.writeByte(1);
            dataOutputStream.writeUTF(str);
            dataOutputStream.close();
            return this.journal.write(packetByteArrayOutputStream.getPacket(), z);
        } catch (IOException e) {
            throw createWriteException(str, e);
        }
    }

    public Packet readPacket(RecordLocation recordLocation) throws JMSException {
        try {
            DataInputStream dataInputStream = new DataInputStream(new PacketInputStream(this.journal.read(recordLocation)));
            if (dataInputStream.readByte() != 0) {
                throw new IOException("Record is not a packet type.");
            }
            dataInputStream.readUTF();
            Packet readPacket = this.wireFormat.readPacket(dataInputStream);
            dataInputStream.close();
            return readPacket;
        } catch (IOException e) {
            throw createReadException(recordLocation, e);
        } catch (InvalidRecordLocationException e2) {
            throw createReadException(recordLocation, e2);
        }
    }

    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    /* JADX WARN: Failed to find 'out' block for switch in B:35:0x01c0. Please report as an issue. */
    /* JADX WARN: Failed to find 'out' block for switch in B:7:0x0048. Please report as an issue. */
    private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, JMSException {
        byte readByte;
        RecordLocation recordLocation = null;
        int i = 0;
        log.info("Journal Recovery Started.");
        while (true) {
            RecordLocation nextRecordLocation = this.journal.getNextRecordLocation(recordLocation);
            recordLocation = nextRecordLocation;
            if (nextRecordLocation == null) {
                this.journal.setMark(writeCommand("RECOVERED", true), true);
                log.info(new StringBuffer().append("Journal Recovered: ").append(i).append(" message(s) in transactions recovered.").toString());
                return;
            }
            DataInputStream dataInputStream = new DataInputStream(new PacketInputStream(this.journal.read(recordLocation)));
            try {
                readByte = dataInputStream.readByte();
            } finally {
            }
            switch (readByte) {
                case 0:
                    String readUTF = dataInputStream.readUTF();
                    Packet readPacket = this.wireFormat.readPacket(dataInputStream);
                    if (readPacket instanceof ActiveMQMessage) {
                        ActiveMQMessage activeMQMessage = (ActiveMQMessage) readPacket;
                        try {
                            ((JournalMessageStore) createMessageStore(readUTF, activeMQMessage.getJMSActiveMQDestination().isQueue())).getLongTermMessageStore().addMessage(activeMQMessage);
                            i++;
                        } catch (Throwable th) {
                            log.error(new StringBuffer().append("Recovery Failure: Could not add message: ").append(activeMQMessage.getJMSMessageIdentity().getMessageID()).append(", reason: ").append(th).toString(), th);
                        }
                    } else if (readPacket instanceof MessageAck) {
                        MessageAck messageAck = (MessageAck) readPacket;
                        try {
                            ((JournalMessageStore) createMessageStore(readUTF, messageAck.getDestination().isQueue())).getLongTermMessageStore().removeMessage(messageAck);
                            i++;
                        } catch (Throwable th2) {
                            log.error(new StringBuffer().append("Recovery Failure: Could not remove message: ").append(messageAck.getMessageIdentity().getMessageID()).append(", reason: ").append(th2).toString(), th2);
                        }
                    } else {
                        log.error(new StringBuffer().append("Unknown type of packet in transaction log which will be discarded: ").append(readPacket).toString());
                    }
                    dataInputStream.close();
                case 1:
                case 2:
                    TxCommand txCommand = new TxCommand();
                    txCommand.setType(dataInputStream.readByte());
                    txCommand.setWasPrepared(dataInputStream.readBoolean());
                    switch (txCommand.getType()) {
                        case 4:
                        case 5:
                            txCommand.setTransactionId(dataInputStream.readUTF());
                            break;
                        default:
                            txCommand.setTransactionId(ActiveMQXid.read(dataInputStream));
                            break;
                    }
                    try {
                    } catch (XAException e) {
                        log.error(new StringBuffer().append("Recovery Failure: Could not replay: ").append(txCommand).append(", reason: ").append(e).toString(), e);
                    }
                    switch (txCommand.getType()) {
                        case 1:
                            this.transactionStore.checkpointStore.prepare(txCommand.getTransactionId());
                        case 2:
                        case 4:
                            this.transactionStore.checkpointStore.commit(txCommand.getTransactionId(), txCommand.getWasPrepared());
                        case 3:
                        case 5:
                            this.transactionStore.checkpointStore.rollback(txCommand.getTransactionId());
                    }
                case 3:
                    dataInputStream.readUTF();
                    String readUTF2 = dataInputStream.readUTF();
                    String readUTF3 = dataInputStream.readUTF();
                    try {
                        ((JournalTopicMessageStore) createMessageStore(null, false)).getLongTermTopicMessageStore().setLastAcknowledgedMessageIdentity(readUTF2, new MessageIdentity(readUTF3));
                    } catch (Throwable th3) {
                        log.error(new StringBuffer().append("Recovery Failure: Could not ack message: ").append(readUTF3).append(", reason: ").append(th3).toString(), th3);
                    }
                default:
                    log.error(new StringBuffer().append("Unknown type of record in transaction log which will be discarded: ").append((int) readByte).toString());
            }
        }
    }

    private JMSException createReadException(RecordLocation recordLocation, Exception exc) {
        return JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to read to journal for: ").append(recordLocation).append(". Reason: ").append(exc).toString(), exc);
    }

    protected JMSException createWriteException(Packet packet, Exception exc) {
        return JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to write to journal for: ").append(packet).append(". Reason: ").append(exc).toString(), exc);
    }

    private XAException createWriteException(TxCommand txCommand, Exception exc) {
        return new XAException(new StringBuffer().append("Failed to write to journal for: ").append(txCommand).append(". Reason: ").append(exc).toString()).initCause(exc);
    }

    protected JMSException createWriteException(String str, Exception exc) {
        return JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to write to journal for command: ").append(str).append(". Reason: ").append(exc).toString(), exc);
    }

    protected JMSException createRecoveryFailedException(Exception exc) {
        return JMSExceptionHelper.newJMSException(new StringBuffer().append("Failed to recover from journal. Reason: ").append(exc).toString(), exc);
    }

    public ClockDaemon getClockDaemon() {
        if (this.clockDaemon == null) {
            this.clockDaemon = new ClockDaemon();
            this.clockDaemon.setThreadFactory(new ThreadFactory(this) { // from class: org.codehaus.activemq.store.journal.JournalPersistenceAdapter.4
                private final JournalPersistenceAdapter this$0;

                {
                    this.this$0 = this;
                }

                @Override // EDU.oswego.cs.dl.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    Thread thread = new Thread(runnable, "Checkpoint Timmer");
                    thread.setDaemon(true);
                    return thread;
                }
            });
        }
        return this.clockDaemon;
    }

    public void setClockDaemon(ClockDaemon clockDaemon) {
        this.clockDaemon = clockDaemon;
    }

    public RecordLocation writeTxCommand(TxCommand txCommand, boolean z) throws XAException {
        try {
            PacketByteArrayOutputStream packetByteArrayOutputStream = new PacketByteArrayOutputStream();
            DataOutputStream dataOutputStream = new DataOutputStream(packetByteArrayOutputStream);
            dataOutputStream.writeByte(2);
            dataOutputStream.writeByte(txCommand.getType());
            dataOutputStream.writeBoolean(txCommand.getWasPrepared());
            switch (txCommand.getType()) {
                case 4:
                case 5:
                    dataOutputStream.writeUTF((String) txCommand.getTransactionId());
                    break;
                default:
                    ((ActiveMQXid) txCommand.getTransactionId()).write(dataOutputStream);
                    break;
            }
            dataOutputStream.close();
            return this.journal.write(packetByteArrayOutputStream.getPacket(), z);
        } catch (IOException e) {
            throw createWriteException(txCommand, e);
        }
    }

    public RecordLocation writePacket(String str, String str2, MessageIdentity messageIdentity, boolean z) throws JMSException {
        try {
            PacketByteArrayOutputStream packetByteArrayOutputStream = new PacketByteArrayOutputStream();
            DataOutputStream dataOutputStream = new DataOutputStream(packetByteArrayOutputStream);
            dataOutputStream.writeByte(3);
            dataOutputStream.writeUTF(str);
            dataOutputStream.writeUTF(str2);
            dataOutputStream.writeUTF(messageIdentity.getMessageID());
            dataOutputStream.close();
            return this.journal.write(packetByteArrayOutputStream.getPacket(), z);
        } catch (IOException e) {
            throw createWriteException(new StringBuffer().append("Ack for message: ").append(messageIdentity).toString(), e);
        }
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$org$codehaus$activemq$store$journal$JournalPersistenceAdapter == null) {
            cls = class$("org.codehaus.activemq.store.journal.JournalPersistenceAdapter");
            class$org$codehaus$activemq$store$journal$JournalPersistenceAdapter = cls;
        } else {
            cls = class$org$codehaus$activemq$store$journal$JournalPersistenceAdapter;
        }
        log = LogFactory.getLog(cls);
    }
}
