Skip to content

Comments

Cleanup MTP async data consumer#7423

Draft
Youssef1313 wants to merge 2 commits intomainfrom
dev/ygerges/mtp-consumer
Draft

Cleanup MTP async data consumer#7423
Youssef1313 wants to merge 2 commits intomainfrom
dev/ygerges/mtp-consumer

Conversation

@Youssef1313
Copy link
Member

@Youssef1313 Youssef1313 commented Feb 17, 2026

This refactors the logic to avoid the arbitrary Task.Delay calls in DrainDataAsync, and cleans up the logic.

Copilot AI review requested due to automatic review settings February 20, 2026 12:58
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request refactors the asynchronous data consumer logic in the Microsoft Testing Platform to eliminate arbitrary Task.Delay calls during the drain operation. The changes simplify DrainDataAsync by replacing the retry-with-backoff mechanism with a more deterministic approach: completing the current channel, waiting for all messages to be consumed, then creating a fresh channel for any subsequent messages.

Changes:

  • Removed loop detection logic from DrainDataAsync that previously detected and threw exceptions for publisher/consumer loops
  • Changed DrainDataAsync from tracking message counts with delays to a simpler complete-and-restart approach
  • Removed IEnvironment dependency and TESTINGPLATFORM_MESSAGEBUS_DRAINDATA_ATTEMPTS environment variable
  • Changed channel and consume task fields from readonly to mutable to support channel recreation

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/Platform/Microsoft.Testing.Platform/Messages/IAsyncConsumerDataProcessor.cs Changed DrainDataAsync return type from Task<long> to Task, removing message count tracking
src/Platform/Microsoft.Testing.Platform/Messages/AsynchronousMessageBus.cs Removed IEnvironment parameter, loop detection logic, and simplified DrainDataAsync to iterate through processors
src/Platform/Microsoft.Testing.Platform/Messages/AsyncConsumerDataProcessor.net.cs Refactored DrainDataAsync to complete channel and restart with fresh channel; removed message counting and error tracking logic; made channel and task fields mutable
src/Platform/Microsoft.Testing.Platform/Messages/AsyncConsumerDataProcessor.netstandard.cs Refactored DrainDataAsync to complete channel and restart with fresh channel; removed message counting and error tracking logic; made channel and task fields mutable
src/Platform/Microsoft.Testing.Platform/Hosts/TestHostControllersTestHost.cs Removed IEnvironment parameter from AsynchronousMessageBus constructor call
src/Platform/Microsoft.Testing.Platform/Hosts/TestHostBuilder.cs Removed IEnvironment parameter from AsynchronousMessageBus constructor call
src/Platform/Microsoft.Testing.Platform/Helpers/EnvironmentVariableConstants.cs Removed TESTINGPLATFORM_MESSAGEBUS_DRAINDATA_ATTEMPTS constant as it's no longer used
test/UnitTests/Microsoft.Testing.Platform.UnitTests/Messages/AsynchronousMessageBusTests.cs Updated test constructor calls to remove IEnvironment parameter and use var for type inference

Comment on lines 37 to 58
public async Task DrainDataAsync_Loop_ShouldFail()
{
using MessageBusProxy proxy = new();
LoopConsumerA consumerA = new(proxy);
ConsumerB consumerB = new(proxy);
AsynchronousMessageBus asynchronousMessageBus = new(
var asynchronousMessageBus = new AsynchronousMessageBus(
[consumerA, consumerB],
new CTRLPlusCCancellationTokenSource(),
new SystemTask(),
new NopLoggerFactory(),
new SystemEnvironment());
new NopLoggerFactory());
await asynchronousMessageBus.InitAsync();
proxy.SetBuiltMessageBus(asynchronousMessageBus);

await proxy.PublishAsync(consumerA, new LoopDataA());

InvalidOperationException ex = await Assert.ThrowsExactlyAsync<InvalidOperationException>(asynchronousMessageBus.DrainDataAsync);
Assert.Contains("Publisher/Consumer loop detected during the drain after", ex.Message);

// Prevent loop to continue
consumerA.StopConsume();
consumerB.StopConsume();
}
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test expects DrainDataAsync to throw an InvalidOperationException when a publisher/consumer loop is detected. However, the refactored DrainDataAsync implementation no longer includes loop detection logic. The new implementation simply completes the channel, waits for the consume task, and restarts with a fresh channel. This test will now fail or hang indefinitely as the loop will continue after each drain cycle.

