Skip to content
2 changes: 2 additions & 0 deletions builds/e2e/templates/e2e-run.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ steps:
# Below tests were disabled and marked for re-enable when a blocking item was resolved.
# When it was resolved the tests were never enabled. We need to re-enable these.
$filter += '&FullyQualifiedName!~Provisioning&FullyQualifiedName!~SasOutOfScope&FullyQualifiedName!~X509ManualProvision&FullyQualifiedName!~AuthorizationPolicyUpdateTest&FullyQualifiedName!~AuthorizationPolicyExplicitPolicyTest'
$filter += '&(FullyQualifiedName~Microsoft.Azure.Devices.Edge.Test.Module|FullyQualifiedName~Microsoft.Azure.Devices.Edge.Test.Metrics|FullyQualifiedName~Microsoft.Azure.Devices.Edge.Test.IoTEdgeCheck)'
}
elseif ($test_type -eq 'nestededge_amqp')
{
Expand All @@ -67,6 +68,7 @@ steps:
# Below tests were disabled and marked for re-enable when a blocking item was resolved.
# When it was resolved the tests were never enabled. We need to re-enable these.
$filter += '&FullyQualifiedName!~Provisioning&FullyQualifiedName!~SasOutOfScope&FullyQualifiedName!~X509ManualProvision&FullyQualifiedName!~AuthorizationPolicyUpdateTest&FullyQualifiedName!~AuthorizationPolicyExplicitPolicyTest'
$filter += '&(FullyQualifiedName~QuickstartCerts|FullyQualifiedName~RouteMessageL3LeafToL4Module)'
}
elseif ($test_type -eq 'nestededge_isa95')
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ public Task<ulong> GetMessageCountFromOffset(string endpointId, long offset)
return sequentialStore.GetCountFromOffset(offset);
}

/// <summary>
/// Triggers an immediate cleanup attempt. Used for connection recovery
/// to retry checkpoint commits that may have failed during network outages.
/// </summary>
public void TriggerCleanup()
{
this.messagesCleaner.TriggerCleanup();
}

public void Dispose()
{
this.Dispose(true);
Expand Down Expand Up @@ -230,6 +239,16 @@ public void Dispose()
// Not disposing the cleanup task, in case it is not completed yet.
}

/// <summary>
/// Triggers an immediate cleanup attempt. Called when cloud connection is restored
/// to retry checkpoint commits for messages that were sent but not acknowledged.
/// </summary>
public void TriggerCleanup()
{
this.EnsureCleanupTask(null);
Events.CleanupTriggeredByConnectionRecovery();
}

void EnsureCleanupTask(object state)
{
if (this.cleanupTask == null || this.cleanupTask.IsCompleted)
Expand Down Expand Up @@ -306,9 +325,16 @@ private async Task CleanQueue(bool checkEntireQueueOnCleanup)
Events.CleanupTaskStarted(messageQueueId);
CheckpointData checkpointData = await this.messageStore.checkpointStore.GetCheckpointDataAsync(messageQueueId, CancellationToken.None);
ISequentialStore<MessageRef> sequentialStore = endpointSequentialStore.Value;
long queueHeadOffset = sequentialStore.GetHeadOffset(this.cancellationTokenSource.Token);
long unreadStartOffset = checkpointData.Offset + 1;
Events.TempCleanupCorrelation(messageQueueId, checkpointData.Offset, unreadStartOffset, queueHeadOffset);
Events.CleanupCheckpointState(messageQueueId, checkpointData);
int cleanupEntityStoreCount = 0;

// Track orphaned messages for observability
int orphanedMessageCount = 0;
DateTime earliestOrphanedMessageTime = DateTime.MaxValue;

async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
{
var expiry = messageRef.TimeStamp + messageRef.TimeToLive;
Expand All @@ -317,6 +343,16 @@ async Task<bool> DeleteMessageCallback(long offset, MessageRef messageRef)
return false;
}

// Detect orphaned messages (expired but can't clean due to offset gap)
if (offset > checkpointData.Offset && expiry <= DateTime.UtcNow)
{
orphanedMessageCount++;
if (messageRef.TimeStamp < earliestOrphanedMessageTime)
{
earliestOrphanedMessageTime = messageRef.TimeStamp;
}
}

var message = await this.TryDecrementRefCountUpdate(messageRef.EdgeMessageId, messageQueueId);

await message.ForEachAsync(async msg =>
Expand Down Expand Up @@ -378,6 +414,13 @@ await message.ForEachAsync(async msg =>
totalCleanupCount += cleanupCount;
totalCleanupStoreCount += cleanupEntityStoreCount;
Events.CleanupCompleted(messageQueueId, cleanupCount, cleanupEntityStoreCount, totalCleanupCount, totalCleanupStoreCount);

// Log orphaned messages for observability
if (orphanedMessageCount > 0)
{
TimeSpan oldestOrphanAge = DateTime.UtcNow - earliestOrphanedMessageTime;
Events.OrphanedMessagesDetected(messageQueueId, orphanedMessageCount, checkpointData.Offset, oldestOrphanAge);
}
}
catch (Exception ex)
{
Expand Down Expand Up @@ -415,10 +458,13 @@ enum EventIds
GettingNextBatch,
ObtainedNextBatch,
CleanupCheckpointState,
TempCleanupCorrelation,
MessageAdded,
ErrorGettingMessagesBatch,
CreatedCleanupProcessor,
ErrorUpdatingMessageForEndpoint
ErrorUpdatingMessageForEndpoint,
CleanupTriggeredByConnectionRecovery,
OrphanedMessagesDetected
}

public static void MessageStoreCreated()
Expand All @@ -441,6 +487,11 @@ public static void CleanupTaskInitialized()
Log.LogInformation((int)EventIds.CleanupTaskStarted, "Started task to cleanup processed and stale messages");
}

public static void CleanupTriggeredByConnectionRecovery()
{
Log.LogInformation((int)EventIds.CleanupTriggeredByConnectionRecovery, "Triggering cleanup due to cloud connection recovery to retry pending checkpoint commits");
}

public static void ErrorCleaningMessagesForEndpoint(Exception ex, string endpointId)
{
Log.LogWarning((int)EventIds.ErrorCleaningMessagesForEndpoint, ex, Invariant($"Error cleaning up messages for endpoint {endpointId}"));
Expand All @@ -462,6 +513,11 @@ public static void CleanupCompleted(string endpointId, int queueMessagesCount, i
Log.LogDebug((int)EventIds.CleanupCompleted, Invariant($"Total messages cleaned up from queue for endpoint {endpointId} = {totalQueueMessagesCount}, and total messages cleaned up for message store = {totalStoreMessagesCount}."));
}

public static void OrphanedMessagesDetected(string endpointId, int orphanedCount, long checkpointOffset, TimeSpan oldestAge)
{
Log.LogWarning((int)EventIds.OrphanedMessagesDetected, Invariant($"Detected {orphanedCount} orphaned message(s) in endpoint {endpointId}. Checkpoint offset={checkpointOffset}, oldest message age={oldestAge.TotalSeconds:F1}s. Messages are stuck in store because checkpoint has not advanced. This indicates a potential message acknowledgment failure during network disruption. Checkpoint retries or the cleanup trigger on connection recovery should resolve this."));
}

public static void ErrorGettingMessagesBatch(string entityName, Exception ex)
{
Log.LogWarning((int)EventIds.ErrorGettingMessagesBatch, ex, $"Error getting next batch for endpoint {entityName}.");
Expand Down Expand Up @@ -502,6 +558,13 @@ internal static void CleanupCheckpointState(string endpointId, CheckpointData ch
Log.LogDebug((int)EventIds.CleanupCheckpointState, Invariant($"Checkpoint for endpoint {endpointId} is {checkpointData.Offset}"));
}

internal static void TempCleanupCorrelation(string endpointId, long checkpointOffset, long unreadStartOffset, long queueHeadOffset)
{
Log.LogInformation(
(int)EventIds.TempCleanupCorrelation,
Invariant($"[TEMP CleanupCorrelation] Endpoint={endpointId}, checkpointOffset={checkpointOffset}, unreadStartOffset={unreadStartOffset}, queueHeadOffset={queueHeadOffset}"));
}

internal static void MessageAdded(long offset, string edgeMessageId, string endpointId)
{
// Print only after every 1000th message to avoid flooding logs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ protected override void Load(ContainerBuilder builder)
var connectionManagerTask = c.Resolve<Task<IConnectionManager>>();
var subscriptionProcessorTask = c.Resolve<Task<ISubscriptionProcessor>>();
var deviceScopeIdentitiesCacheTask = c.Resolve<Task<IDeviceScopeIdentitiesCache>>();
var messageStoreTask = this.isStoreAndForwardEnabled ? c.Resolve<Task<IMessageStore>>() : null;
Router router = await routerTask;
ITwinManager twinManager = await twinManagerTask;
IConnectionManager connectionManager = await connectionManagerTask;
Expand All @@ -539,6 +540,18 @@ protected override void Load(ContainerBuilder builder)
invokeMethodHandler,
subscriptionProcessor,
deviceScopeIdentitiesCache);

// Subscribe MessageStore to connection recovery events
// to trigger cleanup when cloud connection is restored
if (messageStoreTask != null)
{
IMessageStore messageStore = await messageStoreTask;
connectionManager.CloudConnectionEstablished += (sender, identity) =>
{
messageStore.TriggerCleanup();
};
}

return hub;
})
.As<Task<IEdgeHub>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,11 @@ public interface IMessageStore : IDisposable
/// Returns the number of messages in the store from a offset
/// </summary>
Task<ulong> GetMessageCountFromOffset(string endpointId, long offset);

/// <summary>
/// Triggers an immediate cleanup attempt. Called when cloud connection is restored
/// to retry checkpoint commits for messages that were sent but not acknowledged.
/// </summary>
void TriggerCleanup();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,9 @@ static async Task EnterDeadCheckpointingAsync(EndpointExecutorFsm thisPtr)
{
ISinkResult<IMessage> result = thisPtr.currentCheckpointCommand.Result;
Events.Checkpoint(thisPtr, result);
Events.CheckpointCommitAttemptStart(thisPtr, 1, 1, result.Succeeded.Count, EmptyMessages.Count);
await thisPtr.Checkpointer.CommitAsync(result.Succeeded, EmptyMessages, thisPtr.lastFailedRevivalTime, thisPtr.unhealthySince, cts.Token);
Events.CheckpointCommitAttemptSucceeded(thisPtr, 1, result.Succeeded.Count, EmptyMessages.Count);
Events.CheckpointSuccess(thisPtr, result);
}

Expand All @@ -386,20 +388,49 @@ static async Task EnterCheckpointingAsync(EndpointExecutorFsm thisPtr)
try
{
Preconditions.CheckNotNull(thisPtr.currentCheckpointCommand);
using (var cts = new CancellationTokenSource(thisPtr.config.Timeout))
ISinkResult<IMessage> result = thisPtr.currentCheckpointCommand.Result;

if (result.Succeeded.Any() || result.InvalidDetailsList.Any())
{
ISinkResult<IMessage> result = thisPtr.currentCheckpointCommand.Result;
ICollection<IMessage> toCheckpoint = result.InvalidDetailsList.Count > 0
? result.Succeeded.Concat(result.InvalidDetailsList.Select(i => i.Item)).ToList()
: result.Succeeded;
ICollection<IMessage> remaining = result.Failed;

Events.Checkpoint(thisPtr, result);

if (result.Succeeded.Any() || result.InvalidDetailsList.Any())
// Attempt checkpoint commit with retry logic
const int MaxCommitRetries = 3;
Exception lastException = null;

for (int attempt = 1; attempt <= MaxCommitRetries; attempt++)
{
ICollection<IMessage> toCheckpoint = result.InvalidDetailsList.Count > 0
? result.Succeeded.Concat(result.InvalidDetailsList.Select(i => i.Item)).ToList()
: result.Succeeded;
ICollection<IMessage> remaining = result.Failed;

Events.Checkpoint(thisPtr, result);
await thisPtr.Checkpointer.CommitAsync(toCheckpoint, remaining, Option.None<DateTime>(), thisPtr.unhealthySince, cts.Token);
Events.CheckpointSuccess(thisPtr, result);
try
{
using (var cts = new CancellationTokenSource(thisPtr.config.Timeout))
{
Events.CheckpointCommitAttemptStart(thisPtr, attempt, MaxCommitRetries, toCheckpoint.Count, remaining.Count);
await thisPtr.Checkpointer.CommitAsync(toCheckpoint, remaining, Option.None<DateTime>(), thisPtr.unhealthySince, cts.Token);
Events.CheckpointCommitAttemptSucceeded(thisPtr, attempt, toCheckpoint.Count, remaining.Count);
Events.CheckpointSuccess(thisPtr, result);
break; // Success, exit retry loop
}
}
catch (Exception ex) when (attempt < MaxCommitRetries)
{
lastException = ex;
Events.CheckpointCommitRetry(thisPtr, attempt, ex);

// Exponential backoff: 100ms, 200ms, 400ms
int delayMs = 100 * (int)Math.Pow(2, attempt - 1);
await Task.Delay(delayMs);
}
catch (Exception ex) when (attempt == MaxCommitRetries)
{
lastException = ex;
Events.CheckpointCommitFailed(thisPtr, MaxCommitRetries, ex);
throw;
}
}
}

Expand Down Expand Up @@ -589,7 +620,11 @@ enum EventIds
UpdateEndpoint,
UpdateEndpointSuccess,
UpdateEndpointFailure,
CheckRetryInnerException
CheckRetryInnerException,
CheckpointCommitAttemptStart,
CheckpointCommitAttemptSucceeded,
CheckpointCommitRetry,
CheckpointCommitFailed
}

public static void StateEnter(EndpointExecutorFsm fsm)
Expand Down Expand Up @@ -722,6 +757,53 @@ public static void CheckpointFailure(EndpointExecutorFsm fsm, Exception ex)
GetContextString(fsm));
}

