diff --git a/src/cmd/faktory.go b/src/cmd/faktory.go index 888193c..f909cd6 100644 --- a/src/cmd/faktory.go +++ b/src/cmd/faktory.go @@ -164,6 +164,7 @@ func runJob(ctx context.Context, helper worker.Helper, job opslevel.RunnerJob) p logPrefix := func() string { return fmt.Sprintf("%s [%d] ", time.Now().UTC().Format(time.RFC3339), 0) } streamer := pkg.NewLogStreamer( logger, + logMaxBytes, // per-stream buffer cap tracks the ship-batch size pkg.NewFaktorySetOutcomeProcessor(helper, logger, job.Id), pkg.NewSanitizeLogProcessor(job.Variables), pkg.NewPrefixLogProcessor(logPrefix), diff --git a/src/cmd/root.go b/src/cmd/root.go index 006794f..99c64e1 100644 --- a/src/cmd/root.go +++ b/src/cmd/root.go @@ -49,7 +49,7 @@ func init() { rootCmd.PersistentFlags().String("job-pod-shell", "/bin/sh", "The job pod shell to use for commands run inside the pod.") rootCmd.PersistentFlags().String("job-pod-workdir", "/jobs", "The job pod working directory.") rootCmd.PersistentFlags().Int("job-pod-log-max-interval", 30, "The max amount of time between when pod logs are shipped to OpsLevel. Works in tandem with 'job-pod-log-max-size'") - rootCmd.PersistentFlags().Int("job-pod-log-max-size", 1000000, "The max amount in bytes to buffer before pod logs are shipped to OpsLevel. Works in tandem with 'job-pod-log-max-interval'") + rootCmd.PersistentFlags().Int("job-pod-log-max-size", 262144, "The max bytes of pod logs buffered before a batch is shipped to OpsLevel. Works in tandem with 'job-pod-log-max-interval'. Also the dominant per-job memory term: peak runner memory is roughly 'job-concurrency' * 6 * this value, so lower it when running high concurrency.") rootCmd.PersistentFlags().Bool("job-agent-mode", false, "Enable agent mode with privileged security context for Container-in-Container support. WARNING: This grants elevated privileges and should only be enabled for trusted workloads.") rootCmd.PersistentFlags().String("job-pod-helper-image", "", "Override the helper init container image. Defaults to the published ECR image matching the runner version. Useful for local development with kind.") rootCmd.PersistentFlags().String("queue", "", "The queue this runner should process jobs from. Empty means the default queue.") diff --git a/src/cmd/run.go b/src/cmd/run.go index 392fc18..fbd14b1 100644 --- a/src/cmd/run.go +++ b/src/cmd/run.go @@ -121,6 +121,7 @@ func jobWorker(ctx context.Context, wg *sync.WaitGroup, index int, runnerId opsl streamer := pkg.NewLogStreamer( logger, + logMaxBytes, // per-stream buffer cap tracks the ship-batch size pkg.NewSetOutcomeVarLogProcessor(client, logger, runnerId, jobId, jobNumber), pkg.NewSanitizeLogProcessor(job.Variables), pkg.NewPrefixLogProcessor(logPrefix), diff --git a/src/cmd/test.go b/src/cmd/test.go index 12f57fb..216d9fa 100644 --- a/src/cmd/test.go +++ b/src/cmd/test.go @@ -40,6 +40,7 @@ func doTest(cmd *cobra.Command, args []string) error { } streamer := pkg.NewLogStreamer( log.Logger, + 0, // unbounded buffers for local testing pkg.NewSetOutcomeVarLogProcessor(nil, log.Logger, "1", "1", "1"), pkg.NewSanitizeLogProcessor(job.Variables), pkg.NewLoggerLogProcessor(log.Logger), diff --git a/src/pkg/buffer.go b/src/pkg/buffer.go index 11b05ee..3bb6cdf 100644 --- a/src/pkg/buffer.go +++ b/src/pkg/buffer.go @@ -5,17 +5,50 @@ import ( "sync" ) -// SafeBuffer is a goroutine safe bytes.Buffer +// SafeBuffer is a goroutine safe bytes.Buffer with an optional size cap. +// +// The cap exists to protect the runner from OOM: pod stdout/stderr is written +// into this buffer by client-go's exec stream, while the LogStreamer drains it +// on a ticker. If the drain side stalls (e.g. a slow log-shipping API call) or a +// job emits data faster than it can be drained, an unbounded buffer would grow +// until the process is killed. When maxSize is exceeded, writes beyond the cap +// are dropped and the dropped byte count is recorded so the streamer can emit a +// visible marker into the log stream. type SafeBuffer struct { - buffer bytes.Buffer - mutex sync.Mutex + buffer bytes.Buffer + mutex sync.Mutex + maxSize int + dropped int +} + +// NewSafeBuffer returns a SafeBuffer that drops writes once it holds maxSize +// bytes of unread data. A maxSize <= 0 means unbounded. +func NewSafeBuffer(maxSize int) *SafeBuffer { + return &SafeBuffer{maxSize: maxSize} } // Write appends the contents of p to the buffer, growing the buffer as needed. It returns // the number of bytes written. +// +// To the caller (client-go's exec stream copier) the write always "succeeds" — +// returning a short write or error would tear down the exec stream. When the cap +// is reached we accept as much as fits, drop the rest, and track how much was +// dropped. func (s *SafeBuffer) Write(p []byte) (n int, err error) { s.mutex.Lock() defer s.mutex.Unlock() + if s.maxSize > 0 { + room := s.maxSize - s.buffer.Len() + if room <= 0 { + s.dropped += len(p) + return len(p), nil + } + if len(p) > room { + s.buffer.Write(p[:room]) + s.dropped += len(p) - room + return len(p), nil + } + } return s.buffer.Write(p) } @@ -27,6 +60,23 @@ func (s *SafeBuffer) String() string { return s.buffer.String() } +// Len returns the number of bytes of the unread portion of the buffer. +func (s *SafeBuffer) Len() int { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.buffer.Len() +} + +// DroppedBytes returns the number of bytes dropped since the last call and +// resets the counter. +func (s *SafeBuffer) DroppedBytes() int { + s.mutex.Lock() + defer s.mutex.Unlock() + n := s.dropped + s.dropped = 0 + return n +} + // ReadString reads until the first occurrence of delim in the input, // returning a string containing the data up to and including the delimiter. // If ReadString encounters an error before finding a delimiter, diff --git a/src/pkg/faktoryRunnerAppendJobLogProcessor.go b/src/pkg/faktoryRunnerAppendJobLogProcessor.go index 33038f8..8e55293 100644 --- a/src/pkg/faktoryRunnerAppendJobLogProcessor.go +++ b/src/pkg/faktoryRunnerAppendJobLogProcessor.go @@ -21,10 +21,13 @@ type FaktoryAppendJobLogProcessor struct { firstLine bool lastTime time.Time elapsed time.Duration + batches chan []string + done chan struct{} + droppedBatches int } func NewFaktoryAppendJobLogProcessor(helper faktoryWorker.Helper, logger zerolog.Logger, jobId opslevel.ID, maxBytes int, maxTime time.Duration) *FaktoryAppendJobLogProcessor { - return &FaktoryAppendJobLogProcessor{ + s := &FaktoryAppendJobLogProcessor{ helper: helper, logger: logger, jobId: jobId, @@ -34,7 +37,11 @@ func NewFaktoryAppendJobLogProcessor(helper faktoryWorker.Helper, logger zerolog logLinesBytesSize: 0, firstLine: false, lastTime: time.Now(), + batches: make(chan []string, shipQueueDepth), + done: make(chan struct{}), } + go s.ship() + return s } func (s *FaktoryAppendJobLogProcessor) Process(line string) string { @@ -57,7 +64,7 @@ func (s *FaktoryAppendJobLogProcessor) Process(line string) string { s.elapsed += time.Since(s.lastTime) if s.elapsed > s.maxTime { s.logger.Trace().Msg("Shipping logs because of maxTime ...") - s.elapsed = time.Since(time.Now()) + s.elapsed = 0 s.submit() } s.lastTime = time.Now() @@ -74,21 +81,58 @@ func (s *FaktoryAppendJobLogProcessor) ProcessStderr(line string) string { } func (s *FaktoryAppendJobLogProcessor) Flush(outcome JobOutcome) { - if len(s.logLines) > 0 { - s.logger.Trace().Msg("Sleeping before append job logs ...") - time.Sleep(1 * time.Second) - s.submit() - s.logger.Trace().Msg("Finished append job logs ...") + // The pod is done producing, so the final batch must not be dropped: enqueue + // it with a blocking send (the shipper is still draining) before closing. + if batch := s.takeBatch(); batch != nil { + s.batches <- batch + } + close(s.batches) + <-s.done // wait for in-flight batches to finish enqueuing + if s.droppedBatches > 0 { + s.logger.Warn().Msgf("dropped %d log batch(es) for job '%s' due to enqueue backpressure", s.droppedBatches, s.jobId) + } +} + +// takeBatch detaches the accumulated lines into a standalone batch and starts a +// fresh buffer; see OpsLevelAppendLogProcessor.takeBatch. Returns nil when there +// is nothing buffered. +func (s *FaktoryAppendJobLogProcessor) takeBatch() []string { + if len(s.logLines) == 0 { + return nil } + batch := s.logLines + s.logLines = make([]string, 0, len(batch)) + s.logLinesBytesSize = 0 + return batch } +// submit hands the current batch off to the background shipper. It never blocks; +// see OpsLevelAppendLogProcessor.submit for the rationale. func (s *FaktoryAppendJobLogProcessor) submit() { - if len(s.logLines) > 0 { + batch := s.takeBatch() + if batch == nil { + return + } + select { + case s.batches <- batch: + default: + s.droppedBatches++ + } +} + +// ship runs on its own goroutine, enqueuing batches to Faktory so the +// LogStreamer drain loop never blocks on the (network) enqueue call. +func (s *FaktoryAppendJobLogProcessor) ship() { + defer close(s.done) + for logLines := range s.batches { + if len(logLines) == 0 { + continue + } job := faktory.NewJob("Runners::Faktory::AppendJobLog", opslevel.RunnerAppendJobLogInput{ RunnerId: "faktory", RunnerJobId: s.jobId, SentAt: opslevel.NewISO8601DateNow(), - Logs: s.logLines, + Logs: logLines, }) job.Queue = "app" batch := s.helper.Bid() @@ -98,7 +142,7 @@ func (s *FaktoryAppendJobLogProcessor) submit() { }) if err != nil { MetricEnqueueBatchFailed.Inc() - s.logger.Error().Err(err).Msgf("error while enqueuing append logs for '%d' log line(s) for job '%s'", len(s.logLines), s.jobId) + s.logger.Error().Err(err).Msgf("error while enqueuing append logs for '%d' log line(s) for job '%s'", len(logLines), s.jobId) } } else { err := s.helper.With(func(cl *faktory.Client) error { @@ -106,11 +150,8 @@ func (s *FaktoryAppendJobLogProcessor) submit() { }) if err != nil { MetricEnqueueFailed.Inc() - s.logger.Error().Err(err).Msgf("error while enqueuing append logs for '%d' log line(s) for job '%s'", len(s.logLines), s.jobId) + s.logger.Error().Err(err).Msgf("error while enqueuing append logs for '%d' log line(s) for job '%s'", len(logLines), s.jobId) } } } - s.logLinesBytesSize = 0 - s.logLines = nil - s.logLines = []string{} } diff --git a/src/pkg/logs.go b/src/pkg/logs.go index 98df7a8..4350ab7 100644 --- a/src/pkg/logs.go +++ b/src/pkg/logs.go @@ -24,11 +24,15 @@ type LogStreamer struct { logBuffer *ring.Ring } -func NewLogStreamer(logger zerolog.Logger, processors ...LogProcessor) LogStreamer { +// NewLogStreamer builds a streamer whose stdout/stderr buffers are each capped +// at bufferMaxBytes. A bufferMaxBytes <= 0 leaves the buffers unbounded (used by +// tests and local `test` runs). Bytes dropped once a buffer is full are counted +// in the MetricLogBytesDropped metric. +func NewLogStreamer(logger zerolog.Logger, bufferMaxBytes int, processors ...LogProcessor) LogStreamer { quit := make(chan bool) return LogStreamer{ - Stdout: &SafeBuffer{}, - Stderr: &SafeBuffer{}, + Stdout: NewSafeBuffer(bufferMaxBytes), + Stderr: NewSafeBuffer(bufferMaxBytes), processors: processors, logger: logger, quit: quit, @@ -65,6 +69,15 @@ func (s *LogStreamer) processLine(stream logStream) { s.logBuffer = s.logBuffer.Next() } +// recordDroppedBytes folds any bytes a capped buffer had to drop into the +// dropped-bytes metric so log loss is observable rather than silent. +func (s *LogStreamer) recordDroppedBytes() { + dropped := s.Stdout.DroppedBytes() + s.Stderr.DroppedBytes() + if dropped > 0 && MetricLogBytesDropped != nil { + MetricLogBytesDropped.Add(float64(dropped)) + } +} + func (s *LogStreamer) GetLogBuffer() []string { output := make([]string, 0) s.logBuffer.Do(func(line any) { @@ -93,6 +106,7 @@ func (s *LogStreamer) Run(ctx context.Context) { s.processLine(stream) } } + s.recordDroppedBytes() } } } @@ -119,6 +133,7 @@ done: for _, stream := range s.streams() { s.processLine(stream) } + s.recordDroppedBytes() s.logger.Trace().Msg("Flushing log processors ...") for i := len(s.processors) - 1; i >= 0; i-- { s.processors[i].Flush(outcome) diff --git a/src/pkg/logs_test.go b/src/pkg/logs_test.go index 8025235..2820887 100644 --- a/src/pkg/logs_test.go +++ b/src/pkg/logs_test.go @@ -2,6 +2,7 @@ package pkg import ( "context" + "strings" "testing" "time" @@ -27,7 +28,7 @@ func (c *captureProcessor) Flush(_ JobOutcome) {} func TestLogStreamerPartialLineStdout(t *testing.T) { cap := &captureProcessor{} - s := NewLogStreamer(zerolog.Nop(), cap) + s := NewLogStreamer(zerolog.Nop(), 0, cap) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -46,9 +47,37 @@ func TestLogStreamerPartialLineStdout(t *testing.T) { autopilot.Equals(t, []string{"partial", "trailing-no-newline"}, cap.lines) } +// Data with no newline that exceeds the buffer cap must be bounded to the cap +// (excess dropped) and the capped portion flushed at job end. +func TestLogStreamerCapsOversizedLine(t *testing.T) { + cap := &captureProcessor{} + s := NewLogStreamer(zerolog.Nop(), 64, cap) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go s.Run(ctx) + + // 200 bytes, no newline: 64 are buffered, 136 dropped. + _, _ = s.Stdout.Write([]byte(strings.Repeat("x", 200))) + time.Sleep(150 * time.Millisecond) + s.Flush(JobOutcome{}) + + autopilot.Equals(t, []string{strings.Repeat("x", 64)}, cap.lines) +} + +// SafeBuffer must cap resident memory and report what it dropped. +func TestSafeBufferCapsAndReportsDrops(t *testing.T) { + b := NewSafeBuffer(10) + n, _ := b.Write([]byte("0123456789ABCDEF")) // 16 bytes into a 10-byte cap + autopilot.Equals(t, 16, n) // caller always sees a full write + autopilot.Equals(t, 10, b.Len()) + autopilot.Equals(t, 6, b.DroppedBytes()) + autopilot.Equals(t, 0, b.DroppedBytes()) // counter resets after read +} + func TestLogStreamerPartialLineStderr(t *testing.T) { cap := &captureProcessor{} - s := NewLogStreamer(zerolog.Nop(), cap) + s := NewLogStreamer(zerolog.Nop(), 0, cap) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/src/pkg/metrics.go b/src/pkg/metrics.go index d305957..423541c 100644 --- a/src/pkg/metrics.go +++ b/src/pkg/metrics.go @@ -20,6 +20,7 @@ var ( MetricJobsProcessing prometheus.Gauge MetricEnqueueFailed prometheus.Counter MetricEnqueueBatchFailed prometheus.Counter + MetricLogBytesDropped prometheus.Counter ) func initMetrics(id string) { @@ -61,6 +62,12 @@ func initMetrics(id string) { Help: "The count of jobs that failed to enqueue to faktory for a batch.", ConstLabels: prometheus.Labels{"runner": id}, }) + MetricLogBytesDropped = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: metricNamespace, + Name: "log_bytes_dropped", + Help: "The count of pod log bytes dropped because a per-stream buffer hit its size cap.", + ConstLabels: prometheus.Labels{"runner": id}, + }) } func StartMetricsServer(id string, port int) { diff --git a/src/pkg/opslevelAppendLogProcessor.go b/src/pkg/opslevelAppendLogProcessor.go index 33b5e6b..06cde71 100644 --- a/src/pkg/opslevelAppendLogProcessor.go +++ b/src/pkg/opslevelAppendLogProcessor.go @@ -8,6 +8,14 @@ import ( "github.com/rs/zerolog" ) +// shipQueueDepth bounds how many log batches may wait to ship at once. A slow +// API call backs up into this queue instead of stalling the LogStreamer drain +// loop (which would let the pod stdout/stderr buffers grow unbounded). Memory +// held by the shipper is therefore bounded by shipQueueDepth * maxBytes (times +// ~1.34 for base64), plus the batch currently being assembled. Kept small and +// fixed so per-job memory tracks the single 'job-pod-log-max-size' lever. +const shipQueueDepth = 2 + type OpsLevelAppendLogProcessor struct { client *opslevel.Client logger zerolog.Logger @@ -21,10 +29,13 @@ type OpsLevelAppendLogProcessor struct { firstLine bool lastTime time.Time elapsed time.Duration + batches chan []string + done chan struct{} + droppedBatches int } func NewOpsLevelAppendLogProcessor(client *opslevel.Client, logger zerolog.Logger, runnerId opslevel.ID, jobId opslevel.ID, jobNumber string, maxBytes int, maxTime time.Duration) *OpsLevelAppendLogProcessor { - return &OpsLevelAppendLogProcessor{ + s := &OpsLevelAppendLogProcessor{ client: client, logger: logger, runnerId: runnerId, @@ -36,7 +47,11 @@ func NewOpsLevelAppendLogProcessor(client *opslevel.Client, logger zerolog.Logge logLinesBytesSize: 0, firstLine: false, lastTime: time.Now(), + batches: make(chan []string, shipQueueDepth), + done: make(chan struct{}), } + go s.ship() + return s } func (s *OpsLevelAppendLogProcessor) Process(line string) string { @@ -76,26 +91,64 @@ func (s *OpsLevelAppendLogProcessor) ProcessStderr(line string) string { } func (s *OpsLevelAppendLogProcessor) Flush(outcome JobOutcome) { - if len(s.logLines) > 0 { - s.logger.Trace().Msg("Sleeping before append job logs ...") - time.Sleep(1 * time.Second) - s.submit() - s.logger.Trace().Msg("Finished append job logs ...") + // The pod is done producing, so the final batch must not be dropped: send it + // with a blocking enqueue (the shipper is still draining) before closing. + if batch := s.takeBatch(); batch != nil { + s.batches <- batch } + close(s.batches) + <-s.done // wait for in-flight batches to finish shipping + if s.droppedBatches > 0 { + s.logger.Warn().Msgf("dropped %d log batch(es) for job '%s' due to shipping backpressure", s.droppedBatches, s.jobNumber) + } +} + +// takeBatch detaches the accumulated lines into a standalone batch and starts a +// fresh buffer. A new backing slice is allocated because the returned slice is +// handed to the shipper goroutine. Returns nil when there is nothing buffered. +func (s *OpsLevelAppendLogProcessor) takeBatch() []string { + if len(s.logLines) == 0 { + return nil + } + batch := s.logLines + s.logLines = make([]string, 0, len(batch)) + s.logLinesBytesSize = 0 + return batch } +// submit hands the current batch off to the background shipper. It never blocks: +// if the shipper is behind and the queue is full, the batch is dropped (and +// counted) rather than stalling the drain loop, which would let the pod +// stdout/stderr buffers grow unbounded. func (s *OpsLevelAppendLogProcessor) submit() { - if s.client != nil && len(s.logLines) > 0 { + batch := s.takeBatch() + if batch == nil { + return + } + select { + case s.batches <- batch: + default: + s.droppedBatches++ + } +} + +// ship runs on its own goroutine for the lifetime of the processor, performing +// the (blocking) network calls so the LogStreamer drain loop never waits on the +// API. +func (s *OpsLevelAppendLogProcessor) ship() { + defer close(s.done) + for batch := range s.batches { + if s.client == nil || len(batch) == 0 { + continue + } err := s.client.RunnerAppendJobLog(opslevel.RunnerAppendJobLogInput{ RunnerId: s.runnerId, RunnerJobId: s.jobId, SentAt: opslevel.NewISO8601DateNow(), - Logs: s.logLines, + Logs: batch, }) if err != nil { - s.logger.Error().Err(err).Msgf("error while appending '%d' log line(s) for job '%s'", len(s.logLines), s.jobNumber) + s.logger.Error().Err(err).Msgf("error while appending '%d' log line(s) for job '%s'", len(batch), s.jobNumber) } } - s.logLinesBytesSize = 0 - s.logLines = s.logLines[:0] }