Service catalog: announce foundation, external catalog, consumer-side resolver#170
Service catalog: announce foundation, external catalog, consumer-side resolver#170kaisoz wants to merge 29 commits into
Conversation
Wire cmd/sam-catalog/main.go: cobra flags, catalog store, MCP HTTP server, tail-before-bootstrap ordering, registerSelf, sweeper, rewalk. Add runRewalk helper in ingest.go. Add sam-catalog to Makefile build target. Add TestCatalogEndToEnd integration test verifying github-tools appears in query_catalog with correct PeerID.
TestDiscoveryUsesCatalog: hub + provider P (static github-tools) + sam-catalog + consumer C; polls discover_remote_services (type=mcp) until catalog path returns github-tools; asserts peer_id == P. TestDiscoveryFallsBackWithoutCatalog: no catalog; consumer C resolves github-tools via name-based DHT lookup (discoverServicesByName).
…dial) When a catalog is registered on the same node that runs discovery, queryCatalog now queries it directly over HTTP MCP via queryLocalCatalog, bypassing the libp2p self-dial that silently failed before.
Replace local-vs-mesh split with catalogEndpoint{url,peer} + resolveCatalogEndpoints
(hosted URL first, mesh peer fallback) + callCatalog dispatch. Drops 'local' naming;
a failed hosted-catalog query now falls through to a mesh peer then DHT.
…g param resolveCatalogEndpoints now guards the hosted-URL append behind !forceRefresh so the second (retry) pass re-resolves only the mesh peer, avoiding a redundant re-dial of the co-located catalog. queryCatalog's serviceType parameter was never read (only typeStr was used); removed to tighten the signature and both call sites updated accordingly.
Upstream (MCP SDK v1.6.1) consolidated the MCP HTTP mount to /mcp. Update the catalog's MCP server, the node->catalog client, and their tests to match.
…ange) Upstream moved /mcp from NewSSEHandler to NewStreamableHTTPHandler; update the catalog MCP server, the node->catalog client, and their tests to the Streamable transport to match.
There was a problem hiding this comment.
Code Review
This pull request introduces a new service catalog component (sam-catalog) that aggregates and exposes service discoveries via MCP, along with gossip-based service announcements in sam-node. Feedback on the changes focuses on robustness and reliability, including wrapping startup network calls with timeout contexts to prevent blocking, adding defensive nil checks to avoid nil pointer dereference panics, using bufio.Reader instead of bufio.Scanner to handle large SSE lines, and validating/capping TTL values to prevent integer overflow.
| // Initial snapshot. | ||
| if err := nc.bootstrap(ctx, store, []api.ServiceType{ | ||
| api.ServiceType_SERVICE_TYPE_MCP, | ||
| api.ServiceType_SERVICE_TYPE_INFERENCE, | ||
| }); err != nil { | ||
| log.Printf("bootstrap warning: %v", err) | ||
| } |
There was a problem hiding this comment.
The initial bootstrap request is performed using the global signal context ctx, which is only cancelled on process termination (SIGINT/SIGTERM). If the sidecar node is slow or unresponsive during startup, this call will block the startup of sam-catalog indefinitely. Wrapping the bootstrap call with a dedicated timeout context ensures the catalog can start up or fail fast.
| // Initial snapshot. | |
| if err := nc.bootstrap(ctx, store, []api.ServiceType{ | |
| api.ServiceType_SERVICE_TYPE_MCP, | |
| api.ServiceType_SERVICE_TYPE_INFERENCE, | |
| }); err != nil { | |
| log.Printf("bootstrap warning: %v", err) | |
| } | |
| // Initial snapshot. | |
| bootstrapCtx, bootstrapCancel := context.WithTimeout(ctx, 15*time.Second) | |
| if err := nc.bootstrap(bootstrapCtx, store, []api.ServiceType{ | |
| api.ServiceType_SERVICE_TYPE_MCP, | |
| api.ServiceType_SERVICE_TYPE_INFERENCE, | |
| }); err != nil { | |
| log.Printf("bootstrap warning: %v", err) | |
| } | |
| bootstrapCancel() |
| // Register this catalog as a service so others can discover it. | ||
| if err := registerSelf(ctx, nodeURL, nodeToken, ownURL); err != nil { | ||
| log.Printf("registerSelf warning: %v", err) | ||
| } |
There was a problem hiding this comment.
The self-registration request is performed using the global signal context ctx. If the sidecar node is unresponsive, this call will block the startup of sam-catalog indefinitely. Wrapping the registration call with a dedicated timeout context ensures the catalog startup is bounded.
| // Register this catalog as a service so others can discover it. | |
| if err := registerSelf(ctx, nodeURL, nodeToken, ownURL); err != nil { | |
| log.Printf("registerSelf warning: %v", err) | |
| } | |
| // Register this catalog as a service so others can discover it. | |
| regCtx, regCancel := context.WithTimeout(ctx, 10*time.Second) | |
| if err := registerSelf(regCtx, nodeURL, nodeToken, ownURL); err != nil { | |
| log.Printf("registerSelf warning: %v", err) | |
| } | |
| regCancel() |
| func (n *SamNode) joinTopic(name string) (*pubsub.Topic, error) { | ||
| n.mu.Lock() | ||
| defer n.mu.Unlock() | ||
| if t, ok := n.topics[name]; ok { |
There was a problem hiding this comment.
If n.PubSub is nil (e.g., if the node is running in a mode where PubSub is not initialized, or during certain test scenarios), calling n.PubSub.Join will cause a nil pointer dereference panic. Adding a defensive nil check for n.PubSub prevents this. Note that the "errors" package should be added to the imports list.
func (n *SamNode) joinTopic(name string) (*pubsub.Topic, error) {
n.mu.Lock()
defer n.mu.Unlock()
if n.PubSub == nil {
return nil, errors.New("pubsub not initialized")
}
if t, ok := n.topics[name]; ok {| scanner := bufio.NewScanner(resp.Body) | ||
| for scanner.Scan() { | ||
| line := scanner.Text() | ||
| if !strings.HasPrefix(line, "data: ") { | ||
| continue // skip event:/blank lines | ||
| } | ||
| payload := strings.TrimPrefix(line, "data: ") | ||
| var ann api.ServiceAnnounce | ||
| if err := protojson.Unmarshal([]byte(payload), &ann); err != nil { | ||
| continue | ||
| } | ||
| store.Upsert(&ann, time.Now()) | ||
| } | ||
| return scanner.Err() |
There was a problem hiding this comment.
Using bufio.Scanner with the default buffer size (64KB) can cause the stream processing to fail with bufio.ErrTooLong if any single SSE line (e.g., a large ServiceAnnounce message) exceeds 64KB. It is safer and more robust to use bufio.Reader with ReadString('\n') which dynamically resizes and has no arbitrary line length limit.
reader := bufio.NewReader(resp.Body)
for {
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
break
}
return err
}
line = strings.TrimSuffix(line, "\n")
line = strings.TrimSuffix(line, "\r")
if !strings.HasPrefix(line, "data: ") {
continue
}
payload := strings.TrimPrefix(line, "data: ")
var ann api.ServiceAnnounce
if err := protojson.Unmarshal([]byte(payload), &ann); err != nil {
continue
}
store.Upsert(&ann, time.Now())
}
return nil| func (n *SamNode) resolveCatalogEndpoints(ctx context.Context, forceRefresh bool) []catalogEndpoint { | ||
| var eps []catalogEndpoint | ||
| if !forceRefresh { | ||
| if url, ok := n.services.hostedCatalogURL(); ok { | ||
| eps = append(eps, catalogEndpoint{url: url}) | ||
| } | ||
| } |
There was a problem hiding this comment.
If n.services is nil (e.g., in a zero-value SamNode or during unit tests), calling n.services.hostedCatalogURL() will cause a nil pointer dereference panic. Adding a defensive nil check for n.services ensures robustness.
| func (n *SamNode) resolveCatalogEndpoints(ctx context.Context, forceRefresh bool) []catalogEndpoint { | |
| var eps []catalogEndpoint | |
| if !forceRefresh { | |
| if url, ok := n.services.hostedCatalogURL(); ok { | |
| eps = append(eps, catalogEndpoint{url: url}) | |
| } | |
| } | |
| func (n *SamNode) resolveCatalogEndpoints(ctx context.Context, forceRefresh bool) []catalogEndpoint { | |
| var eps []catalogEndpoint | |
| if !forceRefresh && n.services != nil { | |
| if url, ok := n.services.hostedCatalogURL(); ok { | |
| eps = append(eps, catalogEndpoint{url: url}) | |
| } | |
| } |
| // Upsert inserts or refreshes an entry, setting Expiry from the announce TTL. | ||
| func (s *Store) Upsert(a *api.ServiceAnnounce, now time.Time) { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
| s.entries[key(a.Type, a.Name, a.PeerId)] = Entry{ | ||
| Type: a.Type, Name: a.Name, PeerID: a.PeerId, Addrs: a.Addrs, | ||
| Expiry: now.Add(time.Duration(a.TtlMs) * time.Millisecond), | ||
| } | ||
| } |
There was a problem hiding this comment.
The TtlMs value provided in the announcement is converted directly to a time.Duration without validation. An extremely large TtlMs value can cause integer overflow of time.Duration (which wraps around to a negative value or undefined behavior), and can also lead to memory exhaustion if entries are kept in the store indefinitely. Capping the TTL to a maximum reasonable value (e.g., 24 hours) and handling negative values prevents these issues.
// Upsert inserts or refreshes an entry, setting Expiry from the announce TTL.
func (s *Store) Upsert(a *api.ServiceAnnounce, now time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
ttl := time.Duration(a.TtlMs) * time.Millisecond
const maxTTL = 24 * time.Hour
if ttl > maxTTL {
ttl = maxTTL
} else if ttl < 0 {
ttl = 0
}
s.entries[key(a.Type, a.Name, a.PeerId)] = Entry{
Type: a.Type, Name: a.Name, PeerID: a.PeerId, Addrs: a.Addrs,
Expiry: now.Add(ttl),
}
}…ose/write; drop unused type)
Summary
Adds a service catalog that accelerates mesh discovery, in three layers:
ServiceAnnouncefor its local services on the gossip topic/sam/service/announce/v1, on the existing reprovide tick. Self-signed (verified via the announcer'speer_id), no trusted-key list.cmd/sam-catalog): a standalone process that connects to a node's sidecar, bootstraps via/sam/service/discover, tails a new binary-safe/sam/service/announce/streamSSE endpoint, maintains an in-memory store with TTL sweep + periodic re-walk, serves aquery_catalogMCP tool, and self-registers as aCATALOGservice.Tests
tests/e2e/catalog.bats(real hub): three cases — retrieval via catalog, on-the-fly service pickup, DHT fallback.Notes
cmd/sam-catalog's--own-urldefaults to the bind address; set a node-reachable address when binding to a non-loopback interface.🤖 Generated with Claude Code