Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions internal/cli/system/core/hubsync/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
// writes them to disk via the connection render layer.
// It returns a formatted status message with the count
// of synced entries, or an empty string when nothing
// was fetched. Every error is silently swallowed so the
// hook never blocks the session start.
// was fetched. Every error is surfaced as a stderr
// warning via the warn sink, but never propagates: the
// hook must not block the session start. Only a genuine
// zero-entry result stays silent.
//
// The data flow is:
//
Expand Down
10 changes: 9 additions & 1 deletion internal/cli/system/core/hubsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ func Connected(ctxDir string) (bool, error) {
func Sync(_ string) string {
cfg, loadErr := connectCfg.Load()
if loadErr != nil {
logWarn.Warn(cfgWarn.HubSyncLoadConfig, loadErr)
return ""
}

client, dialErr := hub.NewClient(
cfg.HubAddr, cfg.Token,
)
if dialErr != nil {
logWarn.Warn(cfgWarn.HubSyncDial, cfg.HubAddr, dialErr)
return ""
}
defer func() {
Expand All @@ -84,11 +86,17 @@ func Sync(_ string) string {
entries, syncErr := client.Sync(
context.Background(), cfg.Types, 0,
)
if syncErr != nil || len(entries) == 0 {
if syncErr != nil {
logWarn.Warn(cfgWarn.HubSyncPull, cfg.HubAddr, syncErr)
return ""
}
if len(entries) == 0 {
// Genuine empty result: not an error, no warning.
return ""
}

if writeErr := render.WriteEntries(entries); writeErr != nil {
logWarn.Warn(cfgWarn.HubSyncWrite, len(entries), writeErr)
return ""
}

Expand Down
191 changes: 191 additions & 0 deletions internal/cli/system/core/hubsync/sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// / ctx: https://ctx.ist
// ,'`./ do you remember?
// `.,'\
// \ Copyright 2026-present Context contributors.
// SPDX-License-Identifier: Apache-2.0

package hubsync_test

import (
"bytes"
"net"
"os"
"path/filepath"
"strings"
"testing"

"github.com/ActiveMemory/ctx/internal/assets/read/lookup"
connectCfg "github.com/ActiveMemory/ctx/internal/cli/connection/core/config"
"github.com/ActiveMemory/ctx/internal/cli/system/core/hubsync"
"github.com/ActiveMemory/ctx/internal/config/fs"
"github.com/ActiveMemory/ctx/internal/crypto"
"github.com/ActiveMemory/ctx/internal/hub"
logWarn "github.com/ActiveMemory/ctx/internal/log/warn"
"github.com/ActiveMemory/ctx/internal/testutil/testctx"
)

// TestMain initializes the embedded text-asset lookup so error
// strings rendered into warnings resolve their DescKey text.
func TestMain(m *testing.M) {
lookup.Init()
os.Exit(m.Run())
}

// declareContext positions the test in a temp project with a
// materialized .context/ directory and returns its path.
func declareContext(t *testing.T) string {
t.Helper()
dir := t.TempDir()
ctxDir := testctx.Declare(t, dir)
if mkErr := os.MkdirAll(ctxDir, fs.PermExec); mkErr != nil {
t.Fatal(mkErr)
}
return ctxDir
}

// captureWarnings redirects the warn sink to a buffer for the
// duration of the test.
func captureWarnings(t *testing.T) *bytes.Buffer {
t.Helper()
var buf bytes.Buffer
restore := logWarn.SetSink(&buf)
t.Cleanup(restore)
return &buf
}

// saveConnectConfig generates a global encryption key under the
// test HOME and persists a connection config pointing at addr.
func saveConnectConfig(t *testing.T, addr, token string) {
t.Helper()
key, keyErr := crypto.GenerateKey()
if keyErr != nil {
t.Fatal(keyErr)
}
keyPath := crypto.GlobalKeyPath()
if mkErr := os.MkdirAll(
filepath.Dir(keyPath), fs.PermKeyDir,
); mkErr != nil {
t.Fatal(mkErr)
}
if saveErr := crypto.SaveKey(keyPath, key); saveErr != nil {
t.Fatal(saveErr)
}
if cfgErr := connectCfg.Save(connectCfg.Config{
HubAddr: addr,
Token: token,
}); cfgErr != nil {
t.Fatal(cfgErr)
}
}

// startHub serves a hub with the given store on a random port
// and returns its address and a registered client token.
func startHub(t *testing.T, store *hub.Store) (string, string) {
t.Helper()
adminTok, tokErr := hub.GenerateAdminToken()
if tokErr != nil {
t.Fatal(tokErr)
}
srv := hub.NewServer(store, adminTok)
lis, lisErr := net.Listen("tcp", "127.0.0.1:0")
if lisErr != nil {
t.Fatal(lisErr)
}
go func() { _ = srv.Serve(lis) }()
t.Cleanup(srv.GracefulStop)

addr := lis.Addr().String()
client, dialErr := hub.NewClient(addr, "")
if dialErr != nil {
t.Fatal(dialErr)
}
defer func() {
if cerr := client.Close(); cerr != nil {
t.Log(cerr)
}
}()
reg, regErr := client.Register(
t.Context(), adminTok, "hubsync-test",
)
if regErr != nil {
t.Fatal(regErr)
}
return addr, reg.ClientToken
}

func TestSync_WarnsOnLoadError(t *testing.T) {
declareContext(t)
buf := captureWarnings(t)

if got := hubsync.Sync(""); got != "" {
t.Errorf("Sync = %q, want empty on load failure", got)
}
if !strings.Contains(
buf.String(), "hubsync: load connection config:",
) {
t.Errorf("missing load warning, got: %q", buf.String())
}
}

func TestSync_WarnsOnDialError(t *testing.T) {
declareContext(t)
// grpc.NewClient is lazy for almost every bad target, but
// a control character fails URL parsing at construction —
// the one eager failure mode, and exactly what a corrupted
// connect config would produce.
saveConnectConfig(t, "\x00", "tok")
buf := captureWarnings(t)

if got := hubsync.Sync(""); got != "" {
t.Errorf("Sync = %q, want empty on dial failure", got)
}
if !strings.Contains(buf.String(), "hubsync: dial") {
t.Errorf("missing dial warning, got: %q", buf.String())
}
}

func TestSync_WarnsOnPullError(t *testing.T) {
declareContext(t)
// A well-formed address nobody listens on: client
// construction is lazy, so the failure surfaces at the
// Sync RPC.
lis, lisErr := net.Listen("tcp", "127.0.0.1:0")
if lisErr != nil {
t.Fatal(lisErr)
}
addr := lis.Addr().String()
if closeErr := lis.Close(); closeErr != nil {
t.Fatal(closeErr)
}
saveConnectConfig(t, addr, "tok")
buf := captureWarnings(t)

if got := hubsync.Sync(""); got != "" {
t.Errorf("Sync = %q, want empty on pull failure", got)
}
if !strings.Contains(buf.String(), "hubsync: sync from") {
t.Errorf("missing pull warning, got: %q", buf.String())
}
}

func TestSync_NoWarnOnEmptyResult(t *testing.T) {
ctxDir := declareContext(t)
store, storeErr := hub.NewStore(
filepath.Join(ctxDir, "hub-data"),
)
if storeErr != nil {
t.Fatal(storeErr)
}
addr, token := startHub(t, store)
saveConnectConfig(t, addr, token)
buf := captureWarnings(t)

if got := hubsync.Sync(""); got != "" {
t.Errorf("Sync = %q, want empty for zero entries", got)
}
if buf.Len() != 0 {
t.Errorf(
"empty result must not warn, got: %q", buf.String(),
)
}
}
51 changes: 51 additions & 0 deletions internal/config/warn/warn.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,33 @@ const (
// the loss visible.
HubReplicateAppend = "hub replicate append: %v"

// HubReplicateDial is the stderr format for a failed gRPC
// client construction toward the master. Takes (masterAddr,
// error). Like every replication warning, it fires once per
// attempt; the loop retries on its own interval.
HubReplicateDial = "hub replicate dial %s: %v"

// HubReplicateStream is the stderr format for a failed sync
// stream open toward the master. Takes (masterAddr, error).
HubReplicateStream = "hub replicate open stream %s: %v"

// HubReplicateSend is the stderr format for a failed sync
// request send on the replication stream. Takes (masterAddr,
// error).
HubReplicateSend = "hub replicate send request %s: %v"

// HubReplicateCloseSend is the stderr format for a failed
// half-close of the replication stream. Takes (masterAddr,
// error).
HubReplicateCloseSend = "hub replicate close send %s: %v"

// HubReplicateRecv is the stderr format for a transport
// failure while receiving replicated entries. Takes
// (masterAddr, error). io.EOF (the normal end of a sync
// stream) and caller shutdown are deliberately not warned;
// see replicateOnce.
HubReplicateRecv = "hub replicate recv %s: %v"

// StateInitializedProbe is the stderr format for failures
// inside [state.Initialized] beyond "no context dir declared."
// Hooks bail on false either way, but a visible warning shows
Expand Down Expand Up @@ -187,6 +214,30 @@ const (
NotifyWebhookPost = "notify: webhook POST failed: %v"
)

// Hubsync hook warning formats. The session-start hubsync hook
// must never block or fail the session, so [hubsync.Sync] keeps
// returning a (possibly empty) nudge string — these warnings are
// the only signal that a configured hub sync went wrong instead
// of merely finding nothing new.
const (
// HubSyncLoadConfig is the format for a failed connection
// config load. Takes (error).
HubSyncLoadConfig = "hubsync: load connection config: %v"

// HubSyncDial is the format for a rejected hub address at
// client construction. Takes (addr, error).
HubSyncDial = "hubsync: dial %s: %v"

// HubSyncPull is the format for a failed Sync RPC. Takes
// (addr, error). A genuine zero-entry result is not an
// error and is never warned.
HubSyncPull = "hubsync: sync from %s: %v"

// HubSyncWrite is the format for a failed entry write after
// a successful pull. Takes (count, error).
HubSyncWrite = "hubsync: write %d entries: %v"
)

// Warn context identifiers for index generation.
const (
// IndexHeader is the context label for index header write errors.
Expand Down
14 changes: 14 additions & 0 deletions internal/hub/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func replicateOnce(
),
)
if dialErr != nil {
logWarn.Warn(cfgWarn.HubReplicateDial, masterAddr, dialErr)
return
}
defer func() {
Expand All @@ -98,21 +99,34 @@ func replicateOnce(
cfgHub.PathSync,
)
if streamErr != nil {
logWarn.Warn(cfgWarn.HubReplicateStream, masterAddr, streamErr)
return
}

if sendErr := stream.SendMsg(&SyncRequest{
SinceSequence: lastSeq,
}); sendErr != nil {
logWarn.Warn(cfgWarn.HubReplicateSend, masterAddr, sendErr)
return
}
if closeErr := stream.CloseSend(); closeErr != nil {
logWarn.Warn(cfgWarn.HubReplicateCloseSend, masterAddr, closeErr)
return
}

for {
msg := &EntryMsg{}
if recvErr := stream.RecvMsg(msg); recvErr != nil {
// io.EOF is the normal end of every sync stream
// and a done caller context is routine shutdown;
// warning on either would spam stderr once per
// replication cycle. Anything else is a transport
// failure worth surfacing.
if !eof(recvErr) && ctx.Err() == nil {
logWarn.Warn(
cfgWarn.HubReplicateRecv, masterAddr, recvErr,
)
}
return
}
entry := Entry{
Expand Down
Loading
Loading