Skip to content

Service catalog: announce foundation, external catalog, consumer-side resolver#170

Open
kaisoz wants to merge 29 commits into
google:mainfrom
kaisoz:sam-service-catalog
Open

Service catalog: announce foundation, external catalog, consumer-side resolver#170
kaisoz wants to merge 29 commits into
google:mainfrom
kaisoz:sam-service-catalog

Conversation

@kaisoz

@kaisoz kaisoz commented Jun 30, 2026

Copy link
Copy Markdown
Collaborator

Summary

Adds a service catalog that accelerates mesh discovery, in three layers:

  1. Announce foundation (node-side): every node publishes a signed ServiceAnnounce for its local services on the gossip topic /sam/service/announce/v1, on the existing reprovide tick. Self-signed (verified via the announcer's peer_id), no trusted-key list.
  2. External catalog (cmd/sam-catalog): a standalone process that connects to a node's sidecar, bootstraps via /sam/service/discover, tails a new binary-safe /sam/service/announce/stream SSE endpoint, maintains an in-memory store with TTL sweep + periodic re-walk, serves a query_catalog MCP tool, and self-registers as a CATALOG service.
  3. Consumer-side resolver (node-side): discovery resolves via a reachable catalog first (transport-agnostic endpoint resolution — a catalog this node hosts is queried directly over HTTP; a remote one over libp2p), falling back to the DHT on miss/error/empty.

Tests

  • Unit tests for announce sign/verify, the catalog store (TTL), the queryCatalog client + entry→provider mapping.
  • Integration tests (mock hub): announce propagation, catalog end-to-end, discovery + DHT fallback. (The catalog-accelerated path over biscuit-authed libp2p is skipped under the mock hub, which can't issue real biscuits.)
  • New tests/e2e/catalog.bats (real hub): three cases — retrieval via catalog, on-the-fly service pickup, DHT fallback.

Notes

  • DHT stays source-of-truth; the catalog is an accelerator (empty/unreachable → DHT fallback).
  • cmd/sam-catalog's --own-url defaults to the bind address; set a node-reachable address when binding to a non-loopback interface.

🤖 Generated with Claude Code

kaisoz added 27 commits June 30, 2026 10:56
Wire cmd/sam-catalog/main.go: cobra flags, catalog store, MCP HTTP
server, tail-before-bootstrap ordering, registerSelf, sweeper, rewalk.
Add runRewalk helper in ingest.go. Add sam-catalog to Makefile build
target. Add TestCatalogEndToEnd integration test verifying github-tools
appears in query_catalog with correct PeerID.
TestDiscoveryUsesCatalog: hub + provider P (static github-tools) +
sam-catalog + consumer C; polls discover_remote_services (type=mcp)
until catalog path returns github-tools; asserts peer_id == P.

TestDiscoveryFallsBackWithoutCatalog: no catalog; consumer C resolves
github-tools via name-based DHT lookup (discoverServicesByName).
…dial)

When a catalog is registered on the same node that runs discovery,
queryCatalog now queries it directly over HTTP MCP via queryLocalCatalog,
bypassing the libp2p self-dial that silently failed before.
Replace local-vs-mesh split with catalogEndpoint{url,peer} + resolveCatalogEndpoints
(hosted URL first, mesh peer fallback) + callCatalog dispatch. Drops 'local' naming;
a failed hosted-catalog query now falls through to a mesh peer then DHT.
…g param

resolveCatalogEndpoints now guards the hosted-URL append behind !forceRefresh
so the second (retry) pass re-resolves only the mesh peer, avoiding a
redundant re-dial of the co-located catalog.  queryCatalog's serviceType
parameter was never read (only typeStr was used); removed to tighten the
signature and both call sites updated accordingly.
Upstream (MCP SDK v1.6.1) consolidated the MCP HTTP mount to /mcp. Update the
catalog's MCP server, the node->catalog client, and their tests to match.
…ange)

Upstream moved /mcp from NewSSEHandler to NewStreamableHTTPHandler; update the
catalog MCP server, the node->catalog client, and their tests to the Streamable
transport to match.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new service catalog component (sam-catalog) that aggregates and exposes service discoveries via MCP, along with gossip-based service announcements in sam-node. Feedback on the changes focuses on robustness and reliability, including wrapping startup network calls with timeout contexts to prevent blocking, adding defensive nil checks to avoid nil pointer dereference panics, using bufio.Reader instead of bufio.Scanner to handle large SSE lines, and validating/capping TTL values to prevent integer overflow.

Comment thread cmd/sam-catalog/main.go
Comment on lines +101 to +107
// Initial snapshot.
if err := nc.bootstrap(ctx, store, []api.ServiceType{
api.ServiceType_SERVICE_TYPE_MCP,
api.ServiceType_SERVICE_TYPE_INFERENCE,
}); err != nil {
log.Printf("bootstrap warning: %v", err)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The initial bootstrap request is performed using the global signal context ctx, which is only cancelled on process termination (SIGINT/SIGTERM). If the sidecar node is slow or unresponsive during startup, this call will block the startup of sam-catalog indefinitely. Wrapping the bootstrap call with a dedicated timeout context ensures the catalog can start up or fail fast.

Suggested change
// Initial snapshot.
if err := nc.bootstrap(ctx, store, []api.ServiceType{
api.ServiceType_SERVICE_TYPE_MCP,
api.ServiceType_SERVICE_TYPE_INFERENCE,
}); err != nil {
log.Printf("bootstrap warning: %v", err)
}
// Initial snapshot.
bootstrapCtx, bootstrapCancel := context.WithTimeout(ctx, 15*time.Second)
if err := nc.bootstrap(bootstrapCtx, store, []api.ServiceType{
api.ServiceType_SERVICE_TYPE_MCP,
api.ServiceType_SERVICE_TYPE_INFERENCE,
}); err != nil {
log.Printf("bootstrap warning: %v", err)
}
bootstrapCancel()

Comment thread cmd/sam-catalog/main.go
Comment on lines +109 to +112
// Register this catalog as a service so others can discover it.
if err := registerSelf(ctx, nodeURL, nodeToken, ownURL); err != nil {
log.Printf("registerSelf warning: %v", err)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The self-registration request is performed using the global signal context ctx. If the sidecar node is unresponsive, this call will block the startup of sam-catalog indefinitely. Wrapping the registration call with a dedicated timeout context ensures the catalog startup is bounded.

Suggested change
// Register this catalog as a service so others can discover it.
if err := registerSelf(ctx, nodeURL, nodeToken, ownURL); err != nil {
log.Printf("registerSelf warning: %v", err)
}
// Register this catalog as a service so others can discover it.
regCtx, regCancel := context.WithTimeout(ctx, 10*time.Second)
if err := registerSelf(regCtx, nodeURL, nodeToken, ownURL); err != nil {
log.Printf("registerSelf warning: %v", err)
}
regCancel()

Comment thread cmd/sam-node/announce.go
Comment on lines +28 to +31
func (n *SamNode) joinTopic(name string) (*pubsub.Topic, error) {
n.mu.Lock()
defer n.mu.Unlock()
if t, ok := n.topics[name]; ok {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If n.PubSub is nil (e.g., if the node is running in a mode where PubSub is not initialized, or during certain test scenarios), calling n.PubSub.Join will cause a nil pointer dereference panic. Adding a defensive nil check for n.PubSub prevents this. Note that the "errors" package should be added to the imports list.

func (n *SamNode) joinTopic(name string) (*pubsub.Topic, error) {
	n.mu.Lock()
	defer n.mu.Unlock()
	if n.PubSub == nil {
		return nil, errors.New("pubsub not initialized")
	}
	if t, ok := n.topics[name]; ok {

Comment thread cmd/sam-catalog/ingest.go
Comment on lines +150 to +163
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "data: ") {
continue // skip event:/blank lines
}
payload := strings.TrimPrefix(line, "data: ")
var ann api.ServiceAnnounce
if err := protojson.Unmarshal([]byte(payload), &ann); err != nil {
continue
}
store.Upsert(&ann, time.Now())
}
return scanner.Err()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using bufio.Scanner with the default buffer size (64KB) can cause the stream processing to fail with bufio.ErrTooLong if any single SSE line (e.g., a large ServiceAnnounce message) exceeds 64KB. It is safer and more robust to use bufio.Reader with ReadString('\n') which dynamically resizes and has no arbitrary line length limit.

	reader := bufio.NewReader(resp.Body)
	for {
		line, err := reader.ReadString('\n')
		if err != nil {
			if err == io.EOF {
				break
			}
			return err
		}
		line = strings.TrimSuffix(line, "\n")
		line = strings.TrimSuffix(line, "\r")
		if !strings.HasPrefix(line, "data: ") {
			continue
		}
		payload := strings.TrimPrefix(line, "data: ")
		var ann api.ServiceAnnounce
		if err := protojson.Unmarshal([]byte(payload), &ann); err != nil {
			continue
		}
		store.Upsert(&ann, time.Now())
	}
	return nil

Comment on lines +94 to +100
func (n *SamNode) resolveCatalogEndpoints(ctx context.Context, forceRefresh bool) []catalogEndpoint {
var eps []catalogEndpoint
if !forceRefresh {
if url, ok := n.services.hostedCatalogURL(); ok {
eps = append(eps, catalogEndpoint{url: url})
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If n.services is nil (e.g., in a zero-value SamNode or during unit tests), calling n.services.hostedCatalogURL() will cause a nil pointer dereference panic. Adding a defensive nil check for n.services ensures robustness.

Suggested change
func (n *SamNode) resolveCatalogEndpoints(ctx context.Context, forceRefresh bool) []catalogEndpoint {
var eps []catalogEndpoint
if !forceRefresh {
if url, ok := n.services.hostedCatalogURL(); ok {
eps = append(eps, catalogEndpoint{url: url})
}
}
func (n *SamNode) resolveCatalogEndpoints(ctx context.Context, forceRefresh bool) []catalogEndpoint {
var eps []catalogEndpoint
if !forceRefresh && n.services != nil {
if url, ok := n.services.hostedCatalogURL(); ok {
eps = append(eps, catalogEndpoint{url: url})
}
}

Comment thread internal/catalog/store.go
Comment on lines +43 to +51
// Upsert inserts or refreshes an entry, setting Expiry from the announce TTL.
func (s *Store) Upsert(a *api.ServiceAnnounce, now time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
s.entries[key(a.Type, a.Name, a.PeerId)] = Entry{
Type: a.Type, Name: a.Name, PeerID: a.PeerId, Addrs: a.Addrs,
Expiry: now.Add(time.Duration(a.TtlMs) * time.Millisecond),
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The TtlMs value provided in the announcement is converted directly to a time.Duration without validation. An extremely large TtlMs value can cause integer overflow of time.Duration (which wraps around to a negative value or undefined behavior), and can also lead to memory exhaustion if entries are kept in the store indefinitely. Capping the TTL to a maximum reasonable value (e.g., 24 hours) and handling negative values prevents these issues.

// Upsert inserts or refreshes an entry, setting Expiry from the announce TTL.
func (s *Store) Upsert(a *api.ServiceAnnounce, now time.Time) {
	s.mu.Lock()
	defer s.mu.Unlock()
	ttl := time.Duration(a.TtlMs) * time.Millisecond
	const maxTTL = 24 * time.Hour
	if ttl > maxTTL {
		ttl = maxTTL
	} else if ttl < 0 {
		ttl = 0
	}
	s.entries[key(a.Type, a.Name, a.PeerId)] = Entry{
		Type: a.Type, Name: a.Name, PeerID: a.PeerId, Addrs: a.Addrs,
		Expiry: now.Add(ttl),
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant