diff --git a/bin/opslevel-runner-coding-agent b/bin/opslevel-runner-coding-agent new file mode 100755 index 0000000..5424f2e --- /dev/null +++ b/bin/opslevel-runner-coding-agent @@ -0,0 +1,19 @@ +#!/usr/bin/env bash + +set -eu +SCRIPT_DIR="${BASH_SOURCE[0]%/*}" + +# source for KUBECONFIG and k8s context to be set +source "$SCRIPT_DIR/setup-kind.sh" opslevel-runner + +exec watchexec --watch "$SCRIPT_DIR/../src" --exts go,mod,sum --restart \ + -- go run -C "$SCRIPT_DIR/../src" . --log-level TRACE run \ + --mode=faktory \ + --queues=coding-agent \ + --queue=coding-agent \ + --job-pod-max-wait=900 \ + --runner-pod-namespace=default \ + --job-agent-mode=true --metrics-port=10355 \ + --job-pod-helper-image=localhost/opslevel-runner:dev \ + --job-pod-requests-cpu="${OPSLEVEL_JOB_POD_REQUESTS_CPU:-50}" \ + --job-pod-requests-memory="${OPSLEVEL_JOB_POD_REQUESTS_MEMORY:-32}" diff --git a/src/Procfile b/src/Procfile index 81387ce..7adde19 100644 --- a/src/Procfile +++ b/src/Procfile @@ -1,3 +1,4 @@ faktory: faktory runner: ../bin/opslevel-runner-runner +coding-agent: ../bin/opslevel-runner-coding-agent image-builder: watchexec --watch cmd/enqueue.go --watch cmd/root.go --watch main.go --watch go.mod --watch go.sum --watch ../Dockerfile -- HELPER_IMAGE=localhost/opslevel-runner:dev ../bin/build-helper-image.sh opslevel-runner diff --git a/src/cmd/enqueue.go b/src/cmd/enqueue.go index c1ed6fb..ad98078 100644 --- a/src/cmd/enqueue.go +++ b/src/cmd/enqueue.go @@ -1,11 +1,13 @@ package cmd import ( + "encoding/json" "fmt" "os" "time" faktory "github.com/contribsys/faktory/client" + "github.com/rs/zerolog/log" "github.com/spf13/cobra" "gopkg.in/yaml.v3" ) @@ -32,6 +34,7 @@ var enqueueCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { job, err := readFaktoryJobInput() cobra.CheckErr(err) + client, err := faktory.Open() cobra.CheckErr(err) @@ -58,6 +61,12 @@ var enqueueCmd = &cobra.Command{ } } + if log.Debug().Enabled() { + if b, merr := json.Marshal(j); merr == nil { + log.Debug().RawJSON("payload", b).Msg("Submitting Faktory job") + } + } + if job.Batch != "" { batch, err := client.BatchOpen(job.Batch) cobra.CheckErr(err) diff --git a/src/cmd/faktory.go b/src/cmd/faktory.go index 888193c..c11d5bc 100644 --- a/src/cmd/faktory.go +++ b/src/cmd/faktory.go @@ -172,7 +172,7 @@ func runJob(ctx context.Context, helper worker.Helper, job opslevel.RunnerJob) p go streamer.Run(ctx) pkg.MetricJobsProcessing.Inc() - logger.Info().Msgf("Starting job '%s'", job.Id) + logger.Info().Str("job_id", string(job.Id)).Msg("job started") runner := pkg.NewJobRunner("faktory", cfgFile) outcome := runner.Run(ctx, job, streamer.Stdout, streamer.Stderr) streamer.Flush(outcome) @@ -186,7 +186,12 @@ func emitJobStartedMetrics() time.Time { func emitJobCompleteMetrics(jobStart time.Time, job opslevel.RunnerJob, outcome pkg.JobOutcome) { jobDuration := time.Since(jobStart) - log.Info().Str("outcome", outcome.Message).Msgf("Finished job '%s' took '%s' and had outcome '%s'", job.Id, jobDuration, outcome.Outcome) + log.Info(). + Str("job_id", string(job.Id)). + Str("outcome", string(outcome.Outcome)). + Int64("duration_ms", jobDuration.Milliseconds()). + Str("outcome_message", outcome.Message). + Msg("job finished") pkg.MetricJobsDuration.Observe(jobDuration.Seconds()) pkg.MetricJobsFinished.WithLabelValues(string(outcome.Outcome)).Inc() pkg.MetricJobsProcessing.Dec() diff --git a/src/pkg/k8s.go b/src/pkg/k8s.go index e7a9e25..5c42d83 100644 --- a/src/pkg/k8s.go +++ b/src/pkg/k8s.go @@ -16,6 +16,7 @@ import ( corev1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -30,8 +31,8 @@ import ( ) const ( - ContainerNameHelper = "helper" - ContainerNameJob = "job" + SquidProxyImage = "ubuntu/squid:latest@sha256:6a097f68bae708cedbabd6188d68c7e2e7a38cedd05a176e1cc0ba29e3bbe029" + SquidConfigMapName = "squid-config" ) var ( @@ -92,7 +93,7 @@ func LoadK8SClient() { func NewJobRunner(runnerId string, path string) *JobRunner { if !k8sValidated { - // It's ok if this function panics because we wouldn't beable to run jobs anyway + // It's ok if this function panics because we wouldn't be able to run jobs anyway LoadK8SClient() } // kubernetes.Clientset is thread-safe and designed to be shared across goroutines @@ -121,6 +122,15 @@ func (s *JobRunner) getPodEnv(configs []opslevel.RunnerJobVariable) []corev1.Env return output } +func extractJobVariable(vars []opslevel.RunnerJobVariable, key string) string { + for _, v := range vars { + if v.Key == key { + return v.Value + } + } + return "" +} + func (s *JobRunner) getConfigMapObject(identifier string, job opslevel.RunnerJob) *corev1.ConfigMap { data := map[string]string{} for _, file := range job.Files { @@ -171,6 +181,14 @@ func executable() *int32 { return &value } +func getContainerNames(containers []corev1.Container) []string { + names := make([]string, 0, len(containers)) + for _, container := range containers { + names = append(names, container.Name) + } + return names +} + func (s *JobRunner) getPodObject(identifier string, labels map[string]string, job opslevel.RunnerJob) *corev1.Pod { // TODO: Allow configuration of Labels // TODO: Allow configuration of Pod Command @@ -195,6 +213,162 @@ func (s *JobRunner) getPodObject(identifier string, labels map[string]string, jo } } + containers := []corev1.Container{ + { + Name: "job", + Image: job.Image, + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{ + "/bin/sh", + "-c", + fmt.Sprintf("sleep %d", s.podConfig.Lifetime), + }, + Resources: s.podConfig.Resources, + Env: s.getPodEnv(job.Variables), + SecurityContext: containerSecurityContext, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "scripts", + ReadOnly: true, + MountPath: "/opslevel", + }, + { + Name: "shared", + ReadOnly: true, + MountPath: "/mount", + }, + }, + }, + } + + volumes := []corev1.Volume{ + { + Name: "scripts", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: identifier, + }, + DefaultMode: executable(), + }, + }, + }, + { + Name: "shared", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + } + + // helperContainer copies the runner binary into the shared volume. It runs + // to completion before the main and sidecar containers start. + helperContainer := corev1.Container{ + Name: "helper", + Image: s.podConfig.helperImage(), + ImagePullPolicy: s.podConfig.PullPolicy, + Command: []string{ + "cp", + "/opslevel-runner", + "/mount", + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "shared", + ReadOnly: false, + MountPath: "/mount", + }, + }, + } + initContainers := []corev1.Container{helperContainer} + + // working with the queue name; can't use agentMode until agentMode stops + // implying privileged mode + if s.podConfig.Queue == "coding-agent" { + proxyAllowedDomains := extractJobVariable(job.Variables, "PROXY_ALLOWED_DOMAINS") + squidUID := int64(13) + alwaysPolicy := corev1.ContainerRestartPolicyAlways + // squid runs as a native sidecar (init container with RestartPolicy=Always). + // This gates the job container's start on the startupProbe succeeding and + // lets kubelet restart squid independently without touching the pod-level + // RestartPolicy=Never that the runner relies on for job outcome reporting. + initContainers = append(initContainers, corev1.Container{ + Name: "squid", + Image: SquidProxyImage, + ImagePullPolicy: corev1.PullIfNotPresent, + RestartPolicy: &alwaysPolicy, + Command: []string{"/bin/sh", "-c"}, + Args: []string{`set -eu +: > /srv/squid/custom-allowed-domains.conf +if [ -n "${PROXY_ALLOWED_DOMAINS:-}" ]; then + echo "$PROXY_ALLOWED_DOMAINS" | tr ',' '\n' > /srv/squid/custom-allowed-domains.conf +fi +printf 'include /etc/squid/conf.d/squid.conf\npid_filename /srv/squid/squid.pid\n' > /srv/squid/squid.conf +exec squid -N -f /srv/squid/squid.conf +`}, + SecurityContext: &corev1.SecurityContext{ + RunAsUser: &squidUID, + RunAsGroup: &squidUID, + }, + Ports: []corev1.ContainerPort{ + {Name: "proxy", ContainerPort: 3128, Protocol: corev1.ProtocolTCP}, + }, + Env: []corev1.EnvVar{ + {Name: "PROXY_ALLOWED_DOMAINS", Value: proxyAllowedDomains}, + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "squid-config", ReadOnly: true, MountPath: "/etc/squid/conf.d"}, + {Name: "squid-runtime", ReadOnly: false, MountPath: "/srv/squid"}, + }, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("50m"), + corev1.ResourceMemory: resource.MustParse("128Mi"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("512Mi"), + }, + }, + StartupProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + TCPSocket: &corev1.TCPSocketAction{ + Port: intstr.FromInt(3128), + }, + }, + InitialDelaySeconds: 0, + PeriodSeconds: 1, + TimeoutSeconds: 1, + FailureThreshold: 5, + }, + }) + + volumes = append(volumes, + corev1.Volume{ + Name: "squid-config", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: SquidConfigMapName, + }, + }, + }, + }, + corev1.Volume{ + Name: "squid-runtime", + VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}, + }, + ) + + // set env vars to route job container traffic through the sidecar proxy. + proxyURL := "http://localhost:3128" + containers[0].Env = append(containers[0].Env, + corev1.EnvVar{Name: "http_proxy", Value: proxyURL}, + corev1.EnvVar{Name: "https_proxy", Value: proxyURL}, + corev1.EnvVar{Name: "no_proxy", Value: "localhost,127.0.0.1,::1"}, + ) + } + return &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: identifier, @@ -208,71 +382,9 @@ func (s *JobRunner) getPodObject(identifier string, labels map[string]string, jo SecurityContext: &podSecurityContext, ServiceAccountName: s.podConfig.ServiceAccountName, NodeSelector: s.podConfig.NodeSelector, - InitContainers: []corev1.Container{ - { - Name: ContainerNameHelper, - Image: s.podConfig.helperImage(), - ImagePullPolicy: s.podConfig.PullPolicy, - Command: []string{ - "cp", - "/opslevel-runner", - "/mount", - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "shared", - ReadOnly: false, - MountPath: "/mount", - }, - }, - }, - }, - Containers: []corev1.Container{ - { - Name: ContainerNameJob, - Image: job.Image, - ImagePullPolicy: corev1.PullIfNotPresent, - Command: []string{ - "/bin/sh", - "-c", - fmt.Sprintf("sleep %d", s.podConfig.Lifetime), - }, - Resources: s.podConfig.Resources, - Env: s.getPodEnv(job.Variables), - SecurityContext: containerSecurityContext, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "scripts", - ReadOnly: true, - MountPath: "/opslevel", - }, - { - Name: "shared", - ReadOnly: true, - MountPath: "/mount", - }, - }, - }, - }, - Volumes: []corev1.Volume{ - { - Name: "scripts", - VolumeSource: corev1.VolumeSource{ - ConfigMap: &corev1.ConfigMapVolumeSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: identifier, - }, - DefaultMode: executable(), - }, - }, - }, - { - Name: "shared", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, - }, - }, - }, + InitContainers: initContainers, + Containers: containers, + Volumes: volumes, }, } } @@ -294,6 +406,20 @@ func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, std "app.kubernetes.io/instance": identifier, "app.kubernetes.io/managed-by": runnerIdentifier, } + + jobLogger := s.logger.With(). + Str("job_id", string(job.Id)). + Str("namespace", s.podConfig.Namespace). + Logger() + ctx = jobLogger.WithContext(ctx) + + jobLogger.Debug(). + Str("image", job.Image). + Strs("commands", job.Commands). + Int("files", len(job.Files)). + Int("variables", len(job.Variables)). + Msg("job input received") + labelSelector, err := CreateLabelSelector(labels) if err != nil { return JobOutcome{ @@ -309,7 +435,7 @@ func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, std Outcome: opslevel.RunnerJobOutcomeEnumFailed, } } - defer s.DeleteConfigMap(context.Background(), cfgMap) // Use Background for cleanup to ensure it completes + defer s.DeleteConfigMap(jobLogger.WithContext(context.Background()), cfgMap) // Use Background for cleanup to ensure it completes pdb, err := s.CreatePDB(ctx, s.getPBDObject(identifier, labelSelector)) if err != nil { @@ -318,7 +444,7 @@ func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, std Outcome: opslevel.RunnerJobOutcomeEnumFailed, } } - defer s.DeletePDB(context.Background(), pdb) // Use Background for cleanup to ensure it completes + defer s.DeletePDB(jobLogger.WithContext(context.Background()), pdb) // Use Background for cleanup to ensure it completes pod, err := s.CreatePod(ctx, s.getPodObject(identifier, labels, job)) if err != nil { @@ -327,7 +453,7 @@ func (s *JobRunner) Run(ctx context.Context, job opslevel.RunnerJob, stdout, std Outcome: opslevel.RunnerJobOutcomeEnumFailed, } } - defer s.DeletePod(context.Background(), pod) // Use Background for cleanup to ensure it completes + defer s.DeletePod(jobLogger.WithContext(context.Background()), pod) // Use Background for cleanup to ensure it completes timeout := time.Second * time.Duration(viper.GetInt("job-pod-max-wait")) waitErr := s.WaitForPod(ctx, pod, timeout) @@ -378,6 +504,7 @@ func GetKubernetesConfig() (*rest.Config, error) { } func (s *JobRunner) ExecWithConfig(ctx context.Context, config JobConfig) error { + log := zerolog.Ctx(ctx) req := s.clientset.CoreV1().RESTClient().Post(). Resource("pods"). Name(config.PodName). @@ -392,18 +519,41 @@ func (s *JobRunner) ExecWithConfig(ctx context.Context, config JobConfig) error Stderr: config.Stderr != nil, TTY: false, }, scheme.ParameterCodec) - s.logger.Debug().Msgf("Execing pod %s/%s ...", config.Namespace, config.PodName) - s.logger.Trace().Msgf("ExecWithOptions: execute(POST %s)", req.URL()) + log.Debug(). + Str("kind", "Pod"). + Str("name", config.PodName). + Str("container", config.ContainerName). + Msg("execing pod") + log.Trace().Str("url", req.URL().String()).Msg("exec request") exec, err := remotecommand.NewSPDYExecutor(s.config, "POST", req.URL()) if err != nil { + log.Error().Err(err). + Str("kind", "Pod"). + Str("name", config.PodName). + Msg("pod exec failed") return err } - return exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + start := time.Now() + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ Stdin: config.Stdin, Stdout: config.Stdout, Stderr: config.Stderr, Tty: false, }) + if err != nil { + log.Error().Err(err). + Str("kind", "Pod"). + Str("name", config.PodName). + Int64("duration_ms", time.Since(start).Milliseconds()). + Msg("pod exec failed") + return err + } + log.Debug(). + Str("kind", "Pod"). + Str("name", config.PodName). + Int64("duration_ms", time.Since(start).Milliseconds()). + Msg("pod exec complete") + return nil } func (s *JobRunner) Exec(ctx context.Context, stdout, stderr *SafeBuffer, pod *corev1.Pod, containerName string, cmd ...string) error { @@ -419,18 +569,105 @@ func (s *JobRunner) Exec(ctx context.Context, stdout, stderr *SafeBuffer, pod *c } func (s *JobRunner) CreateConfigMap(ctx context.Context, config *corev1.ConfigMap) (*corev1.ConfigMap, error) { - s.logger.Trace().Msgf("Creating configmap %s/%s ...", config.Namespace, config.Name) - return s.clientset.CoreV1().ConfigMaps(config.Namespace).Create(ctx, config, metav1.CreateOptions{}) + log := zerolog.Ctx(ctx) + keys := make([]string, 0, len(config.Data)) + for k := range config.Data { + keys = append(keys, k) + } + log.Debug(). + Str("kind", "ConfigMap"). + Str("name", config.Name). + Bool("immutable", config.Immutable != nil && *config.Immutable). + Strs("data_keys", keys). + Msg("creating resource") + + out, err := s.clientset.CoreV1().ConfigMaps(config.Namespace).Create(ctx, config, metav1.CreateOptions{}) + if err != nil { + log.Error().Err(err). + Str("kind", "ConfigMap"). + Str("name", config.Name). + Msg("create resource failed") + return out, err + } + log.Debug(). + Str("kind", "ConfigMap"). + Str("name", config.Name). + Msg("created resource") + return out, nil } func (s *JobRunner) CreatePDB(ctx context.Context, config *policyv1.PodDisruptionBudget) (*policyv1.PodDisruptionBudget, error) { - s.logger.Trace().Msgf("Creating pod disruption budget %s/%s ...", config.Namespace, config.Name) - return s.clientset.PolicyV1().PodDisruptionBudgets(config.Namespace).Create(ctx, config, metav1.CreateOptions{}) + log := zerolog.Ctx(ctx) + maxUnavailable := "" + if config.Spec.MaxUnavailable != nil { + maxUnavailable = config.Spec.MaxUnavailable.String() + } + selector := "" + if config.Spec.Selector != nil { + parts := make([]string, 0, len(config.Spec.Selector.MatchLabels)) + for k, v := range config.Spec.Selector.MatchLabels { + parts = append(parts, fmt.Sprintf("%s=%s", k, v)) + } + selector = strings.Join(parts, ",") + } + log.Debug(). + Str("kind", "PodDisruptionBudget"). + Str("name", config.Name). + Str("max_unavailable", maxUnavailable). + Str("selector", selector). + Msg("creating resource") + + out, err := s.clientset.PolicyV1().PodDisruptionBudgets(config.Namespace).Create(ctx, config, metav1.CreateOptions{}) + if err != nil { + log.Error().Err(err). + Str("kind", "PodDisruptionBudget"). + Str("name", config.Name). + Msg("create resource failed") + return out, err + } + log.Debug(). + Str("kind", "PodDisruptionBudget"). + Str("name", config.Name). + Msg("created resource") + return out, nil } func (s *JobRunner) CreatePod(ctx context.Context, config *corev1.Pod) (*corev1.Pod, error) { - s.logger.Trace().Msgf("Creating pod %s/%s ...", config.Namespace, config.Name) - return s.clientset.CoreV1().Pods(config.Namespace).Create(ctx, config, metav1.CreateOptions{}) + log := zerolog.Ctx(ctx) + + // main job container is index 0 by design + c := config.Spec.Containers[0] + + log.Debug(). + Str("kind", "Pod"). + Str("name", config.Name). + Str("image", c.Image). + Int("containers", len(config.Spec.Containers)). + Int("init_containers", len(config.Spec.InitContainers)). + Strs("init_container_names", getContainerNames(config.Spec.InitContainers)). + Str("cpu_request", c.Resources.Requests.Cpu().String()). + Str("mem_request", c.Resources.Requests.Memory().String()). + Str("cpu_limit", c.Resources.Limits.Cpu().String()). + Str("mem_limit", c.Resources.Limits.Memory().String()). + Bool("privileged", c.SecurityContext != nil && c.SecurityContext.Privileged != nil && *c.SecurityContext.Privileged). + Int("env_count", len(c.Env)). + Int("volume_count", len(config.Spec.Volumes)). + Str("restart_policy", string(config.Spec.RestartPolicy)). + Msg("creating resource") + + out, err := s.clientset.CoreV1().Pods(config.Namespace).Create(ctx, config, metav1.CreateOptions{}) + if err != nil { + log.Error().Err(err). + Str("kind", "Pod"). + Str("name", config.Name). + Msg("create resource failed") + return out, err + } + log.Debug(). + Str("kind", "Pod"). + Str("name", config.Name). + Msg("created resource") + return out, nil } func (s *JobRunner) isPodInDesiredState(podConfig *corev1.Pod) wait.ConditionWithContextFunc { @@ -450,41 +687,97 @@ func (s *JobRunner) isPodInDesiredState(podConfig *corev1.Pod) wait.ConditionWit } func (s *JobRunner) WaitForPod(ctx context.Context, podConfig *corev1.Pod, timeout time.Duration) error { - s.logger.Debug().Msgf("Waiting for pod %s/%s to be ready in %s ...", podConfig.Namespace, podConfig.Name, timeout) + log := zerolog.Ctx(ctx) + log.Debug(). + Str("kind", "Pod"). + Str("name", podConfig.Name). + Int64("timeout_seconds", int64(timeout.Seconds())). + Msg("waiting for pod") waitCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - return wait.PollUntilContextTimeout(waitCtx, time.Second, timeout, false, s.isPodInDesiredState(podConfig)) + start := time.Now() + err := wait.PollUntilContextTimeout(waitCtx, time.Second, timeout, false, s.isPodInDesiredState(podConfig)) + if err != nil { + log.Error().Err(err). + Str("kind", "Pod"). + Str("name", podConfig.Name). + Int64("duration_ms", time.Since(start).Milliseconds()). + Msg("pod not ready") + return err + } + log.Debug(). + Str("kind", "Pod"). + Str("name", podConfig.Name). + Int64("duration_ms", time.Since(start).Milliseconds()). + Msg("pod ready") + return nil } func (s *JobRunner) DeleteConfigMap(ctx context.Context, config *corev1.ConfigMap) { if config == nil { return } - s.logger.Trace().Msgf("Deleting configmap %s/%s ...", config.Namespace, config.Name) + log := zerolog.Ctx(ctx) + log.Debug(). + Str("kind", "ConfigMap"). + Str("name", config.Name). + Msg("deleting resource") err := s.clientset.CoreV1().ConfigMaps(config.Namespace).Delete(ctx, config.Name, metav1.DeleteOptions{}) if err != nil { - s.logger.Error().Err(err).Msgf("received error on ConfigMap deletion") + log.Error().Err(err). + Str("kind", "ConfigMap"). + Str("name", config.Name). + Msg("delete resource failed") + return } + log.Debug(). + Str("kind", "ConfigMap"). + Str("name", config.Name). + Msg("deleted resource") } func (s *JobRunner) DeletePDB(ctx context.Context, config *policyv1.PodDisruptionBudget) { if config == nil { return } - s.logger.Trace().Msgf("Deleting pod disruption budget %s/%s ...", config.Namespace, config.Name) + log := zerolog.Ctx(ctx) + log.Debug(). + Str("kind", "PodDisruptionBudget"). + Str("name", config.Name). + Msg("deleting resource") err := s.clientset.PolicyV1().PodDisruptionBudgets(config.Namespace).Delete(ctx, config.Name, metav1.DeleteOptions{}) if err != nil { - s.logger.Error().Err(err).Msgf("received error on PDB deletion") + log.Error().Err(err). + Str("kind", "PodDisruptionBudget"). + Str("name", config.Name). + Msg("delete resource failed") + return } + log.Debug(). + Str("kind", "PodDisruptionBudget"). + Str("name", config.Name). + Msg("deleted resource") } func (s *JobRunner) DeletePod(ctx context.Context, config *corev1.Pod) { if config == nil { return } - s.logger.Trace().Msgf("Deleting pod %s/%s ...", config.Namespace, config.Name) + log := zerolog.Ctx(ctx) + log.Debug(). + Str("kind", "Pod"). + Str("name", config.Name). + Msg("deleting resource") err := s.clientset.CoreV1().Pods(config.Namespace).Delete(ctx, config.Name, metav1.DeleteOptions{}) if err != nil { - s.logger.Error().Err(err).Msgf("received error on Pod deletion") + log.Error().Err(err). + Str("kind", "Pod"). + Str("name", config.Name). + Msg("delete resource failed") + return } + log.Debug(). + Str("kind", "Pod"). + Str("name", config.Name). + Msg("deleted resource") } diff --git a/src/pkg/k8s_config.go b/src/pkg/k8s_config.go index e532c31..61940f6 100644 --- a/src/pkg/k8s_config.go +++ b/src/pkg/k8s_config.go @@ -28,6 +28,7 @@ type K8SPodConfig struct { SecurityContext corev1.PodSecurityContext `yaml:"securityContext"` NodeSelector map[string]string `yaml:"nodeSelector"` AgentMode bool `yaml:"agentMode"` + Queue string `yaml:"queue"` HelperImage string `yaml:"helperImage"` } @@ -50,6 +51,7 @@ func ReadPodConfig(path string) (*K8SPodConfig, error) { }, TerminationGracePeriodSeconds: 5, AgentMode: viper.GetBool("job-agent-mode"), + Queue: viper.GetString("queue"), HelperImage: viper.GetString("job-pod-helper-image"), }, } diff --git a/tests/enqueue-coding-agent-job.sh b/tests/enqueue-coding-agent-job.sh new file mode 100755 index 0000000..68895f2 --- /dev/null +++ b/tests/enqueue-coding-agent-job.sh @@ -0,0 +1,267 @@ +#!/bin/bash +# +# Enqueue a coding-agent job that just sleeps, then run curl-based proxy +# probes against the squid egress sidecar from outside the pod via +# `kubectl exec`. Prints PASS/FAIL markers and curl -v traces directly +# to this script's stdout (no dependency on runner-side log visibility). +# +# The job is placed on the 'coding-agent' Faktory queue, which is consumed +# exclusively by the coding-agent worker as per src/Procfile. +# +# Usage: ./tests/enqueue-coding-agent-job.sh +# +# Prerequisites: +# - kind k8s cluster up and helper image loaded +# - Faktory and opslevel-runner running from the Procfile +# faktory - Faktory server +# coding-agent - sidecar worker, --queues=coding-agent --queue=coding-agent +# +# This can be done running 'task run' in your terminal beforehand. +# +# Flow: +# 1. Apply the squid-config ConfigMap (idempotent). +# 2. Delete any pre-existing coding-agent pods (identified by the +# squid initContainer, which only coding-agent pods have). +# 3. Enqueue a `sleep 41` job on the coding-agent queue. Job variables +# include http_proxy/https_proxy so curl inside the pod uses the +# squid sidecar automatically, plus PROXY_ALLOWED_DOMAINS to seed +# the runtime allowlist. +# 4. Wait for the new pod to become Ready (kubectl wait --timeout=45s). +# 5. kubectl exec into the job container and run curl probes against +# three targets, printing PASS/FAIL markers. +# +# Probe targets (with PROXY_ALLOWED_DOMAINS=httpbin.org,www.amazon.ca): +# - httpbin.org -> ALLOW (via runtime allowlist / extra_allow ACL) +# - www.amazon.ca -> ALLOW (via runtime allowlist / extra_allow ACL) +# - example.com -> DENY (control: not in any allowlist) +# +# After probes complete, the pod remains alive for the runner's main +# container lifetime (default 3600s) for interactive follow-up. +# +# Cleanup stale coding-agent pods manually: +# kubectl get pods -n default -l app.kubernetes.io/managed-by=runner-faktory -o json \ +# | jq -r ' +# .items[] +# | select(any(.spec.initContainers[]?; .name == "squid")) +# | .metadata.name +# ' \ +# | while IFS= read -r pod; do kubectl delete pod -n default "$pod"; done +# + +set -e + +# load KUBECONFIG (.env.local) + set $cmd / KIND_EXPERIMENTAL_PROVIDER for k8s context +SCRIPT_DIR="${BASH_SOURCE[0]%/*}/../bin" +source "$SCRIPT_DIR/kind-env.sh" + +echo "Applying squid-config ConfigMap..." +kubectl apply -f - <<'EOF' +apiVersion: v1 +kind: ConfigMap +metadata: + name: squid-config + namespace: default + annotations: + kubernetes.io/description: | + Coding Agent Squid Egress Sidecar Proxy Configuration used in pods. + allowed-domains.txt is the globally shared domain allowlist mounted + into the squid-proxy sidecar. +data: + squid.conf: | + # Egress proxy for coding agent LLM sandbox + # Allows only explicitly listed domains; denies all private/loopback ranges + # to prevent sandbox from reaching internal cluster services via the proxy. + + http_port 3128 + + # ACL: private and loopback address ranges + acl to_private dst 10.0.0.0/8 + acl to_private dst 172.16.0.0/12 + acl to_private dst 192.168.0.0/16 + acl to_loopback dst 127.0.0.0/8 + acl to_loopback dst ::1 + acl to_cloud_metadata dst 169.254.0.0/16 + + # ACL: allowed destination domains (shared + customer-specific, resolved at startup) + acl allowed_domains dstdomain "/etc/squid/conf.d/allowed-domains.txt" + # ACL: runtime per-job allowlist written from PROXY_ALLOWED_DOMAINS by the sidecar entrypoint + acl extra_allow dstdomain "/srv/squid/custom-allowed-domains.conf" + + # block access to private networks. + http_access deny to_private + # block any pod-local services it shouldn't access + http_access deny to_loopback + # block cloud metadata endpoint (169.254.169.254) + http_access deny to_cloud_metadata + + # Allow CONNECT tunnels (HTTPS) to allowed domains only + http_access allow CONNECT allowed_domains + http_access allow CONNECT extra_allow + + # Allow plain HTTP to allowed domains only + http_access allow allowed_domains + http_access allow extra_allow + + # Deny everything else + http_access deny all + + # Logs + access_log stdio:/dev/stdout + cache_log stdio:/dev/stderr + cache_store_log none + + # Disable cache, pure forward proxy usage + cache deny all + allowed-domains.txt: | + # Claude API + api.anthropic.com + + # Git providers + github.com + api.github.com + gitlab.com + bitbucket.org + + # Package registries - Node + registry.npmjs.org + npmjs.com + yarnpkg.com + registry.yarnpkg.com + + # Package registries - Python + pypi.org + files.pythonhosted.org + pythonhosted.org + + # Package registries - Go + proxy.golang.org + sum.golang.org + + # Package registries - Ruby + rubygems.org + + # Package registries - Rust + crates.io + static.crates.io + index.crates.io + + # OS packages + archive.ubuntu.com + security.ubuntu.com +EOF + +# Delete any pre-existing coding-agent pods so the post-enqueue pod lookup +# is unambiguous. Coding-agent pods are identified by the squid +# initContainer, which the runner only adds when queue == "coding-agent". +# jq is used because kubectl's jsonpath filter with wildcard is broken. +echo "Deleting dangling coding-agent pods..." +kubectl get pods -n default -l app.kubernetes.io/managed-by=runner-faktory -o json 2>/dev/null \ + | jq -r ' + .items[] + | select(any(.spec.initContainers[]?; .name == "squid")) + | .metadata.name + ' \ + | while IFS= read -r pod; do + kubectl delete pod -n default "$pod" + done + +src="${BASH_SOURCE[0]%/*}/../src" +JOB_ID="coding-agent-proxy-test-$(date +%s)" + +echo "Enqueuing coding-agent proxy test job (ID: ${JOB_ID}) ..." + +JOB_FILE=$(mktemp) +cat > "$JOB_FILE" </dev/null \ + | jq -r ' + .items[] + | select(any(.spec.initContainers[]?; .name == "squid")) + | .metadata.name + ' \ + | tail -n1) + if [ -n "$POD" ]; then + break + fi + sleep 1 +done +if [ -z "$POD" ]; then + echo "FATAL: coding-agent job pod not running!" >&2 + exit 1 +fi +echo "Pod created: $POD" + +echo "Waiting for the coding-agent job pod to become Ready..." +kubectl wait --for=condition=Ready -n default "pod/$POD" --timeout=45s + +echo "" +echo "Testing egress proxy inside pod $POD ..." +echo "" +kubectl exec -n default -c job "$POD" -- bash -c ' +probe() { + local url="$1" expected="$2" label actual + label=${url#*://} + label=${label%%/*} + echo "===================================================================" + echo "PROBE: $label url=$url expected=$expected" + echo "===================================================================" + # -sS: hide progress meter but show errors; --max-time 15; + # -o /dev/null: drop body; -w prints status line. + # curl honors http_proxy/https_proxy env vars (set via job variables). + if curl -sS --max-time 15 -o /dev/null -w "HTTP %{http_code}\n" "$url"; then + actual=allow + else + actual=deny + fi + if [ "$actual" = "$expected" ]; then + echo "RESULT: PASS $label -> $actual (expected $expected)" + else + echo "RESULT: FAIL $label -> $actual (expected $expected)" + fi + echo "" +} +probe https://httpbin.org/get allow +probe https://www.amazon.ca/ allow +probe https://github.com/ allow +probe https://bitbucket.org/ allow +probe https://xkcd.com/2347/ deny +' + +echo "" +echo "Probes complete." +echo "" +echo "Pod remains alive for interactive follow-up:" +echo " kubectl exec -it -n default -c job $POD -- bash" +echo "" +echo "Squid access log (proxy-level ALLOW/DENY audit):" +echo " kubectl logs -n default -c squid $POD" +echo "" +echo "Faktory: http://localhost:7420"