Conversation
There was a problem hiding this comment.
Pull request overview
This pull request refactors the asynchronous data consumer logic in the Microsoft Testing Platform to eliminate arbitrary Task.Delay calls during the drain operation. The changes simplify DrainDataAsync by replacing the retry-with-backoff mechanism with a more deterministic approach: completing the current channel, waiting for all messages to be consumed, then creating a fresh channel for any subsequent messages.
Changes:
- Removed loop detection logic from
DrainDataAsyncthat previously detected and threw exceptions for publisher/consumer loops - Changed
DrainDataAsyncfrom tracking message counts with delays to a simpler complete-and-restart approach - Removed
IEnvironmentdependency andTESTINGPLATFORM_MESSAGEBUS_DRAINDATA_ATTEMPTSenvironment variable - Changed channel and consume task fields from readonly to mutable to support channel recreation
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/Platform/Microsoft.Testing.Platform/Messages/IAsyncConsumerDataProcessor.cs | Changed DrainDataAsync return type from Task<long> to Task, removing message count tracking |
| src/Platform/Microsoft.Testing.Platform/Messages/AsynchronousMessageBus.cs | Removed IEnvironment parameter, loop detection logic, and simplified DrainDataAsync to iterate through processors |
| src/Platform/Microsoft.Testing.Platform/Messages/AsyncConsumerDataProcessor.net.cs | Refactored DrainDataAsync to complete channel and restart with fresh channel; removed message counting and error tracking logic; made channel and task fields mutable |
| src/Platform/Microsoft.Testing.Platform/Messages/AsyncConsumerDataProcessor.netstandard.cs | Refactored DrainDataAsync to complete channel and restart with fresh channel; removed message counting and error tracking logic; made channel and task fields mutable |
| src/Platform/Microsoft.Testing.Platform/Hosts/TestHostControllersTestHost.cs | Removed IEnvironment parameter from AsynchronousMessageBus constructor call |
| src/Platform/Microsoft.Testing.Platform/Hosts/TestHostBuilder.cs | Removed IEnvironment parameter from AsynchronousMessageBus constructor call |
| src/Platform/Microsoft.Testing.Platform/Helpers/EnvironmentVariableConstants.cs | Removed TESTINGPLATFORM_MESSAGEBUS_DRAINDATA_ATTEMPTS constant as it's no longer used |
| test/UnitTests/Microsoft.Testing.Platform.UnitTests/Messages/AsynchronousMessageBusTests.cs | Updated test constructor calls to remove IEnvironment parameter and use var for type inference |
| public async Task DrainDataAsync_Loop_ShouldFail() | ||
| { | ||
| using MessageBusProxy proxy = new(); | ||
| LoopConsumerA consumerA = new(proxy); | ||
| ConsumerB consumerB = new(proxy); | ||
| AsynchronousMessageBus asynchronousMessageBus = new( | ||
| var asynchronousMessageBus = new AsynchronousMessageBus( | ||
| [consumerA, consumerB], | ||
| new CTRLPlusCCancellationTokenSource(), | ||
| new SystemTask(), | ||
| new NopLoggerFactory(), | ||
| new SystemEnvironment()); | ||
| new NopLoggerFactory()); | ||
| await asynchronousMessageBus.InitAsync(); | ||
| proxy.SetBuiltMessageBus(asynchronousMessageBus); | ||
|
|
||
| await proxy.PublishAsync(consumerA, new LoopDataA()); | ||
|
|
||
| InvalidOperationException ex = await Assert.ThrowsExactlyAsync<InvalidOperationException>(asynchronousMessageBus.DrainDataAsync); | ||
| Assert.Contains("Publisher/Consumer loop detected during the drain after", ex.Message); | ||
|
|
||
| // Prevent loop to continue | ||
| consumerA.StopConsume(); | ||
| consumerB.StopConsume(); | ||
| } |
There was a problem hiding this comment.
This test expects DrainDataAsync to throw an InvalidOperationException when a publisher/consumer loop is detected. However, the refactored DrainDataAsync implementation no longer includes loop detection logic. The new implementation simply completes the channel, waits for the consume task, and restarts with a fresh channel. This test will now fail or hang indefinitely as the loop will continue after each drain cycle.
| public async Task DrainDataAsync() | ||
| { | ||
| // We go volatile because we race with Interlocked.Increment in PublishAsync | ||
| long totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed); | ||
| long totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived); | ||
| const int minDelayTimeMs = 25; | ||
| int currentDelayTimeMs = minDelayTimeMs; | ||
| while (Interlocked.CompareExchange(ref _totalPayloadReceived, totalPayloadReceived, totalPayloadProcessed) != totalPayloadProcessed) | ||
| { | ||
| // When we cancel we throw inside ConsumeAsync and we won't drain anymore any data | ||
| if (_cancellationToken.IsCancellationRequested) | ||
| { | ||
| break; | ||
| } | ||
|
|
||
| await _task.Delay(currentDelayTimeMs).ConfigureAwait(false); | ||
| currentDelayTimeMs = Math.Min(currentDelayTimeMs + minDelayTimeMs, 200); | ||
|
|
||
| if (_consumerState.Task.IsFaulted) | ||
| { | ||
| // Rethrow the exception | ||
| await _consumerState.Task.ConfigureAwait(false); | ||
| } | ||
|
|
||
| // Wait for the consumer to complete the current enqueued items | ||
| totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed); | ||
| totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived); | ||
| } | ||
|
|
||
| // It' possible that we fail and we have consumed the item | ||
| if (_consumerState.Task.IsFaulted) | ||
| { | ||
| // Rethrow the exception | ||
| await _consumerState.Task.ConfigureAwait(false); | ||
| } | ||
| _channel.Complete(); | ||
| await _consumeTask.ConfigureAwait(false); | ||
|
|
||
| return _totalPayloadReceived; | ||
| _channel = new(); | ||
| _consumeTask = _task.Run(ConsumeAsync, _cancellationToken); | ||
| } |
There was a problem hiding this comment.
Critical race condition: When DrainDataAsync completes the channel (line 74) and before creating a new channel (line 77), any concurrent calls to PublishAsync will throw InvalidOperationException when trying to write to the completed channel. This is problematic because DrainDataAsync is called at multiple synchronization points during normal execution (see CommonTestHost.cs lines 223, 229, 245, 249), not just during shutdown. The old implementation avoided this by not completing the channel during drain. Consider using a lock or other synchronization mechanism to atomically swap the old channel with a new one, or ensure no publishing can occur during drain.
| public async Task DrainDataAsync() | ||
| { | ||
| // We go volatile because we race with Interlocked.Increment in PublishAsync | ||
| long totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed); | ||
| long totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived); | ||
| const int minDelayTimeMs = 25; | ||
| int currentDelayTimeMs = minDelayTimeMs; | ||
| while (Interlocked.CompareExchange(ref _totalPayloadReceived, totalPayloadReceived, totalPayloadProcessed) != totalPayloadProcessed) | ||
| { | ||
| // When we cancel we throw inside ConsumeAsync and we won't drain anymore any data | ||
| if (_cancellationToken.IsCancellationRequested) | ||
| { | ||
| break; | ||
| } | ||
|
|
||
| await _task.Delay(currentDelayTimeMs).ConfigureAwait(false); | ||
| currentDelayTimeMs = Math.Min(currentDelayTimeMs + minDelayTimeMs, 200); | ||
|
|
||
| if (_consumerState.Task.IsFaulted) | ||
| { | ||
| // Rethrow the exception | ||
| await _consumerState.Task.ConfigureAwait(false); | ||
| } | ||
|
|
||
| // Wait for the consumer to complete the current enqueued items | ||
| totalPayloadProcessed = Volatile.Read(ref _totalPayloadProcessed); | ||
| totalPayloadReceived = Volatile.Read(ref _totalPayloadReceived); | ||
| } | ||
|
|
||
| // It' possible that we fail and we have consumed the item | ||
| if (_consumerState.Task.IsFaulted) | ||
| { | ||
| // Rethrow the exception | ||
| await _consumerState.Task.ConfigureAwait(false); | ||
| } | ||
| _channel.Writer.Complete(); | ||
| await _consumeTask.ConfigureAwait(false); | ||
|
|
||
| return _totalPayloadReceived; | ||
| _channel = CreateChannel(); | ||
| _consumeTask = _task.Run(ConsumeAsync, _cancellationToken); | ||
| } |
There was a problem hiding this comment.
Critical race condition: When DrainDataAsync completes the channel writer (line 72) and before creating a new channel (line 75), any concurrent calls to PublishAsync will throw ChannelClosedException when trying to write to the completed channel. This is problematic because DrainDataAsync is called at multiple synchronization points during normal execution (see CommonTestHost.cs lines 223, 229, 245, 249), not just during shutdown. The old implementation avoided this by not completing the channel during drain. Consider using a lock or other synchronization mechanism to atomically swap the old channel with a new one, or ensure no publishing can occur during drain.
This refactors the logic to avoid the arbitrary Task.Delay calls in DrainDataAsync, and cleans up the logic.