ActiveMQ NMS enlisted in TransactionScope

How to enlist ActiveMQ session in the ambient transaction scope? I believe that code below is self explanatory.

Why to do so? Imagine a situation (likely to occur in SOA  + EDA scenario):

  • Service A handles a “PostOrderRequest”;
  • Service A starts a transaction;
  • Service A creates an order in its internal data storage;
  • Service A commits the transaction;
  • Service A publishes a “OrderPosted” event to the ActiveMQ bus – which fails;
  • Service B can not consume the message

or

  • Service A creates the order in the DB;
  • Service A publishes the event to the ActiveMQ – with success;
  • Service A commits the transaction – which fails (no power, CPU explodes – you name it);
  • Service A restarts;
  • Service B consumes the message (but the order is not there!);

The solution is to enlist the ActiveMQ publisher session in the transaction – the same being used for database access. Please mind that this will make the transaction promoted to distributed transaction! There are other options to introduce consistency in messaging scenarios (and to live with eventual inconsistency) but let’s assume that 2PC is our only acceptable solution (which is a very strong assumption).

using Microsoft.VisualStudio.TestTools.UnitTesting;
using Apache.NMS.ActiveMQ;
using System.Transactions;
using Test.DBAccess;

namespace ActiveMQTranScope
{
    [TestClass]
    public class UnitTest1
    {
        [TestMethod]
        public void TestMethod1()
        {
            var esbConnFactory = new NetTxConnectionFactory("failover:(tcp://localhost:61616)?transport.timeout=5000");
            using (var esbConn = esbConnFactory.CreateNetTxConnection("user", "passwrod"))
            {
                esbConn.ClientId = "unit-test";
                esbConn.Start();

                using (var session = esbConn.CreateNetTxSession())
                using (var destination = session.GetQueue("TestTransactionalQueue"))
                using (var publisher = session.CreateProducer(destination))
                using (var db = new MyDbContext("MyConnectionString"))
                {
                    using (var ts = new TransactionScope(TransactionScopeOption.Required))
                    {
                        db.MyEntities.Add(new MyEntity());

                        publisher.Send(session.CreateTextMessage("Message1"));
                        publisher.Send(session.CreateTextMessage("Message2"));
                        publisher.Send(session.CreateTextMessage("Message3"));

                        db.SaveChanges();
                        ts.Complete();
                    }
                }
            }

        }
    }
}
Advertisements