From aab017ea4b230ac07247c2b6333f7baff19d5f9a Mon Sep 17 00:00:00 2001 From: Joshua Mouch Date: Thu, 18 Jun 2026 00:13:49 -0400 Subject: [PATCH] Add a cross-client self-instrumentation telemetry-names contract A single source of truth (types/telemetry/registry.ts) for the self- instrumentation span / metric / attribute names every AHP client emits about its own operation, consumed by the existing scripts/generate-*.ts codegen to emit an idiomatic generated name-holder per client (Go / Rust / Swift / Kotlin / TypeScript), plus a generated-output freshness gate so the holders can't drift. The Rust client is the reference adopter: it emits the metrics through the metrics facade (a no-op until the host installs a recorder), named entirely from the generated holder, with a recorder-based emission test. Client self- instrumentation is distinct from the protocol's OpenTelemetry-over-AHP channel; see docs/specification/self-instrumentation.md. --- clients/go/CHANGELOG.md | 6 + clients/go/ahptypes/telemetry.generated.go | 100 ++++++ clients/kotlin/CHANGELOG.md | 6 + .../generated/Telemetry.generated.kt | 101 ++++++ clients/rust/CHANGELOG.md | 26 ++ clients/rust/Cargo.lock | 277 ++++++++++++++- clients/rust/Cargo.toml | 1 + clients/rust/crates/ahp-types/src/lib.rs | 1 + .../rust/crates/ahp-types/src/telemetry.rs | 101 ++++++ clients/rust/crates/ahp/Cargo.toml | 8 + .../rust/crates/ahp/examples/otel_export.rs | 124 +++++++ clients/rust/crates/ahp/src/client.rs | 136 ++++++-- clients/rust/crates/ahp/src/lib.rs | 1 + clients/rust/crates/ahp/src/telemetry.rs | 152 +++++++++ .../crates/ahp/tests/telemetry_emission.rs | 316 ++++++++++++++++++ .../Generated/Telemetry.generated.swift | 102 ++++++ clients/swift/CHANGELOG.md | 6 + clients/typescript/CHANGELOG.md | 6 + docs/.vitepress/config.mts | 1 + docs/specification/self-instrumentation.md | 105 ++++++ docs/specification/telemetry-channel.md | 2 + package.json | 3 +- scripts/generate-go.ts | 52 +++ scripts/generate-kotlin.ts | 69 ++++ scripts/generate-rust.ts | 63 ++++ scripts/generate-swift.ts | 77 +++++ scripts/read-telemetry.test.ts | 175 ++++++++++ scripts/read-telemetry.ts | 139 ++++++++ scripts/verify-generated.ts | 142 ++++++++ types/index.ts | 13 + types/telemetry/registry.test.ts | 103 ++++++ types/telemetry/registry.ts | 137 ++++++++ 32 files changed, 2515 insertions(+), 36 deletions(-) create mode 100644 clients/go/ahptypes/telemetry.generated.go create mode 100644 clients/kotlin/src/main/kotlin/com/microsoft/agenthostprotocol/generated/Telemetry.generated.kt create mode 100644 clients/rust/crates/ahp-types/src/telemetry.rs create mode 100644 clients/rust/crates/ahp/examples/otel_export.rs create mode 100644 clients/rust/crates/ahp/src/telemetry.rs create mode 100644 clients/rust/crates/ahp/tests/telemetry_emission.rs create mode 100644 clients/swift/AgentHostProtocol/Sources/AgentHostProtocol/Generated/Telemetry.generated.swift create mode 100644 docs/specification/self-instrumentation.md create mode 100644 scripts/read-telemetry.test.ts create mode 100644 scripts/read-telemetry.ts create mode 100644 scripts/verify-generated.ts create mode 100644 types/telemetry/registry.test.ts create mode 100644 types/telemetry/registry.ts diff --git a/clients/go/CHANGELOG.md b/clients/go/CHANGELOG.md index 1c45d413..81b49f00 100644 --- a/clients/go/CHANGELOG.md +++ b/clients/go/CHANGELOG.md @@ -16,6 +16,12 @@ tag whose matching `## [X.Y.Z]` heading is missing from this file. ### Added +- Generated telemetry-name constants (`telemetry.generated.go`) — the shared + cross-client self-instrumentation span / metric / attribute names, generated + from the `types/telemetry/registry.ts` contract so they stay identical across + clients. Includes the `host-event` / `host-subscription` / `host-resource` / + `host-snapshot` / `host-summaries` `ahp.stream` values for multi-host + dropped-event accounting. - `ChangesetOperationStatusDisabled` — new `ChangesetOperationStatus` value for operations that are currently unavailable and cannot be invoked. - `ChangesetOperation.Group` — optional identifier for grouping related diff --git a/clients/go/ahptypes/telemetry.generated.go b/clients/go/ahptypes/telemetry.generated.go new file mode 100644 index 00000000..9fc1252e --- /dev/null +++ b/clients/go/ahptypes/telemetry.generated.go @@ -0,0 +1,100 @@ +// Generated from types/*.ts — do not edit. +// +// Regenerate with: npm run generate:go + +package ahptypes + +// Cross-client telemetry NAMES — the self-instrumentation contract shared by every +// AHP client, generated from types/telemetry/registry.ts. Only the names are shared; +// the tracer / meter wiring is hand-written per language. + +const ( + // Instrumentation-scope name used for every AHP self-instrumentation span and metric. + TelemetrySource = "Microsoft.AgentHostProtocol" + + // Span names. + // Span covering a single JSON-RPC request, from send until it settles. + RequestSpan = "ahp.request" + + // Metric names. + // Messages sent to the host, tagged by ahp.message.kind (request|notification). + MessagesSentMetric = "ahp.client.messages.sent" + // Messages received from the host. + MessagesReceivedMetric = "ahp.client.messages.received" + // Round-trip duration of a JSON-RPC request, tagged by rpc.method and ahp.outcome (ok|error|cancelled|timeout). + RequestDurationMetric = "ahp.client.request.duration" + // Requests awaiting a response. + RequestsInFlightMetric = "ahp.client.requests.in_flight" + // Subscriptions registered with the client (decremented on unsubscribe or shutdown). + SubscriptionsActiveMetric = "ahp.client.subscriptions.active" + // Reconnect operations, tagged by outcome. + ReconnectsMetric = "ahp.client.reconnects" + // Buffered events evicted under back-pressure (drop-oldest), tagged by stream. + EventsDroppedMetric = "ahp.client.events.dropped" + // Inbound frames that failed to decode and were skipped (protocol resync is the host’s responsibility). + FramesMalformedMetric = "ahp.client.frames.malformed" + + // Metric units. + // Unit for the ahp.client.messages.sent metric. + MessagesSentUnit = "{message}" + // Unit for the ahp.client.messages.received metric. + MessagesReceivedUnit = "{message}" + // Unit for the ahp.client.request.duration metric. + RequestDurationUnit = "ms" + // Unit for the ahp.client.requests.in_flight metric. + RequestsInFlightUnit = "{request}" + // Unit for the ahp.client.subscriptions.active metric. + SubscriptionsActiveUnit = "{subscription}" + // Unit for the ahp.client.reconnects metric. + ReconnectsUnit = "{reconnect}" + // Unit for the ahp.client.events.dropped metric. + EventsDroppedUnit = "{event}" + // Unit for the ahp.client.frames.malformed metric. + FramesMalformedUnit = "{frame}" + + // Attribute keys. + // RPC system identifier (OTel rpc.system); always "jsonrpc" for AHP. + AttrRpcSystem = "rpc.system" + // JSON-RPC method name the span/metric is scoped to (OTel rpc.method). + AttrRpcMethod = "rpc.method" + // Client-assigned JSON-RPC request id. + AttrRequestId = "ahp.request.id" + // Terminal outcome of a request or reconnect (ok|error|cancelled|timeout). + AttrOutcome = "ahp.outcome" + // Whether a sent message was a request or a notification. + AttrMessageKind = "ahp.message.kind" + // Which event stream a dropped or observed event belongs to. + AttrStream = "ahp.stream" + + // Attribute values. + // JSON-RPC — the only RPC system AHP uses. + RpcSystemJsonrpc = "jsonrpc" + // The request or reconnect completed successfully. + OutcomeOk = "ok" + // The request or reconnect failed with an error response. + OutcomeError = "error" + // The request was cancelled before it settled. + OutcomeCancelled = "cancelled" + // The request exceeded its configured timeout. + OutcomeTimeout = "timeout" + // A JSON-RPC request (expects a response). + MessageKindRequest = "request" + // A JSON-RPC notification (fire-and-forget). + MessageKindNotification = "notification" + // A per-resource subscription stream. + StreamSubscription = "subscription" + // The client-wide event stream. + StreamEvent = "event" + // A state-snapshot stream. + StreamState = "state" + // A multi-host client's host-event delivery stream. + StreamHostEvent = "host-event" + // A multi-host client's host-subscription delivery stream. + StreamHostSubscription = "host-subscription" + // A multi-host client's host-resource delivery stream. + StreamHostResource = "host-resource" + // A multi-host client's host-snapshot delivery stream. + StreamHostSnapshot = "host-snapshot" + // A multi-host client's host-summaries delivery stream. + StreamHostSummaries = "host-summaries" +) diff --git a/clients/kotlin/CHANGELOG.md b/clients/kotlin/CHANGELOG.md index d36b5759..bc1f34b2 100644 --- a/clients/kotlin/CHANGELOG.md +++ b/clients/kotlin/CHANGELOG.md @@ -17,6 +17,12 @@ versions (`*-SNAPSHOT`) are explicitly rejected by the publish pipeline; bump ### Added +- Generated `AhpTelemetryNames` object — the shared cross-client + self-instrumentation span / metric / attribute name constants, generated from + the `types/telemetry/registry.ts` contract so they stay identical across + clients. Includes the `host-event` / `host-subscription` / `host-resource` / + `host-snapshot` / `host-summaries` `ahp.stream` values for multi-host + dropped-event accounting. - `ChangesetOperationStatus.Disabled` — new enum value for changeset operations that are currently unavailable and cannot be invoked. - `ChangesetOperation.group` — optional identifier for grouping related diff --git a/clients/kotlin/src/main/kotlin/com/microsoft/agenthostprotocol/generated/Telemetry.generated.kt b/clients/kotlin/src/main/kotlin/com/microsoft/agenthostprotocol/generated/Telemetry.generated.kt new file mode 100644 index 00000000..6eb4052b --- /dev/null +++ b/clients/kotlin/src/main/kotlin/com/microsoft/agenthostprotocol/generated/Telemetry.generated.kt @@ -0,0 +1,101 @@ +// Generated from types/telemetry/registry.ts — do not edit + +package com.microsoft.agenthostprotocol.generated + +/** + * Cross-client telemetry NAMES — the self-instrumentation contract shared by every + * AHP client, generated from `types/telemetry/registry.ts`. Only the names are shared; + * the OpenTelemetry tracer / meter wiring is hand-written per language. + */ +object AhpTelemetryNames { + + // ── Instrumentation scope ── + /** Instrumentation-scope name used for every AHP self-instrumentation span and metric. */ + const val SOURCE: String = "Microsoft.AgentHostProtocol" + + // ── Span names ── + /** Span covering a single JSON-RPC request, from send until it settles. */ + const val REQUEST_SPAN: String = "ahp.request" + + // ── Metric names ── + /** Messages sent to the host, tagged by ahp.message.kind (request|notification). */ + const val MESSAGES_SENT: String = "ahp.client.messages.sent" + /** Messages received from the host. */ + const val MESSAGES_RECEIVED: String = "ahp.client.messages.received" + /** Round-trip duration of a JSON-RPC request, tagged by rpc.method and ahp.outcome (ok|error|cancelled|timeout). */ + const val REQUEST_DURATION: String = "ahp.client.request.duration" + /** Requests awaiting a response. */ + const val REQUESTS_IN_FLIGHT: String = "ahp.client.requests.in_flight" + /** Subscriptions registered with the client (decremented on unsubscribe or shutdown). */ + const val SUBSCRIPTIONS_ACTIVE: String = "ahp.client.subscriptions.active" + /** Reconnect operations, tagged by outcome. */ + const val RECONNECTS: String = "ahp.client.reconnects" + /** Buffered events evicted under back-pressure (drop-oldest), tagged by stream. */ + const val EVENTS_DROPPED: String = "ahp.client.events.dropped" + /** Inbound frames that failed to decode and were skipped (protocol resync is the host’s responsibility). */ + const val FRAMES_MALFORMED: String = "ahp.client.frames.malformed" + + // ── Metric units ── + /** Unit for the `ahp.client.messages.sent` metric. */ + const val MESSAGES_SENT_UNIT: String = "{message}" + /** Unit for the `ahp.client.messages.received` metric. */ + const val MESSAGES_RECEIVED_UNIT: String = "{message}" + /** Unit for the `ahp.client.request.duration` metric. */ + const val REQUEST_DURATION_UNIT: String = "ms" + /** Unit for the `ahp.client.requests.in_flight` metric. */ + const val REQUESTS_IN_FLIGHT_UNIT: String = "{request}" + /** Unit for the `ahp.client.subscriptions.active` metric. */ + const val SUBSCRIPTIONS_ACTIVE_UNIT: String = "{subscription}" + /** Unit for the `ahp.client.reconnects` metric. */ + const val RECONNECTS_UNIT: String = "{reconnect}" + /** Unit for the `ahp.client.events.dropped` metric. */ + const val EVENTS_DROPPED_UNIT: String = "{event}" + /** Unit for the `ahp.client.frames.malformed` metric. */ + const val FRAMES_MALFORMED_UNIT: String = "{frame}" + + // ── Attribute keys ── + /** RPC system identifier (OTel rpc.system); always "jsonrpc" for AHP. */ + const val ATTR_RPC_SYSTEM: String = "rpc.system" + /** JSON-RPC method name the span/metric is scoped to (OTel rpc.method). */ + const val ATTR_RPC_METHOD: String = "rpc.method" + /** Client-assigned JSON-RPC request id. */ + const val ATTR_REQUEST_ID: String = "ahp.request.id" + /** Terminal outcome of a request or reconnect (ok|error|cancelled|timeout). */ + const val ATTR_OUTCOME: String = "ahp.outcome" + /** Whether a sent message was a request or a notification. */ + const val ATTR_MESSAGE_KIND: String = "ahp.message.kind" + /** Which event stream a dropped or observed event belongs to. */ + const val ATTR_STREAM: String = "ahp.stream" + + // ── Attribute values ── + /** JSON-RPC — the only RPC system AHP uses. */ + const val RPC_SYSTEM_JSONRPC: String = "jsonrpc" + /** The request or reconnect completed successfully. */ + const val OUTCOME_OK: String = "ok" + /** The request or reconnect failed with an error response. */ + const val OUTCOME_ERROR: String = "error" + /** The request was cancelled before it settled. */ + const val OUTCOME_CANCELLED: String = "cancelled" + /** The request exceeded its configured timeout. */ + const val OUTCOME_TIMEOUT: String = "timeout" + /** A JSON-RPC request (expects a response). */ + const val MESSAGE_KIND_REQUEST: String = "request" + /** A JSON-RPC notification (fire-and-forget). */ + const val MESSAGE_KIND_NOTIFICATION: String = "notification" + /** A per-resource subscription stream. */ + const val STREAM_SUBSCRIPTION: String = "subscription" + /** The client-wide event stream. */ + const val STREAM_EVENT: String = "event" + /** A state-snapshot stream. */ + const val STREAM_STATE: String = "state" + /** A multi-host client's host-event delivery stream. */ + const val STREAM_HOST_EVENT: String = "host-event" + /** A multi-host client's host-subscription delivery stream. */ + const val STREAM_HOST_SUBSCRIPTION: String = "host-subscription" + /** A multi-host client's host-resource delivery stream. */ + const val STREAM_HOST_RESOURCE: String = "host-resource" + /** A multi-host client's host-snapshot delivery stream. */ + const val STREAM_HOST_SNAPSHOT: String = "host-snapshot" + /** A multi-host client's host-summaries delivery stream. */ + const val STREAM_HOST_SUMMARIES: String = "host-summaries" +} diff --git a/clients/rust/CHANGELOG.md b/clients/rust/CHANGELOG.md index 8f655a68..25e975ed 100644 --- a/clients/rust/CHANGELOG.md +++ b/clients/rust/CHANGELOG.md @@ -19,6 +19,24 @@ matching `## [X.Y.Z]` heading is missing from this file. - `ahp_error_codes::CONFLICT` constant (`-32011`) added to `ahp-types`; covers ETag-conflict failures from `ResourceWriteParams.if_match` checks. - `apply_action_to_changeset`, `apply_action_to_annotations`, and `apply_action_to_resource_watch` reducers in `ahp`; all previously-skipped conformance fixtures for the `changeset`, `annotations`, and `resourceWatch` reducer families now pass. +- Generated `telemetry` module (`ahp_types::telemetry`) — the shared + cross-client self-instrumentation span / metric / attribute name constants, + generated from the `types/telemetry/registry.ts` contract so they stay + identical across clients. Includes the `host-event` / `host-subscription` / + `host-resource` / `host-snapshot` / `host-summaries` `ahp.stream` values for + multi-host dropped-event accounting. +- The `ahp` client now emits self-instrumentation **metrics** named by + `ahp_types::telemetry` through the [`metrics`](https://docs.rs/metrics) + facade: messages sent/received, request duration + in-flight, reconnects, + dropped events (per stream), and malformed frames, with `rpc.*` / `ahp.*` + attributes. The facade is a no-op until the host installs a recorder, so it + is zero-cost when unobserved. Only the names are shared; the instrumentation + is hand-written and idiomatic to Rust. Covered by a + `tests/telemetry_emission.rs` integration test that installs a `metrics-util` + recorder and asserts the metrics actually emit (names, the in-flight gauge + going `+1` → `0`, and the `rpc.method` / `ahp.outcome` / `ahp.message.kind` + attributes), plus an `examples/otel_export.rs` showing a consumer installing + a recorder to observe the client's self-instrumentation. - `ChangesetOperationStatus::Disabled` — new variant for changeset operations that are currently unavailable and cannot be invoked. - `ChangesetOperation.group` — optional identifier for grouping related @@ -44,6 +62,14 @@ matching `## [X.Y.Z]` heading is missing from this file. as `null`. - Session reducers now apply `_meta` (`meta`) updates from every tool-call-scoped action, not only `session/toolCallStart`. +- Client telemetry: a cancelled request is now tagged + `ahp.outcome=cancelled` instead of `ahp.outcome=timeout` on the + `ahp.client.request.duration` metric. Cancellation (the caller dropping the + request future) and the in-client `default_request_timeout` deadline are now + distinct outcomes. Cancellation is recorded via an RAII span guard, so a + cancelled request still emits a `request.duration` sample and the + `ahp.client.requests.in_flight` gauge is decremented (previously a dropped + request future emitted nothing and leaked the in-flight gauge at `+1`). ### Added diff --git a/clients/rust/Cargo.lock b/clients/rust/Cargo.lock index b4ce821e..18b2193a 100644 --- a/clients/rust/Cargo.lock +++ b/clients/rust/Cargo.lock @@ -2,12 +2,23 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + [[package]] name = "ahp" version = "0.3.0" dependencies = [ "ahp-types", "ahp-ws", + "metrics", + "metrics-util", "serde", "serde_json", "thiserror", @@ -44,6 +55,12 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +[[package]] +name = "autocfg" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2032f911046de80f0a198e0901378627c33f59ea0ac00e363d481118bd70a53" + [[package]] name = "bitflags" version = "2.11.1" @@ -59,6 +76,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bumpalo" +version = "3.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72f5acc6cb2ba439de613abc23857ec3d78374d8ed5ac84e9d11336e87da8649" + [[package]] name = "bytes" version = "1.11.1" @@ -106,6 +129,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crypto-common" version = "0.1.7" @@ -143,6 +181,12 @@ dependencies = [ "syn", ] +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "equivalent" version = "1.0.2" @@ -177,6 +221,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.3.2" @@ -285,7 +335,16 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ - "foldhash", + "foldhash 0.1.5", +] + +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "foldhash 0.2.0", ] [[package]] @@ -443,6 +502,17 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "js-sys" +version = "0.3.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03d04c30968dffe80775bd4d7fb676131cd04a1fb46d2686dbffbaec2d9dfd31" +dependencies = [ + "cfg-if", + "futures-util", + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -494,6 +564,37 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "metrics" +version = "0.24.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89550ee9f79e88fef3119de263694973a8adb26c21d75322164fb8c493039fe2" +dependencies = [ + "portable-atomic", + "rapidhash", +] + +[[package]] +name = "metrics-util" +version = "0.20.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96f8722f8562635f92f8ed992f26df0532266eb03d5202607c20c0d7e9745e13" +dependencies = [ + "aho-corasick", + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.16.1", + "indexmap", + "metrics", + "ordered-float", + "quanta", + "radix_trie", + "rand", + "rand_xoshiro", + "rapidhash", + "sketches-ddsketch", +] + [[package]] name = "mio" version = "1.2.0" @@ -522,6 +623,15 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -531,6 +641,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "once_cell" version = "1.21.4" @@ -580,6 +699,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-float" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7d950ca161dc355eaf28f82b11345ed76c6e1f6eb1f4f4479e0323b9e2fbd0e" +dependencies = [ + "num-traits", +] + [[package]] name = "parking_lot" version = "0.12.5" @@ -621,6 +749,12 @@ version = "0.3.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + [[package]] name = "potential_utf" version = "0.1.5" @@ -658,6 +792,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.45" @@ -679,6 +828,16 @@ version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.9.4" @@ -708,6 +867,33 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rand_xoshiro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f703f4665700daf5512dcca5f43afa6af89f09db47fb56be587f80636bda2d41" +dependencies = [ + "rand_core", +] + +[[package]] +name = "rapidhash" +version = "4.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5e48930979c155e2f33aa36ab3119b5ee81332beb6482199a8ecd6029b80b59" +dependencies = [ + "rustversion", +] + +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -730,6 +916,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + [[package]] name = "schannel" version = "0.1.29" @@ -864,6 +1056,12 @@ dependencies = [ "libc", ] +[[package]] +name = "sketches-ddsketch" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6f73aeb92d671e0cc4dca167e59b2deb6387c375391bc99ee743f326994a2b" + [[package]] name = "slab" version = "0.4.12" @@ -1170,6 +1368,51 @@ dependencies = [ "wit-bindgen 0.51.0", ] +[[package]] +name = "wasm-bindgen" +version = "0.2.125" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ddb3f79143bced6de84270411622a2699cee572fc0875aeaf1e7867cf9fca1a" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.125" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e21a184b13fb19e157296e2c46056aec9092264fab83e4ba59e68c61b323c3d" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.125" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fecefd9c35bd935a20fc3fc344b5f29138961e4f47fb03297d88f2587afb5ebd" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.125" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23939e44bb9a5d7576fa2b563dc2e136628f1224e88a8deed09e04858b77871f" +dependencies = [ + "unicode-ident", +] + [[package]] name = "wasm-encoder" version = "0.244.0" @@ -1204,6 +1447,38 @@ dependencies = [ "semver", ] +[[package]] +name = "web-sys" +version = "0.3.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6430a72df5eb332242960fe84b3002a241163998241eb596d4f739b9757061d" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-link" version = "0.2.1" diff --git a/clients/rust/Cargo.toml b/clients/rust/Cargo.toml index a9517f0c..83a3a939 100644 --- a/clients/rust/Cargo.toml +++ b/clients/rust/Cargo.toml @@ -22,5 +22,6 @@ tokio = { version = "1", features = ["sync", "rt", "macros", "time"] } tokio-util = "0.7" futures-util = "0.3" tracing = "0.1" +metrics = "0.24" ahp-types = { path = "crates/ahp-types", version = "0.3.0" } ahp = { path = "crates/ahp", version = "0.3.0" } diff --git a/clients/rust/crates/ahp-types/src/lib.rs b/clients/rust/crates/ahp-types/src/lib.rs index e16ddb03..c02e52d1 100644 --- a/clients/rust/crates/ahp-types/src/lib.rs +++ b/clients/rust/crates/ahp-types/src/lib.rs @@ -124,6 +124,7 @@ pub mod errors; pub mod messages; pub mod notifications; pub mod state; +pub mod telemetry; pub mod version; pub use actions::{ActionEnvelope, ActionOrigin, ActionType, StateAction}; diff --git a/clients/rust/crates/ahp-types/src/telemetry.rs b/clients/rust/crates/ahp-types/src/telemetry.rs new file mode 100644 index 00000000..52ed991a --- /dev/null +++ b/clients/rust/crates/ahp-types/src/telemetry.rs @@ -0,0 +1,101 @@ +// Generated from types/*.ts — do not edit. +// +// Regenerate with: npm run generate:rust + +#![allow(missing_docs)] + +// ─── Instrumentation scope ─────────────────────────────────────────────── + +/// Instrumentation-scope name used for every AHP self-instrumentation span and metric. +pub const TELEMETRY_SOURCE: &str = "Microsoft.AgentHostProtocol"; + +// ─── Span names ────────────────────────────────────────────────────────── + +/// Span covering a single JSON-RPC request, from send until it settles. +pub const REQUEST_SPAN: &str = "ahp.request"; + +// ─── Metric names ──────────────────────────────────────────────────────── + +/// Messages sent to the host, tagged by ahp.message.kind (request|notification). +pub const MESSAGES_SENT: &str = "ahp.client.messages.sent"; +/// Messages received from the host. +pub const MESSAGES_RECEIVED: &str = "ahp.client.messages.received"; +/// Round-trip duration of a JSON-RPC request, tagged by rpc.method and ahp.outcome (ok|error|cancelled|timeout). +pub const REQUEST_DURATION: &str = "ahp.client.request.duration"; +/// Requests awaiting a response. +pub const REQUESTS_IN_FLIGHT: &str = "ahp.client.requests.in_flight"; +/// Subscriptions registered with the client (decremented on unsubscribe or shutdown). +pub const SUBSCRIPTIONS_ACTIVE: &str = "ahp.client.subscriptions.active"; +/// Reconnect operations, tagged by outcome. +pub const RECONNECTS: &str = "ahp.client.reconnects"; +/// Buffered events evicted under back-pressure (drop-oldest), tagged by stream. +pub const EVENTS_DROPPED: &str = "ahp.client.events.dropped"; +/// Inbound frames that failed to decode and were skipped (protocol resync is the host’s responsibility). +pub const FRAMES_MALFORMED: &str = "ahp.client.frames.malformed"; + +// ─── Metric units ──────────────────────────────────────────────────────── + +/// Unit for the `ahp.client.messages.sent` metric. +pub const MESSAGES_SENT_UNIT: &str = "{message}"; +/// Unit for the `ahp.client.messages.received` metric. +pub const MESSAGES_RECEIVED_UNIT: &str = "{message}"; +/// Unit for the `ahp.client.request.duration` metric. +pub const REQUEST_DURATION_UNIT: &str = "ms"; +/// Unit for the `ahp.client.requests.in_flight` metric. +pub const REQUESTS_IN_FLIGHT_UNIT: &str = "{request}"; +/// Unit for the `ahp.client.subscriptions.active` metric. +pub const SUBSCRIPTIONS_ACTIVE_UNIT: &str = "{subscription}"; +/// Unit for the `ahp.client.reconnects` metric. +pub const RECONNECTS_UNIT: &str = "{reconnect}"; +/// Unit for the `ahp.client.events.dropped` metric. +pub const EVENTS_DROPPED_UNIT: &str = "{event}"; +/// Unit for the `ahp.client.frames.malformed` metric. +pub const FRAMES_MALFORMED_UNIT: &str = "{frame}"; + +// ─── Attribute keys ────────────────────────────────────────────────────── + +/// RPC system identifier (OTel rpc.system); always "jsonrpc" for AHP. +pub const ATTR_RPC_SYSTEM: &str = "rpc.system"; +/// JSON-RPC method name the span/metric is scoped to (OTel rpc.method). +pub const ATTR_RPC_METHOD: &str = "rpc.method"; +/// Client-assigned JSON-RPC request id. +pub const ATTR_REQUEST_ID: &str = "ahp.request.id"; +/// Terminal outcome of a request or reconnect (ok|error|cancelled|timeout). +pub const ATTR_OUTCOME: &str = "ahp.outcome"; +/// Whether a sent message was a request or a notification. +pub const ATTR_MESSAGE_KIND: &str = "ahp.message.kind"; +/// Which event stream a dropped or observed event belongs to. +pub const ATTR_STREAM: &str = "ahp.stream"; + +// ─── Attribute values ──────────────────────────────────────────────────── + +/// JSON-RPC — the only RPC system AHP uses. +pub const RPC_SYSTEM_JSONRPC: &str = "jsonrpc"; +/// The request or reconnect completed successfully. +pub const OUTCOME_OK: &str = "ok"; +/// The request or reconnect failed with an error response. +pub const OUTCOME_ERROR: &str = "error"; +/// The request was cancelled before it settled. +pub const OUTCOME_CANCELLED: &str = "cancelled"; +/// The request exceeded its configured timeout. +pub const OUTCOME_TIMEOUT: &str = "timeout"; +/// A JSON-RPC request (expects a response). +pub const MESSAGE_KIND_REQUEST: &str = "request"; +/// A JSON-RPC notification (fire-and-forget). +pub const MESSAGE_KIND_NOTIFICATION: &str = "notification"; +/// A per-resource subscription stream. +pub const STREAM_SUBSCRIPTION: &str = "subscription"; +/// The client-wide event stream. +pub const STREAM_EVENT: &str = "event"; +/// A state-snapshot stream. +pub const STREAM_STATE: &str = "state"; +/// A multi-host client's host-event delivery stream. +pub const STREAM_HOST_EVENT: &str = "host-event"; +/// A multi-host client's host-subscription delivery stream. +pub const STREAM_HOST_SUBSCRIPTION: &str = "host-subscription"; +/// A multi-host client's host-resource delivery stream. +pub const STREAM_HOST_RESOURCE: &str = "host-resource"; +/// A multi-host client's host-snapshot delivery stream. +pub const STREAM_HOST_SNAPSHOT: &str = "host-snapshot"; +/// A multi-host client's host-summaries delivery stream. +pub const STREAM_HOST_SUMMARIES: &str = "host-summaries"; diff --git a/clients/rust/crates/ahp/Cargo.toml b/clients/rust/crates/ahp/Cargo.toml index 4608dc74..c3d50da1 100644 --- a/clients/rust/crates/ahp/Cargo.toml +++ b/clients/rust/crates/ahp/Cargo.toml @@ -23,12 +23,16 @@ serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +metrics = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["full"] } serde_json = { workspace = true } ahp-ws = { path = "../ahp-ws" } tracing-subscriber = "0.3" +# In-process metrics recorder used by the telemetry emission tests/example to +# observe the client's self-instrumentation. Compatible with `metrics = 0.24`. +metrics-util = "0.20" [[example]] name = "connect_ws" @@ -37,3 +41,7 @@ path = "examples/connect_ws.rs" [[example]] name = "reducers_demo" path = "examples/reducers_demo.rs" + +[[example]] +name = "otel_export" +path = "examples/otel_export.rs" diff --git a/clients/rust/crates/ahp/examples/otel_export.rs b/clients/rust/crates/ahp/examples/otel_export.rs new file mode 100644 index 00000000..fc7c6b7f --- /dev/null +++ b/clients/rust/crates/ahp/examples/otel_export.rs @@ -0,0 +1,124 @@ +//! Minimal example: install a metrics recorder and observe the AHP client's +//! own self-instrumentation. +//! +//! The `ahp` client emits its self-instrumentation through the [`metrics`] +//! facade (counters, gauges, histograms named by `ahp_types::telemetry`). The +//! facade is a no-op until a *consumer* installs a recorder — at which point +//! the client's metrics flow into whatever backend the consumer wires up. +//! +//! In production a consumer would install an OpenTelemetry-backed recorder +//! (e.g. `metrics-exporter-opentelemetry`, or a Prometheus exporter) so the +//! `ahp.client.*` metrics land in their existing telemetry pipeline. This +//! example keeps the dependency surface tiny by installing the in-process +//! [`metrics_util::debugging::DebuggingRecorder`] and printing the captured +//! values — the same observation point an OTel exporter would tap, minus the +//! network. +//! +//! Run with: +//! +//! ```sh +//! cargo run --example otel_export +//! ``` + +use ahp::{Client, ClientConfig, Transport, TransportError, TransportMessage}; +use ahp_types::messages::{JsonRpcMessage, JsonRpcSuccessResponse, JsonRpcVersion}; +use metrics_util::debugging::{DebugValue, DebuggingRecorder}; +use metrics_util::MetricKind; +use tokio::sync::mpsc; + +/// A bidirectional in-memory transport pair (same shape the tests use), so the +/// example is self-contained and needs no running server. +struct MemTransport { + tx: mpsc::Sender, + rx: mpsc::Receiver, +} + +fn pair() -> (MemTransport, MemTransport) { + let (a_tx, b_rx) = mpsc::channel(16); + let (b_tx, a_rx) = mpsc::channel(16); + ( + MemTransport { tx: a_tx, rx: a_rx }, + MemTransport { tx: b_tx, rx: b_rx }, + ) +} + +impl Transport for MemTransport { + async fn send(&mut self, msg: TransportMessage) -> Result<(), TransportError> { + self.tx.send(msg).await.map_err(|_| TransportError::Closed) + } + + async fn recv(&mut self) -> Result, TransportError> { + Ok(self.rx.recv().await) + } +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + // 1. A consumer installs a recorder. Swap `DebuggingRecorder` for an + // OpenTelemetry/Prometheus exporter recorder in a real deployment. + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + let _guard = metrics::set_default_local_recorder(&recorder); + + // 2. Drive one real request through the client so it emits metrics. + let (client_side, mut server_side) = pair(); + let client = Client::connect(client_side, ClientConfig::default()).await?; + + let server = tokio::spawn(async move { + let msg = server_side.recv().await.unwrap().unwrap(); + let JsonRpcMessage::Request(req) = msg.into_parsed().unwrap() else { + panic!("expected a Request"); + }; + let result = serde_json::json!({ + "protocolVersion": "0.1.0", + "serverSeq": 0, + "snapshots": [], + }); + let resp = JsonRpcMessage::SuccessResponse(JsonRpcSuccessResponse { + jsonrpc: JsonRpcVersion::V2, + id: req.id, + result: ahp_types::common::AnyValue::from(result), + }); + server_side + .send(TransportMessage::encode(&resp).unwrap()) + .await + .unwrap(); + }); + + let init = client + .initialize("otel-example".into(), vec!["0.1.0".into()], vec![]) + .await?; + println!("connected (protocolVersion={})", init.protocol_version); + server.await?; + client.shutdown().await; + + // 3. Observe what the client reported — exactly the data an OTel exporter + // would forward to a collector. + println!("\n--- ahp.client.* metrics observed by the consumer ---"); + for (composite, _unit, _desc, value) in snapshotter.snapshot().into_vec() { + let kind = match composite.kind() { + MetricKind::Counter => "counter", + MetricKind::Gauge => "gauge", + MetricKind::Histogram => "histogram", + }; + let key = composite.key(); + let labels: Vec = key + .labels() + .map(|l| format!("{}={}", l.key(), l.value())) + .collect(); + let rendered = match value { + DebugValue::Counter(n) => format!("{n}"), + DebugValue::Gauge(g) => format!("{}", g.into_inner()), + DebugValue::Histogram(samples) => { + format!("{} sample(s)", samples.len()) + } + }; + println!( + "{kind:9} {:40} {{{}}} = {rendered}", + key.name(), + labels.join(", ") + ); + } + + Ok(()) +} diff --git a/clients/rust/crates/ahp/src/client.rs b/clients/rust/crates/ahp/src/client.rs index cddd0044..a4ee27fa 100644 --- a/clients/rust/crates/ahp/src/client.rs +++ b/clients/rust/crates/ahp/src/client.rs @@ -47,6 +47,7 @@ use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; use tokio::task::JoinHandle; use crate::error::ClientError; +use crate::telemetry; use crate::transport::{Transport, TransportMessage}; /// Default size of a per-subscription broadcast channel. Consumers that @@ -128,7 +129,10 @@ impl ClientEventStream { match self.rx.recv().await { Ok(ev) => return Some(ev), Err(broadcast::error::RecvError::Closed) => return None, - Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Lagged(n)) => { + crate::telemetry::events_dropped(ahp_types::telemetry::STREAM_EVENT, n); + continue; + } } } } @@ -158,7 +162,10 @@ impl SessionSubscription { // Slow consumer: skip the gap and keep going. Callers // that need strict ordering should use a tighter buffer // or their own backpressure. - Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Lagged(n)) => { + crate::telemetry::events_dropped(ahp_types::telemetry::STREAM_SUBSCRIPTION, n); + continue; + } } } } @@ -294,33 +301,77 @@ impl Client { pending.insert(id, tx); } - if self - .shared - .outbound - .send(Outbound::Message(req)) - .await - .is_err() - { - self.shared.pending.lock().await.remove(&id); - return Err(ClientError::Shutdown); - } - - let result = match self.shared.config.default_request_timeout { - Some(dur) => match tokio::time::timeout(dur, rx).await { - Ok(r) => r, - Err(_) => { - self.shared.pending.lock().await.remove(&id); - return Err(ClientError::Cancelled); - } - }, - None => rx.await, + // RAII span: increments the in-flight gauge now and owns the matching + // decrement + `request.duration` record exactly once. The two ways it + // settles map to the two distinct unsuccessful outcomes the contract + // separates: + // + // * Caller CANCELLATION — the caller drops this future before it + // resolves (e.g. `tokio::time::timeout(_, client.request(..))` + // elapses, or a `select!` arm wins). The function body simply stops + // at its current `.await`; the span's `Drop` then records the + // request as [`OUTCOME_CANCELLED`]. Without the guard a dropped + // future would emit no duration sample and leak the in-flight gauge. + // + // * In-client TIMEOUT — our own `default_request_timeout` deadline + // elapses. That is *not* a cancellation: we settle the span as + // [`OUTCOME_TIMEOUT`] explicitly below. + // + // Any normal settle (ok / rpc-error / shutdown) is recorded via + // `span.settle(..)` after the body, after which `Drop` is a no-op. + let mut span = telemetry::RequestSpan::started(method); + let mut timed_out = false; + + let result: Result = 'req: { + if self + .shared + .outbound + .send(Outbound::Message(req)) + .await + .is_err() + { + self.shared.pending.lock().await.remove(&id); + break 'req Err(ClientError::Shutdown); + } + crate::telemetry::message_sent(method, ahp_types::telemetry::MESSAGE_KIND_REQUEST); + + let received = match self.shared.config.default_request_timeout { + Some(dur) => match tokio::time::timeout(dur, rx).await { + Ok(r) => r, + Err(_) => { + // The in-client deadline elapsed: a genuine timeout, + // distinct from a caller-driven cancellation. Flag it so + // the outcome stage tags `timeout` (not `error`, and not + // `cancelled` — the bug this fixes). + self.shared.pending.lock().await.remove(&id); + timed_out = true; + break 'req Err(ClientError::Cancelled); + } + }, + None => rx.await, + }; + + match received { + Ok(Ok(value)) => serde_json::from_value(value).map_err(ClientError::from), + Ok(Err(e)) => Err(ClientError::Rpc(e)), + Err(_) => Err(ClientError::Shutdown), + } }; - match result { - Ok(Ok(value)) => Ok(serde_json::from_value(value)?), - Ok(Err(e)) => Err(ClientError::Rpc(e)), - Err(_) => Err(ClientError::Shutdown), - } + // We reached the end of the body, so this was NOT a caller cancellation + // (a cancellation drops the future before here, leaving `span` to record + // CANCELLED in `Drop`). Settle with the real outcome: the in-client + // deadline reports `timeout`; everything else maps from `result`. + let outcome = if timed_out { + ahp_types::telemetry::OUTCOME_TIMEOUT + } else { + match &result { + Ok(_) => ahp_types::telemetry::OUTCOME_OK, + Err(_) => ahp_types::telemetry::OUTCOME_ERROR, + } + }; + span.settle(outcome); + result } /// Send a JSON-RPC notification (fire-and-forget). @@ -382,7 +433,12 @@ impl Client { last_seen_server_seq, subscriptions, }; - self.request("reconnect", params).await + let result = self.request("reconnect", params).await; + crate::telemetry::reconnect(match &result { + Ok(_) => ahp_types::telemetry::OUTCOME_OK, + Err(_) => ahp_types::telemetry::OUTCOME_ERROR, + }); + result } /// Subscribe to a URI and obtain a handle that streams @@ -403,9 +459,10 @@ impl Client { /// `initialSubscriptions` during [`Client::initialize`]. pub async fn attach_subscription(&self, uri: &str) -> SessionSubscription { let mut subs = self.shared.subscriptions.lock().await; - let tx = subs - .entry(uri.to_string()) - .or_insert_with(|| broadcast::channel(self.shared.config.subscription_buffer).0); + let tx = subs.entry(uri.to_string()).or_insert_with(|| { + crate::telemetry::subscription_opened(); + broadcast::channel(self.shared.config.subscription_buffer).0 + }); SessionSubscription { rx: tx.subscribe(), uri: uri.to_string(), @@ -418,7 +475,9 @@ impl Client { pub async fn unsubscribe(&self, uri: String) -> Result<(), ClientError> { { let mut subs = self.shared.subscriptions.lock().await; - subs.remove(&uri); + if subs.remove(&uri).is_some() { + crate::telemetry::subscription_closed(); + } } self.notify("unsubscribe", UnsubscribeParams { channel: uri }) .await @@ -504,8 +563,14 @@ async fn drive_transport( match inbound { Ok(Some(wire)) => { match wire.into_parsed() { - Ok(msg) => dispatch_inbound(&shared, msg).await, - Err(err) => tracing::warn!(?err, "malformed frame"), + Ok(msg) => { + crate::telemetry::message_received(); + dispatch_inbound(&shared, msg).await + } + Err(err) => { + crate::telemetry::frame_malformed(); + tracing::warn!(?err, "malformed frame") + } } } Ok(None) => break, @@ -528,6 +593,9 @@ async fn drive_transport( })); } let mut subs = shared.subscriptions.lock().await; + for _ in 0..subs.len() { + crate::telemetry::subscription_closed(); + } subs.clear(); // Drop the top-level fan-out sender so any active // `ClientEventStream::recv()` resolves with `None` rather than diff --git a/clients/rust/crates/ahp/src/lib.rs b/clients/rust/crates/ahp/src/lib.rs index 0eb51888..2d2ddbd3 100644 --- a/clients/rust/crates/ahp/src/lib.rs +++ b/clients/rust/crates/ahp/src/lib.rs @@ -150,6 +150,7 @@ pub mod error; pub mod hosts; pub mod multi_host_state_mirror; pub mod reducers; +mod telemetry; pub mod transport; pub use ahp_types; diff --git a/clients/rust/crates/ahp/src/telemetry.rs b/clients/rust/crates/ahp/src/telemetry.rs new file mode 100644 index 00000000..d64517b9 --- /dev/null +++ b/clients/rust/crates/ahp/src/telemetry.rs @@ -0,0 +1,152 @@ +//! Client self-instrumentation metrics. +//! +//! Emits the cross-client telemetry metrics named by [`ahp_types::telemetry`] +//! through the [`metrics`] facade. The facade is a no-op until the host +//! application installs a recorder, so instrumentation is effectively +//! zero-cost when unobserved — the Rust analog of the .NET client's `Meter` +//! gated on `HasListeners()`. +//! +//! Only the metric / attribute *names* are shared across language clients (they +//! are generated from `types/telemetry/registry.ts` into [`ahp_types::telemetry`]); the +//! wiring below is hand-written and idiomatic to Rust, exactly as the telemetry +//! contract intends ("only the NAMES are shared"). +//! +//! This is client SELF-instrumentation (how the client reports on its own +//! operation) and is distinct from the protocol's "OpenTelemetry over AHP" +//! channel (server → client OTLP delivery). + +use ahp_types::telemetry as names; +use metrics::{counter, gauge, histogram}; +use std::time::{Duration, Instant}; + +/// One JSON-RPC message (`kind`: [`names::MESSAGE_KIND_REQUEST`] or +/// [`names::MESSAGE_KIND_NOTIFICATION`]) was written to the transport. +pub(crate) fn message_sent(method: &str, kind: &'static str) { + counter!( + names::MESSAGES_SENT, + names::ATTR_RPC_SYSTEM => names::RPC_SYSTEM_JSONRPC, + names::ATTR_MESSAGE_KIND => kind, + names::ATTR_RPC_METHOD => method.to_owned(), + ) + .increment(1); +} + +/// One JSON-RPC message was read and parsed from the transport. +pub(crate) fn message_received() { + counter!(names::MESSAGES_RECEIVED).increment(1); +} + +/// A frame arrived that could not be parsed as a JSON-RPC message. +pub(crate) fn frame_malformed() { + counter!(names::FRAMES_MALFORMED).increment(1); +} + +/// A request settled; decrements the in-flight gauge and records its duration +/// against the given `outcome` (one of the [`names`] `OUTCOME_*` values). +/// +/// Paired with the gauge increment in [`RequestSpan::started`]; callers should +/// prefer the [`RequestSpan`] guard so cancellation is accounted for. +fn request_finished(method: &str, outcome: &'static str, elapsed: Duration) { + gauge!(names::REQUESTS_IN_FLIGHT).decrement(1.0); + histogram!( + names::REQUEST_DURATION, + names::ATTR_RPC_METHOD => method.to_owned(), + names::ATTR_OUTCOME => outcome, + ) + .record(elapsed.as_secs_f64() * 1_000.0); +} + +/// RAII guard spanning one in-flight request. +/// +/// Construction ([`RequestSpan::started`]) increments the in-flight gauge; the +/// matching decrement + duration record ([`request_finished`]) happens exactly +/// once — either via [`RequestSpan::settle`] with the real outcome, or, if the +/// span is dropped without settling (the caller cancelled the request future), +/// via `Drop` tagged [`names::OUTCOME_CANCELLED`]. +/// +/// This guarantees the in-flight gauge is balanced and a `request.duration` +/// sample is recorded even on cancellation, which a plain +/// increment/`request_finished` pair around an `.await` cannot do (a dropped +/// future runs no further statements, leaking the gauge and emitting no +/// duration). +pub(crate) struct RequestSpan { + method: String, + started: Instant, + settled: bool, +} + +impl RequestSpan { + /// Begin a span: increments the in-flight gauge and starts the clock. + pub(crate) fn started(method: &str) -> Self { + gauge!(names::REQUESTS_IN_FLIGHT).increment(1.0); + Self { + method: method.to_owned(), + started: Instant::now(), + settled: false, + } + } + + /// Record the request as finished with `outcome` (one of the `OUTCOME_*` + /// values). Idempotent guard: subsequent calls and the eventual `Drop` are + /// no-ops so the duration is recorded exactly once. + pub(crate) fn settle(&mut self, outcome: &'static str) { + if self.settled { + return; + } + self.settled = true; + request_finished(&self.method, outcome, self.started.elapsed()); + } +} + +impl Drop for RequestSpan { + fn drop(&mut self) { + // Unsettled at drop ⇒ the request future was cancelled before + // producing an outcome. Record it as such so the gauge stays balanced + // and a `cancelled` duration sample is emitted. + self.settle(names::OUTCOME_CANCELLED); + } +} + +/// A reconnect attempt settled with the given `outcome`. +pub(crate) fn reconnect(outcome: &'static str) { + counter!(names::RECONNECTS, names::ATTR_OUTCOME => outcome).increment(1); +} + +/// A subscription fan-out was opened; increments the active-subscriptions gauge. +pub(crate) fn subscription_opened() { + gauge!(names::SUBSCRIPTIONS_ACTIVE).increment(1.0); +} + +/// A subscription fan-out was closed; decrements the active-subscriptions gauge. +pub(crate) fn subscription_closed() { + gauge!(names::SUBSCRIPTIONS_ACTIVE).decrement(1.0); +} + +/// `dropped` events were skipped on the given `stream` (one of the [`names`] +/// `STREAM_*` values) because a consumer fell behind the broadcast buffer. +pub(crate) fn events_dropped(stream: &'static str, dropped: u64) { + counter!(names::EVENTS_DROPPED, names::ATTR_STREAM => stream).increment(dropped); +} + +#[cfg(test)] +mod tests { + use super::names; + + /// The metric/attribute name *constants* this module emits come straight + /// from the generated contract — assert a representative sample matches the + /// canonical `types/telemetry/registry.ts` values so a drift in either is + /// caught here. This pins the NAME constants only; that the metrics + /// actually *emit* (and carry the right attributes) is proven by the + /// `tests/telemetry_emission.rs` integration test. + #[test] + fn contract_name_constants_match() { + assert_eq!(names::MESSAGES_SENT, "ahp.client.messages.sent"); + assert_eq!(names::REQUEST_DURATION, "ahp.client.request.duration"); + assert_eq!(names::REQUESTS_IN_FLIGHT, "ahp.client.requests.in_flight"); + assert_eq!(names::FRAMES_MALFORMED, "ahp.client.frames.malformed"); + assert_eq!(names::ATTR_RPC_METHOD, "rpc.method"); + assert_eq!(names::ATTR_OUTCOME, "ahp.outcome"); + assert_eq!(names::OUTCOME_OK, "ok"); + assert_eq!(names::MESSAGE_KIND_REQUEST, "request"); + } +} diff --git a/clients/rust/crates/ahp/tests/telemetry_emission.rs b/clients/rust/crates/ahp/tests/telemetry_emission.rs new file mode 100644 index 00000000..0b7f66a4 --- /dev/null +++ b/clients/rust/crates/ahp/tests/telemetry_emission.rs @@ -0,0 +1,316 @@ +//! Integration test: prove the client's self-instrumentation metrics actually +//! *emit* — not merely that their name constants match the contract. +//! +//! Installs an in-process [`metrics_util::debugging::DebuggingRecorder`] as a +//! thread-local recorder, drives real requests through the in-memory transport +//! pair (the same `MemTransport` shape as `client_roundtrip.rs`), and asserts +//! on the captured snapshot: +//! +//! * the metric **names** fire (`ahp.client.messages.sent`, +//! `ahp.client.request.duration`), +//! * the in-flight gauge (`ahp.client.requests.in_flight`) goes `+1` while a +//! request is pending and back to `0` once it settles, and +//! * the attribute keys/values are present +//! (`rpc.method`, `ahp.outcome=ok`, `ahp.message.kind=request`). +//! +//! The recorder is installed thread-locally for the test's duration +//! ([`metrics::set_default_local_recorder`]) and the tests run on a +//! single-threaded Tokio runtime, so the client's spawned reader task and every +//! `.await` point stay on the recording thread — the documented +//! single-threaded-runtime use case for thread-local recorders. + +use ahp::{Client, ClientConfig, Transport, TransportError, TransportMessage}; +use ahp_types::messages::{JsonRpcMessage, JsonRpcSuccessResponse, JsonRpcVersion}; +use ahp_types::telemetry as names; +use metrics_util::debugging::{DebugValue, DebuggingRecorder, Snapshotter}; +use metrics_util::MetricKind; +use std::time::Duration; +use tokio::sync::mpsc; + +// ─── In-memory transport (mirrors client_roundtrip.rs) ─────────────────────── + +struct MemTransport { + tx: mpsc::Sender, + rx: mpsc::Receiver, +} + +fn pair() -> (MemTransport, MemTransport) { + let (a_tx, b_rx) = mpsc::channel(16); + let (b_tx, a_rx) = mpsc::channel(16); + ( + MemTransport { tx: a_tx, rx: a_rx }, + MemTransport { tx: b_tx, rx: b_rx }, + ) +} + +impl Transport for MemTransport { + async fn send(&mut self, msg: TransportMessage) -> Result<(), TransportError> { + self.tx.send(msg).await.map_err(|_| TransportError::Closed) + } + + async fn recv(&mut self) -> Result, TransportError> { + Ok(self.rx.recv().await) + } +} + +// ─── Snapshot helpers ──────────────────────────────────────────────────────── +// +// NOTE: `Snapshotter::snapshot()` is DESTRUCTIVE — it swaps counters/gauges to +// 0 and drains histograms on each call. So every test takes a *single* +// snapshot at a coordinated point and reads all metrics out of that one +// `Vec`, never re-snapshotting. + +type Row = (MetricKind, metrics::Key, DebugValue); + +/// Snapshot the recorder once and return `(kind, key, value)` rows. Destructive +/// per `metrics-util`: drains counters/gauges/histograms. +fn drain(snapshotter: &Snapshotter) -> Vec { + snapshotter + .snapshot() + .into_vec() + .into_iter() + .map(|(composite, _u, _d, value)| { + let (kind, key) = composite.into_parts(); + (kind, key, value) + }) + .collect() +} + +/// Borrow the first row in an already-taken snapshot matching `kind` + `name` +/// (`DebugValue` is not `Clone`, so callers work with references). +fn pick<'a>( + rows: &'a [Row], + kind: MetricKind, + name: &str, +) -> Option<(&'a metrics::Key, &'a DebugValue)> { + rows.iter() + .find(|(k, key, _v)| *k == kind && key.name() == name) + .map(|(_k, key, value)| (key, value)) +} + +/// Read the gauge value for `name` from an already-taken snapshot, or `0.0` if +/// absent (gauge untouched this interval). +fn gauge_value(rows: &[Row], name: &str) -> f64 { + match pick(rows, MetricKind::Gauge, name) { + Some((_key, DebugValue::Gauge(g))) => g.into_inner(), + _ => 0.0, + } +} + +/// Assert that `key`'s labels contain `(label_key, label_value)`. +fn assert_label(key: &metrics::Key, label_key: &str, label_value: &str) { + let found = key + .labels() + .any(|l| l.key() == label_key && l.value() == label_value); + assert!( + found, + "expected label {label_key}={label_value} on {}; labels were [{}]", + key.name(), + key.labels() + .map(|l| format!("{}={}", l.key(), l.value())) + .collect::>() + .join(", ") + ); +} + +fn init_result() -> serde_json::Value { + serde_json::json!({ + "protocolVersion": "0.1.0", + "serverSeq": 0, + "snapshots": [], + }) +} + +// ─── Tests ─────────────────────────────────────────────────────────────────── + +#[tokio::test(flavor = "current_thread")] +async fn emits_request_metrics() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + let guard = metrics::set_default_local_recorder(&recorder); + let _ = &guard; // dropped at end of scope, restoring the prior recorder + + let (client_side, mut server_side) = pair(); + let client = Client::connect(client_side, ClientConfig::default()) + .await + .expect("connect"); + + // Deterministic coordination so we can snapshot the in-flight gauge while + // the request is genuinely pending (no polling — `snapshot()` is + // destructive, so we take exactly two snapshots at known points): + // * `received_tx`: server signals it has READ the request → the client's + // `request()` has already incremented the in-flight gauge and emitted + // `messages.sent`, but has not yet received a response. + // * `release_rx`: test tells the server to send its (held) response. + // The server task records no client metrics, so its thread-local recorder + // scope is irrelevant — only the request future (driven inline below) + // records through `recorder`. + let (received_tx, received_rx) = tokio::sync::oneshot::channel::<()>(); + let (release_tx, release_rx) = tokio::sync::oneshot::channel::<()>(); + + let server = tokio::spawn(async move { + let msg = server_side.recv().await.unwrap().unwrap(); + let JsonRpcMessage::Request(req) = msg.into_parsed().unwrap() else { + panic!("expected a Request"); + }; + assert_eq!(req.method, "initialize"); + received_tx.send(()).unwrap(); + release_rx.await.ok(); + let resp = JsonRpcMessage::SuccessResponse(JsonRpcSuccessResponse { + jsonrpc: JsonRpcVersion::V2, + id: req.id, + result: ahp_types::common::AnyValue::from(init_result()), + }); + server_side + .send(TransportMessage::encode(&resp).unwrap()) + .await + .unwrap(); + }); + + // Drive the request future inline (on THIS task, sharing its thread-local + // recorder scope) concurrently with a coordinator that snapshots mid-flight. + let request_fut = client.initialize("test-client".into(), vec!["0.1.0".into()], vec![]); + let coordinator = async { + // Wait until the server has read the request → it is in flight. + received_rx.await.unwrap(); + // MID-FLIGHT SNAPSHOT (single, destructive): gauge should be +1 and + // `messages.sent` should already carry the request attributes. + let mid = drain(&snapshotter); + release_tx.send(()).unwrap(); + mid + }; + + let (init, mid) = tokio::join!( + async { request_fut.await.expect("initialize") }, + coordinator + ); + assert_eq!(init.protocol_version, "0.1.0"); + server.await.unwrap(); + + // ── Assert the MID-FLIGHT snapshot ── + let mid_in_flight = gauge_value(&mid, names::REQUESTS_IN_FLIGHT); + assert_eq!( + mid_in_flight, 1.0, + "in-flight gauge should read +1 while the request is pending" + ); + + // The `messages.sent` counter fired for the request, with the request kind, + // rpc system, and method attributes. + let (sent_key, sent_val) = pick(&mid, MetricKind::Counter, names::MESSAGES_SENT) + .expect("messages.sent counter should have emitted"); + assert!( + matches!(sent_val, DebugValue::Counter(n) if *n >= 1), + "messages.sent should have incremented", + ); + assert_label(sent_key, names::ATTR_RPC_METHOD, "initialize"); + assert_label( + sent_key, + names::ATTR_MESSAGE_KIND, + names::MESSAGE_KIND_REQUEST, + ); + assert_label(sent_key, names::ATTR_RPC_SYSTEM, names::RPC_SYSTEM_JSONRPC); + + // ── Assert the FINAL snapshot (after the request settled) ── + let fin = drain(&snapshotter); + + // The mid-flight snapshot consumed (reset) the gauge atomic to 0, so the + // settle's `decrement(1.0)` leaves it at -1.0 here. mid + final == 0 proves + // the increment and decrement balanced — i.e. the gauge went +1 then back + // down by exactly 1. + let final_in_flight = gauge_value(&fin, names::REQUESTS_IN_FLIGHT); + assert_eq!( + final_in_flight, -1.0, + "settle should decrement the gauge by 1 (consumed-snapshot accounting)" + ); + assert_eq!( + mid_in_flight + final_in_flight, + 0.0, + "in-flight gauge increment and decrement should balance to 0" + ); + + // The `request.duration` histogram fired with the OK outcome + method. + let (dur_key, dur_val) = pick(&fin, MetricKind::Histogram, names::REQUEST_DURATION) + .expect("request.duration histogram should have emitted"); + assert!( + matches!(dur_val, DebugValue::Histogram(samples) if !samples.is_empty()), + "request.duration should have recorded at least one sample", + ); + assert_label(dur_key, names::ATTR_RPC_METHOD, "initialize"); + assert_label(dur_key, names::ATTR_OUTCOME, names::OUTCOME_OK); + + client.shutdown().await; +} + +#[tokio::test(flavor = "current_thread")] +async fn cancelled_request_tags_outcome_cancelled() { + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + let guard = metrics::set_default_local_recorder(&recorder); + let _ = &guard; + + let (client_side, mut server_side) = pair(); + // Generous default timeout: we want the *caller* to cancel, not the + // in-client deadline to fire (which would be a `timeout`, not a + // `cancelled`, outcome). + let config = ClientConfig { + default_request_timeout: Some(Duration::from_secs(30)), + ..ClientConfig::default() + }; + let client = Client::connect(client_side, config).await.expect("connect"); + + // Server reads the request but never replies, so it stays in flight until + // the caller drops the future. + let server = tokio::spawn(async move { + let msg = server_side.recv().await.unwrap().unwrap(); + let JsonRpcMessage::Request(req) = msg.into_parsed().unwrap() else { + panic!("expected a Request"); + }; + assert_eq!(req.method, "initialize"); + std::future::pending::<()>().await; // hold open; never respond + }); + + // Cancel the request by dropping its future via a caller-side timeout — + // the codebase's established cancellation idiom (see tests/hosts.rs). + let cancelled = tokio::time::timeout( + Duration::from_millis(150), + client.initialize("test-client".into(), vec!["0.1.0".into()], vec![]), + ) + .await; + assert!( + cancelled.is_err(), + "the request future should have been cancelled by the caller-side timeout", + ); + + // Single snapshot after the cancellation; read both metrics from it. + let snap = drain(&snapshotter); + + // The duration metric must have been recorded with the CANCELLED outcome + // (not timeout, not ok) — proves the drop-guard cancellation path. + let (dur_key, dur_val) = pick(&snap, MetricKind::Histogram, names::REQUEST_DURATION) + .expect("request.duration should emit even on cancellation"); + assert!( + matches!(dur_val, DebugValue::Histogram(samples) if !samples.is_empty()), + "request.duration should have recorded a sample on cancellation", + ); + assert_label(dur_key, names::ATTR_OUTCOME, names::OUTCOME_CANCELLED); + // Falsifiability: a regression to the old `OUTCOME_TIMEOUT` mapping would + // put a `timeout` label here, failing this assert. + assert!( + dur_key + .labels() + .all(|l| !(l.key() == names::ATTR_OUTCOME && l.value() == names::OUTCOME_TIMEOUT)), + "a cancelled request must not be tagged ahp.outcome=timeout", + ); + + // The in-flight gauge must balance back to 0 after cancellation: the + // drop-guard's decrement(1.0) cancels the started increment(1.0). A leak + // (no drop-guard) would read +1 here. + assert_eq!( + gauge_value(&snap, names::REQUESTS_IN_FLIGHT), + 0.0, + "in-flight gauge should return to 0 after a cancelled request", + ); + + server.abort(); + client.shutdown().await; +} diff --git a/clients/swift/AgentHostProtocol/Sources/AgentHostProtocol/Generated/Telemetry.generated.swift b/clients/swift/AgentHostProtocol/Sources/AgentHostProtocol/Generated/Telemetry.generated.swift new file mode 100644 index 00000000..c2afa8d2 --- /dev/null +++ b/clients/swift/AgentHostProtocol/Sources/AgentHostProtocol/Generated/Telemetry.generated.swift @@ -0,0 +1,102 @@ +// Generated from types/*.ts — do not edit + +import Foundation + +// MARK: - Telemetry Names + +/// Cross-client telemetry names — the self-instrumentation contract shared by every +/// AHP client, generated from `types/telemetry/registry.ts`. Only the names are shared; +/// the tracer / meter wiring is hand-written per platform. +public enum AhpTelemetryNames { + + // MARK: - Source + /// Instrumentation-scope name used for every AHP self-instrumentation span and metric. + public static let source = "Microsoft.AgentHostProtocol" + + // MARK: - Span Names + /// Span covering a single JSON-RPC request, from send until it settles. + public static let requestSpan = "ahp.request" + + // MARK: - Metric Names + /// Messages sent to the host, tagged by ahp.message.kind (request|notification). + public static let messagesSent = "ahp.client.messages.sent" + /// Messages received from the host. + public static let messagesReceived = "ahp.client.messages.received" + /// Round-trip duration of a JSON-RPC request, tagged by rpc.method and ahp.outcome (ok|error|cancelled|timeout). + public static let requestDuration = "ahp.client.request.duration" + /// Requests awaiting a response. + public static let requestsInFlight = "ahp.client.requests.in_flight" + /// Subscriptions registered with the client (decremented on unsubscribe or shutdown). + public static let subscriptionsActive = "ahp.client.subscriptions.active" + /// Reconnect operations, tagged by outcome. + public static let reconnects = "ahp.client.reconnects" + /// Buffered events evicted under back-pressure (drop-oldest), tagged by stream. + public static let eventsDropped = "ahp.client.events.dropped" + /// Inbound frames that failed to decode and were skipped (protocol resync is the host’s responsibility). + public static let framesMalformed = "ahp.client.frames.malformed" + + // MARK: - Metric Units + /// Unit for the `ahp.client.messages.sent` metric. + public static let messagesSentUnit = "{message}" + /// Unit for the `ahp.client.messages.received` metric. + public static let messagesReceivedUnit = "{message}" + /// Unit for the `ahp.client.request.duration` metric. + public static let requestDurationUnit = "ms" + /// Unit for the `ahp.client.requests.in_flight` metric. + public static let requestsInFlightUnit = "{request}" + /// Unit for the `ahp.client.subscriptions.active` metric. + public static let subscriptionsActiveUnit = "{subscription}" + /// Unit for the `ahp.client.reconnects` metric. + public static let reconnectsUnit = "{reconnect}" + /// Unit for the `ahp.client.events.dropped` metric. + public static let eventsDroppedUnit = "{event}" + /// Unit for the `ahp.client.frames.malformed` metric. + public static let framesMalformedUnit = "{frame}" + + // MARK: - Attribute Keys + /// RPC system identifier (OTel rpc.system); always "jsonrpc" for AHP. + public static let attrRpcSystem = "rpc.system" + /// JSON-RPC method name the span/metric is scoped to (OTel rpc.method). + public static let attrRpcMethod = "rpc.method" + /// Client-assigned JSON-RPC request id. + public static let attrRequestId = "ahp.request.id" + /// Terminal outcome of a request or reconnect (ok|error|cancelled|timeout). + public static let attrOutcome = "ahp.outcome" + /// Whether a sent message was a request or a notification. + public static let attrMessageKind = "ahp.message.kind" + /// Which event stream a dropped or observed event belongs to. + public static let attrStream = "ahp.stream" + + // MARK: - Attribute Values + /// JSON-RPC — the only RPC system AHP uses. + public static let rpcSystemJsonrpc = "jsonrpc" + /// The request or reconnect completed successfully. + public static let outcomeOk = "ok" + /// The request or reconnect failed with an error response. + public static let outcomeError = "error" + /// The request was cancelled before it settled. + public static let outcomeCancelled = "cancelled" + /// The request exceeded its configured timeout. + public static let outcomeTimeout = "timeout" + /// A JSON-RPC request (expects a response). + public static let messageKindRequest = "request" + /// A JSON-RPC notification (fire-and-forget). + public static let messageKindNotification = "notification" + /// A per-resource subscription stream. + public static let streamSubscription = "subscription" + /// The client-wide event stream. + public static let streamEvent = "event" + /// A state-snapshot stream. + public static let streamState = "state" + /// A multi-host client's host-event delivery stream. + public static let streamHostEvent = "host-event" + /// A multi-host client's host-subscription delivery stream. + public static let streamHostSubscription = "host-subscription" + /// A multi-host client's host-resource delivery stream. + public static let streamHostResource = "host-resource" + /// A multi-host client's host-snapshot delivery stream. + public static let streamHostSnapshot = "host-snapshot" + /// A multi-host client's host-summaries delivery stream. + public static let streamHostSummaries = "host-summaries" + +} diff --git a/clients/swift/CHANGELOG.md b/clients/swift/CHANGELOG.md index dd3a101e..89174454 100644 --- a/clients/swift/CHANGELOG.md +++ b/clients/swift/CHANGELOG.md @@ -19,6 +19,12 @@ the tag matches the version pinned in [`VERSION`](VERSION). ### Added +- Generated `AhpTelemetryNames` — the shared cross-client self-instrumentation + span / metric / attribute names, generated from the + `types/telemetry/registry.ts` contract so they stay identical across clients. + Includes the `host-event` / `host-subscription` / `host-resource` / + `host-snapshot` / `host-summaries` `ahp.stream` values for multi-host + dropped-event accounting. - `ChangesetOperationStatus.disabled` — new case for changeset operations that are currently unavailable and cannot be invoked. - `ChangesetOperation.group` — optional identifier for grouping related diff --git a/clients/typescript/CHANGELOG.md b/clients/typescript/CHANGELOG.md index f602219a..9f473994 100644 --- a/clients/typescript/CHANGELOG.md +++ b/clients/typescript/CHANGELOG.md @@ -22,6 +22,12 @@ hotfix escape hatch. ### Added +- Telemetry-name constants (`TELEMETRY_SOURCE`, `TELEMETRY_SPANS`, + `TELEMETRY_METRICS`, `TELEMETRY_ATTRIBUTES`, `TELEMETRY_VALUES`) — the shared + cross-client self-instrumentation names, re-exported from the package root. + Includes the `host-event` / `host-subscription` / `host-resource` / + `host-snapshot` / `host-summaries` `ahp.stream` values for multi-host + dropped-event accounting. - `ChangesetOperationStatus.Disabled` — new enum value for changeset operations that are currently unavailable and cannot be invoked. - `ChangesetOperation.group` — optional identifier for grouping related diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 6de549bd..86c0f634 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -71,6 +71,7 @@ export default withMermaid(defineConfig({ { text: 'Terminal Channel', link: '/specification/terminal-channel' }, { text: 'Resource Watch Channel', link: '/specification/resource-watch-channel' }, { text: 'Telemetry Channel', link: '/specification/telemetry-channel' }, + { text: 'Client Self-Instrumentation', link: '/specification/self-instrumentation' }, ], }, ], diff --git a/docs/specification/self-instrumentation.md b/docs/specification/self-instrumentation.md new file mode 100644 index 00000000..b1c0172d --- /dev/null +++ b/docs/specification/self-instrumentation.md @@ -0,0 +1,105 @@ +# Client Self-Instrumentation + +Self-instrumentation is how an AHP **client** reports on *its own* operation — +the spans, metrics, and attributes it emits about the requests it sends, the +frames it receives, the reconnects it performs, and the events it drops. + +It is distinct from the [telemetry channel](./telemetry-channel.md), which +carries OpenTelemetry data the **host** emits *to* clients (server → client +OTLP delivery). This page is about the client observing itself, not the host +observing the agent. + +> Self-instrumentation is **optional** and **non-normative** for interop: a +> client that emits no telemetry is fully conformant. What *is* normative when a +> client chooses to instrument is the **names** it uses — see below. + +## The names are the contract + +Every client's self-instrumentation flows into **OpenTelemetry**: + +- The **.NET** client uses `System.Diagnostics.ActivitySource` + `Meter`, which + the OpenTelemetry .NET SDK consumes natively — no shim. +- The **Rust** client emits through the [`metrics`](https://docs.rs/metrics) + facade (and `tracing` for diagnostics), both of which convert to OTel through + the standard exporters (`tracing-opentelemetry`, an OTel metrics exporter). +- Other clients adopt the same names when they add instrumentation + (OTel-Go / swift-otel / OTel-Java / OTel-JS). + +In OpenTelemetry the **span / metric / attribute names *are* the contract**: the +dashboards, alerts, and queries an operator builds are keyed off those names. If +the .NET client emits `ahp.client.request.duration` while another client emits a +differently-spelled name for the same thing, a single operator query can no +longer span both clients and cross-client observability silently breaks. + +So for the OTel contract to stay consistent across clients, these names **must be +identical by construction**, not by convention. + +## How the names stay identical: one generated source + +The names live in a single TypeScript source — `types/telemetry/registry.ts` — +as **string enums** (`TelemetrySpan`, `TelemetryMetric`, `TelemetryAttribute`, +`TelemetryOutcome`, …), the same shape as the protocol enums (e.g. +`ChangesetOperationStatus`). Each client SDK gets an idiomatic, **generated** +holder compiled from them: + +| Client | Generated holder | +|---|---| +| .NET | `AhpTelemetryNames` (static class) | +| Rust | `ahp_types::telemetry` (`pub const &str`) | +| Swift | `AhpTelemetryNames` (caseless `enum`) | +| Kotlin | `AhpTelemetryNames` (`object`) | +| Go | `telemetry.generated.go` (package consts) | +| TypeScript | the telemetry enums re-exported from the package root | + +One edit to `types/telemetry/registry.ts` propagates to every client via +`npm run generate`; a divergent hand-typed name is impossible. (The generated +holders are flat constant holders, not language enums — telemetry names are +consumed as raw strings by `Meter` / `metrics`.) + +Each name's description is a JSDoc comment on its enum member, extracted by the +generators with `member.getJsDocs()` — exactly the way descriptions are +extracted for every other protocol enum — so the description authored once, +right next to the name, surfaces as a doc comment in every SDK. + +Only the **names** are shared. The instrumentation *logic* — `ActivitySource` / +`Meter` wiring, `HasListeners()`-style gating, the `metrics` facade calls — stays +hand-written and idiomatic per language. + +## Names + +The current contract (see `types/telemetry/registry.ts` for the authoritative +list and per-name descriptions): + +- **Scope:** `Microsoft.AgentHostProtocol` +- **Span:** `ahp.request` +- **Metrics:** `ahp.client.messages.sent` / `.messages.received` / + `.request.duration` / `.requests.in_flight` / `.subscriptions.active` / + `.reconnects` / `.events.dropped` / `.frames.malformed` +- **Attributes:** `rpc.system`, `rpc.method`, `ahp.request.id`, `ahp.outcome`, + `ahp.message.kind`, `ahp.stream` +- **Attribute values:** `ahp.outcome` ∈ `{ok, error, cancelled, timeout}`; + `ahp.message.kind` ∈ `{request, notification}`; `ahp.stream` ∈ + `{subscription, event, state, host-event, host-subscription, host-resource, + host-snapshot, host-summaries}`; `rpc.system` = `jsonrpc`. + +`rpc.*` attributes follow the OpenTelemetry RPC semantic conventions. The +`host-*` `ahp.stream` values name the per-stream dropped-event channels a +multi-host client (e.g. the .NET client) fans the host's own notifications +across; they are enumerated attribute *values*, not OTel instrument names, so +the hyphenated spelling is intentional. + +## Consuming the telemetry + +The SDKs emit through each platform's native OpenTelemetry primitives, so any +standard exporter picks the signals up. Two reference examples wire a real OTel +exporter to the generated names end-to-end: + +- **Rust:** `clients/rust/crates/ahp/examples/otel_export.rs` — installs a + `metrics` recorder and exports the `ahp.client.*` metrics. +- **.NET:** the `OtelExport` example under `clients/dotnet/examples/` — subscribes + an OTel `MeterListener` / `ActivitySource` to `Microsoft.AgentHostProtocol` + and exports the spans + metrics. + +In both, the instrumentation is zero-cost until a recorder / listener is +installed, so importing the SDK does not force a telemetry dependency on hosts +that don't want one. diff --git a/docs/specification/telemetry-channel.md b/docs/specification/telemetry-channel.md index 8f4d0e6d..a968771f 100644 --- a/docs/specification/telemetry-channel.md +++ b/docs/specification/telemetry-channel.md @@ -2,6 +2,8 @@ The telemetry channel is the way an agent host emits OpenTelemetry (OTel) data — logs, traces, and metrics — to AHP clients. It is a thin pass-through: payloads on the wire are [OTLP/JSON](https://github.com/open-telemetry/opentelemetry-proto) values verbatim. AHP only adds the routing envelope. +> **Not to be confused with [client self-instrumentation](./self-instrumentation.md).** This channel is how the *host* delivers OTel data *to* clients. Self-instrumentation is how a client reports on *its own* operation (the RPC spans/metrics it emits) — a separate concern with its own shared name contract. + This page is normative. The OTel data model itself is defined by [opentelemetry-proto](https://github.com/open-telemetry/opentelemetry-proto); AHP does not redeclare it. ## URI Scheme diff --git a/package.json b/package.json index dfd3842d..4a4fb0bf 100644 --- a/package.json +++ b/package.json @@ -16,12 +16,13 @@ "generate:metadata": "tsx scripts/generate.ts --metadata", "verify:release-metadata": "tsx scripts/verify-release-metadata.ts", "verify:changelog": "tsx scripts/verify-changelog.ts", + "verify:generated": "tsx scripts/verify-generated.ts", "docs:dev": "vitepress dev docs", "docs:build": "npm run generate && vitepress build docs", "docs:preview": "vitepress preview docs", "typecheck": "tsc --noEmit -p types/tsconfig.json", "lint": "eslint", - "test": "npm run typecheck && npm run lint && npm run verify:release-metadata && npm run verify:changelog && npx c8 --include types/reducers.ts --check-coverage --branches 100 tsx --test types/*.test.ts types/version/*.test.ts" + "test": "npm run typecheck && npm run lint && npm run verify:release-metadata && npm run verify:changelog && npm run verify:generated && npx c8 --include types/reducers.ts --check-coverage --branches 100 tsx --test types/*.test.ts types/version/*.test.ts types/telemetry/*.test.ts scripts/*.test.ts" }, "repository": { "type": "git", diff --git a/scripts/generate-go.ts b/scripts/generate-go.ts index f5b1ba8c..44a02db4 100644 --- a/scripts/generate-go.ts +++ b/scripts/generate-go.ts @@ -41,6 +41,7 @@ import fs from 'fs'; import path from 'path'; import { findProtocolSourceFiles } from './find-protocol-sources.js'; import { readProtocolVersions } from './read-protocol-versions.js'; +import { readTelemetry } from './read-telemetry.js'; const GENERATED_BANNER = '// Generated from types/*.ts — do not edit.\n' + @@ -1953,6 +1954,56 @@ function resolveGoFmt(allowMissingFormatter: boolean): string | undefined { ); } +// ─── Telemetry Names File Generator ────────────────────────────────────────── + +function generateGoTelemetryFile(project: Project): string { + const t = readTelemetry(project); + const lines: string[] = [GENERATED_BANNER]; + + lines.push('// Cross-client telemetry NAMES — the self-instrumentation contract shared by every'); + lines.push('// AHP client, generated from types/telemetry/registry.ts. Only the names are shared;'); + lines.push('// the tracer / meter wiring is hand-written per language.'); + lines.push(''); + lines.push('const ('); + if (t.source.doc) lines.push(`\t// ${t.source.doc}`); + lines.push(`\tTelemetrySource = ${JSON.stringify(t.source.value)}`); + lines.push(''); + lines.push('\t// Span names.'); + for (const span of t.spans) { + if (span.doc) lines.push(`\t// ${span.doc}`); + lines.push(`\t${span.id}Span = ${JSON.stringify(span.value)}`); + } + lines.push(''); + lines.push('\t// Metric names.'); + for (const metric of t.metrics) { + if (metric.doc) lines.push(`\t// ${metric.doc}`); + lines.push(`\t${metric.id}Metric = ${JSON.stringify(metric.value)}`); + } + lines.push(''); + lines.push('\t// Metric units.'); + for (const metric of t.metrics) { + lines.push(`\t// Unit for the ${metric.value} metric.`); + lines.push(`\t${metric.id}Unit = ${JSON.stringify(metric.unit)}`); + } + lines.push(''); + lines.push('\t// Attribute keys.'); + for (const attr of t.attributes) { + if (attr.doc) lines.push(`\t// ${attr.doc}`); + lines.push(`\tAttr${attr.id} = ${JSON.stringify(attr.value)}`); + } + lines.push(''); + lines.push('\t// Attribute values.'); + for (const group of t.values) { + for (const member of group.members) { + if (member.doc) lines.push(`\t// ${member.doc}`); + lines.push(`\t${group.group}${member.id} = ${JSON.stringify(member.value)}`); + } + } + lines.push(')'); + lines.push(''); + return lines.join('\n'); +} + // ─── Main Entry Point ──────────────────────────────────────────────────────── export function generateGoModule(project: Project, outputDir: string, options: GenerateGoModuleOptions = {}): void { @@ -1970,6 +2021,7 @@ export function generateGoModule(project: Project, outputDir: string, options: G fs.writeFileSync(path.join(srcDir, 'errors.generated.go'), generateErrorsFile()); fs.writeFileSync(path.join(srcDir, 'messages.generated.go'), generateMessagesFile()); fs.writeFileSync(path.join(srcDir, 'version.generated.go'), generateVersionFile(project)); + fs.writeFileSync(path.join(srcDir, 'telemetry.generated.go'), generateGoTelemetryFile(project)); if (gofmt) { try { diff --git a/scripts/generate-kotlin.ts b/scripts/generate-kotlin.ts index 4b1993df..925655b3 100644 --- a/scripts/generate-kotlin.ts +++ b/scripts/generate-kotlin.ts @@ -35,6 +35,7 @@ import fs from 'fs'; import path from 'path'; import { findProtocolSourceFiles } from './find-protocol-sources.js'; import { readProtocolVersions } from './read-protocol-versions.js'; +import { readTelemetry } from './read-telemetry.js'; const GENERATED_HEADER = '// Generated from types/*.ts — do not edit\n\n' + @@ -1894,6 +1895,73 @@ function checkExhaustiveness(project: Project): void { } } +// ─── Telemetry Names File Generator ────────────────────────────────────────── + +function generateKotlinTelemetryFile(project: Project): string { + const t = readTelemetry(project); + /** PascalCase id → SCREAMING_SNAKE_CASE, matching the existing object-constant style. */ + const scream = (s: string): string => toScreamingSnake(s); + + const lines: string[] = [ + '// Generated from types/telemetry/registry.ts — do not edit', + '', + `package ${PACKAGE}`, + '', + '/**', + ' * Cross-client telemetry NAMES — the self-instrumentation contract shared by every', + ' * AHP client, generated from `types/telemetry/registry.ts`. Only the names are shared;', + ' * the OpenTelemetry tracer / meter wiring is hand-written per language.', + ' */', + 'object AhpTelemetryNames {', + '', + ' // ── Instrumentation scope ──', + ` /** ${t.source.doc} */`, + ` const val SOURCE: String = ${JSON.stringify(t.source.value)}`, + '', + ' // ── Span names ──', + ]; + + for (const span of t.spans) { + if (span.doc) lines.push(` /** ${span.doc} */`); + lines.push(` const val ${scream(span.id)}_SPAN: String = ${JSON.stringify(span.value)}`); + } + + lines.push(''); + lines.push(' // ── Metric names ──'); + for (const metric of t.metrics) { + if (metric.doc) lines.push(` /** ${metric.doc} */`); + lines.push(` const val ${scream(metric.id)}: String = ${JSON.stringify(metric.value)}`); + } + + lines.push(''); + lines.push(' // ── Metric units ──'); + for (const metric of t.metrics) { + lines.push(` /** Unit for the \`${metric.value}\` metric. */`); + lines.push(` const val ${scream(metric.id)}_UNIT: String = ${JSON.stringify(metric.unit)}`); + } + + lines.push(''); + lines.push(' // ── Attribute keys ──'); + for (const attr of t.attributes) { + if (attr.doc) lines.push(` /** ${attr.doc} */`); + lines.push(` const val ATTR_${scream(attr.id)}: String = ${JSON.stringify(attr.value)}`); + } + + lines.push(''); + lines.push(' // ── Attribute values ──'); + for (const group of t.values) { + for (const member of group.members) { + if (member.doc) lines.push(` /** ${member.doc} */`); + lines.push(` const val ${scream(group.group)}_${scream(member.id)}: String = ${JSON.stringify(member.value)}`); + } + } + + lines.push('}'); + lines.push(''); + + return lines.join('\n'); +} + // ─── Main Entry Point ──────────────────────────────────────────────────────── function generateVersionFile(project: Project): string { @@ -1942,4 +2010,5 @@ export function generateKotlinPackage(project: Project, outputDir: string): void fs.writeFileSync(path.join(generatedDir, 'Errors.generated.kt'), generateErrorsFile(project)); fs.writeFileSync(path.join(generatedDir, 'Messages.generated.kt'), generateMessagesFile()); fs.writeFileSync(path.join(generatedDir, 'Version.generated.kt'), generateVersionFile(project)); + fs.writeFileSync(path.join(generatedDir, 'Telemetry.generated.kt'), generateKotlinTelemetryFile(project)); } diff --git a/scripts/generate-rust.ts b/scripts/generate-rust.ts index a167652c..f1ec70df 100644 --- a/scripts/generate-rust.ts +++ b/scripts/generate-rust.ts @@ -22,6 +22,7 @@ import fs from 'fs'; import path from 'path'; import { findProtocolSourceFiles } from './find-protocol-sources.js'; import { readProtocolVersions } from './read-protocol-versions.js'; +import { readTelemetry } from './read-telemetry.js'; const GENERATED_BANNER = '// Generated from types/*.ts — do not edit.\n//\n// Regenerate with: npm run generate:rust\n\n#![allow(missing_docs)]\n'; @@ -1767,6 +1768,67 @@ function checkExhaustiveness(project: Project): void { } } +// ─── Telemetry Names File Generator ────────────────────────────────────────── + +/** Convert a PascalCase (or camelCase) identifier to SCREAMING_SNAKE_CASE. */ +function toScreamingSnake(name: string): string { + return name.replace(/([a-z0-9])([A-Z])/g, '$1_$2').toUpperCase(); +} + +function generateRustTelemetryFile(project: Project): string { + const t = readTelemetry(project); + const lines: string[] = [GENERATED_BANNER]; + + lines.push('// ─── Instrumentation scope ───────────────────────────────────────────────\n'); + if (t.source.doc) lines.push(`/// ${t.source.doc}`); + lines.push(`pub const TELEMETRY_SOURCE: &str = ${JSON.stringify(t.source.value)};\n`); + + lines.push('// ─── Span names ──────────────────────────────────────────────────────────\n'); + for (const span of t.spans) { + const constName = toScreamingSnake(span.id) + '_SPAN'; + if (span.doc) lines.push(`/// ${span.doc}`); + lines.push(`pub const ${constName}: &str = ${JSON.stringify(span.value)};`); + } + lines.push(''); + + lines.push('// ─── Metric names ────────────────────────────────────────────────────────\n'); + for (const metric of t.metrics) { + const constName = toScreamingSnake(metric.id); + if (metric.doc) lines.push(`/// ${metric.doc}`); + lines.push(`pub const ${constName}: &str = ${JSON.stringify(metric.value)};`); + } + lines.push(''); + + lines.push('// ─── Metric units ────────────────────────────────────────────────────────\n'); + for (const metric of t.metrics) { + const constName = toScreamingSnake(metric.id) + '_UNIT'; + lines.push(`/// Unit for the \`${metric.value}\` metric.`); + lines.push(`pub const ${constName}: &str = ${JSON.stringify(metric.unit)};`); + } + lines.push(''); + + lines.push('// ─── Attribute keys ──────────────────────────────────────────────────────\n'); + for (const attr of t.attributes) { + const constName = 'ATTR_' + toScreamingSnake(attr.id); + if (attr.doc) lines.push(`/// ${attr.doc}`); + lines.push(`pub const ${constName}: &str = ${JSON.stringify(attr.value)};`); + } + lines.push(''); + + lines.push('// ─── Attribute values ────────────────────────────────────────────────────\n'); + for (const group of t.values) { + const groupPrefix = toScreamingSnake(group.group) + '_'; + for (const member of group.members) { + const constName = groupPrefix + toScreamingSnake(member.id); + if (member.doc) lines.push(`/// ${member.doc}`); + lines.push(`pub const ${constName}: &str = ${JSON.stringify(member.value)};`); + } + } + lines.push(''); + + return lines.join('\n'); +} + // ─── Main Entry Point ──────────────────────────────────────────────────────── export function generateRustCrate(project: Project, outputDir: string, options: GenerateRustCrateOptions = {}): void { @@ -1791,6 +1853,7 @@ export function generateRustCrate(project: Project, outputDir: string, options: fs.writeFileSync(path.join(srcDir, 'errors.rs'), generateErrorsFile()); fs.writeFileSync(path.join(srcDir, 'messages.rs'), generateMessagesFile()); fs.writeFileSync(path.join(srcDir, 'version.rs'), generateVersionFile(project)); + fs.writeFileSync(path.join(srcDir, 'telemetry.rs'), generateRustTelemetryFile(project)); try { execFileSync('cargo', ['fmt', '-p', 'ahp-types'], { cwd: outputDir, stdio: 'inherit' }); diff --git a/scripts/generate-swift.ts b/scripts/generate-swift.ts index b28ed1da..1fdc585d 100644 --- a/scripts/generate-swift.ts +++ b/scripts/generate-swift.ts @@ -22,6 +22,7 @@ import fs from 'fs'; import path from 'path'; import { findProtocolSourceFiles } from './find-protocol-sources.js'; import { readProtocolVersions } from './read-protocol-versions.js'; +import { readTelemetry } from './read-telemetry.js'; const GENERATED_HEADER = '// Generated from types/*.ts — do not edit\n\nimport Foundation\n'; @@ -1871,6 +1872,81 @@ function checkExhaustiveness(project: Project): void { } } +// ─── Telemetry Names File Generator ────────────────────────────────────────── + +function generateSwiftTelemetryFile(project: Project): string { + const camel = (s: string): string => s.charAt(0).toLowerCase() + s.slice(1); + const t = readTelemetry(project); + const lines: string[] = [GENERATED_HEADER]; + + lines.push('// MARK: - Telemetry Names'); + lines.push(''); + lines.push('/// Cross-client telemetry names — the self-instrumentation contract shared by every'); + lines.push('/// AHP client, generated from `types/telemetry/registry.ts`. Only the names are shared;'); + lines.push('/// the tracer / meter wiring is hand-written per platform.'); + lines.push('public enum AhpTelemetryNames {'); + lines.push(''); + + // Source + lines.push(' // MARK: - Source'); + if (t.source.doc) lines.push(` /// ${t.source.doc}`); + lines.push(` public static let source = ${JSON.stringify(t.source.value)}`); + lines.push(''); + + // Span names + // Swift member name: camel(id) + "Span" (e.g. id="Request" → "requestSpan") + lines.push(' // MARK: - Span Names'); + for (const span of t.spans) { + if (span.doc) lines.push(` /// ${span.doc}`); + lines.push(` public static let ${camel(span.id)}Span = ${JSON.stringify(span.value)}`); + } + lines.push(''); + + // Metric names + // Swift member name: camel(id) (e.g. id="MessagesSent" → "messagesSent") + lines.push(' // MARK: - Metric Names'); + for (const metric of t.metrics) { + if (metric.doc) lines.push(` /// ${metric.doc}`); + lines.push(` public static let ${camel(metric.id)} = ${JSON.stringify(metric.value)}`); + } + lines.push(''); + + // Metric units + // Swift member name: camel(id) + "Unit" (e.g. id="MessagesSent" → "messagesSentUnit") + lines.push(' // MARK: - Metric Units'); + for (const metric of t.metrics) { + lines.push(` /// Unit for the \`${metric.value}\` metric.`); + lines.push(` public static let ${camel(metric.id)}Unit = ${JSON.stringify(metric.unit)}`); + } + lines.push(''); + + // Attribute keys + // Swift member name: "attr" + id (id is PascalCase, e.g. id="RpcSystem" → "attrRpcSystem") + lines.push(' // MARK: - Attribute Keys'); + for (const attr of t.attributes) { + if (attr.doc) lines.push(` /// ${attr.doc}`); + lines.push(` public static let attr${attr.id} = ${JSON.stringify(attr.value)}`); + } + lines.push(''); + + // Attribute values + // Swift member name: camel(group) + id (group and id are PascalCase, + // e.g. group="Outcome", id="Ok" → "outcomeOk") + lines.push(' // MARK: - Attribute Values'); + for (const group of t.values) { + for (const member of group.members) { + if (member.doc) lines.push(` /// ${member.doc}`); + lines.push(` public static let ${camel(group.group)}${member.id} = ${JSON.stringify(member.value)}`); + } + } + lines.push(''); + + lines.push('}'); + lines.push(''); + + return lines.join('\n'); +} + // ─── Main Entry Point ──────────────────────────────────────────────────────── function generateVersionFile(project: Project): string { @@ -1921,4 +1997,5 @@ export function generateSwiftPackage(project: Project, outputDir: string): void fs.writeFileSync(path.join(generatedDir, 'Errors.generated.swift'), generateErrorsFile(project)); fs.writeFileSync(path.join(generatedDir, 'Messages.generated.swift'), generateMessagesFile()); fs.writeFileSync(path.join(generatedDir, 'Version.generated.swift'), generateVersionFile(project)); + fs.writeFileSync(path.join(generatedDir, 'Telemetry.generated.swift'), generateSwiftTelemetryFile(project)); } diff --git a/scripts/read-telemetry.test.ts b/scripts/read-telemetry.test.ts new file mode 100644 index 00000000..71d86d93 --- /dev/null +++ b/scripts/read-telemetry.test.ts @@ -0,0 +1,175 @@ +/** + * Tests for `read-telemetry.ts` — the shared ts-morph reader the per-language + * generators use to extract the telemetry-name enums + their descriptions. + * + * These run over a small IN-MEMORY fixture registry (built with + * `project.createSourceFile`) rather than the real `types/telemetry/registry.ts` + * so the assertions pin the reader's behavior independently of the live + * contract: unit resolution from the computed `[TelemetryMetric.X]` keys, + * `getJsDocs()` member-description extraction (including the empty-doc case), + * and the module-doc-vs-const-doc heuristic that picks the LAST leading JSDoc + * for `TELEMETRY_SOURCE`. + */ + +import { test } from 'node:test'; +import assert from 'node:assert/strict'; + +import { Project } from 'ts-morph'; + +import { readTelemetry } from './read-telemetry.js'; + +/** + * Build an in-memory project whose only source file is a telemetry registry at + * a path that ends in `/telemetry/registry.ts` (which is how `readTelemetry` + * locates it). The fixture mirrors the real registry's SHAPE — a module-level + * JSDoc, a `TELEMETRY_SOURCE` const with its own doc, a `TELEMETRY_METRIC_UNITS` + * record keyed by computed `[TelemetryMetric.X]` members, and the span / metric + * / attribute / value enums — but with trimmed, fixture-only contents. + */ +function fixtureProject(source: string): Project { + const project = new Project({ useInMemoryFileSystem: true }); + project.createSourceFile('types/telemetry/registry.ts', source); + return project; +} + +const FIXTURE = ` +/** + * Module-level doc that must NOT be mistaken for the TELEMETRY_SOURCE doc. + * @module telemetry/registry + */ + +/** The instrumentation-scope name. */ +export const TELEMETRY_SOURCE = 'Fixture.Scope'; + +export enum TelemetrySpan { + /** A request span. */ + Request = 'ahp.request', +} + +export enum TelemetryMetric { + /** Messages sent. */ + MessagesSent = 'ahp.client.messages.sent', + /** Request duration. */ + RequestDuration = 'ahp.client.request.duration', +} + +export const TELEMETRY_METRIC_UNITS: Record = { + [TelemetryMetric.MessagesSent]: '{message}', + [TelemetryMetric.RequestDuration]: 'ms', +}; + +export enum TelemetryAttribute { + /** rpc.method tag. */ + RpcMethod = 'rpc.method', + // intentionally undocumented to exercise the empty-doc branch + Stream = 'ahp.stream', +} + +export enum TelemetryRpcSystem { + /** jsonrpc. */ + Jsonrpc = 'jsonrpc', +} + +export enum TelemetryOutcome { + /** ok. */ + Ok = 'ok', +} + +export enum TelemetryMessageKind { + /** request. */ + Request = 'request', +} + +export enum TelemetryStream { + /** subscription stream. */ + Subscription = 'subscription', + /** multi-host event stream. */ + HostEvent = 'host-event', +} +`; + +test('reads source value and the CONST doc (not the module doc)', () => { + const data = readTelemetry(fixtureProject(FIXTURE)); + assert.equal(data.source.value, 'Fixture.Scope'); + // The module-level JSDoc precedes the const's own JSDoc; the reader picks the + // LAST leading doc, which is the const's own. + assert.equal(data.source.doc, 'The instrumentation-scope name.'); + assert.doesNotMatch(data.source.doc, /Module-level doc/); +}); + +test('extracts span name + value + member JSDoc', () => { + const data = readTelemetry(fixtureProject(FIXTURE)); + assert.deepEqual(data.spans, [ + { id: 'Request', value: 'ahp.request', doc: 'A request span.' }, + ]); +}); + +test('resolves each metric to its unit via the computed [TelemetryMetric.X] keys', () => { + const data = readTelemetry(fixtureProject(FIXTURE)); + const byId = Object.fromEntries(data.metrics.map((m) => [m.id, m])); + assert.equal(byId.MessagesSent.unit, '{message}'); + assert.equal(byId.MessagesSent.value, 'ahp.client.messages.sent'); + assert.equal(byId.RequestDuration.unit, 'ms'); +}); + +test('an undocumented member yields an empty doc string', () => { + const data = readTelemetry(fixtureProject(FIXTURE)); + const stream = data.attributes.find((a) => a.id === 'Stream'); + assert.ok(stream, 'expected a Stream attribute'); + assert.equal(stream.doc, ''); +}); + +test('groups attribute VALUE enums (minus the Telemetry prefix), including hyphenated values', () => { + const data = readTelemetry(fixtureProject(FIXTURE)); + const groupNames = data.values.map((g) => g.group); + assert.deepEqual(groupNames, ['RpcSystem', 'Outcome', 'MessageKind', 'Stream']); + + const streamGroup = data.values.find((g) => g.group === 'Stream'); + assert.ok(streamGroup, 'expected a Stream value group'); + const hostEvent = streamGroup.members.find((m) => m.id === 'HostEvent'); + assert.ok(hostEvent, 'expected the HostEvent value'); + assert.equal(hostEvent.value, 'host-event'); +}); + +test('throws loudly when a required enum is missing', () => { + const broken = ` + /** doc */ + export const TELEMETRY_SOURCE = 'X'; + export const TELEMETRY_METRIC_UNITS: Record = {}; + export enum TelemetrySpan { Request = 'ahp.request' } + `; + assert.throws( + () => readTelemetry(fixtureProject(broken)), + /enum TelemetryMetric not found/, + ); +}); + +test('throws when the registry source file is not in the project', () => { + const project = new Project({ useInMemoryFileSystem: true }); + project.createSourceFile('types/unrelated.ts', 'export const x = 1;'); + assert.throws(() => readTelemetry(project), /could not locate types\/telemetry\/registry\.ts/); +}); + +test('throws when an enum member is not string-valued', () => { + const numeric = ` + /** doc */ + export const TELEMETRY_SOURCE = 'X'; + export enum TelemetrySpan { + /** numeric */ + Request = 1, + } + export enum TelemetryMetric { MessagesSent = 'ahp.client.messages.sent' } + export const TELEMETRY_METRIC_UNITS: Record = { + [TelemetryMetric.MessagesSent]: '{message}', + }; + export enum TelemetryAttribute { RpcMethod = 'rpc.method' } + export enum TelemetryRpcSystem { Jsonrpc = 'jsonrpc' } + export enum TelemetryOutcome { Ok = 'ok' } + export enum TelemetryMessageKind { Request = 'request' } + export enum TelemetryStream { Subscription = 'subscription' } + `; + assert.throws( + () => readTelemetry(fixtureProject(numeric)), + /TelemetrySpan\.Request is not a string-valued enum member/, + ); +}); diff --git a/scripts/read-telemetry.ts b/scripts/read-telemetry.ts new file mode 100644 index 00000000..0e32bd23 --- /dev/null +++ b/scripts/read-telemetry.ts @@ -0,0 +1,139 @@ +/** + * Shared helper for the protocol type generators: read the telemetry name + * enums from `types/telemetry/registry.ts` via ts-morph, returning each name's + * identifier, wire value, and `getJsDocs()`-extracted description. + * + * This is the SAME extraction mechanism the generators use for every protocol + * enum (`enumDecl.getMembers()` + `member.getJsDocs()`) — telemetry doesn't get + * a second comment mechanism. The generators consume the returned data and emit + * a flat per-language constant holder (telemetry names are used as raw strings, + * not as language enums, so the output shape stays a flat holder). + * + * Throws loudly if the registry's expected enums/const are missing or + * malformed, so a refactor of `registry.ts` fails the generator rather than + * silently producing stale output. + */ + +import { EnumDeclaration, Project, SyntaxKind } from 'ts-morph'; + +/** A single telemetry name: its identifier, wire value, and description. */ +export interface TelemetryName { + /** Enum member identifier, e.g. `MessagesSent`. */ + readonly id: string; + /** Wire value, e.g. `ahp.client.messages.sent`. */ + readonly value: string; + /** `getJsDocs()`-extracted description (empty if the member has none). */ + readonly doc: string; +} + +/** A telemetry metric: a name plus its OTel unit annotation. */ +export interface TelemetryMetricName extends TelemetryName { + /** OTel unit, e.g. `{message}` or `ms`. */ + readonly unit: string; +} + +/** A group of attribute values, e.g. the `outcome` values `{ok, error, ...}`. */ +export interface TelemetryValueGroup { + /** Group identifier (enum name minus the `Telemetry` prefix), e.g. `Outcome`. */ + readonly group: string; + /** The values in the group. */ + readonly members: readonly TelemetryName[]; +} + +/** Everything the generators need to emit a telemetry-names holder. */ +export interface TelemetryData { + readonly source: { readonly value: string; readonly doc: string }; + readonly spans: readonly TelemetryName[]; + readonly metrics: readonly TelemetryMetricName[]; + readonly attributes: readonly TelemetryName[]; + readonly values: readonly TelemetryValueGroup[]; +} + +/** Enums that are attribute-VALUE groups (everything except spans/metrics/attributes). */ +const VALUE_ENUMS = [ + 'TelemetryRpcSystem', + 'TelemetryOutcome', + 'TelemetryMessageKind', + 'TelemetryStream', +] as const; + +function members(enumDecl: EnumDeclaration): TelemetryName[] { + return enumDecl.getMembers().map((m) => { + const value = m.getValue(); + if (typeof value !== 'string') { + throw new Error( + `readTelemetry: ${enumDecl.getName()}.${m.getName()} is not a string-valued enum member`, + ); + } + return { + id: m.getName(), + value, + doc: m.getJsDocs()[0]?.getDescription().trim() ?? '', + }; + }); +} + +/** Read the telemetry registry. The project must include `types/telemetry/registry.ts`. */ +export function readTelemetry(project: Project): TelemetryData { + const sf = project + .getSourceFiles() + .find((f) => f.getFilePath().endsWith('/telemetry/registry.ts')); + if (!sf) { + throw new Error('readTelemetry: could not locate types/telemetry/registry.ts in project'); + } + const getEnum = (name: string): EnumDeclaration => { + const decl = sf.getEnum(name); + if (!decl) throw new Error(`readTelemetry: enum ${name} not found in registry.ts`); + return decl; + }; + + // TELEMETRY_SOURCE — a top-level const; getJsDocs() reads the statement-level doc. + const sourceDecl = sf.getVariableDeclaration('TELEMETRY_SOURCE'); + if (!sourceDecl) throw new Error('readTelemetry: TELEMETRY_SOURCE not found'); + const sourceValue = sourceDecl.getInitializerOrThrow().asKind(SyntaxKind.StringLiteral)?.getLiteralValue(); + if (sourceValue === undefined) throw new Error('readTelemetry: TELEMETRY_SOURCE is not a string literal'); + // The first declaration in the file inherits the module-level JSDoc as its + // first leading doc; the const's OWN doc is the last one immediately above it. + const sourceJsDocs = sourceDecl.getVariableStatementOrThrow().getJsDocs(); + const sourceDoc = sourceJsDocs.at(-1)?.getDescription().trim() ?? ''; + + // TELEMETRY_METRIC_UNITS — Record; map metric VALUE -> unit. + const unitsDecl = sf.getVariableDeclarationOrThrow('TELEMETRY_METRIC_UNITS'); + const unitsObj = unitsDecl.getInitializerIfKindOrThrow(SyntaxKind.ObjectLiteralExpression); + const unitByMetricValue = new Map(); + for (const prop of unitsObj.getProperties()) { + const pa = prop.asKind(SyntaxKind.PropertyAssignment); + if (!pa) continue; + // key is a computed [TelemetryMetric.X] -> resolve to the enum member's value + const nameNode = pa.getNameNode(); + const computed = nameNode.asKind(SyntaxKind.ComputedPropertyName); + const access = computed?.getExpression().asKind(SyntaxKind.PropertyAccessExpression); + const memberId = access?.getName(); + const metricValue = memberId + ? getEnum('TelemetryMetric').getMemberOrThrow(memberId).getValue() + : undefined; + const unit = pa.getInitializerOrThrow().asKind(SyntaxKind.StringLiteral)?.getLiteralValue(); + if (typeof metricValue === 'string' && unit !== undefined) { + unitByMetricValue.set(metricValue, unit); + } + } + + const metrics: TelemetryMetricName[] = members(getEnum('TelemetryMetric')).map((m) => { + const unit = unitByMetricValue.get(m.value); + if (unit === undefined) throw new Error(`readTelemetry: no unit for metric ${m.value}`); + return { ...m, unit }; + }); + + const values: TelemetryValueGroup[] = VALUE_ENUMS.map((enumName) => ({ + group: enumName.replace(/^Telemetry/, ''), + members: members(getEnum(enumName)), + })); + + return { + source: { value: sourceValue, doc: sourceDoc }, + spans: members(getEnum('TelemetrySpan')), + metrics, + attributes: members(getEnum('TelemetryAttribute')), + values, + }; +} diff --git a/scripts/verify-generated.ts b/scripts/verify-generated.ts new file mode 100644 index 00000000..974675fc --- /dev/null +++ b/scripts/verify-generated.ts @@ -0,0 +1,142 @@ +/** + * Verify generated output freshness — mechanically enforces that the committed + * per-language telemetry-name holders are "identical by construction" to what + * the generators emit RIGHT NOW from `types/telemetry/registry.ts`. + * + * Why a runtime gate on top of the per-language `git diff` drift checks in + * `ci.yml`: the git-diff checks only fire inside the CI job that runs the + * matching `generate:` step, and they depend on a clean git tree. This + * script regenerates the holders and compares the committed bytes against the + * freshly-generated bytes WITHOUT depending on git, so the same "did you forget + * to regenerate?" guard runs from a plain `npm test` on any contributor's + * machine. It pairs with `verify-changelog.ts` / `verify-release-metadata.ts` + * as a third "no stale committed artifact" gate. + * + * Mechanism: the generators write in place. For each holder this script checks, + * it snapshots the committed bytes, runs the owning `generate:` entry + * point, reads the regenerated bytes, then ALWAYS restores the snapshot (so the + * gate is non-destructive even when it finds drift — the working tree is left + * exactly as it was found). Regeneration is deterministic and the rest of the + * tree is already in sync (CI's drift checks guarantee that), so regenerating + * to compare a holder leaves every other file byte-identical. + * + * Run via `npm run verify:generated` (also wired into `npm test`). + */ + +import { execFileSync } from 'child_process'; +import fs from 'fs'; +import path from 'path'; +import { fileURLToPath } from 'url'; + +const __dirname = path.dirname(fileURLToPath(import.meta.url)); +const ROOT = path.resolve(__dirname, '..'); + +/** A committed generated holder + the generate script that produces it. */ +interface GeneratedHolder { + /** Human label for failure output. */ + readonly label: string; + /** Path (relative to ROOT) of the committed generated file. */ + readonly file: string; + /** `package.json` script (`npm run `) that regenerates it. */ + readonly generate: string; +} + +/** + * The committed cross-client telemetry-name holders. Each is generated from + * `types/telemetry/registry.ts`; the TypeScript holder is intentionally absent + * because it is a gitignored build artifact (regenerated at publish), not a + * committed source. + */ +const HOLDERS: readonly GeneratedHolder[] = [ + { + label: 'go', + file: 'clients/go/ahptypes/telemetry.generated.go', + generate: 'generate:go', + }, + { + label: 'rust', + file: 'clients/rust/crates/ahp-types/src/telemetry.rs', + generate: 'generate:rust', + }, + { + label: 'kotlin', + file: 'clients/kotlin/src/main/kotlin/com/microsoft/agenthostprotocol/generated/Telemetry.generated.kt', + generate: 'generate:kotlin', + }, + { + label: 'swift', + file: 'clients/swift/AgentHostProtocol/Sources/AgentHostProtocol/Generated/Telemetry.generated.swift', + generate: 'generate:swift', + }, +]; + +function runGenerate(script: string): void { + // `--allow-missing-formatter` keeps the Rust/Go generators from failing when + // rustfmt/gofmt is absent; the committed holders are already formatted, so a + // missing formatter must not turn this freshness gate into a false negative. + execFileSync('npm', ['run', script, '--', '--allow-missing-formatter'], { + cwd: ROOT, + stdio: 'pipe', + }); +} + +function main(): void { + const drifted: { holder: GeneratedHolder }[] = []; + + // Group holders by the generate script so we run each generator once. + const byScript = new Map(); + for (const holder of HOLDERS) { + const list = byScript.get(holder.generate) ?? []; + list.push(holder); + byScript.set(holder.generate, list); + } + + for (const [script, holders] of byScript) { + // Snapshot the committed bytes for every holder this generator owns. + const snapshots = holders.map((holder) => { + const abs = path.join(ROOT, holder.file); + if (!fs.existsSync(abs)) { + throw new Error(`verify-generated: committed holder is missing: ${holder.file}`); + } + return { holder, abs, committed: fs.readFileSync(abs) }; + }); + + try { + runGenerate(script); + for (const { holder, abs, committed } of snapshots) { + const regenerated = fs.readFileSync(abs); + if (!committed.equals(regenerated)) { + drifted.push({ holder }); + } + } + } finally { + // Always restore the committed bytes — the gate never mutates the tree, + // whether it passed or found drift. + for (const { abs, committed } of snapshots) { + fs.writeFileSync(abs, committed); + } + } + } + + if (drifted.length > 0) { + console.error('❌ Generated-output freshness check failed:'); + for (const { holder } of drifted) { + console.error( + ` [${holder.label}] ${holder.file} is stale — it does not match the ` + + `output of 'npm run ${holder.generate}'.`, + ); + } + console.error( + " hint: run 'npm run generate' and commit the regenerated holders. " + + 'The telemetry holders are generated from types/telemetry/registry.ts; ' + + 'edit the registry, not the holders.', + ); + process.exit(1); + } + + console.log( + `✅ Generated-output freshness check passed for: ${HOLDERS.map((h) => h.label).join(', ')}`, + ); +} + +main(); diff --git a/types/index.ts b/types/index.ts index a2459162..b8d789a0 100644 --- a/types/index.ts +++ b/types/index.ts @@ -375,3 +375,16 @@ export { isNotificationKnownToVersion, compareProtocolVersions, } from './version/registry.js'; + +// Telemetry names — shared self-instrumentation contract +export { + TELEMETRY_SOURCE, + TELEMETRY_METRIC_UNITS, + TelemetrySpan, + TelemetryMetric, + TelemetryAttribute, + TelemetryRpcSystem, + TelemetryOutcome, + TelemetryMessageKind, + TelemetryStream, +} from './telemetry/registry.js'; diff --git a/types/telemetry/registry.test.ts b/types/telemetry/registry.test.ts new file mode 100644 index 00000000..872c1541 --- /dev/null +++ b/types/telemetry/registry.test.ts @@ -0,0 +1,103 @@ +/** + * Tests for the telemetry registry — guards the invariants the per-language + * generators rely on: + * + * - `TELEMETRY_SOURCE` is non-empty. + * - Every span / metric / attribute NAME is lowercase-dotted (these become + * OTel instrument / attribute-key names, which OTel constrains to dotted + * lowercase). + * - Every enumerated attribute VALUE is a lowercase token, hyphens allowed — + * attribute values are NOT OTel instrument names (cf. OTel attribute values + * such as `http.request.method=GET`), so the multi-host `host-*` stream + * values are legitimately hyphenated. + * - Span / metric / attribute names are globally unique (no collisions). + * - `TELEMETRY_METRIC_UNITS` has a non-empty unit for every metric. + */ + +import { test } from 'node:test'; +import assert from 'node:assert/strict'; + +import { + TELEMETRY_SOURCE, + TELEMETRY_METRIC_UNITS, + TelemetrySpan, + TelemetryMetric, + TelemetryAttribute, + TelemetryRpcSystem, + TelemetryOutcome, + TelemetryMessageKind, + TelemetryStream, +} from './registry.js'; + +/** + * NAME shape — OTel instrument / attribute-key names are lowercase-dotted + * (no hyphens). Spans, metrics, and attribute KEYS must match this. + */ +const DOTTED_RE = /^[a-z][a-z0-9_]*(\.[a-z0-9_]+)*$/; + +/** + * VALUE shape — enumerated attribute VALUES are free-form lowercase tokens. + * Hyphens are permitted here (and only here): an attribute value is not an + * OTel instrument name, so `host-event` is a legitimate value even though it + * could never be a metric/span name. Still constrained to a tight charset so + * a typo like an uppercase letter or whitespace is caught. + */ +const VALUE_RE = /^[a-z][a-z0-9_]*(-[a-z0-9_]+)*$/; + +test('TELEMETRY_SOURCE is non-empty', () => { + assert.ok(TELEMETRY_SOURCE.length > 0); +}); + +test('every span / metric / attribute NAME is lowercase-dotted', () => { + for (const value of Object.values(TelemetrySpan)) { + assert.match(value, DOTTED_RE, `span not dotted: ${value}`); + } + for (const value of Object.values(TelemetryMetric)) { + assert.match(value, DOTTED_RE, `metric not dotted: ${value}`); + } + for (const value of Object.values(TelemetryAttribute)) { + assert.match(value, DOTTED_RE, `attribute not dotted: ${value}`); + } +}); + +test('every enumerated attribute VALUE is a lowercase token (hyphens allowed)', () => { + const valueEnums = [ + TelemetryRpcSystem, + TelemetryOutcome, + TelemetryMessageKind, + TelemetryStream, + ]; + for (const valueEnum of valueEnums) { + for (const value of Object.values(valueEnum)) { + assert.match(value, VALUE_RE, `attribute value not a lowercase token: ${value}`); + } + } +}); + +test('span / metric / attribute names do not collide', () => { + const names = [ + ...Object.values(TelemetrySpan), + ...Object.values(TelemetryMetric), + ...Object.values(TelemetryAttribute), + ]; + assert.equal(new Set(names).size, names.length, 'duplicate telemetry name'); +}); + +test('enumerated values within a group do not collide', () => { + for (const valueEnum of [ + TelemetryRpcSystem, + TelemetryOutcome, + TelemetryMessageKind, + TelemetryStream, + ]) { + const values = Object.values(valueEnum); + assert.equal(new Set(values).size, values.length, 'duplicate telemetry value'); + } +}); + +test('every metric has a non-empty unit', () => { + for (const metric of Object.values(TelemetryMetric)) { + const unit = TELEMETRY_METRIC_UNITS[metric]; + assert.ok(unit !== undefined && unit.length > 0, `missing unit for metric: ${metric}`); + } +}); diff --git a/types/telemetry/registry.ts b/types/telemetry/registry.ts new file mode 100644 index 00000000..45f16e87 --- /dev/null +++ b/types/telemetry/registry.ts @@ -0,0 +1,137 @@ +/** + * Telemetry Registry — the SINGLE SOURCE OF TRUTH for the self-instrumentation + * span / metric / attribute names every AHP client emits about its own + * operation, so the names stay identical across languages BY CONSTRUCTION. + * + * Shape follows the protocol enums (e.g. `ChangesetOperationStatus`): each name + * is a **string enum member**, so its value and its `/** ... *\/` description + * live together and the per-language generators extract the description the + * same way they do for every other enum (`enumDecl.getMembers()` + + * `member.getJsDocs()`). Per-language constant holders are codegen'd from these + * enums (e.g. `AhpTelemetryNames.cs`, `ahp_types::telemetry`); only the NAMES + * are shared — the instrumentation LOGIC stays hand-written per language. + * + * This is client SELF-instrumentation; it is distinct from the protocol's + * "OpenTelemetry over AHP" channel (server -> client OTLP delivery) — see + * [self-instrumentation](../../docs/specification/self-instrumentation.md). + * + * @module telemetry/registry + */ + +/** Instrumentation-scope name used for every AHP self-instrumentation span and metric. */ +export const TELEMETRY_SOURCE = 'Microsoft.AgentHostProtocol'; + +/** Span names. One span per JSON-RPC request, named `${request} {method}`. */ +export enum TelemetrySpan { + /** Span covering a single JSON-RPC request, from send until it settles. */ + Request = 'ahp.request', +} + +/** + * Metric instrument names (lowercase-dotted per OTel convention). Units are + * carried separately in {@link TELEMETRY_METRIC_UNITS}. + */ +export enum TelemetryMetric { + /** Messages sent to the host, tagged by ahp.message.kind (request|notification). */ + MessagesSent = 'ahp.client.messages.sent', + /** Messages received from the host. */ + MessagesReceived = 'ahp.client.messages.received', + /** Round-trip duration of a JSON-RPC request, tagged by rpc.method and ahp.outcome (ok|error|cancelled|timeout). */ + RequestDuration = 'ahp.client.request.duration', + /** Requests awaiting a response. */ + RequestsInFlight = 'ahp.client.requests.in_flight', + /** Subscriptions registered with the client (decremented on unsubscribe or shutdown). */ + SubscriptionsActive = 'ahp.client.subscriptions.active', + /** Reconnect operations, tagged by outcome. */ + Reconnects = 'ahp.client.reconnects', + /** Buffered events evicted under back-pressure (drop-oldest), tagged by stream. */ + EventsDropped = 'ahp.client.events.dropped', + /** Inbound frames that failed to decode and were skipped (protocol resync is the host’s responsibility). */ + FramesMalformed = 'ahp.client.frames.malformed', +} + +/** OTel unit annotation for each metric. Trivial metadata, keyed by metric (no doc needed). */ +export const TELEMETRY_METRIC_UNITS: Record = { + [TelemetryMetric.MessagesSent]: '{message}', + [TelemetryMetric.MessagesReceived]: '{message}', + [TelemetryMetric.RequestDuration]: 'ms', + [TelemetryMetric.RequestsInFlight]: '{request}', + [TelemetryMetric.SubscriptionsActive]: '{subscription}', + [TelemetryMetric.Reconnects]: '{reconnect}', + [TelemetryMetric.EventsDropped]: '{event}', + [TelemetryMetric.FramesMalformed]: '{frame}', +}; + +/** Attribute (tag) keys. `rpc.*` follow the OTel RPC semantic conventions. */ +export enum TelemetryAttribute { + /** RPC system identifier (OTel rpc.system); always "jsonrpc" for AHP. */ + RpcSystem = 'rpc.system', + /** JSON-RPC method name the span/metric is scoped to (OTel rpc.method). */ + RpcMethod = 'rpc.method', + /** Client-assigned JSON-RPC request id. */ + RequestId = 'ahp.request.id', + /** Terminal outcome of a request or reconnect (ok|error|cancelled|timeout). */ + Outcome = 'ahp.outcome', + /** Whether a sent message was a request or a notification. */ + MessageKind = 'ahp.message.kind', + /** Which event stream a dropped or observed event belongs to. */ + Stream = 'ahp.stream', +} + +/** `rpc.system` values. */ +export enum TelemetryRpcSystem { + /** JSON-RPC — the only RPC system AHP uses. */ + Jsonrpc = 'jsonrpc', +} + +/** + * `ahp.outcome` values. NOTE: this is the SINGLE outcome vocabulary — requests + * AND reconnects both use it. The protocol models a reconnect as + * success-or-rejected (a `ReconnectResult` or a rejected JSON-RPC request), the + * same ok/error dichotomy as a request, so a reconnect tags `ok`/`error`, not a + * separate `success`/`failure` set. + */ +export enum TelemetryOutcome { + /** The request or reconnect completed successfully. */ + Ok = 'ok', + /** The request or reconnect failed with an error response. */ + Error = 'error', + /** The request was cancelled before it settled. */ + Cancelled = 'cancelled', + /** The request exceeded its configured timeout. */ + Timeout = 'timeout', +} + +/** `ahp.message.kind` values. */ +export enum TelemetryMessageKind { + /** A JSON-RPC request (expects a response). */ + Request = 'request', + /** A JSON-RPC notification (fire-and-forget). */ + Notification = 'notification', +} + +/** + * `ahp.stream` values. The `host-*` members identify the per-stream + * dropped-event channels a multi-host client (e.g. the .NET client) fans the + * host's own notifications across; they are enumerated attribute VALUES, not + * OTel instrument names, so the hyphenated spelling is intentional and + * idiomatic (cf. OTel attribute values like `http.request.method=GET`). + */ +export enum TelemetryStream { + /** A per-resource subscription stream. */ + Subscription = 'subscription', + /** The client-wide event stream. */ + Event = 'event', + /** A state-snapshot stream. */ + State = 'state', + /** A multi-host client's host-event delivery stream. */ + HostEvent = 'host-event', + /** A multi-host client's host-subscription delivery stream. */ + HostSubscription = 'host-subscription', + /** A multi-host client's host-resource delivery stream. */ + HostResource = 'host-resource', + /** A multi-host client's host-snapshot delivery stream. */ + HostSnapshot = 'host-snapshot', + /** A multi-host client's host-summaries delivery stream. */ + HostSummaries = 'host-summaries', +}