diff --git a/internal/temporalcli/commands.server_test.go b/internal/temporalcli/commands.server_test.go index 26b92cf60..98438875b 100644 --- a/internal/temporalcli/commands.server_test.go +++ b/internal/temporalcli/commands.server_test.go @@ -2,6 +2,7 @@ package temporalcli_test import ( "context" + "fmt" "net" "os" "path/filepath" @@ -139,58 +140,104 @@ func startDevServerAndRunSimpleTest(t *testing.T, args []string, dialAddress str } func TestServer_StartDev_ConcurrentStarts(t *testing.T) { - startOne := func() { - h := NewCommandHarness(t) - defer h.Close() + h := NewCommandHarness(t) + + startOne := func() error { + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() // Start in background, then wait for client to be able to connect port := strconv.Itoa(devserver.MustGetFreePort("127.0.0.1")) httpPort := strconv.Itoa(devserver.MustGetFreePort("127.0.0.1")) resCh := make(chan *CommandResult, 1) go func() { - resCh <- h.Execute("server", "start-dev", "-p", port, "--http-port", httpPort, "--headless", "--log-level", "never") + resCh <- h.ExecuteWithContext(ctx, "server", "start-dev", "-p", port, "--http-port", httpPort, "--headless", "--log-level", "never") }() // Try to connect for a bit while checking for error var cl client.Client - h.EventuallyWithT(func(t *assert.CollectT) { + var lastDialErr error + ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() + timeout := time.NewTimer(3 * time.Second) + defer timeout.Stop() + + waitForServer: + for { select { case res := <-resCh: - require.NoError(t, res.Err) - require.Fail(t, "got early server result") - default: + return concurrentStartCommandResultError("got early server result", res) + case <-ticker.C: + var err error + cl, err = client.Dial(client.Options{HostPort: "127.0.0.1:" + port}) + if err == nil { + break waitForServer + } + lastDialErr = err + case <-timeout.C: + break waitForServer } - var err error - cl, err = client.Dial(client.Options{HostPort: "127.0.0.1:" + port, Logger: testLogger{t: h.t}}) - assert.NoError(t, err) - }, 3*time.Second, 200*time.Millisecond) + } + if cl == nil { + cancel() + select { + case <-time.After(20 * time.Second): + return fmt.Errorf("server was not reachable after 3 seconds: %w; also did not clean up within 20 seconds", lastDialErr) + case res := <-resCh: + if res.Err != nil { + return fmt.Errorf( + "server was not reachable after 3 seconds: %w; cleanup failed: %w", + lastDialErr, + res.Err, + ) + } + return fmt.Errorf("server was not reachable after 3 seconds: %w", lastDialErr) + } + } defer cl.Close() // Send an interrupt by cancelling context - h.CancelContext() + cancel() select { case <-time.After(20 * time.Second): - h.Fail("didn't cleanup after 20 seconds") + return fmt.Errorf("didn't cleanup after 20 seconds") case res := <-resCh: - h.NoError(res.Err) + if res.Err != nil { + return concurrentStartCommandResultError("server returned error", res) + } } + return nil } - // Start 40 dev server instances, with 8 concurrent executions + // Start 40 dev server instances, with 6 concurrent executions. instanceCounter := atomic.Int32{} instanceCounter.Store(40) + errCh := make(chan error, 40) wg := &sync.WaitGroup{} for i := 0; i < 6; i++ { wg.Add(1) go func() { + defer wg.Done() for instanceCounter.Add(-1) >= 0 { - startOne() + if err := startOne(); err != nil { + errCh <- err + } } - wg.Done() }() } wg.Wait() + close(errCh) + for err := range errCh { + require.NoError(t, err) + } +} + +func concurrentStartCommandResultError(msg string, res *CommandResult) error { + if res.Err != nil { + return fmt.Errorf("%s: %w (stdout: %q, stderr: %q)", msg, res.Err, res.Stdout.String(), res.Stderr.String()) + } + return fmt.Errorf("%s (stdout: %q, stderr: %q)", msg, res.Stdout.String(), res.Stderr.String()) } func TestServer_StartDev_WithSearchAttributes(t *testing.T) { diff --git a/internal/temporalcli/commands_test.go b/internal/temporalcli/commands_test.go index 1b1a18e51..8e9bb1f2f 100644 --- a/internal/temporalcli/commands_test.go +++ b/internal/temporalcli/commands_test.go @@ -151,11 +151,23 @@ type CommandResult struct { } func (h *CommandHarness) Execute(args ...string) *CommandResult { + ctx, cancel := context.WithCancel(h.Context) + h.t.Cleanup(cancel) + defer cancel() + return h.execute(ctx, &h.Stdin, args...) +} + +func (h *CommandHarness) ExecuteWithContext(ctx context.Context, args ...string) *CommandResult { + var stdin bytes.Buffer + return h.execute(ctx, &stdin, args...) +} + +func (h *CommandHarness) execute(ctx context.Context, stdin io.Reader, args ...string) *CommandResult { // Copy options, update as needed res := &CommandResult{} options := h.Options // Set stdio - options.Stdin = &h.Stdin + options.Stdin = stdin options.Stdout = &res.Stdout options.Stderr = &res.Stderr // Set args @@ -175,10 +187,6 @@ func (h *CommandHarness) Execute(args ...string) *CommandResult { res.Err = err } - // Run - ctx, cancel := context.WithCancel(h.Context) - h.t.Cleanup(cancel) - defer cancel() h.t.Logf("Calling: %v", strings.Join(args, " ")) temporalcli.Execute(ctx, options) if res.Stdout.Len() > 0 {