RabbitMQ MassTransit Kullanarak Producer Consumer Yapısı

Daha önceki RabbitMQ Nedir yazımızda genel hatlarıyla rabbitmq dan bahsedip windows makina üzerinde kurulumunu anlatıp kısaca; erişilebilirliği, güvenilirliği yüksek ve ölçeklenebilen loosly-coupled asenkron iletişim sağlayan uygulamalar geliştirmektir diye tanımlamıştık.

Bu yazımızda ise Masstransit kullanarak bir Producer Consumer projesi oluşturacağız ancak makinamızda rabbitmq kurulu çalışır vaziyette olduğundan emin olalım.

Enterprise Service Bus (ESB) Nedir ?

Enterprise service bus diğer ismiyle message broker; rabbitmq gibi messaging katmanlarının üzerinde bulunan ve distributed synchronous yada asynchronous communication sağlayabilen bir middleware' dir. Bize birbirinden tamamen farklı olabilen uygulamalar arasında message-based communication yapabilmemizi sağlayan bir transport gateway'i dir. Diğer bir deyişle; message'ları provider ve consumer arasında transport eden bir çeşit middleware tool'u dur. 

 

 

Şimdi ise masstransit kullanarak bir örnek üzerinden ilerleyelim. Örneğimiz şu şekilde olsun; günlük şirket bültenini tüm çalışanlara email olarak gönderen bir producer/consumer yapısı tasarlayalım.

VS da sırasıyla 3 proje oluşturacağız.

  1. RMQMessage(class lib.)
  2. RMQProducer(console app.)
  3. RMQConsumer(console app.)

1-) RMQMessage

İlk projemiz olan RMQMessage ile başlayalım. Vs da RMQMessage adından bir class-library oluşturalım. Masstransit'in message-base communication sağladığını söylemiştik ve hem producer hemde consumer tarafından kullanılacak bu message yada contract diyede adlandırdığımız model ve onun sub-model'lerini oluşturacağız. 

İlk olarak abstract olan BaseMessage.cs'i oluşturalım ve bu model içerisinde ortak olmasını istediğimiz property'ler yer alsın.

    public abstract class BaseMessage
    {
        public DateTime PublishedTime { get; set; }
        public DateTime ConsumedTime { get; set; }
        public Guid QueueId { get; set; }
    }

Sonrasında yukarıda bahsettiğimiz producer ve consumer'ın kullanacağı NewsletterMailMessage adında contract'ımızı aşağıdaki oluşturalım.

    public class NewsletterMailMessage: BaseMessage
    {
        public List<string> AddressList { get; set; }
        public NewsletterModel NewsLetter { get; set; }
    }
    public class NewsletterModel
    {
        public string MailSubject{ get; set; }
        public string HtmlContent { get; set; }
    }

Yukarıda modellerimizi oluşturduk ve RMQMessage projesindeki işimiz bitti. Şimdi sırada 2. projeyi oluşturmak var.

2-) RMQProducer

Bunun için aynı solution'da RMQProducer adında bir console app oluşturalım. Bu proje queue'ya ilgili message'ı publish etmeden sorumlu olacak. Projeyi oluşturduktan sonra nuget üzerinden package manager console kullanarak aşağıdaki masstransit.rabbitmq paketini yükleyelim.

Masstransit.RabbitMQ

 Install-Package MassTransit.RabbitMQ

Bu paket ile birlikte rabbitmq için kullanılan masstransit kütüphanesi onun dependency'leri projemize kurulmuş olacak.

Kurulum tamamlandıktan sonra RMQMessage projesini RMQProducer projesine reference olarak ekleyelim. Ekleme nedenimiz NewsletterMailMessage.cs modelini kullanabilmek.

Program.cs içerisine aşağıdaki gibi InitializeBus adında bir metot tanımlayalım ve bu metot IBusControl arayüzünü initialize etmekten sorumlu olsun.

    static IBusControl InitializeBus()
    {
        return Bus.Factory.CreateUsingRabbitMq(cfg =>
         {
             cfg.Host(new Uri("rabbitmq://localhost/"), h =>
             {
                 h.Username("guest");
                 h.Password("guest");
             });
         });
    }

local makinada kurulu olan rabbitmq url'i rabbitmq://localhost/ dir ve default username password ise guest olarak tanımlanmıştır.

RabbitMQ ya bağlantı kısmını halletik şimdi ise message objemizi initialize edelim.

    static NewsletterMailMessage InitializeMessage()
    {
        var message = new NewsletterMailMessage
        {
            AddressList = _customerService.GetAllMailAddresses(), //assume more than 2000
            NewsLetter = new NewsletterModel
            {
                MailSubject = "Daily Newsletter of Contoso Corp.",
                HtmlContent = "Lorem ipsum dolor sit amet, et nam mucius docendi hendrerit, an usu decore mandamus. Ei qui quod decore, cum nulla nostrud erroribus ut, est eu aperiri interesset. Legere mentitum per an. Hinc legimus nostrum cu vix."
            },
            PublishedTime = DateTime.Now,
            QueueId = Guid.NewGuid()
        };

        Console.WriteLine("Message published !\nSubject : " + message.NewsLetter.MailSubject + "\nPublished at : " + message.PublishedTime + "\nQueueId : " + message.QueueId);
        return message;
    }

Son adım olarak main fonksiyonu içerisinde bu iki metodu call ederek message'ı queue'ya push edip Producer projemizdeki işlemleri bitirelim.

   static void Main(string[] args)
   {
       var busControl = InitializeBus();
       busControl.Start();
       Console.WriteLine("Started publishing.");

       var message = InitializeMessage();

       busControl.Publish(message);
       Console.ReadLine();
   }

 