Copilot uses AI. Check for mistakes.
Comment on lines +72 to 79
public async Task DrainDataAsync()
{
// We go volatile because we race with Interlocked.Increment in PublishAsync
long totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed);
long totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived);
const int minDelayTimeMs = 25;
int currentDelayTimeMs = minDelayTimeMs;
while (Interlocked.CompareExchange(ref _totalPayloadReceived, totalPayloadReceived, totalPayloadProcessed) != totalPayloadProcessed)
{
// When we cancel we throw inside ConsumeAsync and we won't drain anymore any data
if (_cancellationToken.IsCancellationRequested)
{
break;
}

await _task.Delay(currentDelayTimeMs).ConfigureAwait(false);
currentDelayTimeMs = Math.Min(currentDelayTimeMs + minDelayTimeMs, 200);

if (_consumerState.Task.IsFaulted)
{
// Rethrow the exception
await _consumerState.Task.ConfigureAwait(false);
}

// Wait for the consumer to complete the current enqueued items
totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed);
totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived);
}

// It' possible that we fail and we have consumed the item
if (_consumerState.Task.IsFaulted)
{
// Rethrow the exception
await _consumerState.Task.ConfigureAwait(false);
}
_channel.Complete();
await _consumeTask.ConfigureAwait(false);

return _totalPayloadReceived;
_channel = new();
_consumeTask = _task.Run(ConsumeAsync, _cancellationToken);
}
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical race condition: When DrainDataAsync completes the channel (line 74) and before creating a new channel (line 77), any concurrent calls to PublishAsync will throw InvalidOperationException when trying to write to the completed channel. This is problematic because DrainDataAsync is called at multiple synchronization points during normal execution (see CommonTestHost.cs lines 223, 229, 245, 249), not just during shutdown. The old implementation avoided this by not completing the channel during drain. Consider using a lock or other synchronization mechanism to atomically swap the old channel with a new one, or ensure no publishing can occur during drain.

Copilot uses AI. Check for mistakes.
Comment on lines +70 to 77
public async Task DrainDataAsync()
{
// We go volatile because we race with Interlocked.Increment in PublishAsync
long totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed);
long totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived);
const int minDelayTimeMs = 25;
int currentDelayTimeMs = minDelayTimeMs;
while (Interlocked.CompareExchange(ref _totalPayloadReceived, totalPayloadReceived, totalPayloadProcessed) != totalPayloadProcessed)
{
// When we cancel we throw inside ConsumeAsync and we won't drain anymore any data
if (_cancellationToken.IsCancellationRequested)
{
break;
}

await _task.Delay(currentDelayTimeMs).ConfigureAwait(false);
currentDelayTimeMs = Math.Min(currentDelayTimeMs + minDelayTimeMs, 200);

if (_consumerState.Task.IsFaulted)
{
// Rethrow the exception
await _consumerState.Task.ConfigureAwait(false);
}

// Wait for the consumer to complete the current enqueued items
totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed);
totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived);
}

// It' possible that we fail and we have consumed the item
if (_consumerState.Task.IsFaulted)
{
// Rethrow the exception
await _consumerState.Task.ConfigureAwait(false);
}
_channel.Writer.Complete();
await _consumeTask.ConfigureAwait(false);

return _totalPayloadReceived;
_channel = CreateChannel();
_consumeTask = _task.Run(ConsumeAsync, _cancellationToken);
}
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical race condition: When DrainDataAsync completes the channel writer (line 72) and before creating a new channel (line 75), any concurrent calls to PublishAsync will throw ChannelClosedException when trying to write to the completed channel. This is problematic because DrainDataAsync is called at multiple synchronization points during normal execution (see CommonTestHost.cs lines 223, 229, 245, 249), not just during shutdown. The old implementation avoided this by not completing the channel during drain. Consider using a lock or other synchronization mechanism to atomically swap the old channel with a new one, or ensure no publishing can occur during drain.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant