From b942b4166642928edb2b38e6f6a158a90aa547c9 Mon Sep 17 00:00:00 2001 From: Nanook Date: Sun, 21 Jun 2026 04:25:53 +0000 Subject: [PATCH 1/2] test: move concurrent start assertions to test goroutine --- internal/temporalcli/commands.server_test.go | 97 ++++++++++++++++---- 1 file changed, 81 insertions(+), 16 deletions(-) diff --git a/internal/temporalcli/commands.server_test.go b/internal/temporalcli/commands.server_test.go index 26b92cf60..9f0270eb9 100644 --- a/internal/temporalcli/commands.server_test.go +++ b/internal/temporalcli/commands.server_test.go @@ -1,7 +1,9 @@ package temporalcli_test import ( + "bytes" "context" + "fmt" "net" "os" "path/filepath" @@ -14,6 +16,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/temporalio/cli/internal/devserver" + "github.com/temporalio/cli/internal/temporalcli" "go.temporal.io/api/operatorservice/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporal" @@ -139,58 +142,120 @@ func startDevServerAndRunSimpleTest(t *testing.T, args []string, dialAddress str } func TestServer_StartDev_ConcurrentStarts(t *testing.T) { - startOne := func() { - h := NewCommandHarness(t) - defer h.Close() + startOne := func() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Start in background, then wait for client to be able to connect port := strconv.Itoa(devserver.MustGetFreePort("127.0.0.1")) httpPort := strconv.Itoa(devserver.MustGetFreePort("127.0.0.1")) resCh := make(chan *CommandResult, 1) go func() { - resCh <- h.Execute("server", "start-dev", "-p", port, "--http-port", httpPort, "--headless", "--log-level", "never") + resCh <- executeConcurrentStartCommand(ctx, "server", "start-dev", "-p", port, "--http-port", httpPort, "--headless", "--log-level", "never") }() // Try to connect for a bit while checking for error var cl client.Client - h.EventuallyWithT(func(t *assert.CollectT) { + var lastDialErr error + for start := time.Now(); time.Since(start) < 3*time.Second; { select { case res := <-resCh: - require.NoError(t, res.Err) - require.Fail(t, "got early server result") + return concurrentStartCommandResultError("got early server result", res) default: } var err error - cl, err = client.Dial(client.Options{HostPort: "127.0.0.1:" + port, Logger: testLogger{t: h.t}}) - assert.NoError(t, err) - }, 3*time.Second, 200*time.Millisecond) + cl, err = client.Dial(client.Options{HostPort: "127.0.0.1:" + port}) + if err == nil { + break + } + lastDialErr = err + time.Sleep(200 * time.Millisecond) + } + if cl == nil { + cancel() + select { + case <-time.After(20 * time.Second): + return fmt.Errorf("server was not reachable after 3 seconds: %w; also did not clean up within 20 seconds", lastDialErr) + case res := <-resCh: + if res.Err != nil { + return fmt.Errorf( + "server was not reachable after 3 seconds: %w; cleanup failed: %w", + lastDialErr, + res.Err, + ) + } + return fmt.Errorf("server was not reachable after 3 seconds: %w", lastDialErr) + } + } defer cl.Close() // Send an interrupt by cancelling context - h.CancelContext() + cancel() select { case <-time.After(20 * time.Second): - h.Fail("didn't cleanup after 20 seconds") + return fmt.Errorf("didn't cleanup after 20 seconds") case res := <-resCh: - h.NoError(res.Err) + if res.Err != nil { + return concurrentStartCommandResultError("server returned error", res) + } } + return nil } - // Start 40 dev server instances, with 8 concurrent executions + // Start 40 dev server instances, with 6 concurrent executions. instanceCounter := atomic.Int32{} instanceCounter.Store(40) + errCh := make(chan error, 40) wg := &sync.WaitGroup{} for i := 0; i < 6; i++ { wg.Add(1) go func() { + defer wg.Done() for instanceCounter.Add(-1) >= 0 { - startOne() + if err := startOne(); err != nil { + errCh <- err + } } - wg.Done() }() } wg.Wait() + close(errCh) + for err := range errCh { + require.NoError(t, err) + } +} + +func executeConcurrentStartCommand(ctx context.Context, args ...string) *CommandResult { + res := &CommandResult{} + var stdin bytes.Buffer + options := temporalcli.CommandOptions{ + IOStreams: temporalcli.IOStreams{ + Stdin: &stdin, + Stdout: &res.Stdout, + Stderr: &res.Stderr, + }, + Args: args, + DeprecatedEnvConfig: temporalcli.DeprecatedEnvConfig{ + DisableEnvConfig: true, + EnvConfigName: "default", + }, + } + options.Fail = func(err error) { + if res.Err == nil { + res.Err = err + } + } + + temporalcli.Execute(ctx, options) + return res +} + +func concurrentStartCommandResultError(msg string, res *CommandResult) error { + if res.Err != nil { + return fmt.Errorf("%s: %w (stdout: %q, stderr: %q)", msg, res.Err, res.Stdout.String(), res.Stderr.String()) + } + return fmt.Errorf("%s (stdout: %q, stderr: %q)", msg, res.Stdout.String(), res.Stderr.String()) } func TestServer_StartDev_WithSearchAttributes(t *testing.T) { From ce9c1a5b228b1e389771bf3d1e851ff52d744819 Mon Sep 17 00:00:00 2001 From: Nanook Date: Mon, 22 Jun 2026 16:24:00 +0000 Subject: [PATCH 2/2] test: use command harness for concurrent starts --- internal/temporalcli/commands.server_test.go | 58 +++++++------------- internal/temporalcli/commands_test.go | 18 ++++-- 2 files changed, 33 insertions(+), 43 deletions(-) diff --git a/internal/temporalcli/commands.server_test.go b/internal/temporalcli/commands.server_test.go index 9f0270eb9..98438875b 100644 --- a/internal/temporalcli/commands.server_test.go +++ b/internal/temporalcli/commands.server_test.go @@ -1,7 +1,6 @@ package temporalcli_test import ( - "bytes" "context" "fmt" "net" @@ -16,7 +15,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/temporalio/cli/internal/devserver" - "github.com/temporalio/cli/internal/temporalcli" "go.temporal.io/api/operatorservice/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/temporal" @@ -142,8 +140,10 @@ func startDevServerAndRunSimpleTest(t *testing.T, args []string, dialAddress str } func TestServer_StartDev_ConcurrentStarts(t *testing.T) { + h := NewCommandHarness(t) + startOne := func() error { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(t.Context()) defer cancel() // Start in background, then wait for client to be able to connect @@ -151,25 +151,32 @@ func TestServer_StartDev_ConcurrentStarts(t *testing.T) { httpPort := strconv.Itoa(devserver.MustGetFreePort("127.0.0.1")) resCh := make(chan *CommandResult, 1) go func() { - resCh <- executeConcurrentStartCommand(ctx, "server", "start-dev", "-p", port, "--http-port", httpPort, "--headless", "--log-level", "never") + resCh <- h.ExecuteWithContext(ctx, "server", "start-dev", "-p", port, "--http-port", httpPort, "--headless", "--log-level", "never") }() // Try to connect for a bit while checking for error var cl client.Client var lastDialErr error - for start := time.Now(); time.Since(start) < 3*time.Second; { + ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() + timeout := time.NewTimer(3 * time.Second) + defer timeout.Stop() + + waitForServer: + for { select { case res := <-resCh: return concurrentStartCommandResultError("got early server result", res) - default: - } - var err error - cl, err = client.Dial(client.Options{HostPort: "127.0.0.1:" + port}) - if err == nil { - break + case <-ticker.C: + var err error + cl, err = client.Dial(client.Options{HostPort: "127.0.0.1:" + port}) + if err == nil { + break waitForServer + } + lastDialErr = err + case <-timeout.C: + break waitForServer } - lastDialErr = err - time.Sleep(200 * time.Millisecond) } if cl == nil { cancel() @@ -226,31 +233,6 @@ func TestServer_StartDev_ConcurrentStarts(t *testing.T) { } } -func executeConcurrentStartCommand(ctx context.Context, args ...string) *CommandResult { - res := &CommandResult{} - var stdin bytes.Buffer - options := temporalcli.CommandOptions{ - IOStreams: temporalcli.IOStreams{ - Stdin: &stdin, - Stdout: &res.Stdout, - Stderr: &res.Stderr, - }, - Args: args, - DeprecatedEnvConfig: temporalcli.DeprecatedEnvConfig{ - DisableEnvConfig: true, - EnvConfigName: "default", - }, - } - options.Fail = func(err error) { - if res.Err == nil { - res.Err = err - } - } - - temporalcli.Execute(ctx, options) - return res -} - func concurrentStartCommandResultError(msg string, res *CommandResult) error { if res.Err != nil { return fmt.Errorf("%s: %w (stdout: %q, stderr: %q)", msg, res.Err, res.Stdout.String(), res.Stderr.String()) diff --git a/internal/temporalcli/commands_test.go b/internal/temporalcli/commands_test.go index 1b1a18e51..8e9bb1f2f 100644 --- a/internal/temporalcli/commands_test.go +++ b/internal/temporalcli/commands_test.go @@ -151,11 +151,23 @@ type CommandResult struct { } func (h *CommandHarness) Execute(args ...string) *CommandResult { + ctx, cancel := context.WithCancel(h.Context) + h.t.Cleanup(cancel) + defer cancel() + return h.execute(ctx, &h.Stdin, args...) +} + +func (h *CommandHarness) ExecuteWithContext(ctx context.Context, args ...string) *CommandResult { + var stdin bytes.Buffer + return h.execute(ctx, &stdin, args...) +} + +func (h *CommandHarness) execute(ctx context.Context, stdin io.Reader, args ...string) *CommandResult { // Copy options, update as needed res := &CommandResult{} options := h.Options // Set stdio - options.Stdin = &h.Stdin + options.Stdin = stdin options.Stdout = &res.Stdout options.Stderr = &res.Stderr // Set args @@ -175,10 +187,6 @@ func (h *CommandHarness) Execute(args ...string) *CommandResult { res.Err = err } - // Run - ctx, cancel := context.WithCancel(h.Context) - h.t.Cleanup(cancel) - defer cancel() h.t.Logf("Calling: %v", strings.Join(args, " ")) temporalcli.Execute(ctx, options) if res.Stdout.Len() > 0 {