From fa33fbe9033da94a5d936553bf004a8a4fb41aa8 Mon Sep 17 00:00:00 2001 From: Damon Barry Date: Tue, 30 Jun 2026 17:22:15 -0700 Subject: [PATCH 1/8] Prevent message orphaning with checkpoint retry and recovery --- .../storage/MessageStore.cs | 54 ++++++++- .../modules/RoutingModule.cs | 13 ++ .../IMessageStore.cs | 6 + .../statemachine/EndpointExecutorFsm.cs | 75 ++++++++++-- .../storage/MessageStoreTest.cs | 111 ++++++++++++++++++ .../StoringAsyncEndpointExecutorTest.cs | 2 + .../statemachine/EndpointExecutorFsmTest.cs | 65 ++++++++++ 7 files changed, 313 insertions(+), 13 deletions(-) diff --git a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs index ab37688e7d3..b9acf89be20 100644 --- a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs +++ b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs @@ -145,6 +145,15 @@ public Task GetMessageCountFromOffset(string endpointId, long offset) return sequentialStore.GetCountFromOffset(offset); } + /// + /// Triggers an immediate cleanup attempt. Used for connection recovery + /// to retry checkpoint commits that may have failed during network outages. + /// + public void TriggerCleanup() + { + this.messagesCleaner.TriggerCleanup(); + } + public void Dispose() { this.Dispose(true); @@ -230,6 +239,16 @@ public void Dispose() // Not disposing the cleanup task, in case it is not completed yet. } + /// + /// Triggers an immediate cleanup attempt. Called when cloud connection is restored + /// to retry checkpoint commits for messages that were sent but not acknowledged. + /// + public void TriggerCleanup() + { + this.EnsureCleanupTask(null); + Events.CleanupTriggeredByConnectionRecovery(); + } + void EnsureCleanupTask(object state) { if (this.cleanupTask == null || this.cleanupTask.IsCompleted) @@ -309,6 +328,10 @@ private async Task CleanQueue(bool checkEntireQueueOnCleanup) Events.CleanupCheckpointState(messageQueueId, checkpointData); int cleanupEntityStoreCount = 0; + // Track orphaned messages for observability + int orphanedMessageCount = 0; + DateTime earliestOrphanedMessageTime = DateTime.MaxValue; + async Task DeleteMessageCallback(long offset, MessageRef messageRef) { var expiry = messageRef.TimeStamp + messageRef.TimeToLive; @@ -317,6 +340,16 @@ async Task DeleteMessageCallback(long offset, MessageRef messageRef) return false; } + // Detect orphaned messages (expired but can't clean due to offset gap) + if (offset > checkpointData.Offset && expiry <= DateTime.UtcNow) + { + orphanedMessageCount++; + if (messageRef.TimeStamp < earliestOrphanedMessageTime) + { + earliestOrphanedMessageTime = messageRef.TimeStamp; + } + } + var message = await this.TryDecrementRefCountUpdate(messageRef.EdgeMessageId, messageQueueId); await message.ForEachAsync(async msg => @@ -378,6 +411,13 @@ await message.ForEachAsync(async msg => totalCleanupCount += cleanupCount; totalCleanupStoreCount += cleanupEntityStoreCount; Events.CleanupCompleted(messageQueueId, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount); + + // Log orphaned messages for observability + if (orphanedMessageCount > 0) + { + TimeSpan oldestOrphanAge = DateTime.UtcNow - earliestOrphanedMessageTime; + Events.OrphanedMessagesDetected(messageQueueId, orphanedMessageCount, checkpointData.Offset, oldestOrphanAge); + } } catch (Exception ex) { @@ -418,7 +458,9 @@ enum EventIds MessageAdded, ErrorGettingMessagesBatch, CreatedCleanupProcessor, - ErrorUpdatingMessageForEndpoint + ErrorUpdatingMessageForEndpoint, + CleanupTriggeredByConnectionRecovery, + OrphanedMessagesDetected } public static void MessageStoreCreated() @@ -441,6 +483,11 @@ public static void CleanupTaskInitialized() Log.LogInformation((int)EventIds.CleanupTaskStarted, "Started task to cleanup processed and stale messages"); } + public static void CleanupTriggeredByConnectionRecovery() + { + Log.LogInformation((int)EventIds.CleanupTriggeredByConnectionRecovery, "Triggering cleanup due to cloud connection recovery to retry pending checkpoint commits"); + } + public static void ErrorCleaningMessagesForEndpoint(Exception ex, string endpointId) { Log.LogWarning((int)EventIds.ErrorCleaningMessagesForEndpoint, ex, Invariant($"Error cleaning up messages for endpoint {endpointId}")); @@ -462,6 +509,11 @@ public static void CleanupCompleted(string endpointId, int queueMessagesCount, i Log.LogDebug((int)EventIds.CleanupCompleted, Invariant($"Total messages cleaned up from queue for endpoint {endpointId} = {totalQueueMessagesCount}, and total messages cleaned up for message store = {totalStoreMessagesCount}.")); } + public static void OrphanedMessagesDetected(string endpointId, int orphanedCount, long checkpointOffset, TimeSpan oldestAge) + { + Log.LogWarning((int)EventIds.OrphanedMessagesDetected, Invariant($"Detected {orphanedCount} orphaned message(s) in endpoint {endpointId}. Checkpoint offset={checkpointOffset}, oldest message age={oldestAge.TotalSeconds:F1}s. Messages are stuck in store because checkpoint has not advanced. This indicates a potential message acknowledgment failure during network disruption. Checkpoint retries or the cleanup trigger on connection recovery should resolve this.")); + } + public static void ErrorGettingMessagesBatch(string entityName, Exception ex) { Log.LogWarning((int)EventIds.ErrorGettingMessagesBatch, ex, $"Error getting next batch for endpoint {entityName}."); diff --git a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/modules/RoutingModule.cs b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/modules/RoutingModule.cs index 106fdf063eb..06903ab1f7b 100644 --- a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/modules/RoutingModule.cs +++ b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Service/modules/RoutingModule.cs @@ -523,6 +523,7 @@ protected override void Load(ContainerBuilder builder) var connectionManagerTask = c.Resolve>(); var subscriptionProcessorTask = c.Resolve>(); var deviceScopeIdentitiesCacheTask = c.Resolve>(); + var messageStoreTask = this.isStoreAndForwardEnabled ? c.Resolve>() : null; Router router = await routerTask; ITwinManager twinManager = await twinManagerTask; IConnectionManager connectionManager = await connectionManagerTask; @@ -539,6 +540,18 @@ protected override void Load(ContainerBuilder builder) invokeMethodHandler, subscriptionProcessor, deviceScopeIdentitiesCache); + + // Subscribe MessageStore to connection recovery events + // to trigger cleanup when cloud connection is restored + if (messageStoreTask != null) + { + IMessageStore messageStore = await messageStoreTask; + connectionManager.CloudConnectionEstablished += (sender, identity) => + { + messageStore.TriggerCleanup(); + }; + } + return hub; }) .As>() diff --git a/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/IMessageStore.cs b/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/IMessageStore.cs index 4f06a598da6..5e0adfcd700 100644 --- a/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/IMessageStore.cs +++ b/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/IMessageStore.cs @@ -51,5 +51,11 @@ public interface IMessageStore : IDisposable /// Returns the number of messages in the store from a offset /// Task GetMessageCountFromOffset(string endpointId, long offset); + + /// + /// Triggers an immediate cleanup attempt. Called when cloud connection is restored + /// to retry checkpoint commits for messages that were sent but not acknowledged. + /// + void TriggerCleanup(); } } diff --git a/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/endpoints/statemachine/EndpointExecutorFsm.cs b/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/endpoints/statemachine/EndpointExecutorFsm.cs index 0f1b38b6014..095d2006574 100644 --- a/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/endpoints/statemachine/EndpointExecutorFsm.cs +++ b/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/endpoints/statemachine/EndpointExecutorFsm.cs @@ -386,20 +386,47 @@ static async Task EnterCheckpointingAsync(EndpointExecutorFsm thisPtr) try { Preconditions.CheckNotNull(thisPtr.currentCheckpointCommand); - using (var cts = new CancellationTokenSource(thisPtr.config.Timeout)) + ISinkResult result = thisPtr.currentCheckpointCommand.Result; + + if (result.Succeeded.Any() || result.InvalidDetailsList.Any()) { - ISinkResult result = thisPtr.currentCheckpointCommand.Result; + ICollection toCheckpoint = result.InvalidDetailsList.Count > 0 + ? result.Succeeded.Concat(result.InvalidDetailsList.Select(i => i.Item)).ToList() + : result.Succeeded; + ICollection remaining = result.Failed; - if (result.Succeeded.Any() || result.InvalidDetailsList.Any()) + Events.Checkpoint(thisPtr, result); + + // Attempt checkpoint commit with retry logic + const int MaxCommitRetries = 3; + Exception lastException = null; + + for (int attempt = 1; attempt <= MaxCommitRetries; attempt++) { - ICollection toCheckpoint = result.InvalidDetailsList.Count > 0 - ? result.Succeeded.Concat(result.InvalidDetailsList.Select(i => i.Item)).ToList() - : result.Succeeded; - ICollection remaining = result.Failed; - - Events.Checkpoint(thisPtr, result); - await thisPtr.Checkpointer.CommitAsync(toCheckpoint, remaining, Option.None(), thisPtr.unhealthySince, cts.Token); - Events.CheckpointSuccess(thisPtr, result); + try + { + using (var cts = new CancellationTokenSource(thisPtr.config.Timeout)) + { + await thisPtr.Checkpointer.CommitAsync(toCheckpoint, remaining, Option.None(), thisPtr.unhealthySince, cts.Token); + Events.CheckpointSuccess(thisPtr, result); + break; // Success, exit retry loop + } + } + catch (Exception ex) when (attempt < MaxCommitRetries) + { + lastException = ex; + Events.CheckpointCommitRetry(thisPtr, attempt, ex); + + // Exponential backoff: 100ms, 200ms, 400ms + int delayMs = 100 * (int)Math.Pow(2, attempt - 1); + await Task.Delay(delayMs); + } + catch (Exception ex) when (attempt == MaxCommitRetries) + { + lastException = ex; + Events.CheckpointCommitFailed(thisPtr, MaxCommitRetries, ex); + throw; + } } } @@ -589,7 +616,9 @@ enum EventIds UpdateEndpoint, UpdateEndpointSuccess, UpdateEndpointFailure, - CheckRetryInnerException + CheckRetryInnerException, + CheckpointCommitRetry, + CheckpointCommitFailed } public static void StateEnter(EndpointExecutorFsm fsm) @@ -722,6 +751,28 @@ public static void CheckpointFailure(EndpointExecutorFsm fsm, Exception ex) GetContextString(fsm)); } + public static void CheckpointCommitRetry(EndpointExecutorFsm fsm, int attempt, Exception ex) + { + Log.LogWarning( + (int)EventIds.CheckpointCommitRetry, + ex, + "[CheckpointCommitRetry] Checkpoint commit attempt {0} failed, retrying. CheckpointOffset: {1}, {2}", + attempt, + fsm.Status.CheckpointerStatus.Offset, + GetContextString(fsm)); + } + + public static void CheckpointCommitFailed(EndpointExecutorFsm fsm, int maxAttempts, Exception ex) + { + Log.LogError( + (int)EventIds.CheckpointCommitFailed, + ex, + "[CheckpointCommitFailed] Checkpoint commit failed after {0} attempts. CheckpointOffset: {1}, {2}", + maxAttempts, + fsm.Status.CheckpointerStatus.Offset, + GetContextString(fsm)); + } + public static void CheckRetryInnerException(Exception ex, bool retry) { Log.LogDebug((int)EventIds.CheckRetryInnerException, ex, $"[CheckRetryInnerException] Decision to retry exception of type {ex.GetType()} is {retry}"); diff --git a/edge-hub/core/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/storage/MessageStoreTest.cs b/edge-hub/core/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/storage/MessageStoreTest.cs index 30f40974a11..9544cc2e967 100644 --- a/edge-hub/core/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/storage/MessageStoreTest.cs +++ b/edge-hub/core/test/Microsoft.Azure.Devices.Edge.Hub.Core.Test/storage/MessageStoreTest.cs @@ -541,6 +541,117 @@ public async Task MessageStoreAddRemoveEndpointTest(bool checkEntireQueueOnClean } } + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task TestTriggerCleanupWakesCleanupTask(bool checkEntireQueueOnCleanup) + { + // Verify calling TriggerCleanup() immediately starts cleanup + var result = await this.GetMessageStore(checkEntireQueueOnCleanup, ttlSecs: 1, messageCleanupIntervalSecs: 3600); // Long cleanup interval + using (IMessageStore messageStore = result.Item1) + { + await messageStore.AddEndpoint("endpoint1"); + + // Add a message with short TTL + IMessage message = this.GetMessage(1); + await messageStore.Add("endpoint1", message, 1); + + // Wait for message to expire + await Task.Delay(1500); + + // Manually trigger cleanup instead of waiting for 30-minute timer + messageStore.TriggerCleanup(); + + // Wait a bit for cleanup to run + await Task.Delay(500); + + // Verify message was cleaned up + IMessageIterator iterator = messageStore.GetMessageIterator("endpoint1"); + IEnumerable batch = await iterator.GetNext(100); + var batchList = batch as IList ?? batch.ToList(); + + // After cleanup triggered, expired message should be removed + Assert.Empty(batchList); + } + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task TestOrphanedMessageDetection(bool checkEntireQueueOnCleanup) + { + // Verify orphaned message detection when checkpoint doesn't advance + var result = await this.GetMessageStore(checkEntireQueueOnCleanup, ttlSecs: 1); + using (IMessageStore messageStore = result.Item1) + { + await messageStore.AddEndpoint("endpoint1"); + + // Add messages with short TTL + for (int i = 0; i < 3; i++) + { + IMessage message = this.GetMessage(i); + await messageStore.Add("endpoint1", message, 1); + } + + // Wait for messages to expire + await Task.Delay(1500); + + // NOTE: Checkpoint offset hasn't advanced, so messages are "orphaned" + // Create orphaned condition: messages expired, but checkpoint not updated + // This would normally be detected by the OrphanedMessagesDetected event + + // Trigger cleanup which should detect orphaned messages + messageStore.TriggerCleanup(); + + // Wait for cleanup to complete + await Task.Delay(500); + + // Verify at least one cleanup pass occurred + // (The orphan detection is logged, not queryable, but system remains stable) + Assert.NotNull(messageStore); + } + } + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task TestNoOrphanWhenCheckpointAdvances(bool checkEntireQueueOnCleanup) + { + // Verify NO orphan warning when checkpoint properly advances + var result = await this.GetMessageStore(checkEntireQueueOnCleanup, ttlSecs: 1); + using (IMessageStore messageStore = result.Item1) + { + ICheckpointStore checkpointStore = result.Item2; + await messageStore.AddEndpoint("endpoint1"); + + // Add messages with short TTL + for (int i = 0; i < 3; i++) + { + IMessage message = this.GetMessage(i); + await messageStore.Add("endpoint1", message, 1); + } + + // Wait for messages to expire + await Task.Delay(1500); + + // Advance checkpoint so messages can be cleaned normally (not orphaned) + var checkpointData = new CheckpointData(100L); // Offset beyond the messages + await checkpointStore.SetCheckpointDataAsync("endpoint1", checkpointData, CancellationToken.None); + + // Trigger cleanup + messageStore.TriggerCleanup(); + + // Wait for cleanup + await Task.Delay(500); + + // Verify messages are cleaned (no orphan condition) + IMessageIterator iterator = messageStore.GetMessageIterator("endpoint1"); + IEnumerable batch = await iterator.GetNext(100); + var batchList = batch as IList ?? batch.ToList(); + Assert.Empty(batchList); + } + } + [Fact] public void MessageWrapperRoundtripTest() { diff --git a/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/StoringAsyncEndpointExecutorTest.cs b/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/StoringAsyncEndpointExecutorTest.cs index 4c538a73482..839ba5b840a 100644 --- a/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/StoringAsyncEndpointExecutorTest.cs +++ b/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/StoringAsyncEndpointExecutorTest.cs @@ -529,6 +529,8 @@ public Task RemoveEndpoint(string endpointId) public void SetTimeToLive(TimeSpan timeToLive) => throw new NotImplementedException(); + public void TriggerCleanup() => throw new NotImplementedException(); + public List GetReceivedMessagesForEndpoint(string endpointId) => this.GetQueue(endpointId).Queue; public Task GetMessageCountFromOffset(string endpointId, long offset) => Task.FromResult(0ul); diff --git a/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/statemachine/EndpointExecutorFsmTest.cs b/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/statemachine/EndpointExecutorFsmTest.cs index bebc02e09ff..dfd4906b098 100644 --- a/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/statemachine/EndpointExecutorFsmTest.cs +++ b/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/statemachine/EndpointExecutorFsmTest.cs @@ -184,6 +184,71 @@ public async Task TestCheckpointException() await machine3.CloseAsync(); } + [Fact] + [Unit] + public async Task TestCheckpointRetryWithTransientFailureThenSuccess() + { + // Verify retry attempts on transient CommitAsync failures + var endpoint = new TestEndpoint("id1"); + var checkpointer = new Mock(); + checkpointer.Setup(c => c.Admit(It.IsAny())).Returns(true); + + int attemptCount = 0; + checkpointer.Setup(c => c.CommitAsync(It.IsAny>(), It.IsAny>(), It.IsAny>(), It.IsAny>(), It.IsAny())) + .Returns(() => + { + attemptCount++; + // Fail on first attempt, succeed on second + if (attemptCount < 2) + { + return Task.FromException(new TimeoutException("Simulated network timeout")); + } + + return Task.CompletedTask; + }); + + var config = new EndpointExecutorConfig(TimeSpan.FromSeconds(1), MaxRetryStrategy, TimeSpan.FromMinutes(5)); + var machine = new EndpointExecutorFsm(endpoint, checkpointer.Object, config); + + // Send message and wait for completion + SendMessage command = Commands.SendMessage(Message1); + await machine.RunAsync(command); + await command.Completion; + + // Verify checkpoint succeeded after retry + Assert.Equal(State.Idle, machine.Status.State); + checkpointer.Verify(c => c.CommitAsync(It.IsAny>(), It.IsAny>(), It.IsAny>(), It.IsAny>(), It.IsAny()), Times.Exactly(2)); + await machine.CloseAsync(); + } + + [Fact] + [Unit] + public async Task TestCheckpointRetryExhaustedAfterMaxAttempts() + { + // Verify all retry attempts are exhausted and exception is thrown + var endpoint = new TestEndpoint("id1"); + var checkpointer = new Mock(); + checkpointer.Setup(c => c.Admit(It.IsAny())).Returns(true); + + // Always fail CommitAsync + checkpointer.Setup(c => c.CommitAsync(It.IsAny>(), It.IsAny>(), It.IsAny>(), It.IsAny>(), It.IsAny())) + .Returns(Task.FromException(new TimeoutException("Persistent network timeout"))); + + var config = new EndpointExecutorConfig(TimeSpan.FromSeconds(1), MaxRetryStrategy, TimeSpan.FromMinutes(5), throwOnDead: true); + var machine = new EndpointExecutorFsm(endpoint, checkpointer.Object, config); + + // Send message - should fail after retries + SendMessage command = Commands.SendMessage(Message1); + await machine.RunAsync(command); + + var ex = await Assert.ThrowsAsync(() => command.Completion); + Assert.Contains("Persistent network timeout", ex.Message); + + // Verify all 3 retry attempts were made + checkpointer.Verify(c => c.CommitAsync(It.IsAny>(), It.IsAny>(), It.IsAny>(), It.IsAny>(), It.IsAny()), Times.Exactly(3)); + await machine.CloseAsync(); + } + [Fact] [Unit] public async Task TestCheckpointPartialFailureToDead() From d8259c5a5010e0cdbe9fca11ed052b9ad92f65fa Mon Sep 17 00:00:00 2001 From: Damon Barry Date: Wed, 1 Jul 2026 08:58:11 -0700 Subject: [PATCH 2/8] Update preexisting unit test to expect retries --- .../endpoints/statemachine/EndpointExecutorFsmTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/statemachine/EndpointExecutorFsmTest.cs b/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/statemachine/EndpointExecutorFsmTest.cs index dfd4906b098..906046ced77 100644 --- a/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/statemachine/EndpointExecutorFsmTest.cs +++ b/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/statemachine/EndpointExecutorFsmTest.cs @@ -146,7 +146,7 @@ public async Task TestCheckpointException() await command1.Completion; Assert.Equal(State.Idle, machine1.Status.State); - checkpointer1.Verify(c => c.CommitAsync(new[] { Message1 }, new IMessage[0], It.IsAny>(), It.IsAny>(), It.IsAny()), Times.Exactly(1)); + checkpointer1.Verify(c => c.CommitAsync(new[] { Message1 }, new IMessage[0], It.IsAny>(), It.IsAny>(), It.IsAny()), Times.Exactly(3)); await machine1.CloseAsync(); // Test exception throws From ea8a3c291a317519ecdddd1fd33b2a02ff065e6a Mon Sep 17 00:00:00 2001 From: Damon Barry Date: Wed, 1 Jul 2026 09:07:39 -0700 Subject: [PATCH 3/8] Update additional asserts in preexisting test --- .../endpoints/statemachine/EndpointExecutorFsmTest.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/statemachine/EndpointExecutorFsmTest.cs b/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/statemachine/EndpointExecutorFsmTest.cs index 906046ced77..19722a5a37f 100644 --- a/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/statemachine/EndpointExecutorFsmTest.cs +++ b/edge-hub/core/test/Microsoft.Azure.Devices.Routing.Core.Test/endpoints/statemachine/EndpointExecutorFsmTest.cs @@ -162,7 +162,7 @@ public async Task TestCheckpointException() await machine2.RunAsync(command2); await Assert.ThrowsAsync(() => command2.Completion); Assert.Equal(State.Idle, machine2.Status.State); - checkpointer2.Verify(c => c.CommitAsync(It.IsAny>(), It.IsAny>(), It.IsAny>(), It.IsAny>(), It.IsAny()), Times.Exactly(1)); + checkpointer2.Verify(c => c.CommitAsync(It.IsAny>(), It.IsAny>(), It.IsAny>(), It.IsAny>(), It.IsAny()), Times.Exactly(3)); await machine2.CloseAsync(); // Test partial exception - no throw @@ -180,7 +180,7 @@ public async Task TestCheckpointException() await command3.Completion; Assert.Equal(State.Idle, machine3.Status.State); - checkpointer3.Verify(c => c.CommitAsync(It.Is>(m => m.Count == 1), It.IsAny>(), It.IsAny>(), It.IsAny>(), It.IsAny()), Times.Exactly(2)); + checkpointer3.Verify(c => c.CommitAsync(It.Is>(m => m.Count == 1), It.IsAny>(), It.IsAny>(), It.IsAny>(), It.IsAny()), Times.Exactly(6)); await machine3.CloseAsync(); } From d530f2d0b970537576199536de2fc5af05519f03 Mon Sep 17 00:00:00 2001 From: Damon Barry Date: Wed, 1 Jul 2026 12:05:44 -0700 Subject: [PATCH 4/8] Add temporary Checkpoint/MessageStore logs for debugging --- .../storage/MessageStore.cs | 11 +++++++ .../statemachine/EndpointExecutorFsm.cs | 31 +++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs index b9acf89be20..2928760f4f5 100644 --- a/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs +++ b/edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Core/storage/MessageStore.cs @@ -325,6 +325,9 @@ private async Task CleanQueue(bool checkEntireQueueOnCleanup) Events.CleanupTaskStarted(messageQueueId); CheckpointData checkpointData = await this.messageStore.checkpointStore.GetCheckpointDataAsync(messageQueueId, CancellationToken.None); ISequentialStore sequentialStore = endpointSequentialStore.Value; + long queueHeadOffset = sequentialStore.GetHeadOffset(this.cancellationTokenSource.Token); + long unreadStartOffset = checkpointData.Offset + 1; + Events.TempCleanupCorrelation(messageQueueId, checkpointData.Offset, unreadStartOffset, queueHeadOffset); Events.CleanupCheckpointState(messageQueueId, checkpointData); int cleanupEntityStoreCount = 0; @@ -455,6 +458,7 @@ enum EventIds GettingNextBatch, ObtainedNextBatch, CleanupCheckpointState, + TempCleanupCorrelation, MessageAdded, ErrorGettingMessagesBatch, CreatedCleanupProcessor, @@ -554,6 +558,13 @@ internal static void CleanupCheckpointState(string endpointId, CheckpointData ch Log.LogDebug((int)EventIds.CleanupCheckpointState, Invariant($"Checkpoint for endpoint {endpointId} is {checkpointData.Offset}")); } + internal static void TempCleanupCorrelation(string endpointId, long checkpointOffset, long unreadStartOffset, long queueHeadOffset) + { + Log.LogInformation( + (int)EventIds.TempCleanupCorrelation, + Invariant($"[TEMP CleanupCorrelation] Endpoint={endpointId}, checkpointOffset={checkpointOffset}, unreadStartOffset={unreadStartOffset}, queueHeadOffset={queueHeadOffset}")); + } + internal static void MessageAdded(long offset, string edgeMessageId, string endpointId) { // Print only after every 1000th message to avoid flooding logs. diff --git a/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/endpoints/statemachine/EndpointExecutorFsm.cs b/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/endpoints/statemachine/EndpointExecutorFsm.cs index 095d2006574..fb903219a29 100644 --- a/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/endpoints/statemachine/EndpointExecutorFsm.cs +++ b/edge-hub/core/src/Microsoft.Azure.Devices.Routing.Core/endpoints/statemachine/EndpointExecutorFsm.cs @@ -362,7 +362,9 @@ static async Task EnterDeadCheckpointingAsync(EndpointExecutorFsm thisPtr) { ISinkResult result = thisPtr.currentCheckpointCommand.Result; Events.Checkpoint(thisPtr, result); + Events.CheckpointCommitAttemptStart(thisPtr, 1, 1, result.Succeeded.Count, EmptyMessages.Count); await thisPtr.Checkpointer.CommitAsync(result.Succeeded, EmptyMessages, thisPtr.lastFailedRevivalTime, thisPtr.unhealthySince, cts.Token); + Events.CheckpointCommitAttemptSucceeded(thisPtr, 1, result.Succeeded.Count, EmptyMessages.Count); Events.CheckpointSuccess(thisPtr, result); } @@ -407,7 +409,9 @@ static async Task EnterCheckpointingAsync(EndpointExecutorFsm thisPtr) { using (var cts = new CancellationTokenSource(thisPtr.config.Timeout)) { + Events.CheckpointCommitAttemptStart(thisPtr, attempt, MaxCommitRetries, toCheckpoint.Count, remaining.Count); await thisPtr.Checkpointer.CommitAsync(toCheckpoint, remaining, Option.None(), thisPtr.unhealthySince, cts.Token); + Events.CheckpointCommitAttemptSucceeded(thisPtr, attempt, toCheckpoint.Count, remaining.Count); Events.CheckpointSuccess(thisPtr, result); break; // Success, exit retry loop } @@ -617,6 +621,8 @@ enum EventIds UpdateEndpointSuccess, UpdateEndpointFailure, CheckRetryInnerException, + CheckpointCommitAttemptStart, + CheckpointCommitAttemptSucceeded, CheckpointCommitRetry, CheckpointCommitFailed } @@ -762,6 +768,31 @@ public static void CheckpointCommitRetry(EndpointExecutorFsm fsm, int attempt, E GetContextString(fsm)); } + public static void CheckpointCommitAttemptStart(EndpointExecutorFsm fsm, int attempt, int maxAttempts, int toCheckpointCount, int remainingCount) + { + Log.LogInformation( + (int)EventIds.CheckpointCommitAttemptStart, + "[TEMP CheckpointCommitAttemptStart] Starting checkpoint commit attempt {0}/{1}. ToCheckpointSize: {2}, RemainingSize: {3}, CheckpointOffset: {4}, {5}", + attempt, + maxAttempts, + toCheckpointCount, + remainingCount, + fsm.Status.CheckpointerStatus.Offset, + GetContextString(fsm)); + } + + public static void CheckpointCommitAttemptSucceeded(EndpointExecutorFsm fsm, int attempt, int toCheckpointCount, int remainingCount) + { + Log.LogInformation( + (int)EventIds.CheckpointCommitAttemptSucceeded, + "[TEMP CheckpointCommitAttemptSucceeded] Checkpoint commit attempt {0} succeeded. ToCheckpointSize: {1}, RemainingSize: {2}, CheckpointOffset: {3}, {4}", + attempt, + toCheckpointCount, + remainingCount, + fsm.Status.CheckpointerStatus.Offset, + GetContextString(fsm)); + } + public static void CheckpointCommitFailed(EndpointExecutorFsm fsm, int maxAttempts, Exception ex) { Log.LogError( From 5c852eeaedae76e575c0a8cdd1382344f80baf37 Mon Sep 17 00:00:00 2001 From: Damon Barry Date: Wed, 1 Jul 2026 15:00:42 -0700 Subject: [PATCH 5/8] Temporarily narrow focus of nested end-to-end tests --- builds/e2e/nested-e2e.yaml | 8 -------- builds/e2e/templates/e2e-run.yaml | 1 + 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/builds/e2e/nested-e2e.yaml b/builds/e2e/nested-e2e.yaml index eb82081e697..a9029d669b0 100644 --- a/builds/e2e/nested-e2e.yaml +++ b/builds/e2e/nested-e2e.yaml @@ -109,14 +109,6 @@ stages: rootCaCertificate: '$(rootCaCertificate)' rootCaKey: '$(rootCaKey)' - template: templates/e2e-clear-docker-cached-images.yaml - - template: templates/e2e-run.yaml - parameters: - azureSubscription: '$(az.subscription)' - containerRegistryPassword: '$(containerRegistryPassword)' - dpsGroupKeySymmetric: '$(dpsGroupKeySymmetric)' - rootCaPassword: '$(rootCaPassword)' - sas_uri: $(sas_uri) - test_type: nestededge_mqtt - template: templates/nested-deploy-config.yaml parameters: containerRegistryServer: $(containerRegistryServer) diff --git a/builds/e2e/templates/e2e-run.yaml b/builds/e2e/templates/e2e-run.yaml index 7426f3f347c..4a46df9a731 100644 --- a/builds/e2e/templates/e2e-run.yaml +++ b/builds/e2e/templates/e2e-run.yaml @@ -67,6 +67,7 @@ steps: # Below tests were disabled and marked for re-enable when a blocking item was resolved. # When it was resolved the tests were never enabled. We need to re-enable these. $filter += '&FullyQualifiedName!~Provisioning&FullyQualifiedName!~SasOutOfScope&FullyQualifiedName!~X509ManualProvision&FullyQualifiedName!~AuthorizationPolicyUpdateTest&FullyQualifiedName!~AuthorizationPolicyExplicitPolicyTest' + $filter += '&FullyQualifiedName~RouteMessageL3LeafToL4Module' } elseif ($test_type -eq 'nestededge_isa95') { From 54d63b8b3f46adfc71f87902d28d92bc575b4cd3 Mon Sep 17 00:00:00 2001 From: Damon Barry Date: Wed, 1 Jul 2026 15:37:13 -0700 Subject: [PATCH 6/8] [TEMP] Add back a little bit more of the surrounding tests --- builds/e2e/templates/e2e-run.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/builds/e2e/templates/e2e-run.yaml b/builds/e2e/templates/e2e-run.yaml index 4a46df9a731..0fcaaf3bb4e 100644 --- a/builds/e2e/templates/e2e-run.yaml +++ b/builds/e2e/templates/e2e-run.yaml @@ -67,7 +67,7 @@ steps: # Below tests were disabled and marked for re-enable when a blocking item was resolved. # When it was resolved the tests were never enabled. We need to re-enable these. $filter += '&FullyQualifiedName!~Provisioning&FullyQualifiedName!~SasOutOfScope&FullyQualifiedName!~X509ManualProvision&FullyQualifiedName!~AuthorizationPolicyUpdateTest&FullyQualifiedName!~AuthorizationPolicyExplicitPolicyTest' - $filter += '&FullyQualifiedName~RouteMessageL3LeafToL4Module' + $filter += '&(FullyQualifiedName~QuickstartCerts|FullyQualifiedName~RouteMessageL3LeafToL4Module)' } elseif ($test_type -eq 'nestededge_isa95') { From 47483ee3248d2e0ad2e7dfad1c3827aa7e04ec79 Mon Sep 17 00:00:00 2001 From: Damon Barry Date: Wed, 1 Jul 2026 16:01:32 -0700 Subject: [PATCH 7/8] [TEMP] Add back the last set of tests from the MQTT run --- builds/e2e/nested-e2e.yaml | 8 ++++++++ builds/e2e/templates/e2e-run.yaml | 1 + 2 files changed, 9 insertions(+) diff --git a/builds/e2e/nested-e2e.yaml b/builds/e2e/nested-e2e.yaml index a9029d669b0..eb82081e697 100644 --- a/builds/e2e/nested-e2e.yaml +++ b/builds/e2e/nested-e2e.yaml @@ -109,6 +109,14 @@ stages: rootCaCertificate: '$(rootCaCertificate)' rootCaKey: '$(rootCaKey)' - template: templates/e2e-clear-docker-cached-images.yaml + - template: templates/e2e-run.yaml + parameters: + azureSubscription: '$(az.subscription)' + containerRegistryPassword: '$(containerRegistryPassword)' + dpsGroupKeySymmetric: '$(dpsGroupKeySymmetric)' + rootCaPassword: '$(rootCaPassword)' + sas_uri: $(sas_uri) + test_type: nestededge_mqtt - template: templates/nested-deploy-config.yaml parameters: containerRegistryServer: $(containerRegistryServer) diff --git a/builds/e2e/templates/e2e-run.yaml b/builds/e2e/templates/e2e-run.yaml index 0fcaaf3bb4e..b78205fb172 100644 --- a/builds/e2e/templates/e2e-run.yaml +++ b/builds/e2e/templates/e2e-run.yaml @@ -59,6 +59,7 @@ steps: # Below tests were disabled and marked for re-enable when a blocking item was resolved. # When it was resolved the tests were never enabled. We need to re-enable these. $filter += '&FullyQualifiedName!~Provisioning&FullyQualifiedName!~SasOutOfScope&FullyQualifiedName!~X509ManualProvision&FullyQualifiedName!~AuthorizationPolicyUpdateTest&FullyQualifiedName!~AuthorizationPolicyExplicitPolicyTest' + $filter += '&FullyQualifiedName~Microsoft.Azure.Devices.Edge.Test.Module' } elseif ($test_type -eq 'nestededge_amqp') { From 0f503bd87c509be6e6739fbc9fd0cbd6353d4c8e Mon Sep 17 00:00:00 2001 From: Damon Barry Date: Wed, 1 Jul 2026 16:45:29 -0700 Subject: [PATCH 8/8] [TEMP] Add a few more tests back to the MQTT run --- builds/e2e/templates/e2e-run.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/builds/e2e/templates/e2e-run.yaml b/builds/e2e/templates/e2e-run.yaml index b78205fb172..de507f9df5a 100644 --- a/builds/e2e/templates/e2e-run.yaml +++ b/builds/e2e/templates/e2e-run.yaml @@ -59,7 +59,7 @@ steps: # Below tests were disabled and marked for re-enable when a blocking item was resolved. # When it was resolved the tests were never enabled. We need to re-enable these. $filter += '&FullyQualifiedName!~Provisioning&FullyQualifiedName!~SasOutOfScope&FullyQualifiedName!~X509ManualProvision&FullyQualifiedName!~AuthorizationPolicyUpdateTest&FullyQualifiedName!~AuthorizationPolicyExplicitPolicyTest' - $filter += '&FullyQualifiedName~Microsoft.Azure.Devices.Edge.Test.Module' + $filter += '&(FullyQualifiedName~Microsoft.Azure.Devices.Edge.Test.Module|FullyQualifiedName~Microsoft.Azure.Devices.Edge.Test.Metrics|FullyQualifiedName~Microsoft.Azure.Devices.Edge.Test.IoTEdgeCheck)' } elseif ($test_type -eq 'nestededge_amqp') {