public static void CheckpointCommitRetry(EndpointExecutorFsm fsm, int attempt, Exception ex)
{
Log.LogWarning(
(int)EventIds.CheckpointCommitRetry,
ex,
"[CheckpointCommitRetry] Checkpoint commit attempt {0} failed, retrying. CheckpointOffset: {1}, {2}",
attempt,
fsm.Status.CheckpointerStatus.Offset,
GetContextString(fsm));
}

public static void CheckpointCommitAttemptStart(EndpointExecutorFsm fsm, int attempt, int maxAttempts, int toCheckpointCount, int remainingCount)
{
Log.LogInformation(
(int)EventIds.CheckpointCommitAttemptStart,
"[TEMP CheckpointCommitAttemptStart] Starting checkpoint commit attempt {0}/{1}. ToCheckpointSize: {2}, RemainingSize: {3}, CheckpointOffset: {4}, {5}",
attempt,
maxAttempts,
toCheckpointCount,
remainingCount,
fsm.Status.CheckpointerStatus.Offset,
GetContextString(fsm));
}

public static void CheckpointCommitAttemptSucceeded(EndpointExecutorFsm fsm, int attempt, int toCheckpointCount, int remainingCount)
{
Log.LogInformation(
(int)EventIds.CheckpointCommitAttemptSucceeded,
"[TEMP CheckpointCommitAttemptSucceeded] Checkpoint commit attempt {0} succeeded. ToCheckpointSize: {1}, RemainingSize: {2}, CheckpointOffset: {3}, {4}",
attempt,
toCheckpointCount,
remainingCount,
fsm.Status.CheckpointerStatus.Offset,
GetContextString(fsm));
}

public static void CheckpointCommitFailed(EndpointExecutorFsm fsm, int maxAttempts, Exception ex)
{
Log.LogError(
(int)EventIds.CheckpointCommitFailed,
ex,
"[CheckpointCommitFailed] Checkpoint commit failed after {0} attempts. CheckpointOffset: {1}, {2}",
maxAttempts,
fsm.Status.CheckpointerStatus.Offset,
GetContextString(fsm));
}

public static void CheckRetryInnerException(Exception ex, bool retry)
{
Log.LogDebug((int)EventIds.CheckRetryInnerException, ex, $"[CheckRetryInnerException] Decision to retry exception of type {ex.GetType()} is {retry}");
Expand Down
Loading