ActiveMQ NMS enlisted in TransactionScope

How to enlist ActiveMQ session in the ambient transaction scope? I believe that code below is self explanatory.

Why to do so? Imagine a situation (likely to occur in SOA  + EDA scenario):

  • Service A handles a “PostOrderRequest”;
  • Service A starts a transaction;
  • Service A creates an order in its internal data storage;
  • Service A commits the transaction;
  • Service A publishes a “OrderPosted” event to the ActiveMQ bus – which fails;
  • Service B can not consume the message

or

  • Service A creates the order in the DB;
  • Service A publishes the event to the ActiveMQ – with success;
  • Service A commits the transaction – which fails (no power, CPU explodes – you name it);
  • Service A restarts;
  • Service B consumes the message (but the order is not there!);

The solution is to enlist the ActiveMQ publisher session in the transaction – the same being used for database access. Please mind that this will make the transaction promoted to distributed transaction! There are other options to introduce consistency in messaging scenarios (and to live with eventual inconsistency) but let’s assume that 2PC is our only acceptable solution (which is a very strong assumption).

using Microsoft.VisualStudio.TestTools.UnitTesting;
using Apache.NMS.ActiveMQ;
using System.Transactions;
using Test.DBAccess;

namespace ActiveMQTranScope
{
    [TestClass]
    public class UnitTest1
    {
        [TestMethod]
        public void TestMethod1()
        {
            var esbConnFactory = new NetTxConnectionFactory("failover:(tcp://localhost:61616)?transport.timeout=5000");
            using (var esbConn = esbConnFactory.CreateNetTxConnection("user", "passwrod"))
            {
                esbConn.ClientId = "unit-test";
                esbConn.Start();

                using (var session = esbConn.CreateNetTxSession())
                using (var destination = session.GetQueue("TestTransactionalQueue"))
                using (var publisher = session.CreateProducer(destination))
                using (var db = new MyDbContext("MyConnectionString"))
                {
                    using (var ts = new TransactionScope(TransactionScopeOption.Required))
                    {
                        db.MyEntities.Add(new MyEntity());

                        publisher.Send(session.CreateTextMessage("Message1"));
                        publisher.Send(session.CreateTextMessage("Message2"));
                        publisher.Send(session.CreateTextMessage("Message3"));

                        db.SaveChanges();
                        ts.Complete();
                    }
                }
            }

        }
    }
}
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: