Ana içeriğe geç

MassTransit

MassTransit, .NET için mesaj tabanlı uygulamaları kolaylaştıran bir framework’tür; yanlış kullanımlar mesaj kaybına ve consumer sorunlarına yol açar.


1. Consumer’da Exception Yönetimi

Yanlış Kullanım: Exception’ı sessizce yutmak.

public class OrderConsumer : IConsumer<OrderCreated>
{
    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        try
        {
            await ProcessOrderAsync(context.Message);
        }
        catch (Exception)
        {
            // Hata yutuldu, mesaj ack'lanır, veri kaybolur
        }
    }
}

İdeal Kullanım: Retry policy tanımlayıp hataları MassTransit’e bırakın.

builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<OrderConsumer>(cfg =>
    {
        cfg.UseMessageRetry(r => r.Incremental(3,
            TimeSpan.FromSeconds(1),
            TimeSpan.FromSeconds(5)));
    });
});

public class OrderConsumer : IConsumer<OrderCreated>
{
    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        await ProcessOrderAsync(context.Message);
        // Exception fırlarsa MassTransit retry yapar, sonra _error kuyruğuna taşır
    }
}

2. Publish ve Send Farkını Bilmemek

Yanlış Kullanım: Command için Publish, event için Send kullanmak.

// Event'i Send ile göndermek - sadece bir consumer alır
await sendEndpoint.Send(new OrderCreatedEvent { OrderId = 1 });

// Command'ı Publish ile göndermek - birden fazla consumer alır
await publishEndpoint.Publish(new ProcessPaymentCommand { OrderId = 1 });

İdeal Kullanım: Event’ler için Publish, command’lar için Send kullanın.

// Event: Birden fazla consumer dinleyebilir (fan-out)
await _publishEndpoint.Publish(new OrderCreatedEvent
{
    OrderId = order.Id,
    CustomerId = order.CustomerId
});

// Command: Tek bir consumer'a yönlendirilir (point-to-point)
var endpoint = await _sendEndpointProvider.GetSendEndpoint(
    new Uri("queue:payment-processing"));
await endpoint.Send(new ProcessPaymentCommand
{
    OrderId = order.Id,
    Amount = order.TotalPrice
});

3. Message Contract’ı Yanlış Tasarlamak

Yanlış Kullanım: Büyük nesneleri mesaj olarak göndermek.

public class OrderCreatedEvent
{
    public Order Order { get; set; }         // Tüm entity grafiği
    public Customer Customer { get; set; }    // İlişkili entity'ler
    public List<Product> Products { get; set; } // Büyük liste
}

İdeal Kullanım: Mesajları küçük ve self-contained tutun.

public record OrderCreatedEvent
{
    public Guid OrderId { get; init; }
    public int CustomerId { get; init; }
    public decimal TotalPrice { get; init; }
    public DateTime CreatedAt { get; init; }
}

// Consumer detaylı veriyi kendi veri kaynağından çeker
public class OrderNotificationConsumer : IConsumer<OrderCreatedEvent>
{
    public async Task Consume(ConsumeContext<OrderCreatedEvent> context)
    {
        var orderId = context.Message.OrderId;
        var orderDetails = await _orderService.GetDetailsAsync(orderId);
        await _emailService.SendOrderConfirmationAsync(orderDetails);
    }
}

4. Saga State Machine Kullanmamak

Yanlış Kullanım: Dağıtık iş akışını manuel yönetmek.

public class OrderConsumer : IConsumer<OrderCreated>
{
    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        await _paymentService.ChargeAsync(context.Message.OrderId);
        await _inventoryService.ReserveAsync(context.Message.OrderId);
        await _shippingService.CreateAsync(context.Message.OrderId);
        // Bir adım başarısız olursa tüm flow bozulur
    }
}

İdeal Kullanım: MassTransit Saga State Machine ile yönetin.

public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Event(() => OrderCreated, x => x.CorrelateById(c => c.Message.OrderId));
        Event(() => PaymentCompleted, x => x.CorrelateById(c => c.Message.OrderId));
        Event(() => PaymentFailed, x => x.CorrelateById(c => c.Message.OrderId));

        Initially(
            When(OrderCreated)
                .Then(context => context.Saga.OrderId = context.Message.OrderId)
                .Publish(context => new ProcessPayment { OrderId = context.Saga.OrderId })
                .TransitionTo(AwaitingPayment));

        During(AwaitingPayment,
            When(PaymentCompleted)
                .Publish(context => new ReserveInventory { OrderId = context.Saga.OrderId })
                .TransitionTo(AwaitingInventory),
            When(PaymentFailed)
                .Publish(context => new CancelOrder { OrderId = context.Saga.OrderId })
                .TransitionTo(Cancelled));
    }
}

5. Outbox Pattern Kullanmamak

Yanlış Kullanım: DB kayıt ve mesaj gönderimini ayrı işlemde yapmak.

public async Task CreateOrderAsync(Order order)
{
    await _context.Orders.AddAsync(order);
    await _context.SaveChangesAsync();

    await _publishEndpoint.Publish(new OrderCreatedEvent { OrderId = order.Id });
    // SaveChanges başarılı ama Publish başarısız olabilir - tutarsızlık
}

İdeal Kullanım: MassTransit Outbox ile atomik mesaj gönderimi sağlayın.

builder.Services.AddMassTransit(x =>
{
    x.AddEntityFrameworkOutbox<AppDbContext>(o =>
    {
        o.UseSqlServer();
        o.UseBusOutbox();
    });
});

public class OrderService
{
    public async Task CreateOrderAsync(Order order)
    {
        await _context.Orders.AddAsync(order);
        await _publishEndpoint.Publish(new OrderCreatedEvent { OrderId = order.Id });
        await _context.SaveChangesAsync();
        // Mesaj ve DB kaydı aynı transaction'da, Outbox otomatik yönetir
    }
}