Ana içeriğe geç

Channel-Based Queue

System.Threading.Channels, producer-consumer senaryoları için yüksek performanslı asenkron kuyruk sağlar; yanlış kullanımlar bellek sorunlarına ve veri kaybına yol açar.


1. Unbounded Channel ile Bellek Sorunu

Yanlış Kullanım: Sınırsız kapasite ile bellek tükenmesi riski oluşturmak.

var channel = Channel.CreateUnbounded<WorkItem>();

// Producer hızlı, consumer yavaşsa kuyruk sınırsız büyür
while (true)
{
    var item = await GetNextItemAsync();
    await channel.Writer.WriteAsync(item); // Bellek tükenene kadar yazılır
}

İdeal Kullanım: Bounded Channel ile kapasite limiti ve backpressure uygulayın.

var channel = Channel.CreateBounded<WorkItem>(new BoundedChannelOptions(1000)
{
    FullMode = BoundedChannelFullMode.Wait, // Kuyruk dolunca producer bekler
    SingleReader = false,
    SingleWriter = false
});

// DI kaydı
builder.Services.AddSingleton(Channel.CreateBounded<WorkItem>(new BoundedChannelOptions(1000)
{
    FullMode = BoundedChannelFullMode.Wait
}));

2. Background Worker ile Channel Entegrasyonu

Yanlış Kullanım: Controller’da uzun süren işlemi senkron yapmak.

[HttpPost("api/reports")]
public async Task<IActionResult> GenerateReport(ReportRequest request)
{
    var report = await _reportService.GenerateAsync(request); // 30 saniye sürer
    return Ok(report); // İstemci bekler, timeout olabilir
}

İdeal Kullanım: Channel ile isteği kuyruğa atıp hemen yanıt dönün.

[HttpPost("api/reports")]
public async Task<IActionResult> GenerateReport(ReportRequest request)
{
    var jobId = Guid.NewGuid();
    await _channel.Writer.WriteAsync(new ReportJob(jobId, request));
    return Accepted(new { JobId = jobId, StatusUrl = $"/api/reports/status/{jobId}" });
}

public class ReportWorker : BackgroundService
{
    private readonly Channel<ReportJob> _channel;
    private readonly IServiceScopeFactory _scopeFactory;

    public ReportWorker(Channel<ReportJob> channel, IServiceScopeFactory scopeFactory)
    {
        _channel = channel;
        _scopeFactory = scopeFactory;
    }

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        await foreach (var job in _channel.Reader.ReadAllAsync(ct))
        {
            using var scope = _scopeFactory.CreateScope();
            var service = scope.ServiceProvider.GetRequiredService<IReportService>();
            await service.GenerateAsync(job.Request, ct);
        }
    }
}

3. Tek Consumer ile Düşük Throughput

Yanlış Kullanım: Tek consumer ile kuyruğu yavaş işlemek.

protected override async Task ExecuteAsync(CancellationToken ct)
{
    await foreach (var item in _channel.Reader.ReadAllAsync(ct))
    {
        await ProcessAsync(item); // Tek seferde bir item işlenir
    }
}

İdeal Kullanım: Birden fazla consumer ile paralel işleme yapın.

protected override async Task ExecuteAsync(CancellationToken ct)
{
    var workerCount = Environment.ProcessorCount;
    var tasks = Enumerable.Range(0, workerCount)
        .Select(_ => ProcessChannelAsync(ct));

    await Task.WhenAll(tasks);
}

private async Task ProcessChannelAsync(CancellationToken ct)
{
    await foreach (var item in _channel.Reader.ReadAllAsync(ct))
    {
        try
        {
            using var scope = _scopeFactory.CreateScope();
            var processor = scope.ServiceProvider.GetRequiredService<IItemProcessor>();
            await processor.ProcessAsync(item, ct);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Item işlenirken hata: {ItemId}", item.Id);
        }
    }
}

4. Channel Completion’ı Yönetmemek

Yanlış Kullanım: Channel’ı kapatmadan uygulamayı sonlandırmak.

public class ImportService
{
    public async Task ImportAsync(IEnumerable<Record> records)
    {
        foreach (var record in records)
        {
            await _channel.Writer.WriteAsync(record);
        }
        // Writer tamamlandı ama consumer hala bekliyor
    }
}

İdeal Kullanım: Writer tamamlandığında Complete ile bildirin.

public class ImportService
{
    public async Task ImportAsync(IEnumerable<Record> records, CancellationToken ct)
    {
        try
        {
            foreach (var record in records)
            {
                await _channel.Writer.WriteAsync(record, ct);
            }
        }
        finally
        {
            _channel.Writer.Complete();
        }
    }
}

// Consumer
protected override async Task ExecuteAsync(CancellationToken ct)
{
    await foreach (var item in _channel.Reader.ReadAllAsync(ct))
    {
        await ProcessAsync(item, ct);
    }
    // Writer Complete() çağrıldığında döngü otomatik biter
    _logger.LogInformation("Tüm kayıtlar işlendi");
}

5. FullMode Stratejisini Yanlış Seçmek

Yanlış Kullanım: Önemli mesajları DropOldest ile kaybetmek.

var channel = Channel.CreateBounded<PaymentEvent>(new BoundedChannelOptions(100)
{
    FullMode = BoundedChannelFullMode.DropOldest // Ödeme event'leri kaybolabilir!
});

İdeal Kullanım: İş gereksinimine göre uygun FullMode seçin.

// Kritik mesajlar için: Wait (backpressure uygula)
var paymentChannel = Channel.CreateBounded<PaymentEvent>(new BoundedChannelOptions(1000)
{
    FullMode = BoundedChannelFullMode.Wait // Producer bekler, mesaj kaybolmaz
});

// Telemetri/metrik gibi kayıp tolere edilebilir veriler için: DropOldest
var metricsChannel = Channel.CreateBounded<MetricEvent>(new BoundedChannelOptions(500)
{
    FullMode = BoundedChannelFullMode.DropOldest // Eski metrikler atılabilir
});