3-) RMQConsumer

Sırada son adım olan consumer projesini oluşturma var. Consumer Producer'ın gönderdiği message'ları dinleyerek ilgili queue için consume etmeden sorumlu. Projeyi oluşturduktan sonra nuget üzerinden package manager console kullanarak aşağıdaki masstransit.rabbitmq paketini yükleyelim.

Masstransit.RabbitMQ

 Install-Package MassTransit.RabbitMQ

Bu paket ile birlikte rabbitmq için kullanılan masstransit kütüphanesi onun dependency'leri projemize kurulmuş olacak.

Kurulum tamamlandıktan sonra RMQMessage projesini RMQConsumer projesine reference olarak ekleyelim. Ekleme nedenimiz NewsletterMailMessage.cs modelini kullanabilmek.

İlk olarak rabbitmq ile connection sağlama ve hangi queue kullanılacak gibi konfigurasyonları Pragram.cs içerisine tanımlayalım.

    static void Main(string[] args)
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
            {
                h.Username("guest");
                h.Password("guest");
            });

            cfg.ReceiveEndpoint(host, "DailyNewsletterMail", e =>
            e.Consumer<NewsletterMailMessageConsumer>());
        });

        busControl.Start();
        Console.WriteLine("Started consuming.");
        Console.ReadLine();
    }

Yukarıdaki kod bloğu şunu söylüyor; eğer rabbitmq local makinamızda kurulu ise adresi rabbitmq://localhost/ şeklindedir ve default userName-password guest olarak gelmektedir. Queue ismi olarak DailyNewsletterMail verdik ve bu queue için consume edilecek olan message da NewsletterMailMessageConsumer şeklinde belirttik.

Şimdi ise NewsletterMailMessageConsumer.cs adında message objemizi consume edecek olan kısmı geliştirelim.

public class NewsletterMailMessageConsumer : IConsumer<NewsletterMailMessage>
{
    public Task Consume(ConsumeContext<NewsletterMailMessage> context)
    {
        var message = context.Message;
        message.ConsumedTime = DateTime.Now;

        Console.WriteLine("Message consumed !\nSubject : " + message.NewsLetter.MailSubject + "\nConsumed at: " + message.ConsumedTime + "\nQueueId : " + message.QueueId);

        //todo send mail impr.

        return context.CompleteTask;
    }
}

Yukarıdaki kod blou şunu anlatıyor; NewsletterMailMessageConsumer adında IConsumer interface'inin implement eden ve bu implementasyon sonucunda Consume metoduna sahip olan bir consumer'ı mız var. Bu consumer DailyNewsletterMail queue'suna gelen NewsletterMailMessage modelini consume eder.

Sırasıyla Consumer'ı ve Producer'ı run ettiğimizde ilk olarak rmq management-console da DailyNewsletterMail queue'su aşağıdaki gibi listelenecektir.

 Producer ve Consumer projelerinin ayrı ayrı console çıktıları ise aşağıdaki gibi olacaktır.

Aynı QueueId ye ait message'ımız Producer dan çıkıp consumer'a ulaştığı bilgisini yukarıdaki gibi görüyoruz.

Queue yapıları özellikle async mesajlaşma gerektiren yerlerde oldukça önemlidir ve hayat kurtarır. Bu yazıda service bus'lar dan Masstransit kullanarak basit bir consume/producer uygulaması geliştirdik ve daha sonraki yazılarımızda diğer queue yapıları ile ilgilide konuları ele almaya devam edeceğiz.

Yorumlar (2) -

  • Umit

    9.10.2017 13:28:52 | Yanıtla

    Merhaba,
    Öncelikle makale için teşekkürler.
    "Kurulum tamamlandıktan sonra RMQConsumer projesini RMQProducer projesine reference olarak ekleyelim. Ekleme nedenimiz NewsletterMailMessage.cs modelini kullanabilmek."

    demişsiniz RMQConsumer projesine RMQMessage projesin referans olarak eklememiz gerekmez miydi?
    Diğer bir nokta Receiverın  olmadığını sayayarak Producer DailyNewsletterMail kuyruğunu gönderim yapacağını nasıl algılıyor?

    • caner

      10.10.2017 06:15:54 | Yanıtla

      Merhaba,
      Değerli yorumun için teşekkürler. Seninde söylediğin gibi proje isimlerinde ufak bir hata olmuş yazarken. NewsletterMailMessage.cs RMQMessage projesinde olduğundan ve bu class'ı kullanmak istediğimiz projede RMQConsumer olduğundan doğrusu "Kurulum tamamlandıktan sonra RMQMessage projesini RMQConsumer projesine reference olarak ekleyelim" olacak.

      İkinci soruya gelecek olursak; Masstransit'in message-based communication sağlayan bir gateway olduğunu söylemiştik. RMQPublisher sadece queue'ya NewsletterMailMessage tipindeki sınıfı publish etmekten sorumlu. Yani RMQPublisher hangi kuyruğa push yapacağını bilmiyor.

      Eğer receiver projesi 1 kere dahi çalıştırılıp DailyNewsletterMail kuyruğu rabbitmq'ya register olup rabbitmq arayüz sayfasında görebiliyorsak queue'ya publish edilen message'lar DailyNewsletterMail kuyruğuna iletilir.
        * Eğer receiver ayaktaysa yani çalışıyorsa rabbitmq'dan bu message'ı consume eder.
        * Eğer çalışmıyorsa bu message'ları rabbitmq arayüzünde de görebileceğimiz iletilmek üzere Ready state'inde tutulur ve receiver tekrardan çalıştığında consume etmeye devam eder.

      Tekrar teşekkürler güzel yorumunuz için.

Yorum ekle

Loading