diff --git a/internal/cli/system/core/hubsync/doc.go b/internal/cli/system/core/hubsync/doc.go index 07f82107..ae6067b0 100644 --- a/internal/cli/system/core/hubsync/doc.go +++ b/internal/cli/system/core/hubsync/doc.go @@ -23,8 +23,10 @@ // writes them to disk via the connection render layer. // It returns a formatted status message with the count // of synced entries, or an empty string when nothing -// was fetched. Every error is silently swallowed so the -// hook never blocks the session start. +// was fetched. Every error is surfaced as a stderr +// warning via the warn sink, but never propagates: the +// hook must not block the session start. Only a genuine +// zero-entry result stays silent. // // The data flow is: // diff --git a/internal/cli/system/core/hubsync/sync.go b/internal/cli/system/core/hubsync/sync.go index 1d918377..11362a57 100644 --- a/internal/cli/system/core/hubsync/sync.go +++ b/internal/cli/system/core/hubsync/sync.go @@ -66,6 +66,7 @@ func Connected(ctxDir string) (bool, error) { func Sync(_ string) string { cfg, loadErr := connectCfg.Load() if loadErr != nil { + logWarn.Warn(cfgWarn.HubSyncLoadConfig, loadErr) return "" } @@ -73,6 +74,7 @@ func Sync(_ string) string { cfg.HubAddr, cfg.Token, ) if dialErr != nil { + logWarn.Warn(cfgWarn.HubSyncDial, cfg.HubAddr, dialErr) return "" } defer func() { @@ -84,11 +86,17 @@ func Sync(_ string) string { entries, syncErr := client.Sync( context.Background(), cfg.Types, 0, ) - if syncErr != nil || len(entries) == 0 { + if syncErr != nil { + logWarn.Warn(cfgWarn.HubSyncPull, cfg.HubAddr, syncErr) + return "" + } + if len(entries) == 0 { + // Genuine empty result: not an error, no warning. return "" } if writeErr := render.WriteEntries(entries); writeErr != nil { + logWarn.Warn(cfgWarn.HubSyncWrite, len(entries), writeErr) return "" } diff --git a/internal/cli/system/core/hubsync/sync_test.go b/internal/cli/system/core/hubsync/sync_test.go new file mode 100644 index 00000000..944eaccf --- /dev/null +++ b/internal/cli/system/core/hubsync/sync_test.go @@ -0,0 +1,191 @@ +// / ctx: https://ctx.ist +// ,'`./ do you remember? +// `.,'\ +// \ Copyright 2026-present Context contributors. +// SPDX-License-Identifier: Apache-2.0 + +package hubsync_test + +import ( + "bytes" + "net" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/ActiveMemory/ctx/internal/assets/read/lookup" + connectCfg "github.com/ActiveMemory/ctx/internal/cli/connection/core/config" + "github.com/ActiveMemory/ctx/internal/cli/system/core/hubsync" + "github.com/ActiveMemory/ctx/internal/config/fs" + "github.com/ActiveMemory/ctx/internal/crypto" + "github.com/ActiveMemory/ctx/internal/hub" + logWarn "github.com/ActiveMemory/ctx/internal/log/warn" + "github.com/ActiveMemory/ctx/internal/testutil/testctx" +) + +// TestMain initializes the embedded text-asset lookup so error +// strings rendered into warnings resolve their DescKey text. +func TestMain(m *testing.M) { + lookup.Init() + os.Exit(m.Run()) +} + +// declareContext positions the test in a temp project with a +// materialized .context/ directory and returns its path. +func declareContext(t *testing.T) string { + t.Helper() + dir := t.TempDir() + ctxDir := testctx.Declare(t, dir) + if mkErr := os.MkdirAll(ctxDir, fs.PermExec); mkErr != nil { + t.Fatal(mkErr) + } + return ctxDir +} + +// captureWarnings redirects the warn sink to a buffer for the +// duration of the test. +func captureWarnings(t *testing.T) *bytes.Buffer { + t.Helper() + var buf bytes.Buffer + restore := logWarn.SetSink(&buf) + t.Cleanup(restore) + return &buf +} + +// saveConnectConfig generates a global encryption key under the +// test HOME and persists a connection config pointing at addr. +func saveConnectConfig(t *testing.T, addr, token string) { + t.Helper() + key, keyErr := crypto.GenerateKey() + if keyErr != nil { + t.Fatal(keyErr) + } + keyPath := crypto.GlobalKeyPath() + if mkErr := os.MkdirAll( + filepath.Dir(keyPath), fs.PermKeyDir, + ); mkErr != nil { + t.Fatal(mkErr) + } + if saveErr := crypto.SaveKey(keyPath, key); saveErr != nil { + t.Fatal(saveErr) + } + if cfgErr := connectCfg.Save(connectCfg.Config{ + HubAddr: addr, + Token: token, + }); cfgErr != nil { + t.Fatal(cfgErr) + } +} + +// startHub serves a hub with the given store on a random port +// and returns its address and a registered client token. +func startHub(t *testing.T, store *hub.Store) (string, string) { + t.Helper() + adminTok, tokErr := hub.GenerateAdminToken() + if tokErr != nil { + t.Fatal(tokErr) + } + srv := hub.NewServer(store, adminTok) + lis, lisErr := net.Listen("tcp", "127.0.0.1:0") + if lisErr != nil { + t.Fatal(lisErr) + } + go func() { _ = srv.Serve(lis) }() + t.Cleanup(srv.GracefulStop) + + addr := lis.Addr().String() + client, dialErr := hub.NewClient(addr, "") + if dialErr != nil { + t.Fatal(dialErr) + } + defer func() { + if cerr := client.Close(); cerr != nil { + t.Log(cerr) + } + }() + reg, regErr := client.Register( + t.Context(), adminTok, "hubsync-test", + ) + if regErr != nil { + t.Fatal(regErr) + } + return addr, reg.ClientToken +} + +func TestSync_WarnsOnLoadError(t *testing.T) { + declareContext(t) + buf := captureWarnings(t) + + if got := hubsync.Sync(""); got != "" { + t.Errorf("Sync = %q, want empty on load failure", got) + } + if !strings.Contains( + buf.String(), "hubsync: load connection config:", + ) { + t.Errorf("missing load warning, got: %q", buf.String()) + } +} + +func TestSync_WarnsOnDialError(t *testing.T) { + declareContext(t) + // grpc.NewClient is lazy for almost every bad target, but + // a control character fails URL parsing at construction — + // the one eager failure mode, and exactly what a corrupted + // connect config would produce. + saveConnectConfig(t, "\x00", "tok") + buf := captureWarnings(t) + + if got := hubsync.Sync(""); got != "" { + t.Errorf("Sync = %q, want empty on dial failure", got) + } + if !strings.Contains(buf.String(), "hubsync: dial") { + t.Errorf("missing dial warning, got: %q", buf.String()) + } +} + +func TestSync_WarnsOnPullError(t *testing.T) { + declareContext(t) + // A well-formed address nobody listens on: client + // construction is lazy, so the failure surfaces at the + // Sync RPC. + lis, lisErr := net.Listen("tcp", "127.0.0.1:0") + if lisErr != nil { + t.Fatal(lisErr) + } + addr := lis.Addr().String() + if closeErr := lis.Close(); closeErr != nil { + t.Fatal(closeErr) + } + saveConnectConfig(t, addr, "tok") + buf := captureWarnings(t) + + if got := hubsync.Sync(""); got != "" { + t.Errorf("Sync = %q, want empty on pull failure", got) + } + if !strings.Contains(buf.String(), "hubsync: sync from") { + t.Errorf("missing pull warning, got: %q", buf.String()) + } +} + +func TestSync_NoWarnOnEmptyResult(t *testing.T) { + ctxDir := declareContext(t) + store, storeErr := hub.NewStore( + filepath.Join(ctxDir, "hub-data"), + ) + if storeErr != nil { + t.Fatal(storeErr) + } + addr, token := startHub(t, store) + saveConnectConfig(t, addr, token) + buf := captureWarnings(t) + + if got := hubsync.Sync(""); got != "" { + t.Errorf("Sync = %q, want empty for zero entries", got) + } + if buf.Len() != 0 { + t.Errorf( + "empty result must not warn, got: %q", buf.String(), + ) + } +} diff --git a/internal/config/warn/warn.go b/internal/config/warn/warn.go index daa9d999..f183aaf8 100644 --- a/internal/config/warn/warn.go +++ b/internal/config/warn/warn.go @@ -120,6 +120,33 @@ const ( // the loss visible. HubReplicateAppend = "hub replicate append: %v" + // HubReplicateDial is the stderr format for a failed gRPC + // client construction toward the master. Takes (masterAddr, + // error). Like every replication warning, it fires once per + // attempt; the loop retries on its own interval. + HubReplicateDial = "hub replicate dial %s: %v" + + // HubReplicateStream is the stderr format for a failed sync + // stream open toward the master. Takes (masterAddr, error). + HubReplicateStream = "hub replicate open stream %s: %v" + + // HubReplicateSend is the stderr format for a failed sync + // request send on the replication stream. Takes (masterAddr, + // error). + HubReplicateSend = "hub replicate send request %s: %v" + + // HubReplicateCloseSend is the stderr format for a failed + // half-close of the replication stream. Takes (masterAddr, + // error). + HubReplicateCloseSend = "hub replicate close send %s: %v" + + // HubReplicateRecv is the stderr format for a transport + // failure while receiving replicated entries. Takes + // (masterAddr, error). io.EOF (the normal end of a sync + // stream) and caller shutdown are deliberately not warned; + // see replicateOnce. + HubReplicateRecv = "hub replicate recv %s: %v" + // StateInitializedProbe is the stderr format for failures // inside [state.Initialized] beyond "no context dir declared." // Hooks bail on false either way, but a visible warning shows @@ -187,6 +214,30 @@ const ( NotifyWebhookPost = "notify: webhook POST failed: %v" ) +// Hubsync hook warning formats. The session-start hubsync hook +// must never block or fail the session, so [hubsync.Sync] keeps +// returning a (possibly empty) nudge string — these warnings are +// the only signal that a configured hub sync went wrong instead +// of merely finding nothing new. +const ( + // HubSyncLoadConfig is the format for a failed connection + // config load. Takes (error). + HubSyncLoadConfig = "hubsync: load connection config: %v" + + // HubSyncDial is the format for a rejected hub address at + // client construction. Takes (addr, error). + HubSyncDial = "hubsync: dial %s: %v" + + // HubSyncPull is the format for a failed Sync RPC. Takes + // (addr, error). A genuine zero-entry result is not an + // error and is never warned. + HubSyncPull = "hubsync: sync from %s: %v" + + // HubSyncWrite is the format for a failed entry write after + // a successful pull. Takes (count, error). + HubSyncWrite = "hubsync: write %d entries: %v" +) + // Warn context identifiers for index generation. const ( // IndexHeader is the context label for index header write errors. diff --git a/internal/hub/replicate.go b/internal/hub/replicate.go index d8475fda..de50728b 100644 --- a/internal/hub/replicate.go +++ b/internal/hub/replicate.go @@ -81,6 +81,7 @@ func replicateOnce( ), ) if dialErr != nil { + logWarn.Warn(cfgWarn.HubReplicateDial, masterAddr, dialErr) return } defer func() { @@ -98,21 +99,34 @@ func replicateOnce( cfgHub.PathSync, ) if streamErr != nil { + logWarn.Warn(cfgWarn.HubReplicateStream, masterAddr, streamErr) return } if sendErr := stream.SendMsg(&SyncRequest{ SinceSequence: lastSeq, }); sendErr != nil { + logWarn.Warn(cfgWarn.HubReplicateSend, masterAddr, sendErr) return } if closeErr := stream.CloseSend(); closeErr != nil { + logWarn.Warn(cfgWarn.HubReplicateCloseSend, masterAddr, closeErr) return } for { msg := &EntryMsg{} if recvErr := stream.RecvMsg(msg); recvErr != nil { + // io.EOF is the normal end of every sync stream + // and a done caller context is routine shutdown; + // warning on either would spam stderr once per + // replication cycle. Anything else is a transport + // failure worth surfacing. + if !eof(recvErr) && ctx.Err() == nil { + logWarn.Warn( + cfgWarn.HubReplicateRecv, masterAddr, recvErr, + ) + } return } entry := Entry{ diff --git a/internal/hub/replicate_test.go b/internal/hub/replicate_test.go new file mode 100644 index 00000000..dda3d0ed --- /dev/null +++ b/internal/hub/replicate_test.go @@ -0,0 +1,203 @@ +// / ctx: https://ctx.ist +// ,'`./ do you remember? +// `.,'\ +// \ Copyright 2026-present Context contributors. +// SPDX-License-Identifier: Apache-2.0 + +package hub + +import ( + "bytes" + "net" + "os" + "runtime" + "strings" + "testing" + "time" + + "github.com/ActiveMemory/ctx/internal/config/fs" + logWarn "github.com/ActiveMemory/ctx/internal/log/warn" +) + +// captureWarnings redirects the warn sink to a buffer for the +// duration of the test. +func captureWarnings(t *testing.T) *bytes.Buffer { + t.Helper() + var buf bytes.Buffer + restore := logWarn.SetSink(&buf) + t.Cleanup(restore) + return &buf +} + +// startMaster serves a hub over a fresh store on a random port +// and returns the store, the address, and the admin token. +func startMaster(t *testing.T) (*Store, string, string) { + t.Helper() + store, storeErr := NewStore(t.TempDir()) + if storeErr != nil { + t.Fatal(storeErr) + } + adminTok, tokErr := GenerateAdminToken() + if tokErr != nil { + t.Fatal(tokErr) + } + srv := NewServer(store, adminTok) + lis := listenRandom(t) + go func() { _ = srv.Serve(lis) }() + t.Cleanup(srv.GracefulStop) + return store, lis.Addr().String(), adminTok +} + +// registerClient registers a project and returns its token. +func registerClient(t *testing.T, addr, adminTok string) string { + t.Helper() + client, dialErr := NewClient(addr, "") + if dialErr != nil { + t.Fatal(dialErr) + } + defer func() { + if cerr := client.Close(); cerr != nil { + t.Log(cerr) + } + }() + reg, regErr := client.Register( + testCtx(), adminTok, "replicate-test", + ) + if regErr != nil { + t.Fatal(regErr) + } + return reg.ClientToken +} + +// seedEntries appends n entries to the master store. +func seedEntries(t *testing.T, store *Store, n int) { + t.Helper() + entries := make([]Entry, n) + for i := range entries { + entries[i] = Entry{ + ID: "e" + string(rune('a'+i)), + Type: "decision", + Content: "replicated content", + Origin: "replicate-test", + Timestamp: time.Now(), + } + } + if _, appendErr := store.Append(entries); appendErr != nil { + t.Fatal(appendErr) + } +} + +func TestReplicateOnce_WarnsOnDialError(t *testing.T) { + buf := captureWarnings(t) + follower, storeErr := NewStore(t.TempDir()) + if storeErr != nil { + t.Fatal(storeErr) + } + + // grpc.NewClient is lazy for almost every bad target, but + // a control character fails URL parsing at construction — + // the one eager failure mode, and exactly what a corrupted + // peer config would produce. + replicateOnce(testCtx(), "\x00", follower, "tok") + + if !strings.Contains(buf.String(), "hub replicate dial") { + t.Errorf("missing dial warning, got: %q", buf.String()) + } +} + +func TestReplicateOnce_WarnsOnTransportError(t *testing.T) { + buf := captureWarnings(t) + follower, storeErr := NewStore(t.TempDir()) + if storeErr != nil { + t.Fatal(storeErr) + } + + // A well-formed address nobody listens on. Client and + // stream construction are lazy, so the connection failure + // surfaces at whichever stage first touches the wire — + // any of the replication warnings is correct. + lis, lisErr := net.Listen("tcp", "127.0.0.1:0") + if lisErr != nil { + t.Fatal(lisErr) + } + addr := lis.Addr().String() + if closeErr := lis.Close(); closeErr != nil { + t.Fatal(closeErr) + } + + replicateOnce(testCtx(), addr, follower, "tok") + + if !strings.Contains(buf.String(), "hub replicate") { + t.Errorf( + "missing transport warning, got: %q", buf.String(), + ) + } +} + +func TestReplicateOnce_CleanReplicationDoesNotWarn(t *testing.T) { + master, addr, adminTok := startMaster(t) + token := registerClient(t, addr, adminTok) + seedEntries(t, master, 2) + + follower, storeErr := NewStore(t.TempDir()) + if storeErr != nil { + t.Fatal(storeErr) + } + buf := captureWarnings(t) + + replicateOnce(testCtx(), addr, follower, token) + + if buf.Len() != 0 { + t.Errorf( + "clean replication must not warn, got: %q", + buf.String(), + ) + } + if _, lastSeq := follower.lastSequence(); lastSeq != 2 { + t.Errorf("follower lastSeq = %d, want 2", lastSeq) + } +} + +func TestReplicateOnce_KeepsConsumingAfterAppendError(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("permission semantics differ on windows") + } + if os.Geteuid() == 0 { + t.Skip("root bypasses permission checks") + } + master, addr, adminTok := startMaster(t) + token := registerClient(t, addr, adminTok) + seedEntries(t, master, 2) + + followerDir := t.TempDir() + follower, storeErr := NewStore(followerDir) + if storeErr != nil { + t.Fatal(storeErr) + } + // Make every append fail: the store opens its files per + // call, so an access-denied directory rejects each write. + if chmodErr := os.Chmod(followerDir, 0); chmodErr != nil { + t.Fatal(chmodErr) + } + t.Cleanup(func() { + if chmodErr := os.Chmod( + followerDir, fs.PermExec, + ); chmodErr != nil { + t.Log(chmodErr) + } + }) + buf := captureWarnings(t) + + replicateOnce(testCtx(), addr, follower, token) + + // Both entries must have been attempted: an append failure + // is warned per entry and must not abort the stream. + got := strings.Count(buf.String(), "hub replicate append") + if got != 2 { + t.Errorf( + "append warnings = %d, want 2 (loop must keep "+ + "consuming); output: %q", + got, buf.String(), + ) + } +} diff --git a/specs/fix-hub-silent-error-suppression.md b/specs/fix-hub-silent-error-suppression.md new file mode 100644 index 00000000..bed152b5 --- /dev/null +++ b/specs/fix-hub-silent-error-suppression.md @@ -0,0 +1,124 @@ +# Fix Hub Silent Error Suppression + +The session-start hubsync hook and the cluster replication +loop swallowed errors with no logging surface: operators could +not tell whether a sync succeeded, partially failed, or never +reached the network. Upstream issue: +[ActiveMemory/ctx#100](https://github.com/ActiveMemory/ctx/issues/100). + +## Problem + +### Hubsync hook — `internal/cli/system/core/hubsync/sync.go` + +`Sync` returned `""` silently on config-load failure, dial +failure, sync-RPC failure, and entry-write failure. Worse, the +sync-error check was conflated with the empty-result check: + +```go +entries, syncErr := client.Sync( + context.Background(), cfg.Types, 0, +) +if syncErr != nil || len(entries) == 0 { + return "" +} +``` + +A real network error was indistinguishable from "nothing new." +The package doc codified the behavior ("Every error is +silently swallowed so the hook never blocks the session +start"). The never-block constraint is correct; the silence is +the bug. + +### Replication loop — `internal/hub/replicate.go` + +`replicateOnce` returned silently on dial, stream-open, send, +and close-send failures, and on every receive error — including +real transport failures. (The `conn.Close` defer and the +`store.Append` failure path already warn, and append already +keeps consuming the stream; those two sub-items of #100 landed +upstream before this change.) + +## Solution + +Wire every silent return through the established +`internal/log/warn` sink with format constants in +`internal/config/warn`, preserving both functions' signatures +and non-blocking contracts. Logging is the only behavior +change, plus one un-conflation: + +1. `internal/config/warn/warn.go` — nine new format + constants: `HubSyncLoadConfig`, `HubSyncDial`, + `HubSyncPull`, `HubSyncWrite` (hubsync hook; `hubsync:` + prefix per the `notify:` precedent) and + `HubReplicateDial`, `HubReplicateStream`, + `HubReplicateSend`, `HubReplicateCloseSend`, + `HubReplicateRecv` (extending the existing + `HubReplicateAppend` family). +2. `internal/cli/system/core/hubsync/sync.go` — warn at all + four silent sites; split `syncErr` from the + `len(entries) == 0` check so only the error case warns. A + genuine empty result stays silent. +3. `internal/cli/system/core/hubsync/doc.go` — the contract + sentence becomes "Every error is surfaced as a stderr + warning via the warn sink, but never propagates: the hook + must not block the session start." +4. `internal/hub/replicate.go` — warn at dial, stream-open, + send, and close-send failures. The receive site + distinguishes three cases: `io.EOF` is the normal end of + every sync stream (returns silently — warning here would + spam stderr once per `ReplicateInterval`); a done caller + context is routine shutdown noise (silent); anything else + is a transport failure and warns. Issue #100's proposed + code warns on every receive error and would have made + clean replication cycles noisy; this is the one deliberate + deviation. + +## Tests + +`warn.SetSink` (existing test seam) captures output in all of +them. + +- `internal/cli/system/core/hubsync/sync_test.go` (new; the + package had no tests): + - `TestSync_WarnsOnLoadError` — no connect config present; + warns `hubsync: load connection config:`. + - `TestSync_WarnsOnDialError` — `HubAddr` containing a + control character. Empirically the only eager + `grpc.NewClient` failure mode: almost every malformed + target (`://invalid`, `unix://not-abs`) is deferred to + first use by the lazy resolver, but a control character + fails URL parsing at construction. Warns `hubsync: dial`. + - `TestSync_WarnsOnPullError` — well-formed but closed + address; `grpc.NewClient` is lazy, so the failure + surfaces at the Sync RPC; warns `hubsync: sync from`. + - `TestSync_NoWarnOnEmptyResult` — real in-process hub + with zero entries; no warning, empty return (pins the + un-conflation). +- `internal/hub/replicate_test.go` (new; `replicateOnce` had + no direct coverage): + - `TestReplicateOnce_WarnsOnDialError` — master target + with a control character (same eager-failure rationale + as the hubsync dial test). + - `TestReplicateOnce_WarnsOnTransportError` — closed port; + asserts a `hub replicate` warning from whichever lazy + stage surfaces the failure. + - `TestReplicateOnce_CleanReplicationDoesNotWarn` — real + master with two entries, writable follower; entries + replicate, `io.EOF` ends the cycle, zero warnings (pins + the EOF deviation). + - `TestReplicateOnce_KeepsConsumingAfterAppendError` — + read-only follower store directory; both appends fail, + two `hub replicate append` warnings, loop reaches EOF + (pins continue-on-append-failure). + +## Out of Scope + +- Structured (JSON) event-log emission; stderr via + `warn.Warn` is the established pattern (issue's own + out-of-scope list). +- Making `Sync` return an error — the non-blocking hook + contract is hard. +- Wiring `startReplication` into `Server.Start` (#96 + territory) and the hubsync hook's hardcoded + `sinceSequence=0` full-refetch (a separate latent issue, + noted during review of #93).