diff --git a/CHANGES.md b/CHANGES.md index 2bc850d72..ace4583ef 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -110,6 +110,22 @@ To be released. operators distinguish a slow-draining queue from a queue that sees less traffic. [[#316], [#740], [#759]] + - Added an outbound delivery circuit breaker for queued outbox delivery. + Fedify now tracks consecutive network and HTTP 5xx delivery failures + per remote host (including any non-default port), stores the state in + the configured `KvStore`, and requeues messages held by an open circuit + instead of repeatedly sending to an unreachable server. The circuit + breaker is enabled by default for queued outbox delivery and can be + disabled with + `circuitBreaker: false`; applications can customize the failure policy, + recovery delay, held activity TTL, release interval, and state/drop + callbacks. HTTP 429 responses do not count as circuit failures and + `Retry-After` is respected when present. State changes are exposed + through `activitypub.circuit_breaker.state_change` metrics and + `activitypub.circuit_breaker.state_change` span events, and expired + held activities call the outbox permanent failure handler with + `reason: "circuit-breaker-ttl"`. [[#620], [#778]] + - Added OpenTelemetry metrics for ActivityPub fanout and activity lifecycle events, complementing the per-recipient `activitypub.delivery.*` counters and the per-task @@ -155,10 +171,11 @@ To be released. Instruments share an `activitypub.lookup.kind` and (where applicable) `activitypub.lookup.result` attribute drawn from small, spec-bounded enumerations. `activitypub.remote.host` records the - URL hostname only; `http.response.status_code` is recorded when an - HTTP response was observed; `activitypub.cache.enabled` is - recorded on the key and document fetch metrics whenever Fedify can - confidently report the cache layer's presence. Key IDs, actor + URL host, including any non-default port; `http.response.status_code` + is recorded when an HTTP response was observed; + `activitypub.cache.enabled` is recorded on the key and document + fetch metrics whenever Fedify can confidently report the cache + layer's presence. Key IDs, actor IDs, object IDs, JSON-LD context URLs, full URLs, and fediverse handles are deliberately excluded so attacker-controlled remotes cannot inflate metric cardinality. The existing @@ -193,8 +210,9 @@ To be released. `webfinger.resource.scheme` is bucketed to a small allow list (`acct`, `http`, `https`, `mailto`, or `other`) so an attacker-controlled query string cannot inflate metric - cardinality; `activitypub.remote.host` records the URL hostname - only. Full resource URIs, lookup URLs, and handle strings are + cardinality; `activitypub.remote.host` records the URL host, + including any non-default port. Full resource URIs, lookup URLs, + and handle strings are deliberately excluded; they remain on the corresponding spans (`webfinger.lookup`, `webfinger.handle`, `activitypub.get_actor_handle`) for trace-level investigation. @@ -221,6 +239,7 @@ To be released. [#316]: https://github.com/fedify-dev/fedify/issues/316 [#418]: https://github.com/fedify-dev/fedify/issues/418 [#619]: https://github.com/fedify-dev/fedify/issues/619 +[#620]: https://github.com/fedify-dev/fedify/issues/620 [#735]: https://github.com/fedify-dev/fedify/issues/735 [#736]: https://github.com/fedify-dev/fedify/issues/736 [#737]: https://github.com/fedify-dev/fedify/issues/737 @@ -241,6 +260,7 @@ To be released. [#771]: https://github.com/fedify-dev/fedify/pull/771 [#772]: https://github.com/fedify-dev/fedify/pull/772 [#777]: https://github.com/fedify-dev/fedify/pull/777 +[#778]: https://github.com/fedify-dev/fedify/pull/778 ### @fedify/fixture diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 5c2a39a32..7fa2d3978 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -145,6 +145,7 @@ const MANUAL = { { text: "Pragmatics", link: "/manual/pragmatics.md" }, { text: "Key–value store", link: "/manual/kv.md" }, { text: "Message queue", link: "/manual/mq.md" }, + { text: "Circuit breaker", link: "/manual/circuit-breaker.md" }, { text: "Integration", link: "/manual/integration.md" }, { text: "Migration", link: "/manual/migrate.md" }, { text: "Relay", link: "/manual/relay.md" }, diff --git a/docs/manual/circuit-breaker.md b/docs/manual/circuit-breaker.md new file mode 100644 index 000000000..f0dd0be3c --- /dev/null +++ b/docs/manual/circuit-breaker.md @@ -0,0 +1,175 @@ +Circuit breaker +=============== + +*This API is available since Fedify 2.3.0.* + +Fedify's outbound delivery circuit breaker protects queued ActivityPub +delivery from repeatedly hammering a remote server that is down or returning +server errors. It applies to queued outbox delivery: activities delivered +through a configured `MessageQueue` are tracked per remote inbox host, and an +unhealthy host can temporarily hold further deliveries until a recovery probe +is due. + + +Enabling and disabling +---------------------- + +The circuit breaker is enabled by default for queued outbox delivery. To +disable it, pass `circuitBreaker: false` to `createFederation()`: + +~~~~ typescript +import { createFederation } from "@fedify/fedify"; + +const federation = createFederation({ + kv, + queue, + circuitBreaker: false, +}); +~~~~ + +To customize the defaults, pass a `CircuitBreakerOptions` object: + +~~~~ typescript +import { createFederation } from "@fedify/fedify"; + +const federation = createFederation({ + kv, + queue, + circuitBreaker: { + failureThreshold: 5, + failureWindow: { minutes: 10 }, + recoveryDelay: { minutes: 30 }, + heldActivityTtl: { days: 7 }, + releaseInterval: { seconds: 1 }, + }, +}); +~~~~ + +The default policy opens a remote host's circuit after five consecutive +counted failures within ten minutes. When the circuit is open, Fedify +requeues affected outbox messages instead of sending them. After the +`recoveryDelay`, one message is allowed through as a half-open probe. If it +succeeds, the circuit closes; if it fails, the circuit opens again. +While the probe is in flight, other held messages continue to be requeued at +`releaseInterval`. If the worker running the probe stops before recording a +success or failure, Fedify treats the half-open probe as stale after another +`recoveryDelay` and allows a replacement probe. + + +What counts as a failure +------------------------ + +Fedify counts these delivery failures toward the circuit: + + - network errors, including failed `fetch()` calls + - HTTP 5xx responses from the remote inbox + +Fedify does not count these responses as circuit failures: + + - HTTP 429 responses; the `Retry-After` header is respected when present + - HTTP 4xx responses that are not configured as permanent delivery failures + - configured permanent delivery failures, such as `404` or `410` by default + +Any reachable HTTP 4xx response clears the consecutive failure history for +that host because it proves the remote server can be reached. + + +Custom failure policy +--------------------- + +You can replace the numeric threshold/window policy with a callback. The +callback receives the full consecutive failure timestamp list for the remote +host and returns whether the circuit should open: + +~~~~ typescript +const federation = createFederation({ + kv, + queue, + circuitBreaker: { + failure(timestamps) { + return timestamps.length >= 10; + }, + }, +}); +~~~~ + +The callback form is mutually exclusive with `failureThreshold` and +`failureWindow`. + + +Held activity expiry +-------------------- + +Activities held by an open circuit are requeued until the remote host recovers +or the held activity exceeds `heldActivityTtl`, which defaults to seven days. +When a held activity expires, Fedify drops it, records it as an abandoned +outbox activity, calls `circuitBreaker.onActivityDrop` when configured, and +calls the outbox permanent failure handler with +`reason: "circuit-breaker-ttl"`. + +~~~~ typescript +const federation = createFederation({ + kv, + queue, + circuitBreaker: { + onActivityDrop(remoteHost, details) { + console.warn("Dropped held activity", { + remoteHost, + inbox: details.inbox.href, + activityId: details.activityId, + heldSince: details.heldSince.toString(), + }); + }, + }, +}); + +federation.setOutboxPermanentFailureHandler((_ctx, failure) => { + if (failure.reason === "circuit-breaker-ttl") { + // The remote host did not recover before the held activity expired. + return; + } + + // Existing HTTP permanent-failure handling, such as 404 or 410 cleanup. +}); +~~~~ + + +Storage and concurrency +----------------------- + +Circuit state is stored in the configured `KvStore` under the +`["_fedify", "circuit", remoteHost]` key prefix by default. The stored value +has this shape: + +~~~~ typescript +{ + state: "closed" | "open" | "half-open", + failures: string[], + opened?: string, +} +~~~~ + +For multi-worker deployments, use a `KvStore` implementation that supports +`cas()` so competing workers do not overwrite each other's state transitions. +Fedify still works without CAS, but it logs a warning because concurrent +workers can race when opening or closing the same host's circuit. + + +Observability +------------- + +State changes are emitted through the `onStateChange` callback and through +OpenTelemetry: + + - `activitypub.circuit_breaker.state_change` counter with + `activitypub.remote.host` and `activitypub.circuit_breaker.state` + - `activitypub.circuit_breaker.state_change` span event on the queued + outbox worker span with the previous and new state + - `activitypub.circuit_breaker.held` span event on the queued outbox worker + span when an open circuit holds a delivery + +The circuit breaker deliberately records only the remote host, not full inbox +URLs, actor IDs, or activity IDs, to keep metric cardinality bounded. For the +full metric and span attribute lists, see the [OpenTelemetry] manual. + +[OpenTelemetry]: ./opentelemetry.md diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index 7f6bb3051..f2f51b931 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -249,12 +249,14 @@ that wouldn't fit in span attributes (which are limited to primitive values). The following span events are recorded: -| Event name | Recorded on span | Description | -| ------------------------------- | --------------------------- | -------------------------------------------------------------------------------- | -| `activitypub.activity.received` | `activitypub.inbox` | Records full activity JSON and verification status when an activity is received. | -| `activitypub.activity.sent` | `activitypub.send_activity` | Records delivery details when an activity is sent. | -| `activitypub.delivery.failed` | `activitypub.outbox` | Records queued outbox delivery failure details before retry or abandonment. | -| `activitypub.object.fetched` | `activitypub.lookup_object` | Records full object JSON when successfully fetched. | +| Event name | Recorded on span | Description | +| ------------------------------------------ | --------------------------- | -------------------------------------------------------------------------------- | +| `activitypub.activity.received` | `activitypub.inbox` | Records full activity JSON and verification status when an activity is received. | +| `activitypub.activity.sent` | `activitypub.send_activity` | Records delivery details when an activity is sent. | +| `activitypub.circuit_breaker.held` | `activitypub.outbox` | Records queued outbox deliveries held by an open circuit. | +| `activitypub.circuit_breaker.state_change` | `activitypub.outbox` | Records queued outbox circuit breaker state changes. | +| `activitypub.delivery.failed` | `activitypub.outbox` | Records queued outbox delivery failure details before retry or abandonment. | +| `activitypub.object.fetched` | `activitypub.lookup_object` | Records full object JSON when successfully fetched. | [span events]: https://opentelemetry.io/docs/concepts/signals/traces/#span-events @@ -295,13 +297,29 @@ auditing, store it in your application before delivery and correlate it with **`activitypub.delivery.failed` event attributes:** - - `activitypub.remote.host`: The remote inbox host + - `activitypub.remote.host`: The remote inbox host, including any + non-default port - `activitypub.delivery.attempt`: The zero-based queue delivery attempt - `activitypub.delivery.permanent_failure`: Whether Fedify will abandon the delivery instead of retrying - `http.response.status_code` (optional): The HTTP response status code returned by the remote inbox +**`activitypub.circuit_breaker.state_change` event attributes:** + + - `activitypub.remote.host`: The remote inbox host, including any + non-default port + - `activitypub.circuit_breaker.previous_state`: The previous circuit state + (`closed`, `open`, or `half_open`) + - `activitypub.circuit_breaker.state`: The new circuit state (`closed`, + `open`, or `half_open`) + +**`activitypub.circuit_breaker.held` event attributes:** + + - `activitypub.remote.host`: The remote inbox host, including any + non-default port + - `activitypub.circuit_breaker.state`: The circuit state (`open`) + **`activitypub.object.fetched` event attributes:** - `activitypub.object.type`: The type URI of the fetched object @@ -320,6 +338,7 @@ Fedify records the following OpenTelemetry metrics: | `activitypub.delivery.sent` | Counter | `{attempt}` | Counts outgoing ActivityPub delivery attempts. | | `activitypub.delivery.permanent_failure` | Counter | `{failure}` | Counts outgoing deliveries abandoned as permanent failures. | | `activitypub.delivery.duration` | Histogram | `ms` | Measures outgoing ActivityPub delivery attempt duration. | +| `activitypub.circuit_breaker.state_change` | Counter | `{change}` | Counts queued outbox circuit breaker state changes per remote host. | | `activitypub.inbox.activity` | Counter | `{activity}` | Classifies inbound activities by lifecycle outcome. | | `activitypub.inbox.processing_duration` | Histogram | `ms` | Measures inbox listener processing duration. | | `activitypub.outbox.activity` | Counter | `{activity}` | Classifies outbound activities by lifecycle outcome. | @@ -365,6 +384,10 @@ Fedify records the following OpenTelemetry metrics: : `activitypub.remote.host`, `activitypub.delivery.success`, and `activitypub.activity.type` when Fedify knows the activity type. +`activitypub.circuit_breaker.state_change` +: `activitypub.remote.host` and `activitypub.circuit_breaker.state`. + The state value is one of `closed`, `open`, or `half_open`. + `activitypub.inbox.activity` : `activitypub.processing.result` is always present, and is one of: @@ -576,9 +599,10 @@ Fedify records the following OpenTelemetry metrics: `activitypub.cache.enabled` is always present and is `true` when the caller passed a `KeyCache`, `false` otherwise. `activitypub.remote.host` - is the hostname of the key URL. `http.response.status_code` is - present only when an HTTP response was observed. Key IDs, full key - URLs, and actor IDs are deliberately excluded from these metrics; + is the URL host of the key URL, including any non-default port. + `http.response.status_code` is present only when an HTTP response was + observed. Key IDs, full key URLs, and actor IDs are deliberately + excluded from these metrics; they remain on the `activitypub.fetch_key` span for trace-level investigation. @@ -608,8 +632,9 @@ Fedify records the following OpenTelemetry metrics: surfaces these four values at the loader boundary; `invalid` is reserved for the key lookup metrics, where the parser can decide that a successful HTTP response still does not contain a usable - key. `activitypub.remote.host` records the hostname of the - fetched URL when the URL parses; otherwise it is omitted. + key. `activitypub.remote.host` records the URL host of the + fetched URL, including any non-default port, when the URL parses; + otherwise it is omitted. `activitypub.cache.enabled` is `true` for Fedify's built-in `kvCache()`-backed document and context loaders and `false` for the authenticated document loader; for user-supplied factories Fedify @@ -630,7 +655,8 @@ Fedify records the following OpenTelemetry metrics: when it did not. Cache lookups that bypass the KV cache entirely (preloaded JSON-LD contexts and call sites without a matching cache rule) emit no measurement. `activitypub.remote.host` records the - hostname of the looked-up URL when it parses. + URL host of the looked-up URL, including any non-default port, when + it parses. `activitypub.object.lookup` : `activitypub.lookup.kind` is always present and is one of: @@ -646,13 +672,15 @@ Fedify records the following OpenTelemetry metrics: in a `finally` block, so a thrown error is still counted with `kind=other`. - `activitypub.remote.host` is the hostname extracted from the + `activitypub.remote.host` is the host extracted from the identifier: a parsed `URL`, an `acct:user@host` URI, or a bare - `@user@host` / `user@host` handle. Inputs that do not reduce - cleanly to an authority (paths, query strings, fragments, or - whitespace mixed in with the handle suffix) result in the - attribute being omitted, rather than recording a high-cardinality - value. This counter has no companion histogram: `lookupObject()` + `@user@host` / `user@host` handle. For URL identifiers and + handle authorities, non-default ports are included. Inputs that + do not reduce cleanly to an authority (paths, query strings, + fragments, or whitespace mixed in with the handle suffix) result + in the attribute being omitted, rather than recording a + high-cardinality value. This counter has no companion histogram: + `lookupObject()` drives `activitypub.document.fetch.duration` through the document loader, and emitting another duration here would double-count latency. Use `activitypub.object.lookup` for the parsed-result @@ -670,8 +698,9 @@ Fedify records the following OpenTelemetry metrics: discovery (including `TypeError`s from a malformed alias URL or an invalid `preferredUsername`). - `activitypub.remote.host` records `actor.id.hostname` when known - and is omitted otherwise. Actor IDs and handle strings are + `activitypub.remote.host` records `actor.id.host`, including any + non-default port, when known and is omitted otherwise. Actor IDs + and handle strings are deliberately excluded so attacker-controlled actor data cannot inflate metric cardinality. Per-WebFinger-call failure detail (HTTP status, parse failure, network failure, etc.) lives on @@ -710,10 +739,11 @@ Fedify records the following OpenTelemetry metrics: redirecting to an unusual scheme. The corresponding span attribute (`webfinger.resource.scheme` on the `webfinger.lookup` span) still records the raw scheme for trace-level investigation. - `activitypub.remote.host` records the hostname of the latest URL - Fedify attempted, so an operator can see who actually returned a - failure even after one or more redirects; it is omitted only when - the resource itself was malformed before any URL could be built. + `activitypub.remote.host` records the URL host of the latest URL + Fedify attempted, including any non-default port, so an operator + can see who actually returned a failure even after one or more + redirects; it is omitted only when the resource itself was + malformed before any URL could be built. `http.response.status_code` is recorded only when an HTTP response was observed (including non-2xx errors and redirects that exceeded `maxRedirection`). Full resource URIs, lookup URLs, and remote @@ -831,8 +861,11 @@ or processed-task throughput) remain available on `fedify.queue.task.enqueued` `fedify.queue.task.completed`; the activity-level counters are intentionally not a queue-mechanism replacement. -Fedify records `activitypub.remote.host` as the URL hostname only; ports, paths, -and query strings are deliberately excluded to keep metric cardinality bounded. +Fedify records `activitypub.remote.host` as the URL host: the hostname plus +any non-default port. Paths and query strings are deliberately excluded to +keep metric cardinality bounded, but ports are preserved so distinct services +on the same hostname do not collapse into one metric series or circuit +breaker key. Activity types use the same qualified URI form as Fedify's trace attributes, for example `https://www.w3.org/ns/activitystreams#Create`. @@ -881,74 +914,76 @@ for ActivityPub as of November 2024. However, Fedify provides a set of semantic [attributes] for ActivityPub. The following table shows the semantic attributes for ActivityPub: -| Attribute | Type | Description | Example | -| ---------------------------------------- | -------- | ---------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------- | -| `activitypub.activity.id` | string | The URI of the activity object. | `"https://example.com/activity/1"` | -| `activitypub.activity.type` | string[] | The qualified URI(s) of the activity type(s). | `["https://www.w3.org/ns/activitystreams#Create"]` | -| `activitypub.activity.to` | string[] | The URI(s) of the recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | -| `activitypub.activity.cc` | string[] | The URI(s) of the carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | -| `activitypub.activity.bto` | string[] | The URI(s) of the blind recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | -| `activitypub.activity.bcc` | string[] | The URI(s) of the blind carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | -| `activitypub.activity.retries` | int | The ordinal number of activity resending attempt (if and only if it's retried). | `3` | -| `activitypub.delivery.attempt` | int | The zero-based delivery attempt number for a queued outgoing activity. | `0` | -| `activitypub.delivery.permanent_failure` | boolean | Whether an outgoing delivery failure will be abandoned instead of retried. | `true` | -| `activitypub.processing.result` | string | Lifecycle outcome of an inbox or outbox activity: `queued`, `processed`, `retried`, `rejected`, or `abandoned`. | `"retried"` | -| `activitypub.actor.discovery.result` | string | Terminal outcome of `getActorHandle()`: `resolved`, `not_found`, or `error`. | `"resolved"` | -| `activitypub.actor.id` | string | The URI of the actor object. | `"https://example.com/actor/1"` | -| `activitypub.actor.key.cached` | boolean | Whether the actor's public keys are cached. | `true` | -| `activitypub.actor.type` | string[] | The qualified URI(s) of the actor type(s). | `["https://www.w3.org/ns/activitystreams#Person"]` | -| `activitypub.key.id` | string | The URI of the cryptographic key being verified. | `"https://example.com/actor/1#main-key"` | -| `activitypub.key_ownership.method` | string | The method used to verify key ownership (`owner_id` or `actor_fetch`). | `"actor_fetch"` | -| `activitypub.key_ownership.verified` | boolean | Whether the key ownership was successfully verified. | `true` | -| `activitypub.collection.id` | string | The URI of the collection object. | `"https://example.com/collection/1"` | -| `activitypub.collection.kind` | string | The bounded collection kind: `inbox`, `outbox`, `following`, `followers`, `liked`, `featured`, `featured_tags`, or `custom`. | `"followers"` | -| `activitypub.collection.page` | boolean | Whether the collection request targets a cursor page rather than the collection object. | `false` | -| `activitypub.collection.result` | string | Terminal collection request outcome: `served`, `not_found`, `not_acceptable`, `unauthorized`, or `error`. | `"served"` | -| `activitypub.collection.type` | string[] | The qualified URI(s) of the collection type(s). | `["https://www.w3.org/ns/activitystreams#OrderedCollection"]` | -| `activitypub.collection.total_items` | int | The total number of items in the collection. | `42` | -| `activitypub.object.id` | string | The URI of the object or the object enclosed by the activity. | `"https://example.com/object/1"` | -| `activitypub.object.type` | string[] | The qualified URI(s) of the object type(s). | `["https://www.w3.org/ns/activitystreams#Note"]` | -| `activitypub.object.in_reply_to` | string[] | The URI(s) of the original object to which the object reply. | `["https://example.com/object/1"]` | -| `activitypub.inboxes` | int | The number of inboxes the activity is sent to. | `12` | -| `activitypub.remote.host` | string | The hostname of the remote ActivityPub server. | `"example.com"` | -| `activitypub.shared_inbox` | boolean | Whether the activity is sent to the shared inbox. | `true` | -| `docloader.context_url` | string | The URL of the JSON-LD context document (if provided via Link header). | `"https://www.w3.org/ns/activitystreams"` | -| `docloader.document_url` | string | The final URL of the fetched document (after following redirects). | `"https://example.com/object/1"` | -| `fedify.actor.identifier` | string | The identifier of the actor. | `"1"` | -| `fedify.endpoint` | string | The bounded endpoint category that classified an inbound HTTP request handled by `Federation.fetch()`. | `"actor"` | -| `fedify.route.template` | string | The matched URI Template, with parameter names (not values). | `"/users/{identifier}"` | -| `fedify.inbox.recipient` | string | The identifier of the inbox recipient. | `"1"` | -| `fedify.object.type` | string | The URI of the object type. | `"https://www.w3.org/ns/activitystreams#Note"` | -| `fedify.object.values.{parameter}` | string[] | The argument values of the object dispatcher. | `["1", "2"]` | -| `fedify.collection.dispatcher` | string | The collection dispatcher family: `built_in` or `custom`. | `"built_in"` | -| `fedify.collection.cursor` | string | The cursor of the collection. | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="` | -| `fedify.collection.items` | number | The number of materialized items in the collection response or page. It can be less than the total items. | `10` | -| `fedify.queue.role` | string | The Fedify queue role for the task: `inbox`, `outbox`, or `fanout`. | `"outbox"` | -| `fedify.queue.backend` | string | The queue implementation's constructor name (best-effort backend identifier). | `"RedisMessageQueue"` | -| `fedify.queue.native_retrial` | boolean | Whether the queue backend declares `nativeRetrial`, meaning Fedify defers retry handling to the backend. | `true` | -| `fedify.queue.task.attempt` | int | The zero-based attempt number recorded on `fedify.queue.task.enqueued`; non-zero for retry re-enqueues. | `1` | -| `fedify.queue.task.result` | string | The terminal outcome of queue task processing: `completed`, `failed`, or `aborted`. | `"failed"` | -| `http.redirect.url` | string | The redirect URL when a document fetch results in a redirect. | `"https://example.com/new-location"` | -| `http.response.status_code` | int | The HTTP response status code. | `200` | -| `http_signatures.signature` | string | The signature of the HTTP request in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | -| `http_signatures.algorithm` | string | The algorithm of the HTTP request signature. | `"rsa-sha256"` | -| `http_signatures.key_id` | string | The public key ID of the HTTP request signature. | `"https://example.com/actor/1#main-key"` | -| `http_signatures.verified` | boolean | Whether the HTTP request signature was verified successfully. | `false` | -| `http_signatures.failure_reason` | string | Why HTTP signature verification failed (`noSignature`, `invalidSignature`, or `keyFetchError`). | `"keyFetchError"` | -| `http_signatures.key_fetch_status` | int | The HTTP status code from a failed signing-key fetch, when available. | `410` | -| `http_signatures.key_fetch_error` | string | The error type from a non-HTTP signing-key fetch failure, when available. | `"TypeError"` | -| `http_signatures.digest.{algorithm}` | string | The digest of the HTTP request body in hexadecimal. The `{algorithm}` is the digest algorithm (e.g., `sha`, `sha-256`). | `"d41d8cd98f00b204e9800998ecf8427e"` | -| `ld_signatures.key_id` | string | The public key ID of the Linked Data signature. | `"https://example.com/actor/1#main-key"` | -| `ld_signatures.signature` | string | The signature of the Linked Data in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | -| `ld_signatures.type` | string | The algorithm of the Linked Data signature. | `"RsaSignature2017"` | -| `object_integrity_proofs.cryptosuite` | string | The cryptographic suite of the object integrity proof. | `"eddsa-jcs-2022"` | -| `object_integrity_proofs.key_id` | string | The public key ID of the object integrity proof. | `"https://example.com/actor/1#main-key"` | -| `object_integrity_proofs.signature` | string | The integrity proof of the object in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | -| `url.full` | string | The full URL being fetched by the document loader. | `"https://example.com/actor/1"` | -| `webfinger.handle.result` | string | Terminal outcome of an incoming WebFinger request: `resolved`, `invalid`, `not_found`, `tombstoned`, or `error`. | `"resolved"` | -| `webfinger.lookup.result` | string | Terminal outcome of an outgoing WebFinger lookup: `found`, `not_found`, `invalid`, `network_error`, or `error`. | `"found"` | -| `webfinger.resource` | string | The queried resource URI. | `"acct:fedify@hollo.social"` | -| `webfinger.resource.scheme` | string | The scheme of the queried resource URI. Metric attribute is bucketed to `acct`, `http`, `https`, `mailto`, or `other`. | `"acct"` | +| Attribute | Type | Description | Example | +| -------------------------------------------- | -------- | ---------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------- | +| `activitypub.activity.id` | string | The URI of the activity object. | `"https://example.com/activity/1"` | +| `activitypub.activity.type` | string[] | The qualified URI(s) of the activity type(s). | `["https://www.w3.org/ns/activitystreams#Create"]` | +| `activitypub.activity.to` | string[] | The URI(s) of the recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | +| `activitypub.activity.cc` | string[] | The URI(s) of the carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | +| `activitypub.activity.bto` | string[] | The URI(s) of the blind recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | +| `activitypub.activity.bcc` | string[] | The URI(s) of the blind carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | +| `activitypub.activity.retries` | int | The ordinal number of activity resending attempt (if and only if it's retried). | `3` | +| `activitypub.delivery.attempt` | int | The zero-based delivery attempt number for a queued outgoing activity. | `0` | +| `activitypub.delivery.permanent_failure` | boolean | Whether an outgoing delivery failure will be abandoned instead of retried. | `true` | +| `activitypub.circuit_breaker.previous_state` | string | Previous queued outbox circuit breaker state: `closed`, `open`, or `half_open`. | `"closed"` | +| `activitypub.circuit_breaker.state` | string | Current queued outbox circuit breaker state: `closed`, `open`, or `half_open`. | `"open"` | +| `activitypub.processing.result` | string | Lifecycle outcome of an inbox or outbox activity: `queued`, `processed`, `retried`, `rejected`, or `abandoned`. | `"retried"` | +| `activitypub.actor.discovery.result` | string | Terminal outcome of `getActorHandle()`: `resolved`, `not_found`, or `error`. | `"resolved"` | +| `activitypub.actor.id` | string | The URI of the actor object. | `"https://example.com/actor/1"` | +| `activitypub.actor.key.cached` | boolean | Whether the actor's public keys are cached. | `true` | +| `activitypub.actor.type` | string[] | The qualified URI(s) of the actor type(s). | `["https://www.w3.org/ns/activitystreams#Person"]` | +| `activitypub.key.id` | string | The URI of the cryptographic key being verified. | `"https://example.com/actor/1#main-key"` | +| `activitypub.key_ownership.method` | string | The method used to verify key ownership (`owner_id` or `actor_fetch`). | `"actor_fetch"` | +| `activitypub.key_ownership.verified` | boolean | Whether the key ownership was successfully verified. | `true` | +| `activitypub.collection.id` | string | The URI of the collection object. | `"https://example.com/collection/1"` | +| `activitypub.collection.kind` | string | The bounded collection kind: `inbox`, `outbox`, `following`, `followers`, `liked`, `featured`, `featured_tags`, or `custom`. | `"followers"` | +| `activitypub.collection.page` | boolean | Whether the collection request targets a cursor page rather than the collection object. | `false` | +| `activitypub.collection.result` | string | Terminal collection request outcome: `served`, `not_found`, `not_acceptable`, `unauthorized`, or `error`. | `"served"` | +| `activitypub.collection.type` | string[] | The qualified URI(s) of the collection type(s). | `["https://www.w3.org/ns/activitystreams#OrderedCollection"]` | +| `activitypub.collection.total_items` | int | The total number of items in the collection. | `42` | +| `activitypub.object.id` | string | The URI of the object or the object enclosed by the activity. | `"https://example.com/object/1"` | +| `activitypub.object.type` | string[] | The qualified URI(s) of the object type(s). | `["https://www.w3.org/ns/activitystreams#Note"]` | +| `activitypub.object.in_reply_to` | string[] | The URI(s) of the original object to which the object reply. | `["https://example.com/object/1"]` | +| `activitypub.inboxes` | int | The number of inboxes the activity is sent to. | `12` | +| `activitypub.remote.host` | string | The host of the remote ActivityPub server, including any non-default port. | `"example.com:8443"` | +| `activitypub.shared_inbox` | boolean | Whether the activity is sent to the shared inbox. | `true` | +| `docloader.context_url` | string | The URL of the JSON-LD context document (if provided via Link header). | `"https://www.w3.org/ns/activitystreams"` | +| `docloader.document_url` | string | The final URL of the fetched document (after following redirects). | `"https://example.com/object/1"` | +| `fedify.actor.identifier` | string | The identifier of the actor. | `"1"` | +| `fedify.endpoint` | string | The bounded endpoint category that classified an inbound HTTP request handled by `Federation.fetch()`. | `"actor"` | +| `fedify.route.template` | string | The matched URI Template, with parameter names (not values). | `"/users/{identifier}"` | +| `fedify.inbox.recipient` | string | The identifier of the inbox recipient. | `"1"` | +| `fedify.object.type` | string | The URI of the object type. | `"https://www.w3.org/ns/activitystreams#Note"` | +| `fedify.object.values.{parameter}` | string[] | The argument values of the object dispatcher. | `["1", "2"]` | +| `fedify.collection.dispatcher` | string | The collection dispatcher family: `built_in` or `custom`. | `"built_in"` | +| `fedify.collection.cursor` | string | The cursor of the collection. | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="` | +| `fedify.collection.items` | number | The number of materialized items in the collection response or page. It can be less than the total items. | `10` | +| `fedify.queue.role` | string | The Fedify queue role for the task: `inbox`, `outbox`, or `fanout`. | `"outbox"` | +| `fedify.queue.backend` | string | The queue implementation's constructor name (best-effort backend identifier). | `"RedisMessageQueue"` | +| `fedify.queue.native_retrial` | boolean | Whether the queue backend declares `nativeRetrial`, meaning Fedify defers retry handling to the backend. | `true` | +| `fedify.queue.task.attempt` | int | The zero-based attempt number recorded on `fedify.queue.task.enqueued`; non-zero for retry re-enqueues. | `1` | +| `fedify.queue.task.result` | string | The terminal outcome of queue task processing: `completed`, `failed`, or `aborted`. | `"failed"` | +| `http.redirect.url` | string | The redirect URL when a document fetch results in a redirect. | `"https://example.com/new-location"` | +| `http.response.status_code` | int | The HTTP response status code. | `200` | +| `http_signatures.signature` | string | The signature of the HTTP request in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | +| `http_signatures.algorithm` | string | The algorithm of the HTTP request signature. | `"rsa-sha256"` | +| `http_signatures.key_id` | string | The public key ID of the HTTP request signature. | `"https://example.com/actor/1#main-key"` | +| `http_signatures.verified` | boolean | Whether the HTTP request signature was verified successfully. | `false` | +| `http_signatures.failure_reason` | string | Why HTTP signature verification failed (`noSignature`, `invalidSignature`, or `keyFetchError`). | `"keyFetchError"` | +| `http_signatures.key_fetch_status` | int | The HTTP status code from a failed signing-key fetch, when available. | `410` | +| `http_signatures.key_fetch_error` | string | The error type from a non-HTTP signing-key fetch failure, when available. | `"TypeError"` | +| `http_signatures.digest.{algorithm}` | string | The digest of the HTTP request body in hexadecimal. The `{algorithm}` is the digest algorithm (e.g., `sha`, `sha-256`). | `"d41d8cd98f00b204e9800998ecf8427e"` | +| `ld_signatures.key_id` | string | The public key ID of the Linked Data signature. | `"https://example.com/actor/1#main-key"` | +| `ld_signatures.signature` | string | The signature of the Linked Data in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | +| `ld_signatures.type` | string | The algorithm of the Linked Data signature. | `"RsaSignature2017"` | +| `object_integrity_proofs.cryptosuite` | string | The cryptographic suite of the object integrity proof. | `"eddsa-jcs-2022"` | +| `object_integrity_proofs.key_id` | string | The public key ID of the object integrity proof. | `"https://example.com/actor/1#main-key"` | +| `object_integrity_proofs.signature` | string | The integrity proof of the object in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | +| `url.full` | string | The full URL being fetched by the document loader. | `"https://example.com/actor/1"` | +| `webfinger.handle.result` | string | Terminal outcome of an incoming WebFinger request: `resolved`, `invalid`, `not_found`, `tombstoned`, or `error`. | `"resolved"` | +| `webfinger.lookup.result` | string | Terminal outcome of an outgoing WebFinger lookup: `found`, `not_found`, `invalid`, `network_error`, or `error`. | `"found"` | +| `webfinger.resource` | string | The queried resource URI. | `"acct:fedify@hollo.social"` | +| `webfinger.resource.scheme` | string | The scheme of the queried resource URI. Metric attribute is bucketed to `acct`, `http`, `https`, `mailto`, or `other`. | `"acct"` | [attributes]: https://opentelemetry.io/docs/specs/otel/common/#attribute [OpenTelemetry Semantic Conventions]: https://opentelemetry.io/docs/specs/semconv/ diff --git a/packages/fedify/src/federation/callback.ts b/packages/fedify/src/federation/callback.ts index 1643532e5..f0c767acd 100644 --- a/packages/fedify/src/federation/callback.ts +++ b/packages/fedify/src/federation/callback.ts @@ -315,6 +315,16 @@ export type OutboxErrorHandler = ( export type OutboxPermanentFailureHandler = ( context: Context, values: { + /** + * Why Fedify is giving up on delivery. + * + * `"http"` means the inbox returned a configured permanent-failure HTTP + * status. `"circuit-breaker-ttl"` means the outbound circuit breaker held + * the activity until its retention period expired. + * + * @since 2.3.0 + */ + readonly reason: "http" | "circuit-breaker-ttl"; /** The inbox URL that failed. */ readonly inbox: URL; /** The activity that failed to deliver. */ @@ -323,6 +333,13 @@ export type OutboxPermanentFailureHandler = ( readonly error: SendActivityError; /** The HTTP status code returned by the inbox. */ readonly statusCode: number; + /** + * The time when the circuit breaker first held the activity, if + * {@link reason} is `"circuit-breaker-ttl"`. + * + * @since 2.3.0 + */ + readonly circuitHeldSince?: Temporal.Instant; /** * The actor IDs that were supposed to receive the activity at this inbox. */ diff --git a/packages/fedify/src/federation/circuit-breaker.test.ts b/packages/fedify/src/federation/circuit-breaker.test.ts new file mode 100644 index 000000000..f1d8cc3cd --- /dev/null +++ b/packages/fedify/src/federation/circuit-breaker.test.ts @@ -0,0 +1,542 @@ +import { test } from "@fedify/fixture"; +import { assertEquals, assertThrows } from "@std/assert"; +import { + CircuitBreaker, + normalizeCircuitBreakerOptions, + parseCircuitBreakerKvState, +} from "./circuit-breaker.ts"; +import { type KvKey, type KvStoreSetOptions, MemoryKvStore } from "./kv.ts"; + +class AlwaysConflictingKvStore extends MemoryKvStore { + attempts = 0; + + override cas( + _key: KvKey, + _expectedValue: unknown, + _newValue: unknown, + _options?: KvStoreSetOptions, + ): Promise { + this.attempts++; + if (this.attempts > 10) { + throw new Error("beforeSend did not stop retrying CAS misses"); + } + return Promise.resolve(false); + } +} + +class CountingCasKvStore extends MemoryKvStore { + attempts = 0; + + override cas( + key: KvKey, + expectedValue: unknown, + newValue: unknown, + options?: KvStoreSetOptions, + ): Promise { + this.attempts++; + return super.cas(key, expectedValue, newValue, options); + } +} + +test("normalizeCircuitBreakerOptions() uses numeric failure policy", () => { + const options = normalizeCircuitBreakerOptions({ + failureThreshold: 3, + failureWindow: { minutes: 10 }, + }); + const failures = [ + Temporal.Instant.from("2026-05-25T00:00:00Z"), + Temporal.Instant.from("2026-05-25T00:05:00Z"), + Temporal.Instant.from("2026-05-25T00:10:00Z"), + ]; + assertEquals(options.failure(failures.slice(0, 2)), false); + assertEquals(options.failure(failures), true); + assertEquals( + options.failure([ + Temporal.Instant.from("2026-05-25T00:00:00Z"), + Temporal.Instant.from("2026-05-25T00:11:00Z"), + Temporal.Instant.from("2026-05-25T00:12:00Z"), + ]), + false, + ); + assertEquals( + options.pruneFailures( + [ + Temporal.Instant.from("2026-05-25T00:00:00Z"), + Temporal.Instant.from("2026-05-25T00:09:00Z"), + Temporal.Instant.from("2026-05-25T00:10:00Z"), + Temporal.Instant.from("2026-05-25T00:11:00Z"), + Temporal.Instant.from("2026-05-25T00:12:00Z"), + ], + Temporal.Instant.from("2026-05-25T00:12:00Z"), + ).map((t) => t.toString()), + [ + "2026-05-25T00:10:00Z", + "2026-05-25T00:11:00Z", + "2026-05-25T00:12:00Z", + ], + ); +}); + +test("normalizeCircuitBreakerOptions() validates numeric failure policy", () => { + assertThrows( + () => normalizeCircuitBreakerOptions({ failureThreshold: 0 }), + TypeError, + "failureThreshold", + ); + assertThrows( + () => normalizeCircuitBreakerOptions({ failureThreshold: 1.5 }), + TypeError, + "failureThreshold", + ); +}); + +test("normalizeCircuitBreakerOptions() truncates sub-millisecond durations", () => { + const options = normalizeCircuitBreakerOptions({ + recoveryDelay: { milliseconds: 1, nanoseconds: 500_000 }, + }); + assertEquals( + options.recoveryDelay, + Temporal.Duration.from({ milliseconds: 1 }), + ); +}); + +test("normalizeCircuitBreakerOptions() validates positive durations", () => { + assertThrows( + () => normalizeCircuitBreakerOptions({ recoveryDelay: { seconds: 0 } }), + RangeError, + "recoveryDelay", + ); + assertThrows( + () => normalizeCircuitBreakerOptions({ heldActivityTtl: { seconds: 0 } }), + RangeError, + "heldActivityTtl", + ); + assertThrows( + () => normalizeCircuitBreakerOptions({ failureWindow: { seconds: 0 } }), + RangeError, + "failureWindow", + ); + assertThrows( + () => normalizeCircuitBreakerOptions({ releaseInterval: { seconds: 0 } }), + RangeError, + "releaseInterval", + ); + assertThrows( + () => + normalizeCircuitBreakerOptions({ + releaseInterval: { nanoseconds: 500_000 }, + }), + RangeError, + "releaseInterval", + ); +}); + +test("normalizeCircuitBreakerOptions() accepts callback failure policy", () => { + const options = normalizeCircuitBreakerOptions({ + failure: (timestamps) => timestamps.length >= 2, + }); + const base = Temporal.Instant.from("2026-05-25T00:00:00Z"); + const failures = Array.from( + { length: 105 }, + (_, i) => base.add({ minutes: i }), + ); + assertEquals( + options.failure([Temporal.Instant.from("2026-05-25T00:00:00Z")]), + false, + ); + assertEquals( + options.failure([ + Temporal.Instant.from("2026-05-25T00:00:00Z"), + Temporal.Instant.from("2026-05-25T00:01:00Z"), + ]), + true, + ); + assertEquals( + options.pruneFailures( + failures, + base.add({ minutes: 105 }), + ).map((t) => t.toString()), + failures.slice(-100).map((t) => t.toString()), + ); +}); + +test("parseCircuitBreakerKvState() validates stored shape", () => { + assertEquals( + parseCircuitBreakerKvState({ + state: "open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + halfOpened: "2026-05-25T00:00:00Z", + }), + { + state: "open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + halfOpened: "2026-05-25T00:00:00Z", + }, + ); + assertEquals(parseCircuitBreakerKvState({ state: "open" }), undefined); + assertEquals( + parseCircuitBreakerKvState({ state: "other", failures: [] }), + undefined, + ); + assertEquals( + parseCircuitBreakerKvState({ state: "open", failures: [], opened: 1 }), + undefined, + ); + assertEquals( + parseCircuitBreakerKvState({ + state: "open", + failures: ["not an instant"], + }), + undefined, + ); + assertEquals( + parseCircuitBreakerKvState({ + state: "open", + failures: [], + halfOpened: "not an instant", + }), + undefined, + ); +}); + +test("CircuitBreaker opens, probes, closes, and drops held activities", async () => { + const kv = new MemoryKvStore(); + let now = Temporal.Instant.from("2026-05-25T00:00:00Z"); + const transitions: string[] = []; + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { + failureThreshold: 2, + failureWindow: { minutes: 10 }, + recoveryDelay: { minutes: 30 }, + heldActivityTtl: { days: 7 }, + onStateChange(host, previousState, newState) { + transitions.push(`${host}:${previousState}->${newState}`); + }, + }, + }); + + await circuit.recordFailure("remote.example"); + assertEquals(await circuit.getState("remote.example"), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + + now = Temporal.Instant.from("2026-05-25T00:05:00Z"); + await circuit.recordFailure("remote.example"); + assertEquals(await circuit.getState("remote.example"), { + state: "open", + failures: [ + "2026-05-25T00:00:00Z", + "2026-05-25T00:05:00Z", + ], + opened: "2026-05-25T00:05:00Z", + }); + assertEquals(transitions, ["remote.example:closed->open"]); + + let decision = await circuit.beforeSend("remote.example", {}); + assertEquals(decision, { + type: "hold", + delay: Temporal.Duration.from({ minutes: 30 }), + heldSince: now, + state: "open", + }); + + now = Temporal.Instant.from("2026-05-25T00:35:00Z"); + decision = await circuit.beforeSend("remote.example", {}); + assertEquals(decision, { + type: "send", + probe: true, + stateChange: { previousState: "open", newState: "half-open" }, + }); + assertEquals(await circuit.getState("remote.example"), { + state: "half-open", + failures: [ + "2026-05-25T00:00:00Z", + "2026-05-25T00:05:00Z", + ], + opened: "2026-05-25T00:05:00Z", + halfOpened: "2026-05-25T00:35:00Z", + }); + + await circuit.recordSuccess("remote.example"); + assertEquals(await circuit.getState("remote.example"), undefined); + assertEquals(transitions, [ + "remote.example:closed->open", + "remote.example:open->half-open", + "remote.example:half-open->closed", + ]); + + decision = await circuit.beforeSend("remote.example", { + circuitHeldSince: "2026-05-17T00:00:00Z", + }); + assertEquals(decision, { + type: "drop", + heldSince: Temporal.Instant.from("2026-05-17T00:00:00Z"), + }); + + await kv.set(["_fedify", "circuit", "remote.example"], { + state: "open", + failures: [ + "2026-05-25T00:00:00Z", + "2026-05-25T00:05:00Z", + ], + opened: "2026-05-25T00:05:00Z", + }); + decision = await circuit.beforeSend("remote.example", { + circuitHeldSince: "2026-05-17T00:00:00Z", + }); + assertEquals(decision, { + type: "drop", + heldSince: Temporal.Instant.from("2026-05-17T00:00:00Z"), + }); +}); + +test("CircuitBreaker recovers stale half-open probes", async () => { + const kv = new MemoryKvStore(); + let now = Temporal.Instant.from("2026-05-25T00:00:00Z"); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { + recoveryDelay: { seconds: 30 }, + releaseInterval: { seconds: 5 }, + }, + }); + + await kv.set(["_fedify", "circuit", "remote.example"], { + state: "half-open", + failures: ["2026-05-24T23:00:00Z"], + opened: "2026-05-24T23:00:00Z", + halfOpened: "2026-05-24T23:59:54Z", + }); + + let decision = await circuit.beforeSend("remote.example", {}); + assertEquals(decision, { + type: "hold", + state: "half-open", + delay: Temporal.Duration.from({ seconds: 5 }), + heldSince: now, + }); + assertEquals(await circuit.getState("remote.example"), { + state: "half-open", + failures: ["2026-05-24T23:00:00Z"], + opened: "2026-05-24T23:00:00Z", + halfOpened: "2026-05-24T23:59:54Z", + }); + + now = Temporal.Instant.from("2026-05-25T00:00:30Z"); + decision = await circuit.beforeSend("remote.example", {}); + assertEquals(decision, { type: "send", probe: true }); + assertEquals(await circuit.getState("remote.example"), { + state: "half-open", + failures: ["2026-05-24T23:00:00Z"], + opened: "2026-05-24T23:00:00Z", + halfOpened: "2026-05-25T00:00:30Z", + }); +}); + +test("CircuitBreaker caps held delays at activity TTL", async () => { + const kv = new MemoryKvStore(); + const now = Temporal.Instant.from("2026-05-25T00:05:00Z"); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { + recoveryDelay: { minutes: 30 }, + heldActivityTtl: { minutes: 10 }, + releaseInterval: { minutes: 10 }, + }, + }); + + await kv.set(["_fedify", "circuit", "new-open.example"], { + state: "open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + }); + let decision = await circuit.beforeSend("new-open.example", {}); + assertEquals(decision.type, "hold"); + if (decision.type === "hold") { + assertEquals(decision.state, "open"); + assertEquals(decision.delay.total({ unit: "minute" }), 10); + assertEquals(decision.heldSince.toString(), "2026-05-25T00:05:00Z"); + } + + await kv.set(["_fedify", "circuit", "open.example"], { + state: "open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + }); + decision = await circuit.beforeSend("open.example", { + circuitHeldSince: "2026-05-25T00:00:00Z", + }); + assertEquals(decision.type, "hold"); + if (decision.type === "hold") { + assertEquals(decision.state, "open"); + assertEquals(decision.delay.total({ unit: "minute" }), 5); + assertEquals(decision.heldSince.toString(), "2026-05-25T00:00:00Z"); + } + + await kv.set(["_fedify", "circuit", "half-open.example"], { + state: "half-open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + halfOpened: "2026-05-25T00:00:00Z", + }); + decision = await circuit.beforeSend("half-open.example", { + circuitHeldSince: "2026-05-25T00:00:00Z", + }); + assertEquals(decision.type, "hold"); + if (decision.type === "hold") { + assertEquals(decision.state, "half-open"); + assertEquals(decision.delay.total({ unit: "minute" }), 5); + assertEquals(decision.heldSince.toString(), "2026-05-25T00:00:00Z"); + } +}); + +test("CircuitBreaker ignores malformed held timestamps", async () => { + const kv = new MemoryKvStore(); + const now = Temporal.Instant.from("2026-05-25T00:05:00Z"); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { recoveryDelay: { minutes: 30 } }, + }); + + await kv.set(["_fedify", "circuit", "malformed-held.example"], { + state: "open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + }); + + const decision = await circuit.beforeSend("malformed-held.example", { + circuitHeldSince: "not an instant", + }); + + assertEquals(decision, { + type: "hold", + state: "open", + delay: Temporal.Duration.from({ minutes: 25 }), + heldSince: now, + }); +}); + +test("CircuitBreaker bounds beforeSend CAS retries", async () => { + let kv = new AlwaysConflictingKvStore(); + const now = Temporal.Instant.from("2026-05-25T00:30:00Z"); + let circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { + recoveryDelay: { minutes: 30 }, + releaseInterval: { seconds: 5 }, + }, + }); + await kv.set(["_fedify", "circuit", "open.example"], { + state: "open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + }); + + let decision = await circuit.beforeSend("open.example", {}); + assertEquals(kv.attempts, 10); + assertEquals(decision, { + type: "hold", + state: "open", + delay: Temporal.Duration.from({ seconds: 5 }), + heldSince: now, + }); + + kv = new AlwaysConflictingKvStore(); + circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { + recoveryDelay: { minutes: 30 }, + releaseInterval: { seconds: 5 }, + }, + }); + await kv.set(["_fedify", "circuit", "half-open.example"], { + state: "half-open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + halfOpened: "2026-05-25T00:00:00Z", + }); + + decision = await circuit.beforeSend("half-open.example", {}); + assertEquals(kv.attempts, 10); + assertEquals(decision, { + type: "hold", + state: "half-open", + delay: Temporal.Duration.from({ seconds: 5 }), + heldSince: now, + }); +}); + +test("CircuitBreaker skips recording failures for open circuits", async () => { + const kv = new CountingCasKvStore(); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => Temporal.Instant.from("2026-05-25T00:01:00Z"), + }); + await kv.set(["_fedify", "circuit", "open.example"], { + state: "open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + }); + + assertEquals(await circuit.recordFailure("open.example"), undefined); + assertEquals(kv.attempts, 0); + assertEquals( + await kv.get(["_fedify", "circuit", "open.example"]), + { + state: "open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + }, + ); +}); + +test("CircuitBreaker prunes stale closed failure history", async () => { + const kv = new MemoryKvStore(); + let now = Temporal.Instant.from("2026-05-25T00:00:00Z"); + const circuit = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { + failureThreshold: 2, + failureWindow: { minutes: 10 }, + }, + }); + + await circuit.recordFailure("sporadic.example"); + assertEquals(await circuit.getState("sporadic.example"), { + state: "closed", + failures: ["2026-05-25T00:00:00Z"], + }); + + now = Temporal.Instant.from("2026-05-25T00:20:00Z"); + await circuit.recordFailure("sporadic.example"); + assertEquals(await circuit.getState("sporadic.example"), { + state: "closed", + failures: ["2026-05-25T00:20:00Z"], + }); + + now = Temporal.Instant.from("2026-05-25T00:40:00Z"); + await circuit.recordFailure("sporadic.example"); + assertEquals(await circuit.getState("sporadic.example"), { + state: "closed", + failures: ["2026-05-25T00:40:00Z"], + }); +}); diff --git a/packages/fedify/src/federation/circuit-breaker.ts b/packages/fedify/src/federation/circuit-breaker.ts new file mode 100644 index 000000000..4fd13959a --- /dev/null +++ b/packages/fedify/src/federation/circuit-breaker.ts @@ -0,0 +1,619 @@ +import { getLogger } from "@logtape/logtape"; +import type { Activity } from "@fedify/vocab"; +import type { KvKey, KvStore } from "./kv.ts"; + +/** + * The state of a remote host circuit breaker. + * @since 2.3.0 + */ +export type CircuitBreakerState = "closed" | "open" | "half-open"; + +/** + * The JSON-serializable state stored in the configured {@link KvStore}. + * @since 2.3.0 + */ +export interface CircuitBreakerKvState { + readonly state: CircuitBreakerState; + readonly failures: readonly string[]; + readonly opened?: string; + readonly halfOpened?: string; +} + +/** + * Details passed to {@link CircuitBreakerOptions.onActivityDrop} when a held + * activity expires before the remote host recovers. + * @since 2.3.0 + */ +export interface CircuitBreakerActivityDrop { + /** The inbox URL that would have received the activity. */ + readonly inbox: URL; + /** The activity that was dropped. */ + readonly activity: Activity; + /** The activity ID, when known. */ + readonly activityId?: string; + /** The activity type. */ + readonly activityType: string; + /** The actor IDs represented by this inbox. */ + readonly actorIds: readonly URL[]; + /** The time when Fedify first held this activity. */ + readonly heldSince: Temporal.Instant; +} + +/** + * Configures how a remote host circuit opens after repeated delivery + * failures. + * @since 2.3.0 + */ +export type CircuitBreakerFailurePolicy = + | { + failure(timestamps: readonly Temporal.Instant[]): boolean; + readonly failureThreshold?: never; + readonly failureWindow?: never; + } + | { + readonly failure?: never; + readonly failureThreshold?: number; + readonly failureWindow?: Temporal.Duration | Temporal.DurationLike; + }; + +/** + * Options for Fedify's outbound activity circuit breaker. + * @since 2.3.0 + */ +export type CircuitBreakerOptions = CircuitBreakerFailurePolicy & { + /** + * How long an open circuit waits before allowing a half-open recovery probe. + * @default `{ minutes: 30 }` + */ + readonly recoveryDelay?: Temporal.Duration | Temporal.DurationLike; + + /** + * How long Fedify keeps requeueing activities held by an open circuit before + * dropping them. + * @default `{ days: 7 }` + */ + readonly heldActivityTtl?: Temporal.Duration | Temporal.DurationLike; + + /** + * How often other held activities retry while a half-open probe is in + * flight. The probe is treated as stale after the recovery delay. + * @default `{ seconds: 1 }` + */ + readonly releaseInterval?: Temporal.Duration | Temporal.DurationLike; + + /** + * Called whenever the circuit state changes. + */ + readonly onStateChange?: ( + remoteHost: string, + previousState: CircuitBreakerState, + newState: CircuitBreakerState, + ) => void | Promise; + + /** + * Called when an activity held by the circuit breaker expires. + */ + readonly onActivityDrop?: ( + remoteHost: string, + details: CircuitBreakerActivityDrop, + ) => void | Promise; +}; + +/** + * Normalized circuit breaker options used internally by Fedify. + * @internal + */ +export interface NormalizedCircuitBreakerOptions { + readonly failure: (timestamps: readonly Temporal.Instant[]) => boolean; + readonly pruneFailures: ( + timestamps: readonly Temporal.Instant[], + now: Temporal.Instant, + ) => readonly Temporal.Instant[]; + readonly recoveryDelay: Temporal.Duration; + readonly heldActivityTtl: Temporal.Duration; + readonly releaseInterval: Temporal.Duration; + readonly onStateChange?: CircuitBreakerOptions["onStateChange"]; + readonly onActivityDrop?: CircuitBreakerOptions["onActivityDrop"]; +} + +const MAX_CUSTOM_FAILURE_HISTORY = 100; + +/** + * Constructor options for {@link CircuitBreaker}. + * @internal + */ +export interface CircuitBreakerCreateOptions { + readonly kv: KvStore; + readonly prefix: KvKey; + readonly options?: CircuitBreakerOptions; + readonly now?: () => Temporal.Instant; + /** + * Observes state changes after user callbacks have run. + * @internal + */ + readonly stateChangeObserver?: ( + remoteHost: string, + previousState: CircuitBreakerState, + newState: CircuitBreakerState, + ) => void | Promise; +} + +/** + * The delivery decision returned by {@link CircuitBreaker.beforeSend}. + * @internal + */ +export type CircuitBreakerBeforeSendDecision = + | { + readonly type: "send"; + readonly probe: boolean; + readonly stateChange?: CircuitBreakerStateChange; + } + | { + readonly type: "hold"; + readonly state: "open" | "half-open"; + readonly delay: Temporal.Duration; + readonly heldSince: Temporal.Instant; + } + | { readonly type: "drop"; readonly heldSince: Temporal.Instant }; + +/** + * A circuit breaker state transition. + * @since 2.3.0 + */ +export interface CircuitBreakerStateChange { + readonly previousState: CircuitBreakerState; + readonly newState: CircuitBreakerState; +} + +/** + * Tracks reachability state for remote outbox delivery hosts. + * @since 2.3.0 + */ +export class CircuitBreaker { + readonly #kv: KvStore; + readonly #prefix: KvKey; + readonly #options: NormalizedCircuitBreakerOptions; + readonly #now: () => Temporal.Instant; + readonly #stateChangeObserver: + | CircuitBreakerCreateOptions["stateChangeObserver"] + | undefined; + + constructor(options: CircuitBreakerCreateOptions) { + this.#kv = options.kv; + this.#prefix = options.prefix; + this.#options = normalizeCircuitBreakerOptions(options.options ?? {}); + this.#now = options.now ?? (() => Temporal.Now.instant()); + this.#stateChangeObserver = options.stateChangeObserver; + } + + get options(): NormalizedCircuitBreakerOptions { + return this.#options; + } + + capHeldDelay( + heldSince: Temporal.Instant, + delay: Temporal.Duration, + ): Temporal.Duration { + const now = this.#now(); + return now.until( + this.#capHeldRetryAt(now, heldSince, now.add(delay)), + ); + } + + async beforeSend( + remoteHost: string, + message: { readonly circuitHeldSince?: string }, + ): Promise { + const heldSince = parseHeldSince(message.circuitHeldSince); + const now = this.#now(); + if ( + heldSince != null && + Temporal.Instant.compare( + heldSince.add(this.#options.heldActivityTtl), + now, + ) <= + 0 + ) { + return { type: "drop", heldSince }; + } + let lastConflictingState: "open" | "half-open" | undefined; + + for (let attempt = 0; attempt < 10; attempt++) { + const oldState = await this.#get(remoteHost); + if (oldState == null || oldState.state === "closed") { + return { type: "send", probe: false }; + } + if (oldState.state === "half-open") { + const halfOpened = oldState.halfOpened == null + ? undefined + : Temporal.Instant.from(oldState.halfOpened); + if (halfOpened != null) { + const staleAt = halfOpened.add(this.#options.recoveryDelay); + if (Temporal.Instant.compare(now, staleAt) < 0) { + const releaseAt = now.add(this.#options.releaseInterval); + const retryAt = Temporal.Instant.compare(releaseAt, staleAt) < 0 + ? releaseAt + : staleAt; + const cappedRetryAt = this.#capHeldRetryAt( + now, + heldSince, + retryAt, + ); + return { + type: "hold", + state: "half-open", + delay: now.until(cappedRetryAt), + heldSince: heldSince ?? now, + }; + } + } + const newState = { + ...oldState, + state: "half-open", + halfOpened: now.toString(), + } satisfies CircuitBreakerKvState; + if (await this.#replace(remoteHost, oldState, newState)) { + return { type: "send", probe: true }; + } + lastConflictingState = "half-open"; + continue; + } + + const opened = oldState.opened == null + ? now + : Temporal.Instant.from(oldState.opened); + const probeAt = opened.add(this.#options.recoveryDelay); + if (Temporal.Instant.compare(now, probeAt) < 0) { + const retryAt = this.#capHeldRetryAt(now, heldSince, probeAt); + return { + type: "hold", + state: "open", + delay: now.until(retryAt), + heldSince: heldSince ?? now, + }; + } + + const newState = { + ...oldState, + state: "half-open", + halfOpened: now.toString(), + } satisfies CircuitBreakerKvState; + if (await this.#replace(remoteHost, oldState, newState)) { + await this.#notifyStateChange(remoteHost, "open", "half-open"); + return { + type: "send", + probe: true, + stateChange: { previousState: "open", newState: "half-open" }, + }; + } + lastConflictingState = "open"; + } + if (lastConflictingState != null) { + const retryAt = this.#capHeldRetryAt( + now, + heldSince, + now.add(this.#options.releaseInterval), + ); + return { + type: "hold", + state: lastConflictingState, + delay: now.until(retryAt), + heldSince: heldSince ?? now, + }; + } + throw new Error(`Failed to update circuit breaker state for ${remoteHost}`); + } + + async recordSuccess( + remoteHost: string, + ): Promise { + for (let attempt = 0; attempt < 10; attempt++) { + const oldState = await this.#get(remoteHost); + if (oldState == null) return undefined; + if (await this.#replace(remoteHost, oldState, undefined)) { + if (oldState.state !== "closed") { + await this.#notifyStateChange(remoteHost, oldState.state, "closed"); + return { + previousState: oldState.state, + newState: "closed", + }; + } + return undefined; + } + } + throw new Error(`Failed to update circuit breaker state for ${remoteHost}`); + } + + async recordReachableFailure( + remoteHost: string, + ): Promise { + return await this.recordSuccess(remoteHost); + } + + async recordFailure( + remoteHost: string, + ): Promise { + const now = this.#now(); + for (let attempt = 0; attempt < 10; attempt++) { + const oldState = await this.#get(remoteHost); + if (oldState?.state === "open") return undefined; + const oldFailures = oldState?.failures.map(Temporal.Instant.from) ?? []; + const failures = this.#options.pruneFailures( + [...oldFailures, now], + now, + ); + let newState: CircuitBreakerKvState; + let transition: [CircuitBreakerState, CircuitBreakerState] | undefined; + if ( + oldState?.state === "half-open" || this.#options.failure(failures) + ) { + newState = { + state: "open", + failures: failures.map((t) => t.toString()), + opened: now.toString(), + }; + transition = [oldState?.state ?? "closed", "open"]; + } else { + newState = { + state: "closed", + failures: failures.map((t) => t.toString()), + }; + } + if (await this.#replace(remoteHost, oldState, newState)) { + if (transition != null) { + await this.#notifyStateChange( + remoteHost, + transition[0], + transition[1], + ); + return { + previousState: transition[0], + newState: transition[1], + }; + } + return undefined; + } + } + throw new Error(`Failed to update circuit breaker state for ${remoteHost}`); + } + + async dropActivity( + remoteHost: string, + details: CircuitBreakerActivityDrop, + ): Promise { + try { + await this.#options.onActivityDrop?.(remoteHost, details); + } catch (error) { + getLogger(["fedify", "federation", "circuit"]).error( + "An unexpected error occurred in circuit breaker activity drop " + + "handler:\n{error}", + { remoteHost, error }, + ); + } + } + + async getState( + remoteHost: string, + ): Promise { + return await this.#get(remoteHost); + } + + #key(remoteHost: string): KvKey { + return [...this.#prefix, remoteHost] as KvKey; + } + + #capHeldRetryAt( + now: Temporal.Instant, + heldSince: Temporal.Instant | undefined, + retryAt: Temporal.Instant, + ): Temporal.Instant { + const heldFrom = heldSince ?? now; + const expiresAt = heldFrom.add(this.#options.heldActivityTtl); + return Temporal.Instant.compare(expiresAt, retryAt) < 0 + ? expiresAt + : retryAt; + } + + async #get(remoteHost: string): Promise { + return parseCircuitBreakerKvState( + await this.#kv.get(this.#key(remoteHost)), + ); + } + + async #replace( + remoteHost: string, + oldState: CircuitBreakerKvState | undefined, + newState: CircuitBreakerKvState | undefined, + ): Promise { + const key = this.#key(remoteHost); + if (this.#kv.cas == null) { + if (newState == null) { + await this.#kv.delete(key); + } else { + await this.#kv.set(key, newState); + } + return true; + } + return await this.#kv.cas(key, oldState, newState); + } + + async #notifyStateChange( + remoteHost: string, + previousState: CircuitBreakerState, + newState: CircuitBreakerState, + ): Promise { + try { + await this.#options.onStateChange?.(remoteHost, previousState, newState); + } catch (error) { + getLogger(["fedify", "federation", "circuit"]).error( + "An unexpected error occurred in circuit breaker state change " + + "handler:\n{error}", + { remoteHost, previousState, newState, error }, + ); + } + try { + await this.#stateChangeObserver?.(remoteHost, previousState, newState); + } catch (error) { + getLogger(["fedify", "federation", "circuit"]).error( + "An unexpected error occurred in circuit breaker state change " + + "observer:\n{error}", + { remoteHost, previousState, newState, error }, + ); + } + } +} + +/** + * Normalizes user-provided circuit breaker options into the internal policy + * shape used while processing queued outbox deliveries. + * + * @param options The public circuit breaker options supplied to Fedify. + * @returns The normalized failure predicate, failure pruning function, + * duration values, and optional callbacks with defaults applied. + * @throws {RangeError} If any configured duration is not positive. + * @throws {TypeError} If `failureThreshold` is not a positive integer. + */ +export function normalizeCircuitBreakerOptions( + options: CircuitBreakerOptions, +): NormalizedCircuitBreakerOptions { + const recoveryDelay = toInstantDuration( + options.recoveryDelay ?? { minutes: 30 }, + ); + const heldActivityTtl = toInstantDuration( + options.heldActivityTtl ?? { hours: 24 * 7 }, + ); + const releaseInterval = toInstantDuration( + options.releaseInterval ?? { seconds: 1 }, + ); + assertPositiveDuration(recoveryDelay, "recoveryDelay"); + assertPositiveDuration(heldActivityTtl, "heldActivityTtl"); + assertPositiveDuration(releaseInterval, "releaseInterval"); + let failure: (timestamps: readonly Temporal.Instant[]) => boolean; + let pruneFailures: ( + timestamps: readonly Temporal.Instant[], + now: Temporal.Instant, + ) => readonly Temporal.Instant[]; + if (options.failure == null) { + const failureThreshold = options.failureThreshold ?? 5; + if (!Number.isInteger(failureThreshold) || failureThreshold <= 0) { + throw new TypeError("failureThreshold must be a positive integer."); + } + const failureWindow = toInstantDuration( + options.failureWindow ?? { minutes: 10 }, + ); + assertPositiveDuration(failureWindow, "failureWindow"); + pruneFailures = (timestamps, now) => { + const earliest = now.subtract(failureWindow); + return timestamps + .filter((timestamp) => + Temporal.Instant.compare(timestamp, earliest) >= 0 + ) + .slice(-failureThreshold); + }; + failure = (timestamps) => { + if (timestamps.length < failureThreshold) return false; + const first = timestamps[timestamps.length - failureThreshold]; + const last = timestamps[timestamps.length - 1]; + return Temporal.Duration.compare(first.until(last), failureWindow) <= 0; + }; + } else { + failure = options.failure; + pruneFailures = (timestamps) => + timestamps.slice(-MAX_CUSTOM_FAILURE_HISTORY); + } + return { + failure, + pruneFailures, + recoveryDelay, + heldActivityTtl, + releaseInterval, + onStateChange: options.onStateChange, + onActivityDrop: options.onActivityDrop, + }; +} + +function toInstantDuration( + duration: Temporal.Duration | Temporal.DurationLike, +): Temporal.Duration { + const parsed = Temporal.Duration.from(duration); + return Temporal.Duration.from({ + milliseconds: Math.trunc( + parsed.total({ + unit: "millisecond", + relativeTo: Temporal.PlainDateTime.from("2026-01-01T00:00:00"), + }), + ), + }); +} + +function assertPositiveDuration( + duration: Temporal.Duration, + name: string, +): void { + if (Temporal.Duration.compare(duration, { seconds: 0 }) <= 0) { + throw new RangeError(`${name} must be a positive duration.`); + } +} + +function parseHeldSince( + value: string | undefined, +): Temporal.Instant | undefined { + if (value == null) return undefined; + try { + return Temporal.Instant.from(value); + } catch (error) { + getLogger(["fedify", "federation", "circuit"]).warn( + "Invalid circuitHeldSince value in queued outbox message: {value}", + { value, error }, + ); + return undefined; + } +} + +/** + * Parses a value loaded from the circuit breaker KV store. + * + * @param value The raw KV value to validate. + * @returns A circuit breaker state when `value` has a recognized state and + * valid instant strings, or `undefined` when the stored value is malformed. + */ +export function parseCircuitBreakerKvState( + value: unknown, +): CircuitBreakerKvState | undefined { + const isInstantString = (v: unknown): v is string => { + if (typeof v !== "string") return false; + try { + Temporal.Instant.from(v); + return true; + } catch { + return false; + } + }; + if (typeof value !== "object" || value == null) return undefined; + const record = value as Record; + if ( + record.state !== "closed" && + record.state !== "open" && + record.state !== "half-open" + ) { + return undefined; + } + if ( + !Array.isArray(record.failures) || + !record.failures.every((failure) => isInstantString(failure)) + ) { + return undefined; + } + if (record.opened != null && !isInstantString(record.opened)) { + return undefined; + } + if (record.halfOpened != null && !isInstantString(record.halfOpened)) { + return undefined; + } + return { + state: record.state, + failures: record.failures, + ...(record.opened == null ? {} : { opened: record.opened }), + ...(record.halfOpened == null ? {} : { halfOpened: record.halfOpened }), + }; +} diff --git a/packages/fedify/src/federation/federation.ts b/packages/fedify/src/federation/federation.ts index 483d0b764..7d650650c 100644 --- a/packages/fedify/src/federation/federation.ts +++ b/packages/fedify/src/federation/federation.ts @@ -13,6 +13,7 @@ import type { import type { MeterProvider, TracerProvider } from "@opentelemetry/api"; import type { ActivityTransformer } from "../compat/types.ts"; import type { HttpMessageSignaturesSpec } from "../sig/http.ts"; +import type { CircuitBreakerOptions } from "./circuit-breaker.ts"; import type { ActorAliasMapper, ActorDispatcher, @@ -1020,6 +1021,17 @@ export interface FederationOptions { */ outboxRetryPolicy?: RetryPolicy; + /** + * The circuit breaker for queued outbound activity delivery. When enabled, + * Fedify tracks repeated failures per remote host and temporarily holds + * queued activities instead of repeatedly hammering an unreachable server. + * + * Passing `false` disables the circuit breaker. + * + * @since 2.3.0 + */ + circuitBreaker?: false | CircuitBreakerOptions; + /** * The retry policy for processing incoming activities. By default, this * uses an exponential backoff strategy with a maximum of 10 attempts and a diff --git a/packages/fedify/src/federation/metrics.test.ts b/packages/fedify/src/federation/metrics.test.ts index 541f1ea17..0392fb7d5 100644 --- a/packages/fedify/src/federation/metrics.test.ts +++ b/packages/fedify/src/federation/metrics.test.ts @@ -5,7 +5,9 @@ import { FetchError } from "@fedify/vocab-runtime"; import type { MessageQueue } from "./mq.ts"; import { classifyFetchError, + getRemoteHost, instrumentDocumentLoader, + recordCircuitBreakerStateChange, recordCollectionDispatchDuration, recordCollectionPageItems, recordCollectionRequest, @@ -29,6 +31,21 @@ const noopQueue: MessageQueue = { }, }; +test("getRemoteHost() includes non-default ports", () => { + assertEquals( + getRemoteHost(new URL("https://example.com/inbox")), + "example.com", + ); + assertEquals( + getRemoteHost(new URL("https://example.com:8443/inbox")), + "example.com:8443", + ); + assertEquals( + getRemoteHost(new URL("https://example.com:443/inbox")), + "example.com", + ); +}); + test("recordFanoutRecipients() records the recipient count with activity type", () => { const [meterProvider, recorder] = createTestMeterProvider(); recordFanoutRecipients( @@ -166,6 +183,29 @@ test("recordOutboxActivity() records counter with result and activity type", () ); }); +test("recordCircuitBreakerStateChange() records counter with bounded attributes", () => { + const [meterProvider, recorder] = createTestMeterProvider(); + recordCircuitBreakerStateChange( + meterProvider, + "remote.example", + "half_open", + ); + const measurements = recorder.getMeasurements( + "activitypub.circuit_breaker.state_change", + ); + assertEquals(measurements.length, 1); + assertEquals(measurements[0].type, "counter"); + assertEquals(measurements[0].value, 1); + assertEquals( + measurements[0].attributes["activitypub.remote.host"], + "remote.example", + ); + assertEquals( + measurements[0].attributes["activitypub.circuit_breaker.state"], + "half_open", + ); +}); + test("recordKeyLookup() records counter and duration with all attributes", () => { const [meterProvider, recorder] = createTestMeterProvider(); recordKeyLookup(meterProvider, { diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index f13c64cb9..b69b32ac7 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -75,6 +75,13 @@ export type InboxActivityResult = */ export type OutboxActivityResult = "queued" | "retried" | "abandoned"; +/** + * The bounded circuit breaker state value recorded on + * `activitypub.circuit_breaker.state_change`. + * @since 2.3.0 + */ +export type CircuitBreakerMetricState = "closed" | "open" | "half_open"; + /** * Common attributes shared by all queue task metrics. * @since 2.3.0 @@ -278,9 +285,10 @@ export type KeyLookupResult = Exclude; /** * Attributes accepted by {@link recordKeyLookup}. `remoteUrl` is taken as - * a `URL` so that the helper can derive the hostname-only - * `activitypub.remote.host` attribute internally and refuse to record - * high-cardinality values such as full key IDs or actor URLs. + * a `URL` so that the helper can derive the URL host, including any + * non-default port, for the `activitypub.remote.host` attribute internally + * and refuse to record high-cardinality values such as full key IDs or actor + * URLs. * @since 2.3.0 */ export interface KeyLookupAttributes { @@ -473,6 +481,7 @@ class FederationMetrics { readonly fanoutRecipients: Histogram; readonly inboxActivity: Counter; readonly outboxActivity: Counter; + readonly circuitBreakerStateChange: Counter; readonly keyLookup: Counter; readonly keyLookupDuration: Histogram; readonly documentFetch: Counter; @@ -648,6 +657,13 @@ class FederationMetrics { "live on `activitypub.delivery.*`.", unit: "{activity}", }); + this.circuitBreakerStateChange = meter.createCounter( + "activitypub.circuit_breaker.state_change", + { + description: "Outbound ActivityPub delivery circuit breaker changes.", + unit: "{change}", + }, + ); this.keyLookup = meter.createCounter("activitypub.key.lookup", { description: "Public-key lookup attempts performed by Fedify, including both " + @@ -976,6 +992,16 @@ class FederationMetrics { ); } + recordCircuitBreakerStateChange( + remoteHost: string, + state: CircuitBreakerMetricState, + ): void { + this.circuitBreakerStateChange.add(1, { + "activitypub.remote.host": remoteHost, + "activitypub.circuit_breaker.state": state, + }); + } + recordKeyLookup(attrs: KeyLookupAttributes): void { const attributes: Attributes = { "activitypub.lookup.kind": "public_key", @@ -1225,6 +1251,21 @@ export function recordOutboxActivity( ); } +/** + * Records one outbound delivery circuit breaker state transition. + * @since 2.3.0 + */ +export function recordCircuitBreakerStateChange( + meterProvider: MeterProvider | undefined, + remoteHost: string, + state: CircuitBreakerMetricState, +): void { + getFederationMetrics(meterProvider).recordCircuitBreakerStateChange( + remoteHost, + state, + ); +} + /** * Records one measurement on `activitypub.key.lookup` (counter) and * `activitypub.key.lookup.duration` (histogram) for a public-key lookup. @@ -1423,9 +1464,10 @@ export interface InstrumentDocumentLoaderOptions { * and as `fetched` on success. The wrapper rethrows whatever the * wrapped loader throws so caller behavior is unchanged. * - * The wrapper records the hostname of the requested URL on - * `activitypub.remote.host` when the URL parses; full URLs, paths, and - * query strings are deliberately excluded to keep cardinality bounded. + * The wrapper records the host of the requested URL, including any + * non-default port, on `activitypub.remote.host` when the URL parses; full + * URLs, paths, and query strings are deliberately excluded to keep + * cardinality bounded. * HTTP status codes are recorded only when the failure carries a * `Response` (currently, when the wrapped loader throws a * {@link FetchError} with a non-`null` `response`). @@ -1562,7 +1604,7 @@ export function getFederationMetrics( * @since 2.3.0 */ export function getRemoteHost(url: URL): string { - return url.hostname; + return url.host; } /** diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 7d10e6304..d669daefd 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -52,6 +52,7 @@ import { import { FetchError, getDocumentLoader } from "@fedify/vocab-runtime"; import { SpanStatusCode } from "@opentelemetry/api"; import { getAuthenticatedDocumentLoader } from "../utils/docloader.ts"; +import { CircuitBreaker } from "./circuit-breaker.ts"; const documentLoader = getDocumentLoader(); import type { Context, GetActorOptions } from "./context.ts"; @@ -6571,6 +6572,1142 @@ test("FederationImpl.processQueuedTask() permanent failure", async (t) => { fetchMock.hardReset(); }); +test("FederationImpl.processQueuedTask() circuit breaker", async (t) => { + fetchMock.spyGlobal(); + + interface Queued { + message: Message; + options: Parameters[1]; + } + + interface CircuitBreakerSetup { + federation: FederationImpl; + kv: MemoryKvStore; + queued: Queued[]; + } + + function setup( + options: ConstructorParameters>[0][ + "circuitBreaker" + ], + federationOptions: Pick< + ConstructorParameters>[0], + | "meterProvider" + | "tracerProvider" + | "permanentFailureStatusCodes" + | "outboxRetryPolicy" + > = {}, + queueOptions: Pick = {}, + ): CircuitBreakerSetup { + const kv = new MemoryKvStore(); + const queued: Queued[] = []; + const queue: MessageQueue = { + nativeRetrial: queueOptions.nativeRetrial, + enqueue(message, options) { + queued.push({ message, options }); + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + queue, + circuitBreaker: options, + ...federationOptions, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox"); + return { federation, kv, queued }; + } + + function createOutboxMessage( + inbox: string, + overrides: Partial = {}, + ): OutboxMessage { + return { + type: "outbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + keys: [], + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://example.com/activity/circuit", + actor: "https://example.com/users/alice", + object: { type: "Note", content: "test" }, + }, + activityId: "https://example.com/activity/circuit", + activityType: "https://www.w3.org/ns/activitystreams#Create", + inbox, + sharedInbox: false, + actorIds: ["https://breaker.example/users/bob"], + started: new Date().toISOString(), + attempt: 0, + headers: {}, + traceContext: {}, + ...overrides, + }; + } + + await t.step("is not created without an outbox queue", () => { + const federation = new FederationImpl({ + kv: new MemoryKvStore(), + }); + assertEquals(federation.circuitBreaker, undefined); + }); + + await t.step("5xx opens circuit and holds the failed message", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://breaker.example/inbox", { + status: 500, + body: "server error", + }); + const { federation, queued, kv } = setup({ + failureThreshold: 1, + failureWindow: { minutes: 10 }, + recoveryDelay: { minutes: 30 }, + }); + const orderingKey = "https://example.com/object/breaker"; + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://breaker.example/inbox", { orderingKey }), + ); + + assertEquals(queued.length, 1); + const held = queued[0].message as OutboxMessage; + assertEquals(held.attempt, 0); + assertEquals(held.orderingKey, orderingKey); + assertEquals(held.circuitHeld, true); + assertExists(held.circuitHeldSince); + assertEquals(queued[0].options?.orderingKey, orderingKey); + assertEquals( + queued[0].options?.delay, + Temporal.Duration.from({ minutes: 30 }), + ); + const state = await kv.get>([ + "_fedify", + "circuit", + "breaker.example", + ]); + assertEquals(state?.state, "open"); + assertEquals(Array.isArray(state?.failures), true); + assertEquals((state?.failures as unknown[]).length, 1); + assertExists(state?.opened); + }); + + await t.step("open circuit requeues without sending", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + let requests = 0; + fetchMock.post("https://open.example/inbox", () => { + requests++; + return { status: 500, body: "server error" }; + }); + const { federation, queued } = setup({ + failureThreshold: 1, + recoveryDelay: { hours: 1 }, + }); + const orderingKey = "https://example.com/object/open"; + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://open.example/inbox", { orderingKey }), + ); + const held = queued[0].message as OutboxMessage; + queued.length = 0; + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://open.example/inbox", { + circuitHeld: true, + circuitHeldSince: held.circuitHeldSince, + orderingKey, + }), + ); + + assertEquals(requests, 1); + assertEquals(queued.length, 1); + const requeued = queued[0].message as OutboxMessage; + assertEquals(requeued.attempt, 0); + assertEquals(requeued.orderingKey, orderingKey); + assertEquals(requeued.circuitHeld, true); + assertEquals(requeued.circuitHeldSince, held.circuitHeldSince); + assertEquals(queued[0].options?.orderingKey, orderingKey); + assertEquals( + queued[0].options?.delay, + Temporal.Duration.from({ hours: 1 }), + ); + }); + + await t.step("circuit keys include non-default ports", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + let defaultPortRequests = 0; + fetchMock.post("https://ports.example:8443/inbox", { + status: 500, + body: "server error", + }); + fetchMock.post("https://ports.example/inbox", () => { + defaultPortRequests++; + return { status: 202, body: "" }; + }); + const { federation, queued, kv } = setup({ + failureThreshold: 1, + recoveryDelay: { hours: 1 }, + }); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://ports.example:8443/inbox"), + ); + assertEquals( + (await kv.get>([ + "_fedify", + "circuit", + "ports.example:8443", + ]))?.state, + "open", + ); + assertEquals( + await kv.get(["_fedify", "circuit", "ports.example"]), + undefined, + ); + + queued.length = 0; + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://ports.example/inbox"), + ); + + assertEquals(defaultPortRequests, 1); + assertEquals(queued, []); + }); + + await t.step("post-send circuit errors do not retry delivery", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://success-bookkeeping.example/inbox", { + status: 202, + body: "", + }); + const { federation, queued, kv } = setup({ + failureThreshold: 1, + }); + await kv.set(["_fedify", "circuit", "success-bookkeeping.example"], { + state: "closed", + failures: [], + }); + kv.cas = () => Promise.resolve(false); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://success-bookkeeping.example/inbox"), + ); + + assertEquals(queued, []); + }); + + await t.step("pre-send circuit errors do not block delivery", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + let requests = 0; + fetchMock.post("https://presend-bookkeeping.example/inbox", () => { + requests++; + return { status: 202, body: "" }; + }); + const { federation, queued, kv } = setup({ failureThreshold: 1 }); + kv.get = () => Promise.reject(new Error("kv get failed")); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://presend-bookkeeping.example/inbox"), + ); + + assertEquals(requests, 1); + assertEquals(queued, []); + }); + + await t.step("circuit failure errors fall back to retry", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://failure-bookkeeping.example/inbox", { + status: 500, + body: "server error", + }); + const { federation, queued, kv } = setup( + { + failureThreshold: 1, + }, + { outboxRetryPolicy: () => Temporal.Duration.from({ seconds: 3 }) }, + ); + kv.cas = () => Promise.resolve(false); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://failure-bookkeeping.example/inbox"), + ); + + assertEquals(queued.length, 1); + const retry = queued[0].message as OutboxMessage; + assertEquals(retry.attempt, 1); + assertEquals(retry.circuitHeld, undefined); + assertEquals( + queued[0].options?.delay, + Temporal.Duration.from({ seconds: 3 }), + ); + }); + + await t.step("local delivery errors do not open circuit", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + const { federation, queued, kv } = setup( + { failureThreshold: 1 }, + { outboxRetryPolicy: () => Temporal.Duration.from({ seconds: 3 }) }, + ); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://local-error.example/inbox", { + headers: { "Invalid Header": "x" }, + }), + ); + + assertEquals(queued.length, 1); + const retry = queued[0].message as OutboxMessage; + assertEquals(retry.attempt, 1); + assertEquals(retry.circuitHeld, undefined); + assertEquals( + queued[0].options?.delay, + Temporal.Duration.from({ seconds: 3 }), + ); + assertEquals( + await kv.get(["_fedify", "circuit", "local-error.example"]), + undefined, + ); + }); + + await t.step("calendar retry delays are enqueued", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://calendar-delay.example/inbox", { + status: 500, + body: "server error", + }); + const { federation, queued } = setup( + { + failureThreshold: 5, + }, + { outboxRetryPolicy: () => Temporal.Duration.from({ days: 1 }) }, + ); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://calendar-delay.example/inbox"), + ); + + assertEquals(queued.length, 1); + assertEquals( + queued[0].options?.delay, + Temporal.Duration.from({ days: 1 }), + ); + }); + + await t.step("negative calendar retry delays are clamped", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://negative-calendar-delay.example/inbox", { + status: 500, + body: "server error", + }); + const { federation, queued } = setup( + { + failureThreshold: 5, + }, + { outboxRetryPolicy: () => Temporal.Duration.from({ days: -1 }) }, + ); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://negative-calendar-delay.example/inbox"), + ); + + assertEquals(queued.length, 1); + assertEquals( + queued[0].options?.delay, + Temporal.Duration.from({ seconds: 0 }), + ); + }); + + await t.step("circuit hold respects retry give-up", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://hold-give-up.example/inbox", { + status: 500, + body: "server error", + }); + const { federation, queued, kv } = setup( + { + failureThreshold: 1, + recoveryDelay: { minutes: 30 }, + }, + { outboxRetryPolicy: () => null }, + ); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://hold-give-up.example/inbox"), + ); + + assertEquals(queued, []); + assertEquals( + (await kv.get>([ + "_fedify", + "circuit", + "hold-give-up.example", + ]))?.state, + "open", + ); + }); + + await t.step("circuit decision errors fall back to retry", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://decision-bookkeeping.example/inbox", { + status: 500, + body: "server error", + }); + const { federation, queued, kv } = setup( + { + failureThreshold: 1, + }, + { outboxRetryPolicy: () => Temporal.Duration.from({ seconds: 4 }) }, + ); + const originalGet = kv.get.bind(kv); + let getCalls = 0; + kv.get = (...args) => { + getCalls++; + return getCalls === 1 + ? originalGet(...args) + : Promise.reject(new Error("kv get failed")); + }; + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://decision-bookkeeping.example/inbox"), + ); + + assertEquals(queued.length, 1); + const retry = queued[0].message as OutboxMessage; + assertEquals(retry.attempt, 1); + assertEquals(retry.circuitHeld, undefined); + assertEquals( + queued[0].options?.delay, + Temporal.Duration.from({ seconds: 4 }), + ); + }); + + await t.step("circuit reachable errors keep permanent failure", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://permanent-bookkeeping.example/inbox", { + status: 500, + body: "server error", + }); + const { federation, queued, kv } = setup( + { + failureThreshold: 1, + }, + { permanentFailureStatusCodes: [500] }, + ); + await kv.set(["_fedify", "circuit", "permanent-bookkeeping.example"], { + state: "half-open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + halfOpened: "2026-05-25T00:00:00Z", + }); + const originalCas = kv.cas.bind(kv); + let casCalls = 0; + kv.cas = (...args) => { + casCalls++; + return casCalls === 1 ? originalCas(...args) : Promise.resolve(false); + }; + let permanentFailureStatusCode: unknown; + federation.setOutboxPermanentFailureHandler((_ctx, values) => { + permanentFailureStatusCode = values.statusCode; + }); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://permanent-bookkeeping.example/inbox"), + ); + + assertEquals(queued, []); + assertEquals(permanentFailureStatusCode, 500); + }); + + await t.step("429 respects Retry-After without opening circuit", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://rate.example/inbox", { + status: 429, + headers: { "Retry-After": "120" }, + body: "rate limited", + }); + const { federation, queued, kv } = setup({ + failureThreshold: 1, + recoveryDelay: { minutes: 30 }, + }); + const orderingKey = "https://example.com/object/rate"; + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://rate.example/inbox", { orderingKey }), + ); + + assertEquals(queued.length, 1); + const retry = queued[0].message as OutboxMessage; + assertEquals(retry.attempt, 1); + assertEquals(retry.orderingKey, orderingKey); + assertEquals(retry.circuitHeld, undefined); + assertEquals(queued[0].options?.orderingKey, orderingKey); + assertEquals( + queued[0].options?.delay, + Temporal.Duration.from({ seconds: 120 }), + ); + assertEquals( + await kv.get(["_fedify", "circuit", "rate.example"]), + undefined, + ); + }); + + await t.step( + "429 respects Retry-After with circuit breaker disabled", + async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://rate-disabled.example/inbox", { + status: 429, + headers: { "Retry-After": "120" }, + body: "rate limited", + }); + const { federation, queued, kv } = setup(false, {}, { + nativeRetrial: true, + }); + assertEquals(federation.circuitBreaker, undefined); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://rate-disabled.example/inbox", { + orderingKey: "https://example.com/object/rate-limited", + }), + ); + + assertEquals(queued.length, 1); + const retry = queued[0].message as OutboxMessage; + assertEquals(retry.attempt, 1); + assertEquals(retry.circuitHeld, undefined); + assertEquals( + queued[0].options?.delay, + Temporal.Duration.from({ seconds: 120 }), + ); + assertEquals( + queued[0].options?.orderingKey, + "https://example.com/object/rate-limited", + ); + assertEquals( + await kv.get(["_fedify", "circuit", "rate-disabled.example"]), + undefined, + ); + }, + ); + + await t.step("429 Retry-After still respects retry give-up", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://give-up.example/inbox", { + status: 429, + headers: { "Retry-After": "120" }, + body: "rate limited", + }); + const { federation, queued, kv } = setup( + { failureThreshold: 1 }, + { outboxRetryPolicy: () => null }, + ); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://give-up.example/inbox"), + ); + + assertEquals(queued, []); + assertEquals( + await kv.get(["_fedify", "circuit", "give-up.example"]), + undefined, + ); + }); + + await t.step("503 respects Retry-After while counting failure", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://unavailable.example/inbox", { + status: 503, + headers: { "Retry-After": "120" }, + body: "temporarily unavailable", + }); + const { federation, queued, kv } = setup( + { + failureThreshold: 5, + failureWindow: { minutes: 10 }, + }, + { outboxRetryPolicy: () => Temporal.Duration.from({ seconds: 3 }) }, + ); + const orderingKey = "https://example.com/object/unavailable"; + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://unavailable.example/inbox", { + orderingKey, + }), + ); + + assertEquals(queued.length, 1); + const retry = queued[0].message as OutboxMessage; + assertEquals(retry.attempt, 1); + assertEquals(retry.circuitHeld, undefined); + assertEquals(retry.orderingKey, orderingKey); + assertEquals( + queued[0].options?.delay, + Temporal.Duration.from({ seconds: 120 }), + ); + assertEquals(queued[0].options?.orderingKey, orderingKey); + const state = await kv.get>([ + "_fedify", + "circuit", + "unavailable.example", + ]); + assertEquals(state?.state, "closed"); + assertEquals((state?.failures as unknown[]).length, 1); + }); + + await t.step("503 Retry-After delays newly opened circuit hold", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://open-retry-after.example/inbox", { + status: 503, + headers: { "Retry-After": "3600" }, + body: "temporarily unavailable", + }); + const { federation, queued, kv } = setup( + { + failureThreshold: 1, + recoveryDelay: { seconds: 30 }, + }, + { outboxRetryPolicy: () => Temporal.Duration.from({ seconds: 3 }) }, + ); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://open-retry-after.example/inbox"), + ); + + assertEquals(queued.length, 1); + const held = queued[0].message as OutboxMessage; + assertEquals(held.attempt, 0); + assertEquals(held.circuitHeld, true); + assertEquals(queued[0].options?.delay?.toString(), "PT3600S"); + const state = await kv.get>([ + "_fedify", + "circuit", + "open-retry-after.example", + ]); + assertEquals(state?.state, "open"); + }); + + await t.step("malformed Retry-After falls back to retry policy", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://huge-retry-after.example/inbox", { + status: 429, + headers: { "Retry-After": "999999999999999999999999999999" }, + body: "rate limited", + }); + const { federation, queued, kv } = setup( + { failureThreshold: 1 }, + { outboxRetryPolicy: () => Temporal.Duration.from({ seconds: 3 }) }, + ); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://huge-retry-after.example/inbox"), + ); + + assertEquals(queued.length, 1); + assertEquals( + queued[0].options?.delay?.total({ unit: "second" }), + 3, + ); + assertEquals( + await kv.get(["_fedify", "circuit", "huge-retry-after.example"]), + undefined, + ); + }); + + await t.step( + "invalid Retry-After date falls back to retry policy", + async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://invalid-retry-after.example/inbox", { + status: 429, + headers: { "Retry-After": "1.5" }, + body: "rate limited", + }); + const { federation, queued, kv } = setup( + { failureThreshold: 1 }, + { outboxRetryPolicy: () => Temporal.Duration.from({ seconds: 3 }) }, + ); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://invalid-retry-after.example/inbox"), + ); + + assertEquals(queued.length, 1); + assertEquals( + queued[0].options?.delay?.total({ unit: "second" }), + 3, + ); + assertEquals( + await kv.get(["_fedify", "circuit", "invalid-retry-after.example"]), + undefined, + ); + }, + ); + + await t.step("asctime Retry-After date is interpreted as UTC", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + const retryAfter = "Wed Dec 31 23:59:59 2036"; + fetchMock.post("https://asctime-retry-after.example/inbox", { + status: 429, + headers: { "Retry-After": retryAfter }, + body: "rate limited", + }); + const { federation, queued } = setup( + { failureThreshold: 1 }, + { outboxRetryPolicy: () => Temporal.Duration.from({ seconds: 3 }) }, + ); + const before = Temporal.Now.instant(); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://asctime-retry-after.example/inbox"), + ); + + const after = Temporal.Now.instant(); + const retryAtMs = Date.parse(`${retryAfter} GMT`); + assertEquals(queued.length, 1); + const delayMs = queued[0].options?.delay?.total({ unit: "millisecond" }); + assertExists(delayMs); + assertEquals(delayMs <= retryAtMs - before.epochMilliseconds, true); + assertEquals(delayMs >= retryAtMs - after.epochMilliseconds, true); + }); + + await t.step("permanent 5xx does not open circuit", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://permanent-500.example/inbox", { + status: 500, + body: "server error", + }); + const { federation, queued, kv } = setup( + { failureThreshold: 1 }, + { permanentFailureStatusCodes: [500] }, + ); + let permanentFailureStatusCode: unknown; + federation.setOutboxPermanentFailureHandler((_ctx, values) => { + permanentFailureStatusCode = values.statusCode; + }); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://permanent-500.example/inbox"), + ); + + assertEquals(queued, []); + assertEquals(permanentFailureStatusCode, 500); + assertEquals( + await kv.get(["_fedify", "circuit", "permanent-500.example"]), + undefined, + ); + }); + + await t.step("permanent 5xx closes half-open circuit", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://permanent-probe.example/inbox", { + status: 500, + body: "server error", + }); + const { federation, queued, kv } = setup( + { + failureThreshold: 1, + releaseInterval: { seconds: 1 }, + }, + { permanentFailureStatusCodes: [500] }, + ); + await kv.set(["_fedify", "circuit", "permanent-probe.example"], { + state: "half-open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + halfOpened: "2026-05-25T00:00:00Z", + }); + let permanentFailureStatusCode: unknown; + federation.setOutboxPermanentFailureHandler((_ctx, values) => { + permanentFailureStatusCode = values.statusCode; + }); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://permanent-probe.example/inbox"), + ); + + assertEquals(queued, []); + assertEquals(permanentFailureStatusCode, 500); + assertEquals( + await kv.get(["_fedify", "circuit", "permanent-probe.example"]), + undefined, + ); + }); + + await t.step("permanent 4xx closes half-open circuit", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://gone.example/inbox", { + status: 410, + body: "gone", + }); + const { federation, queued, kv } = setup({ + failureThreshold: 1, + releaseInterval: { seconds: 1 }, + }); + await kv.set(["_fedify", "circuit", "gone.example"], { + state: "half-open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + halfOpened: "2026-05-25T00:00:00Z", + }); + let permanentFailureStatusCode: unknown; + federation.setOutboxPermanentFailureHandler((_ctx, values) => { + permanentFailureStatusCode = values.statusCode; + }); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://gone.example/inbox"), + ); + + assertEquals(queued, []); + assertEquals(permanentFailureStatusCode, 410); + assertEquals( + await kv.get(["_fedify", "circuit", "gone.example"]), + undefined, + ); + }); + + await t.step("false disables circuit handling", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://disabled.example/inbox", { + status: 500, + body: "server error", + }); + const { federation, queued, kv } = setup(false); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://disabled.example/inbox"), + ); + + assertEquals(queued.length, 1); + const retry = queued[0].message as OutboxMessage; + assertEquals(retry.attempt, 1); + assertEquals(retry.circuitHeld, undefined); + assertEquals( + await kv.get(["_fedify", "circuit", "disabled.example"]), + undefined, + ); + }); + + await t.step("state changes are recorded in metrics and spans", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://telemetry.example/inbox", { + status: 500, + body: "server error", + }); + const [meterProvider, recorder] = createTestMeterProvider(); + const [tracerProvider, exporter] = createTestTracerProvider(); + const { federation, queued } = setup( + { failureThreshold: 1 }, + { meterProvider, tracerProvider }, + ); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://telemetry.example/inbox"), + ); + + assertEquals(queued.length, 1); + const measurements = recorder.getMeasurements( + "activitypub.circuit_breaker.state_change", + ); + assertEquals(measurements.length, 1); + assertEquals( + measurements[0].attributes["activitypub.remote.host"], + "telemetry.example", + ); + assertEquals( + measurements[0].attributes["activitypub.circuit_breaker.state"], + "open", + ); + const events = exporter.getEvents( + "activitypub.outbox", + "activitypub.circuit_breaker.state_change", + ); + assertEquals(events.length, 1); + assertEquals( + events[0].attributes?.["activitypub.remote.host"], + "telemetry.example", + ); + assertEquals( + events[0].attributes?.["activitypub.circuit_breaker.previous_state"], + "closed", + ); + assertEquals( + events[0].attributes?.["activitypub.circuit_breaker.state"], + "open", + ); + const heldEvents = exporter.getEvents( + "activitypub.outbox", + "activitypub.circuit_breaker.held", + ); + assertEquals(heldEvents.length, 1); + assertEquals( + heldEvents[0].attributes?.["activitypub.remote.host"], + "telemetry.example", + ); + assertEquals( + heldEvents[0].attributes?.["activitypub.circuit_breaker.state"], + "open", + ); + }); + + await t.step("held half-open circuit is recorded in spans", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + const now = Temporal.Instant.from("2026-05-25T00:00:30Z"); + const [tracerProvider, exporter] = createTestTracerProvider(); + const { federation, queued, kv } = setup( + { + failureThreshold: 1, + recoveryDelay: { minutes: 5 }, + releaseInterval: { minutes: 1 }, + }, + { tracerProvider }, + ); + federation.circuitBreaker = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { + failureThreshold: 1, + recoveryDelay: { minutes: 5 }, + releaseInterval: { minutes: 1 }, + }, + }); + await kv.set(["_fedify", "circuit", "half-open-telemetry.example"], { + state: "half-open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + halfOpened: "2026-05-25T00:00:00Z", + }); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://half-open-telemetry.example/inbox", { + circuitHeld: true, + circuitHeldSince: "2026-05-25T00:00:00Z", + }), + ); + + assertEquals(queued.length, 1); + const events = exporter.getEvents( + "activitypub.outbox", + "activitypub.circuit_breaker.held", + ); + assertEquals(events.length, 1); + assertEquals( + events[0].attributes?.["activitypub.remote.host"], + "half-open-telemetry.example", + ); + assertEquals( + events[0].attributes?.["activitypub.circuit_breaker.state"], + "half_open", + ); + }); + + await t.step( + "stale half-open probe does not record open transition", + async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + fetchMock.post("https://stale-probe-telemetry.example/inbox", { + status: 202, + body: "", + }); + const now = Temporal.Instant.from("2026-05-25T00:00:02Z"); + const [tracerProvider, exporter] = createTestTracerProvider(); + const { federation, kv } = setup( + { + failureThreshold: 1, + recoveryDelay: { seconds: 1 }, + }, + { tracerProvider }, + ); + federation.circuitBreaker = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { + failureThreshold: 1, + recoveryDelay: { seconds: 1 }, + }, + }); + await kv.set(["_fedify", "circuit", "stale-probe-telemetry.example"], { + state: "half-open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + halfOpened: "2026-05-25T00:00:00Z", + }); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://stale-probe-telemetry.example/inbox"), + ); + + const events = exporter.getEvents( + "activitypub.outbox", + "activitypub.circuit_breaker.state_change", + ); + assertEquals(events.length, 1); + assertEquals( + events[0].attributes?.["activitypub.circuit_breaker.previous_state"], + "half_open", + ); + assertEquals( + events[0].attributes?.["activitypub.circuit_breaker.state"], + "closed", + ); + }, + ); + + await t.step("expired held activity is dropped", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + let dropped: { remoteHost: string; heldSince: Temporal.Instant } | null = + null; + const { federation, queued } = setup({ + failureThreshold: 1, + heldActivityTtl: { seconds: 1 }, + onActivityDrop(remoteHost, details) { + dropped = { remoteHost, heldSince: details.heldSince }; + }, + }); + let permanentFailureReason: unknown; + federation.setOutboxPermanentFailureHandler((_ctx, values) => { + permanentFailureReason = values.reason; + }); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://ttl.example/inbox", { + circuitHeld: true, + circuitHeldSince: "2026-05-25T00:00:00Z", + }), + ); + + assertEquals(queued, []); + assertEquals(dropped, { + remoteHost: "ttl.example", + heldSince: Temporal.Instant.from("2026-05-25T00:00:00Z"), + }); + assertEquals(permanentFailureReason, "circuit-breaker-ttl"); + }); + + await t.step("expired held probe is dropped after failed send", async () => { + fetchMock.hardReset(); + fetchMock.spyGlobal(); + let now = Temporal.Instant.from("2026-05-25T00:00:01Z"); + const heldSince = Temporal.Instant.from("2026-05-25T00:00:00Z"); + fetchMock.post("https://expired-probe.example/inbox", () => { + now = Temporal.Instant.from("2026-05-25T00:00:03Z"); + return { status: 500, body: "server error" }; + }); + let dropped: { remoteHost: string; heldSince: Temporal.Instant } | null = + null; + const { federation, queued, kv } = setup({ + failureThreshold: 1, + recoveryDelay: { seconds: 1 }, + heldActivityTtl: { seconds: 2 }, + releaseInterval: { seconds: 1 }, + }); + federation.circuitBreaker = new CircuitBreaker({ + kv, + prefix: ["_fedify", "circuit"], + now: () => now, + options: { + failureThreshold: 1, + recoveryDelay: { seconds: 1 }, + heldActivityTtl: { seconds: 2 }, + releaseInterval: { seconds: 1 }, + onActivityDrop(remoteHost, details) { + dropped = { remoteHost, heldSince: details.heldSince }; + }, + }, + }); + await kv.set(["_fedify", "circuit", "expired-probe.example"], { + state: "half-open", + failures: ["2026-05-25T00:00:00Z"], + opened: "2026-05-25T00:00:00Z", + halfOpened: "2026-05-25T00:00:00Z", + }); + let permanentFailureReason: unknown; + federation.setOutboxPermanentFailureHandler((_ctx, values) => { + permanentFailureReason = values.reason; + }); + + await federation.processQueuedTask( + undefined, + createOutboxMessage("https://expired-probe.example/inbox", { + circuitHeld: true, + circuitHeldSince: heldSince.toString(), + }), + ); + + assertEquals(queued, []); + assertEquals(dropped, { + remoteHost: "expired-probe.example", + heldSince, + }); + assertEquals(permanentFailureReason, "circuit-breaker-ttl"); + }); + + fetchMock.hardReset(); +}); + test("FederationImpl.processQueuedTask() queue task metrics", async (t) => { await t.step( "records failed result when worker re-throws (nativeRetrial)", diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index fb31b9a07..105608188 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -82,6 +82,12 @@ import { getAuthenticatedDocumentLoader } from "../utils/docloader.ts"; import { kvCache } from "../utils/kv-cache.ts"; import { ACTOR_ALIAS_PREFIX, FederationBuilderImpl } from "./builder.ts"; import type { OutboxErrorHandler } from "./callback.ts"; +import { + CircuitBreaker, + type CircuitBreakerBeforeSendDecision, + type CircuitBreakerState, + type CircuitBreakerStateChange, +} from "./circuit-breaker.ts"; import { buildCollectionSynchronizationHeader } from "./collection.ts"; import type { ActorKeyPair, @@ -127,6 +133,7 @@ import { isAbortError, type QueueTaskCommonAttributes, type QueueTaskResult, + recordCircuitBreakerStateChange, recordCollectionRequest, recordFanoutRecipients, recordInboxActivity, @@ -152,6 +159,104 @@ import { import { handleWebFinger } from "./webfinger.ts"; import { hasMalformedKnownTemporalLiteral } from "./temporal.ts"; +const circuitBreakerCasWarningKvStores = new WeakSet(); +const retryAfterHttpDate = new RegExp( + "^(?:" + + "(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun), \\d{2} " + + "(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) " + + "\\d{4} \\d{2}:\\d{2}:\\d{2} GMT" + + "|" + + "(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday), " + + "\\d{2}-(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)-" + + "\\d{2} \\d{2}:\\d{2}:\\d{2} GMT" + + "|" + + "(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun) " + + "(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) " + + "(?: \\d|\\d{2}) \\d{2}:\\d{2}:\\d{2} \\d{4}" + + ")$", +); + +function parseRetryAfter( + headers: Headers, + now: Temporal.Instant = Temporal.Now.instant(), +): Temporal.Duration | undefined { + const value = headers.get("Retry-After"); + if (value == null) return undefined; + const trimmed = value.trim(); + if (/^\d+$/.test(trimmed)) { + const seconds = Number(trimmed); + if (!Number.isFinite(seconds)) return undefined; + return parseRetryAfterDuration({ seconds }); + } + if (!retryAfterHttpDate.test(trimmed)) return undefined; + const httpDate = trimmed.endsWith("GMT") ? trimmed : `${trimmed} GMT`; + const retryAtMs = Date.parse(httpDate); + if (Number.isNaN(retryAtMs)) return undefined; + const nowMs = Number(now.epochMilliseconds); + return parseRetryAfterDuration({ + milliseconds: Math.max(0, retryAtMs - nowMs), + }); +} + +function parseRetryAfterDuration( + durationLike: Temporal.DurationLike, +): Temporal.Duration | undefined { + try { + return Temporal.Duration.from(durationLike); + } catch (error) { + if (error instanceof RangeError) return undefined; + throw error; + } +} + +function clampNegativeDelay(delay: Temporal.Duration): Temporal.Duration { + return delay.sign < 0 ? Temporal.Duration.from({ seconds: 0 }) : delay; +} + +function maxDelay( + first: Temporal.Duration, + second: Temporal.Duration, +): Temporal.Duration { + return Temporal.Duration.compare(first, second) >= 0 ? first : second; +} + +function isTransportDeliveryError(error: unknown): boolean { + return error instanceof FetchError || isAbortError(error); +} + +function toCircuitBreakerMetricState( + state: CircuitBreakerState, +): "closed" | "open" | "half_open" { + return state === "half-open" ? "half_open" : state; +} + +function recordCircuitBreakerSpanEvent( + span: Span, + remoteHost: string, + change: CircuitBreakerStateChange, +): void { + span.addEvent("activitypub.circuit_breaker.state_change", { + "activitypub.remote.host": remoteHost, + "activitypub.circuit_breaker.previous_state": toCircuitBreakerMetricState( + change.previousState, + ), + "activitypub.circuit_breaker.state": toCircuitBreakerMetricState( + change.newState, + ), + }); +} + +function recordCircuitBreakerHeldSpanEvent( + span: Span, + remoteHost: string, + state: "open" | "half-open", +): void { + span.addEvent("activitypub.circuit_breaker.held", { + "activitypub.remote.host": remoteHost, + "activitypub.circuit_breaker.state": toCircuitBreakerMetricState(state), + }); +} + function isRemoteContextLoadingFailure(error: unknown): boolean { return error instanceof Error && typeof (error as Error & { details?: { code?: unknown } }).details === @@ -281,6 +386,13 @@ export interface FederationKvPrefixes { * @since 2.1.0 */ readonly acceptSignatureNonce: KvKey; + + /** + * The key prefix used for storing outbound delivery circuit breaker state. + * @default `["_fedify", "circuit"]` + * @since 2.3.0 + */ + readonly circuitBreaker: KvKey; } /** @@ -339,6 +451,7 @@ export class FederationImpl skipSignatureVerification: boolean; outboxRetryPolicy: RetryPolicy; inboxRetryPolicy: RetryPolicy; + circuitBreaker?: CircuitBreaker; activityTransformers: readonly ActivityTransformer[]; _tracerProvider: TracerProvider | undefined; _meterProvider: MeterProvider | undefined; @@ -355,6 +468,7 @@ export class FederationImpl publicKey: ["_fedify", "publicKey"], httpMessageSignaturesSpec: ["_fedify", "httpMessageSignaturesSpec"], acceptSignatureNonce: ["_fedify", "acceptSignatureNonce"], + circuitBreaker: ["_fedify", "circuit"], } satisfies FederationKvPrefixes), ...(options.kvPrefixes ?? {}), }; @@ -371,6 +485,32 @@ export class FederationImpl this.outboxQueue = options.queue.outbox; this.fanoutQueue = options.queue.fanout; } + if (options.circuitBreaker !== false && this.outboxQueue != null) { + this.circuitBreaker = new CircuitBreaker({ + kv: options.kv, + prefix: this.kvPrefixes.circuitBreaker, + options: options.circuitBreaker, + stateChangeObserver: (remoteHost, _previousState, newState) => { + const metricState = toCircuitBreakerMetricState(newState); + recordCircuitBreakerStateChange( + this.meterProvider, + remoteHost, + metricState, + ); + }, + }); + if ( + options.kv.cas == null && + !circuitBreakerCasWarningKvStores.has(options.kv) + ) { + circuitBreakerCasWarningKvStores.add(options.kv); + getLogger(["fedify", "federation", "circuit"]).warn( + "The configured key-value store does not support CAS; outbound " + + "delivery circuit breaker updates may race under concurrent " + + "workers.", + ); + } + } this.inboxQueueStarted = false; this.outboxQueueStarted = false; this.fanoutQueueStarted = false; @@ -888,13 +1028,155 @@ export class FederationImpl } keys.push(pair); } + const loaderOptions = this.#getLoaderOptions(message.baseUrl); + let parsedActorIds: URL[] | undefined; + const getActorIds = () => { + parsedActorIds ??= (message.actorIds ?? []).flatMap((id) => { + try { + return [new URL(id)]; + } catch { + logger.warn( + "Invalid actorId URL in OutboxMessage: {id}", + { id }, + ); + return []; + } + }); + return parsedActorIds; + }; + const parseActivity = () => + Activity.fromJsonLd(message.activity, { + contextLoader: this.contextLoaderFactory(loaderOptions), + documentLoader: rsaKeyPair == null + ? this.documentLoaderFactory(loaderOptions) + : this.authenticatedDocumentLoaderFactory(rsaKeyPair, loaderOptions), + tracerProvider: this.tracerProvider, + }); + const enqueueHeldOutboxMessage = async ( + delay: Temporal.Duration, + heldSince: Temporal.Instant, + ) => { + const { outboxQueue } = this; + if (outboxQueue == null) return; + const heldMessage = { + ...message, + circuitHeld: true, + circuitHeldSince: heldSince.toString(), + } satisfies OutboxMessage; + await outboxQueue.enqueue(heldMessage, { + delay: clampNegativeDelay(delay), + orderingKey: message.orderingKey, + }); + getFederationMetrics(this.meterProvider).recordQueueTaskEnqueued( + { + role: "outbox", + queue: outboxQueue, + activityType: heldMessage.activityType, + }, + heldMessage.attempt, + ); + }; + const dropHeldOutboxMessage = async ( + circuit: CircuitBreaker, + remoteHost: string, + inbox: URL, + heldSince: Temporal.Instant, + activity: Awaited>, + ) => { + await circuit.dropActivity(remoteHost, { + inbox, + activity, + activityId: message.activityId, + activityType: message.activityType, + actorIds: getActorIds(), + heldSince, + }); + if (this.outboxPermanentFailureHandler != null) { + const ctx = this.#createContext( + new URL(message.baseUrl), + _, + { + documentLoader: this.documentLoaderFactory(loaderOptions), + }, + ); + try { + await this.outboxPermanentFailureHandler(ctx, { + reason: "circuit-breaker-ttl", + inbox, + activity, + error: new SendActivityError( + inbox, + 0, + "Circuit breaker held activity expired.", + "", + ), + statusCode: 0, + circuitHeldSince: heldSince, + actorIds: getActorIds(), + }); + } catch (handlerError) { + logger.error( + "An unexpected error occurred in " + + "outboxPermanentFailureHandler:\n{error}", + { ...logData, error: handlerError }, + ); + } + } + recordOutboxActivity( + this.meterProvider, + "abandoned", + message.activityType, + ); + }; try { + const inbox = new URL(message.inbox); + const circuit = this.outboxQueue == null + ? undefined + : this.circuitBreaker; + const remoteHost = getRemoteHost(inbox); + let decision: CircuitBreakerBeforeSendDecision | undefined; + if (circuit != null) { + try { + decision = await circuit.beforeSend(remoteHost, message); + } catch (circuitError) { + getLogger(["fedify", "federation", "circuit"]).error( + "Failed to check circuit breaker state before sending; " + + "proceeding with delivery:\n{error}", + { ...logData, remoteHost, error: circuitError }, + ); + } + } + if (decision != null && circuit != null) { + if (decision.type === "hold") { + recordCircuitBreakerHeldSpanEvent(span, remoteHost, decision.state); + await enqueueHeldOutboxMessage(decision.delay, decision.heldSince); + return; + } + if (decision.type === "drop") { + const activity = await parseActivity(); + await dropHeldOutboxMessage( + circuit, + remoteHost, + inbox, + decision.heldSince, + activity, + ); + return; + } + if (decision.stateChange != null) { + recordCircuitBreakerSpanEvent( + span, + remoteHost, + decision.stateChange, + ); + } + } await sendActivity({ keys, activity: message.activity, activityId: message.activityId, activityType: message.activityType, - inbox: new URL(message.inbox), + inbox, sharedInbox: message.sharedInbox, headers: new Headers(message.headers), specDeterminer: new KvSpecDeterminer( @@ -905,6 +1187,20 @@ export class FederationImpl meterProvider: this.meterProvider, tracerProvider: this.tracerProvider, }); + if (circuit != null) { + try { + const stateChange = await circuit.recordSuccess(remoteHost); + if (stateChange != null) { + recordCircuitBreakerSpanEvent(span, remoteHost, stateChange); + } + } catch (error) { + getLogger(["fedify", "federation", "circuit"]).error( + "Failed to record successful delivery in circuit breaker state; " + + "the activity was already delivered:\n{error}", + { ...logData, remoteHost, error }, + ); + } + } } catch (error) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(error) }); const remoteHost = (() => { @@ -921,26 +1217,111 @@ export class FederationImpl return undefined; } })(); + let retryAfterDelay: Temporal.Duration | undefined; + let circuitHold: + | { + delay: Temporal.Duration; + heldSince: Temporal.Instant; + remoteHost: string; + state: "open" | "half-open"; + } + | undefined; + let circuitDrop: + | { + circuit: CircuitBreaker; + remoteHost: string; + inbox: URL; + heldSince: Temporal.Instant; + } + | undefined; + let retryPolicyDelay: Temporal.Duration | null | undefined; + let policyDelayCalculated = false; + const getPolicyDelay = () => { + if (!policyDelayCalculated) { + retryPolicyDelay = this.outboxRetryPolicy({ + elapsedTime: Temporal.Instant.from(message.started).until( + Temporal.Now.instant(), + ), + attempts: message.attempt, + }); + policyDelayCalculated = true; + } + return retryPolicyDelay; + }; + const isPermanentFailure = error instanceof SendActivityError && + this.permanentFailureStatusCodes.includes(error.statusCode); + if ( + !isPermanentFailure && + error instanceof SendActivityError && + (error.statusCode === 429 || error.statusCode === 503) + ) { + retryAfterDelay = parseRetryAfter(error.responseHeaders); + } + if ( + remoteHost != null && + this.outboxQueue != null && + this.circuitBreaker != null + ) { + try { + if (error instanceof SendActivityError) { + const { statusCode } = error; + const stateChange = isPermanentFailure || statusCode === 429 || + (statusCode >= 400 && statusCode < 500) + ? await this.circuitBreaker.recordReachableFailure(remoteHost) + : statusCode >= 500 + ? await this.circuitBreaker.recordFailure(remoteHost) + : undefined; + if (stateChange != null) { + recordCircuitBreakerSpanEvent(span, remoteHost, stateChange); + } + } else if (isTransportDeliveryError(error)) { + const stateChange = await this.circuitBreaker.recordFailure( + remoteHost, + ); + if (stateChange != null) { + recordCircuitBreakerSpanEvent(span, remoteHost, stateChange); + } + } + if (!isPermanentFailure) { + const circuitDecision = await this.circuitBreaker.beforeSend( + remoteHost, + message, + ); + if (circuitDecision.type === "hold") { + circuitHold = { + delay: circuitDecision.delay, + heldSince: circuitDecision.heldSince, + remoteHost, + state: circuitDecision.state, + }; + } else if (circuitDecision.type === "drop") { + circuitDrop = { + circuit: this.circuitBreaker, + remoteHost, + inbox: new URL(message.inbox), + heldSince: circuitDecision.heldSince, + }; + } + } + } catch (circuitError) { + getLogger(["fedify", "federation", "circuit"]).error( + "Failed to update circuit breaker state after delivery failure; " + + "falling back to normal failure handling:\n{error}", + { ...logData, remoteHost, error: circuitError }, + ); + } + } span.addEvent("activitypub.delivery.failed", { ...(remoteHost == null ? {} : { "activitypub.remote.host": remoteHost }), "activitypub.delivery.attempt": message.attempt, - "activitypub.delivery.permanent_failure": - error instanceof SendActivityError && - this.permanentFailureStatusCodes.includes(error.statusCode), + "activitypub.delivery.permanent_failure": isPermanentFailure, ...(error instanceof SendActivityError ? { "http.response.status_code": error.statusCode } : {}), }); - const loaderOptions = this.#getLoaderOptions(message.baseUrl); - const activity = await Activity.fromJsonLd(message.activity, { - contextLoader: this.contextLoaderFactory(loaderOptions), - documentLoader: rsaKeyPair == null - ? this.documentLoaderFactory(loaderOptions) - : this.authenticatedDocumentLoaderFactory(rsaKeyPair, loaderOptions), - tracerProvider: this.tracerProvider, - }); + const activity = await parseActivity(); try { await this.onOutboxError?.(error as Error, activity); } catch (error) { @@ -950,10 +1331,20 @@ export class FederationImpl ); } + if (circuitDrop != null) { + await dropHeldOutboxMessage( + circuitDrop.circuit, + circuitDrop.remoteHost, + circuitDrop.inbox, + circuitDrop.heldSince, + activity, + ); + return; + } + // Check if the error is a permanent delivery failure if ( - error instanceof SendActivityError && - this.permanentFailureStatusCodes.includes(error.statusCode) + isPermanentFailure ) { getFederationMetrics(this.meterProvider).recordPermanentFailure( error.inbox, @@ -977,21 +1368,12 @@ export class FederationImpl ); try { await this.outboxPermanentFailureHandler(ctx, { + reason: "http", inbox: new URL(message.inbox), activity, error, statusCode: error.statusCode, - actorIds: (message.actorIds ?? []).flatMap((id) => { - try { - return [new URL(id)]; - } catch { - logger.warn( - "Invalid actorId URL in OutboxMessage: {id}", - { id }, - ); - return []; - } - }), + actorIds: getActorIds(), }); } catch (handlerError) { logger.error( @@ -1009,8 +1391,33 @@ export class FederationImpl return; } + if (circuitHold != null && getPolicyDelay() != null) { + logger.error( + "Failed to send activity {activityId} to {inbox}; holding because " + + "the remote host circuit is open:\n{error}", + { ...logData, error }, + ); + recordCircuitBreakerHeldSpanEvent( + span, + circuitHold.remoteHost, + circuitHold.state, + ); + const circuit = this.circuitBreaker; + const holdDelay = retryAfterDelay == null || circuit == null + ? circuitHold.delay + : circuit.capHeldDelay( + circuitHold.heldSince, + maxDelay(circuitHold.delay, retryAfterDelay), + ); + await enqueueHeldOutboxMessage( + holdDelay, + circuitHold.heldSince, + ); + return; + } + // Skip retry logic if the message queue backend handles retries automatically - if (this.outboxQueue?.nativeRetrial) { + if (this.outboxQueue?.nativeRetrial && retryAfterDelay == null) { logger.error( "Failed to send activity {activityId} to {inbox}; backend will handle retry:\n{error}", { ...logData, error }, @@ -1018,12 +1425,8 @@ export class FederationImpl throw error; } - const delay = this.outboxRetryPolicy({ - elapsedTime: Temporal.Instant.from(message.started).until( - Temporal.Now.instant(), - ), - attempts: message.attempt, - }); + const policyDelay = getPolicyDelay(); + const delay = policyDelay == null ? null : retryAfterDelay ?? policyDelay; if (delay != null) { logger.error( "Failed to send activity {activityId} to {inbox} (attempt " + @@ -1039,9 +1442,8 @@ export class FederationImpl await outboxQueue.enqueue( retryMessage, { - delay: Temporal.Duration.compare(delay, { seconds: 0 }) < 0 - ? Temporal.Duration.from({ seconds: 0 }) - : delay, + delay: clampNegativeDelay(delay), + orderingKey: message.orderingKey, }, ); getFederationMetrics(this.meterProvider).recordQueueTaskEnqueued( @@ -1184,9 +1586,7 @@ export class FederationImpl await this.inboxQueue.enqueue( retryMessage, { - delay: Temporal.Duration.compare(delay, { seconds: 0 }) < 0 - ? Temporal.Duration.from({ seconds: 0 }) - : delay, + delay: clampNegativeDelay(delay), }, ); if (activityType != null) { diff --git a/packages/fedify/src/federation/mod.ts b/packages/fedify/src/federation/mod.ts index b490e2a44..5a87fdf0f 100644 --- a/packages/fedify/src/federation/mod.ts +++ b/packages/fedify/src/federation/mod.ts @@ -5,6 +5,7 @@ */ export { createFederationBuilder } from "./builder.ts"; export * from "./callback.ts"; +export * from "./circuit-breaker.ts"; export * from "./collection.ts"; export * from "./context.ts"; export * from "./federation.ts"; diff --git a/packages/fedify/src/federation/queue.ts b/packages/fedify/src/federation/queue.ts index de152f941..36f35ad02 100644 --- a/packages/fedify/src/federation/queue.ts +++ b/packages/fedify/src/federation/queue.ts @@ -57,6 +57,17 @@ export interface OutboxMessage { readonly attempt: number; readonly headers: Readonly>; readonly orderingKey?: string; + /** + * Whether this message is currently held by the outbound circuit breaker. + * @internal + */ + readonly circuitHeld?: true; + /** + * When Fedify first held this message because the remote host circuit was + * open. + * @internal + */ + readonly circuitHeldSince?: string; readonly traceContext: Readonly>; } diff --git a/packages/fedify/src/federation/send.test.ts b/packages/fedify/src/federation/send.test.ts index f6ae684d1..915bfdcdb 100644 --- a/packages/fedify/src/federation/send.test.ts +++ b/packages/fedify/src/federation/send.test.ts @@ -13,6 +13,7 @@ import { Person, Service, } from "@fedify/vocab"; +import { FetchError } from "@fedify/vocab-runtime"; import { assert, assertEquals, @@ -243,6 +244,7 @@ test("sendActivity()", async (t) => { fetchMock.post("https://example.com/inbox2", { status: 500, + headers: { "Retry-After": "120" }, body: "something went wrong", }); @@ -288,9 +290,57 @@ test("sendActivity()", async (t) => { assertEquals(e.statusCode, 500); assertEquals(e.inbox, new URL("https://example.com/inbox2")); assertEquals(e.responseBody, "something went wrong"); + assertEquals(e.responseHeaders.get("Retry-After"), "120"); } }); + await t.step( + "signed challenge retry transport errors throw FetchError", + async () => { + const activity: unknown = { + "@context": "https://www.w3.org/ns/activitystreams", + "type": "Create", + "id": "https://example.com/activity", + "actor": "https://example.com/person", + }; + const failure = new TypeError("challenge retry connection reset"); + let requestCount = 0; + fetchMock.post("https://example.com/inbox-challenge-reset", () => { + requestCount++; + if (requestCount === 1) { + return new Response("Unauthorized", { + status: 401, + headers: { + "Accept-Signature": + 'sig1=("@method" "@target-uri" "@authority" ' + + '"content-digest");created;nonce="retry-nonce"', + }, + }); + } + throw failure; + }); + + const error = await assertRejects( + () => + sendActivity({ + activity, + activityId: "https://example.com/activity", + keys: [{ privateKey: rsaPrivateKey2, keyId: rsaPublicKey2.id! }], + inbox: new URL("https://example.com/inbox-challenge-reset"), + }), + FetchError, + "challenge retry connection reset", + ); + + assertEquals( + error.url.href, + "https://example.com/inbox-challenge-reset", + ); + assertEquals(error.cause, failure); + assertEquals(requestCount, 2); + }, + ); + fetchMock.post("https://example.com/inbox-gone", { status: 410, body: "Gone", @@ -544,7 +594,7 @@ test("sendActivity() records OpenTelemetry delivery metrics", async (t) => { assertEquals(sent[0].value, 1); assertEquals( sent[0].attributes["activitypub.remote.host"], - "metrics.example", + "metrics.example:8443", ); assertEquals( sent[0].attributes["activitypub.activity.type"], @@ -560,7 +610,7 @@ test("sendActivity() records OpenTelemetry delivery metrics", async (t) => { assertGreaterOrEqual(durations[0].value, 0); assertEquals( durations[0].attributes["activitypub.remote.host"], - "metrics.example", + "metrics.example:8443", ); assertEquals( durations[0].attributes["activitypub.activity.type"], diff --git a/packages/fedify/src/federation/send.ts b/packages/fedify/src/federation/send.ts index 263af1de1..b0b5923d9 100644 --- a/packages/fedify/src/federation/send.ts +++ b/packages/fedify/src/federation/send.ts @@ -1,4 +1,5 @@ import type { Recipient } from "@fedify/vocab"; +import { FetchError } from "@fedify/vocab-runtime"; import { getLogger } from "@logtape/logtape"; import { type Attributes, @@ -314,12 +315,15 @@ async function sendActivityInternal( ? await fetch(request) : await doubleKnock(request, rsaKey, { tracerProvider, specDeterminer }); } catch (error) { + const transportError = error instanceof FetchError + ? error + : createFetchError(inbox.href, error); logger.error( "Failed to send activity {activityId} to {inbox}:\n{error}", { activityId, inbox: inbox.href, - error, + error: transportError, }, ); federationMetrics.recordDelivery( @@ -328,7 +332,7 @@ async function sendActivityInternal( false, activityType, ); - throw error; + throw transportError; } try { if (!response.ok) { @@ -358,6 +362,7 @@ async function sendActivityInternal( `Failed to send activity ${activityId} to ${inbox.href} ` + `(${response.status} ${response.statusText}):\n${error}`, error, + response.headers, ); } @@ -386,6 +391,13 @@ async function sendActivityInternal( } } +function createFetchError(url: string, cause: unknown): FetchError { + const message = cause instanceof Error ? cause.message : String(cause); + const error = new FetchError(url, message); + error.cause = cause; + return error; +} + /** * An error that is thrown when an activity fails to send to a remote inbox. * It contains structured information about the failure, including the HTTP @@ -411,23 +423,32 @@ export class SendActivityError extends Error { */ readonly responseBody: string; + /** + * The response headers from the inbox. + * @since 2.3.0 + */ + readonly responseHeaders: Headers; + /** * Creates a new {@link SendActivityError}. * @param inbox The inbox URL. * @param statusCode The HTTP status code. * @param message The error message. * @param responseBody The response body. + * @param responseHeaders The response headers. */ constructor( inbox: URL, statusCode: number, message: string, responseBody: string, + responseHeaders?: HeadersInit, ) { super(message); this.name = "SendActivityError"; this.inbox = inbox; this.statusCode = statusCode; this.responseBody = responseBody; + this.responseHeaders = new Headers(responseHeaders); } } diff --git a/packages/vocab/src/actor.test.ts b/packages/vocab/src/actor.test.ts index ef986637a..aca485daa 100644 --- a/packages/vocab/src/actor.test.ts +++ b/packages/vocab/src/actor.test.ts @@ -297,6 +297,31 @@ test("getActorHandle() records activitypub.actor.discovery counter", { }, ); + await t.step( + "records non-default ports for actor IDs", + async () => { + fetchMock.removeRoutes(); + fetchMock.get( + "begin:https://foo.example.com:8443/.well-known/webfinger?", + { status: 404 }, + ); + const [meterProvider, recorder] = createTestMeterProvider(); + await rejects( + () => + getActorHandle(new URL("https://foo.example.com:8443/@john"), { + meterProvider, + }), + TypeError, + ); + const counter = recorder.getMeasurement("activitypub.actor.discovery"); + ok(counter != null); + deepStrictEqual( + counter.attributes["activitypub.remote.host"], + "foo.example.com:8443", + ); + }, + ); + await t.step( "records result=error when a malformed WebFinger alias throws TypeError", async () => { diff --git a/packages/vocab/src/actor.ts b/packages/vocab/src/actor.ts index 0bc9ad4f4..7abf0185d 100644 --- a/packages/vocab/src/actor.ts +++ b/packages/vocab/src/actor.ts @@ -93,7 +93,7 @@ function getActorDiscoveryRemoteHost( ): string | undefined { const id = actor instanceof URL ? actor : actor.id; if (id == null) return undefined; - return id.hostname === "" ? undefined : id.hostname; + return id.host === "" ? undefined : id.host; } // Subclass of TypeError that preserves the documented `throws {TypeError}` diff --git a/packages/vocab/src/lookup.test.ts b/packages/vocab/src/lookup.test.ts index 1acf4624c..60aac697e 100644 --- a/packages/vocab/src/lookup.test.ts +++ b/packages/vocab/src/lookup.test.ts @@ -765,6 +765,33 @@ test("lookupObject() records activitypub.object.lookup counter", { ); }); + await t.step( + "records non-default ports for URL identifiers", + async () => { + const [meterProvider, recorder] = createTestMeterProvider(); + const object = await lookupObject("https://example.com:8443/object", { + documentLoader: (url) => + Promise.resolve({ + contextUrl: null, + documentUrl: url, + document: { + "@context": "https://www.w3.org/ns/activitystreams", + id: url, + type: "Note", + }, + }), + contextLoader: mockDocumentLoader, + meterProvider, + }); + assertInstanceOf(object, Object); + const counter = recorder.getMeasurement("activitypub.object.lookup"); + deepStrictEqual( + counter?.attributes["activitypub.remote.host"], + "example.com:8443", + ); + }, + ); + await t.step("records kind=other on null result", async () => { fetchMock.removeRoutes(); fetchMock.get("begin:https://example.com/.well-known/webfinger", { diff --git a/packages/vocab/src/lookup.ts b/packages/vocab/src/lookup.ts index e648eff02..d2164ec38 100644 --- a/packages/vocab/src/lookup.ts +++ b/packages/vocab/src/lookup.ts @@ -67,13 +67,13 @@ function getLookupRemoteHost(identifier: string | URL): string | undefined { return extractHandleHost(stripped); } } - if (url.hostname !== "") return url.hostname; - // `acct:` URIs are opaque (no `//host` form), so the URL hostname is - // empty. The user and authority live in `url.pathname` as + if (url.host !== "") return url.host; + // `acct:` URIs are opaque (no `//host` form), so the URL host is empty. + // The user and authority live in `url.pathname` as // `user@host`; reuse the same handle-extraction logic, which both // takes only the substring after the last `@` and refuses to record // anything that looks like a path / query / fragment rather than a - // bare hostname. + // bare host. if (url.protocol === "acct:") return extractHandleHost(url.pathname); return undefined; } @@ -87,9 +87,9 @@ function extractHandleHost(handle: string): string | undefined { // the metric attribute, so we drop the host entirely in those cases. if (/[/?#\s]/.test(candidate)) return undefined; // Round-trip through `URL` so the parser validates the authority and - // strips any port/userinfo before we record it. + // strips any userinfo before we record it. try { - return new URL(`https://${candidate}`).hostname || undefined; + return new URL(`https://${candidate}`).host || undefined; } catch { return undefined; } diff --git a/packages/webfinger/src/lookup.test.ts b/packages/webfinger/src/lookup.test.ts index 21cdfe59f..34641e4ed 100644 --- a/packages/webfinger/src/lookup.test.ts +++ b/packages/webfinger/src/lookup.test.ts @@ -557,6 +557,27 @@ test("lookupWebFinger() records webfinger.lookup counter and duration", { }, ); + await t.step( + "records non-default ports for URL resources", + async () => { + fetchMock.removeRoutes(); + fetchMock.get( + "https://example.com:8443/.well-known/webfinger?resource=https%3A%2F%2Fexample.com%3A8443%2Ffoo", + { body: { subject: "https://example.com:8443/foo", links: [] } }, + ); + const [meterProvider, recorder] = createTestMeterProvider(); + await lookupWebFinger("https://example.com:8443/foo", { + meterProvider, + }); + const counter = recorder.getMeasurement("webfinger.lookup"); + ok(counter != null); + deepStrictEqual( + counter.attributes["activitypub.remote.host"], + "example.com:8443", + ); + }, + ); + await t.step("records result=not_found with status 404", async () => { fetchMock.removeRoutes(); fetchMock.get( diff --git a/packages/webfinger/src/lookup.ts b/packages/webfinger/src/lookup.ts index ac5238c50..5b35c4704 100644 --- a/packages/webfinger/src/lookup.ts +++ b/packages/webfinger/src/lookup.ts @@ -310,7 +310,7 @@ async function lookupWebFingerInternal( url.searchParams.set("resource", resource.href); let redirected = 0; while (true) { - const remoteHost = url.hostname; + const remoteHost = url.host; logger.debug( "Fetching WebFinger resource descriptor from {url}...", { url: url.href },