diff --git a/.github/scripts/e2e-prepare-env.sh b/.github/scripts/e2e-prepare-env.sh new file mode 100755 index 0000000..0e49dd3 --- /dev/null +++ b/.github/scripts/e2e-prepare-env.sh @@ -0,0 +1,81 @@ +#!/usr/bin/env bash +# Writes non-secret env to $RUNNER_TEMP/e2e-env.sh and materializes secrets into temp files. +# Never echoes secret values. + +set -euo pipefail + +ENV_FILE="${RUNNER_TEMP}/e2e-env.sh" +: >"$ENV_FILE" + +write_env() { + printf 'export %s=%q\n' "$1" "$2" >>"$ENV_FILE" +} + +if [[ -n "${E2E_SSH_PRIVATE_KEY:-}" ]]; then + KEY_FILE="${RUNNER_TEMP}/e2e_ssh_key" + printf '%s\n' "$E2E_SSH_PRIVATE_KEY" >"$KEY_FILE" + chmod 600 "$KEY_FILE" + write_env SSH_PRIVATE_KEY "$KEY_FILE" +fi + +if [[ -n "${E2E_SSH_PUBLIC_KEY:-}" ]]; then + PUB_FILE="${RUNNER_TEMP}/e2e_ssh_pub" + printf '%s\n' "$E2E_SSH_PUBLIC_KEY" >"$PUB_FILE" + chmod 644 "$PUB_FILE" + write_env SSH_PUBLIC_KEY "$PUB_FILE" +fi + +if [[ -n "${E2E_CLUSTER_KUBECONFIG:-}" ]]; then + KC_FILE="${RUNNER_TEMP}/e2e_kubeconfig" + printf '%s' "$E2E_CLUSTER_KUBECONFIG" | base64 -d >"$KC_FILE" + chmod 600 "$KC_FILE" + write_env KUBE_CONFIG_PATH "$KC_FILE" + write_env E2E_BASE_KUBE_CONFIG_PATH "$KC_FILE" +fi + +if [[ -n "${SSH_HOST:-}" ]]; then + write_env SSH_HOST "$SSH_HOST" + write_env E2E_BASE_SSH_HOST "$SSH_HOST" +fi +if [[ -n "${SSH_USER:-}" ]]; then + write_env SSH_USER "$SSH_USER" + write_env E2E_BASE_SSH_USER "$SSH_USER" +fi +if [[ -n "${SSH_VM_USER:-}" ]]; then + write_env SSH_VM_USER "$SSH_VM_USER" +fi +JUMP_HOST="${SSH_JUMP_HOST:-${E2E_TUNNEL_SSH_JUMP_HOST:-}}" +JUMP_USER="${SSH_JUMP_USER:-${E2E_TUNNEL_SSH_JUMP_USER:-}}" +if [[ -n "${JUMP_HOST}" ]]; then + write_env SSH_JUMP_HOST "$JUMP_HOST" + write_env E2E_TUNNEL_SSH_JUMP_HOST "$JUMP_HOST" +fi +if [[ -n "${JUMP_USER}" ]]; then + write_env SSH_JUMP_USER "$JUMP_USER" + write_env E2E_TUNNEL_SSH_JUMP_USER "$JUMP_USER" +fi +if [[ -n "${LOG_LEVEL:-}" ]]; then + write_env LOG_LEVEL "$LOG_LEVEL" +fi +if [[ -n "${E2E_GINKGO_LABEL_FILTER:-}" ]]; then + write_env E2E_GINKGO_LABEL_FILTER "$E2E_GINKGO_LABEL_FILTER" +fi +if [[ -n "${E2E_TEST_TIMEOUT:-}" ]]; then + write_env E2E_TEST_TIMEOUT "$E2E_TEST_TIMEOUT" +fi +if [[ -n "${TEST_CLUSTER_STORAGE_CLASS:-}" ]]; then + write_env TEST_CLUSTER_STORAGE_CLASS "$TEST_CLUSTER_STORAGE_CLASS" +fi +if [[ -n "${TEST_CLUSTER_NAMESPACE:-}" ]]; then + write_env TEST_CLUSTER_NAMESPACE "$TEST_CLUSTER_NAMESPACE" +fi +if [[ -n "${TEST_CLUSTER_CREATE_MODE:-}" ]]; then + write_env TEST_CLUSTER_CREATE_MODE "$TEST_CLUSTER_CREATE_MODE" +fi + +write_env GOMODCACHE "${GOMODCACHE:-${RUNNER_TEMP}/e2e-gomodcache}" +write_env GOCACHE "${GOCACHE:-${RUNNER_TEMP}/e2e-gocache}" +write_env E2E_ARTIFACT_DIR "${E2E_ARTIFACT_DIR:-${RUNNER_TEMP}/e2e-artifacts}" +write_env E2E_TEMP_DIR "${E2E_TEMP_DIR:-${E2E_ARTIFACT_DIR:-${RUNNER_TEMP}/e2e-artifacts}}" + +echo "e2e-env.sh prepared (secrets written to temp files only)" diff --git a/.github/scripts/e2e-prepare-workspace.sh b/.github/scripts/e2e-prepare-workspace.sh new file mode 100755 index 0000000..f0510b3 --- /dev/null +++ b/.github/scripts/e2e-prepare-workspace.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash +# Self-hosted runners: E2E may leave read-only Go cache trees in the workspace; +# actions/checkout@v4 then fails with EACCES when wiping the directory. + +set -euo pipefail + +WS="${GITHUB_WORKSPACE:?GITHUB_WORKSPACE is not set}" + +prune_dir() { + local p="$1" + [ -e "$p" ] || return 0 + chmod -R u+w "$p" 2>/dev/null || true + if rm -rf "$p" 2>/dev/null; then + return 0 + fi + if command -v sudo >/dev/null 2>&1; then + sudo chmod -R u+w "$p" 2>/dev/null || true + sudo rm -rf "$p" 2>/dev/null || true + fi +} + +for d in \ + .e2e-gomodcache .e2e-gocache .e2e-artifacts \ + e2e/.gomodcache e2e/.gocache e2e/temp; do + prune_dir "${WS}/${d}" +done diff --git a/.github/workflows/e2e-reusable.yml b/.github/workflows/e2e-reusable.yml new file mode 100644 index 0000000..236a35b --- /dev/null +++ b/.github/workflows/e2e-reusable.yml @@ -0,0 +1,590 @@ +# Reusable E2E pipeline with PR-scoped cluster persistence (actions/cache + TEST_CLUSTER_RESUME). +# +# pipeline_mode: +# create-and-test — create-cluster (reuse/resume if possible) + run-tests; no teardown +# teardown-only — teardown-cluster only (trigger when PR label is removed) + +name: Storage E2E (reusable) + +permissions: + contents: read + checks: write + pull-requests: read + +on: + workflow_call: + inputs: + pipeline_mode: + description: "create-and-test | teardown-only" + type: string + required: true + pr_number: + description: "Pull request number (stable session/namespace key)" + type: string + required: true + module_slug: + description: "Module slug for TEST_CLUSTER_NAMESPACE (e.g. sds-node-configurator)" + type: string + required: true + module_path: + description: "Path to the module e2e Go module root" + type: string + required: true + cluster_provider: + description: "Cluster provider: alwaysCreateNew | alwaysUseExisting | commander" + type: string + required: true + cluster_config: + description: "Path to cluster_config.yml relative to repository root" + type: string + required: false + default: "" + test_package: + description: "Go package for run-tests" + type: string + required: false + default: "./tests/" + label_filter: + description: "Ginkgo label filter; empty runs all tests" + type: string + required: false + default: "" + test_timeout: + description: "go test / ginkgo timeout (E2E_TEST_TIMEOUT)" + type: string + required: false + default: "3h30m" + storage_e2e_ref: + description: "Git ref of storage-e2e for checkout" + type: string + required: false + default: "main" + runner_labels: + description: "JSON array of runner labels" + type: string + required: false + default: '["self-hosted","regular"]' + skip_create_cluster: + description: "Deprecated — ignored; create-cluster always runs and uses --skip-if-ready" + type: boolean + required: false + default: false + outputs: + run_tests_result: + description: "Result of run-tests job" + value: ${{ jobs.run-tests.result }} + secrets: + E2E_SSH_PRIVATE_KEY: + required: false + E2E_SSH_PUBLIC_KEY: + required: false + E2E_SSH_HOST: + required: false + E2E_SSH_USER: + required: false + E2E_SSH_JUMP_HOST: + required: false + E2E_SSH_JUMP_USER: + required: false + E2E_CLUSTER_KUBECONFIG: + required: false + E2E_TEST_CLUSTER_CREATE_MODE: + required: false + E2E_TEST_CLUSTER_STORAGE_CLASS: + required: false + E2E_TEST_CLUSTER_CLEANUP: + required: false + E2E_DECKHOUSE_LICENSE: + required: false + E2E_REGISTRY_DOCKER_CFG: + required: false + SSH_VM_USER: + required: false + GOPROXY: + required: false + +defaults: + run: + shell: bash + +env: + E2E_PR_NUMBER: ${{ inputs.pr_number }} + # Stable PR-scoped session cache (one entry per PR; concurrency group prevents parallel E2E on same PR). + E2E_CACHE_KEY: e2e-pr-${{ inputs.pr_number }}-session + # Stable PR namespace (no run_id) — required for cluster resume across re-runs. + TEST_CLUSTER_NAMESPACE: e2e-${{ inputs.module_slug }}-pr${{ inputs.pr_number }} + TEST_CLUSTER_CREATE_MODE: ${{ secrets.E2E_TEST_CLUSTER_CREATE_MODE || inputs.cluster_provider }} + TEST_CLUSTER_STORAGE_CLASS: ${{ secrets.E2E_TEST_CLUSTER_STORAGE_CLASS }} + DKP_LICENSE_KEY: ${{ secrets.E2E_DECKHOUSE_LICENSE }} + REGISTRY_DOCKER_CFG: ${{ secrets.E2E_REGISTRY_DOCKER_CFG }} + SSH_HOST: ${{ secrets.E2E_SSH_HOST }} + SSH_USER: ${{ secrets.E2E_SSH_USER }} + SSH_VM_USER: ${{ secrets.SSH_VM_USER }} + # Jump host for test cluster nodes (10.10.10.x). Without it the runner connects directly → timeout. + SSH_JUMP_HOST: ${{ secrets.E2E_SSH_JUMP_HOST }} + SSH_JUMP_USER: ${{ secrets.E2E_SSH_JUMP_USER }} + # API SSH tunnel: ProxyJump only when jump secret is set (do not fall back to SSH_HOST). + E2E_TUNNEL_SSH_JUMP_HOST: ${{ secrets.E2E_SSH_JUMP_HOST }} + E2E_TUNNEL_SSH_JUMP_USER: ${{ secrets.E2E_SSH_JUMP_USER }} + LOG_LEVEL: ${{ vars.E2E_LOG_LEVEL || 'info' }} + E2E_GINKGO_LABEL_FILTER: ${{ inputs.label_filter }} + E2E_LABEL_FILTER: ${{ inputs.label_filter }} + E2E_TEST_TIMEOUT: ${{ inputs.test_timeout }} + # Do not set KUBE_CONFIG_PATH here — e2e-prepare-env.sh writes it from E2E_CLUSTER_KUBECONFIG. + +jobs: + create-cluster: + if: inputs.pipeline_mode == 'create-and-test' + name: create-cluster + runs-on: ${{ fromJSON(inputs.runner_labels) }} + env: + TEST_CLUSTER_CLEANUP: "false" + E2E_SKIP_IF_READY: "true" + steps: + - name: Set E2E temp paths + run: | + echo "E2E_ARTIFACT_DIR=${RUNNER_TEMP}/e2e-artifacts" >> "$GITHUB_ENV" + echo "E2E_TEMP_DIR=${RUNNER_TEMP}/e2e-artifacts" >> "$GITHUB_ENV" + echo "GOMODCACHE=${RUNNER_TEMP}/e2e-gomodcache" >> "$GITHUB_ENV" + echo "GOCACHE=${RUNNER_TEMP}/e2e-gocache" >> "$GITHUB_ENV" + mkdir -p "${RUNNER_TEMP}/e2e-artifacts" "${RUNNER_TEMP}/e2e-gomodcache" "${RUNNER_TEMP}/e2e-gocache" + + - name: Restore PR session cache + id: session_cache + uses: actions/cache/restore@v4 + with: + path: ${{ env.E2E_ARTIFACT_DIR }} + key: ${{ env.E2E_CACHE_KEY }} + restore-keys: | + ${{ env.E2E_CACHE_KEY }} + + - name: Verify session cache + run: | + echo "cache_hit=${{ steps.session_cache.outputs.cache-hit }}" + ls -la "${E2E_ARTIFACT_DIR}" 2>/dev/null || true + if [ -f "${E2E_ARTIFACT_DIR}/session.json" ]; then + echo "session.json present" + else + echo "session.json missing (will provision or resume from cluster-state)" + fi + + # Inline: script is not in workspace yet; previous runs may leave read-only .e2e-gomodcache (EACCES on checkout). + - name: Prepare workspace for checkout (self-hosted) + run: | + set -euo pipefail + WS="${GITHUB_WORKSPACE}" + prune_dir() { + local p="$1" + [ -e "$p" ] || return 0 + chmod -R u+w "$p" 2>/dev/null || true + rm -rf "$p" 2>/dev/null || { command -v sudo >/dev/null && sudo chmod -R u+w "$p" && sudo rm -rf "$p"; } || true + } + for d in .e2e-gomodcache .e2e-gocache .e2e-artifacts e2e/.gomodcache e2e/.gocache e2e/temp; do + prune_dir "${WS}/${d}" + done + + - name: Checkout module repository + uses: actions/checkout@v4 + with: + clean: false + + - name: Checkout storage-e2e + uses: actions/checkout@v4 + with: + repository: deckhouse/storage-e2e + ref: ${{ inputs.storage_e2e_ref }} + path: storage-e2e + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version-file: storage-e2e/go.mod + cache-dependency-path: | + storage-e2e/go.sum + ${{ inputs.module_path }}/go.sum + + - name: Prepare SSH and kubeconfig + run: storage-e2e/.github/scripts/e2e-prepare-env.sh + env: + E2E_SSH_PRIVATE_KEY: ${{ secrets.E2E_SSH_PRIVATE_KEY }} + E2E_SSH_PUBLIC_KEY: ${{ secrets.E2E_SSH_PUBLIC_KEY }} + E2E_CLUSTER_KUBECONFIG: ${{ secrets.E2E_CLUSTER_KUBECONFIG }} + + - name: Build storage-e2e CLI + run: go build -o "${{ runner.temp }}/storage-e2e" ./cmd/e2e + working-directory: storage-e2e + + - name: create-cluster (reuse / resume / provision) + working-directory: ${{ inputs.module_path }} + run: | + set -euo pipefail + source "${{ runner.temp }}/e2e-env.sh" + CONFIG_ARG="" + if [ -n "${{ inputs.cluster_config }}" ]; then + CONFIG_ARG="--config ${{ github.workspace }}/${{ inputs.cluster_config }}" + fi + "${{ runner.temp }}/storage-e2e" create-cluster \ + --provider "${{ inputs.cluster_provider }}" \ + --pr-number "${{ inputs.pr_number }}" \ + --skip-if-ready \ + ${CONFIG_ARG} \ + --artifact-dir "${E2E_ARTIFACT_DIR}" + + - name: Save PR session cache + if: always() + uses: actions/cache/save@v4 + with: + path: ${{ env.E2E_ARTIFACT_DIR }} + key: ${{ env.E2E_CACHE_KEY }} + + - name: Upload session artifact (same workflow run) + if: always() + uses: actions/upload-artifact@v4 + with: + name: e2e-session-pr-${{ inputs.pr_number }} + path: ${{ env.E2E_ARTIFACT_DIR }} + retention-days: 14 + if-no-files-found: ignore + + run-tests: + if: inputs.pipeline_mode == 'create-and-test' && needs.create-cluster.result == 'success' + name: run-tests + needs: create-cluster + timeout-minutes: 90 + runs-on: ${{ fromJSON(inputs.runner_labels) }} + env: + TEST_CLUSTER_CLEANUP: "false" + CI: "true" + E2E_CLUSTER_PHASE: run + permissions: + checks: write + contents: read + steps: + - name: Set E2E temp paths + run: | + echo "E2E_ARTIFACT_DIR=${RUNNER_TEMP}/e2e-artifacts" >> "$GITHUB_ENV" + echo "E2E_TEMP_DIR=${RUNNER_TEMP}/e2e-artifacts" >> "$GITHUB_ENV" + echo "GOMODCACHE=${RUNNER_TEMP}/e2e-gomodcache" >> "$GITHUB_ENV" + echo "GOCACHE=${RUNNER_TEMP}/e2e-gocache" >> "$GITHUB_ENV" + mkdir -p "${RUNNER_TEMP}/e2e-artifacts" "${RUNNER_TEMP}/e2e-gomodcache" "${RUNNER_TEMP}/e2e-gocache" + + - name: Restore PR session cache + id: session_cache + uses: actions/cache/restore@v4 + with: + path: ${{ env.E2E_ARTIFACT_DIR }} + key: ${{ env.E2E_CACHE_KEY }} + restore-keys: | + ${{ env.E2E_CACHE_KEY }} + + - name: Prepare workspace for checkout (self-hosted) + run: | + set -euo pipefail + WS="${GITHUB_WORKSPACE}" + prune_dir() { + local p="$1" + [ -e "$p" ] || return 0 + chmod -R u+w "$p" 2>/dev/null || true + rm -rf "$p" 2>/dev/null || { command -v sudo >/dev/null && sudo chmod -R u+w "$p" && sudo rm -rf "$p"; } || true + } + for d in .e2e-gomodcache .e2e-gocache .e2e-artifacts e2e/.gomodcache e2e/.gocache e2e/temp; do + prune_dir "${WS}/${d}" + done + + - name: Checkout module repository + uses: actions/checkout@v4 + with: + clean: false + + - name: Checkout storage-e2e + uses: actions/checkout@v4 + with: + repository: deckhouse/storage-e2e + ref: ${{ inputs.storage_e2e_ref }} + path: storage-e2e + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version-file: ${{ inputs.module_path }}/go.mod + cache-dependency-path: ${{ inputs.module_path }}/go.sum + + - name: Download session artifact from create-cluster + uses: actions/download-artifact@v4 + with: + name: e2e-session-pr-${{ inputs.pr_number }} + path: ${{ env.E2E_ARTIFACT_DIR }} + + - name: Verify session from create-cluster + run: | + echo "cache_hit=${{ steps.session_cache.outputs.cache-hit }}" + ls -la "${E2E_ARTIFACT_DIR}" 2>/dev/null || true + if [ ! -f "${E2E_ARTIFACT_DIR}/session.json" ]; then + echo "::error::create-cluster did not produce session.json for PR ${{ inputs.pr_number }}" + exit 1 + fi + cat "${E2E_ARTIFACT_DIR}/session.json" + + - name: Prepare SSH + run: storage-e2e/.github/scripts/e2e-prepare-env.sh + env: + E2E_SSH_PRIVATE_KEY: ${{ secrets.E2E_SSH_PRIVATE_KEY }} + E2E_SSH_PUBLIC_KEY: ${{ secrets.E2E_SSH_PUBLIC_KEY }} + E2E_CLUSTER_KUBECONFIG: ${{ secrets.E2E_CLUSTER_KUBECONFIG }} + + - name: Build storage-e2e CLI + run: go build -o "${{ runner.temp }}/storage-e2e" ./cmd/e2e + working-directory: storage-e2e + + - name: Pin storage-e2e module + working-directory: ${{ inputs.module_path }} + run: go mod edit -replace=github.com/deckhouse/storage-e2e=${{ github.workspace }}/storage-e2e + + - name: run-tests + working-directory: ${{ inputs.module_path }} + run: | + set -euo pipefail + source "${{ runner.temp }}/e2e-env.sh" + mkdir -p "${GOMODCACHE}" "${GOCACHE}" + go mod download + go mod tidy + LABEL_FILTER="${E2E_GINKGO_LABEL_FILTER:-${E2E_LABEL_FILTER:-}}" + LABEL_ARGS=() + if [ -n "${LABEL_FILTER}" ]; then + LABEL_ARGS=(--label-filter "${LABEL_FILTER}") + fi + "${{ runner.temp }}/storage-e2e" run-tests \ + --package "${{ inputs.test_package }}" \ + --timeout "${E2E_TEST_TIMEOUT:-3h30m}" \ + --artifact-dir "${E2E_ARTIFACT_DIR}" \ + "${LABEL_ARGS[@]}" + + - name: Release cluster lock after tests + if: always() + working-directory: ${{ inputs.module_path }} + run: | + set -euo pipefail + source "${{ runner.temp }}/e2e-env.sh" + if [ ! -f "${E2E_ARTIFACT_DIR}/session.json" ]; then + echo "No session.json — skip lock release" + exit 0 + fi + "${{ runner.temp }}/storage-e2e" release-cluster-lock \ + --artifact-dir "${E2E_ARTIFACT_DIR}" + + - name: Stage JUnit in workspace + if: always() + run: | + mkdir -p junit-report + if [ -f "${E2E_ARTIFACT_DIR}/junit.xml" ]; then + cp "${E2E_ARTIFACT_DIR}/junit.xml" junit-report/junit.xml + fi + + - name: Upload JUnit artifact + uses: actions/upload-artifact@v4 + if: always() + with: + name: e2e-junit-pr-${{ inputs.pr_number }}-${{ github.run_id }} + path: junit-report/junit.xml + retention-days: 14 + if-no-files-found: ignore + + - name: Publish JUnit to PR checks + uses: dorny/test-reporter@v1.9.1 + if: always() + with: + name: E2E (${{ inputs.test_package }}) + path: junit-report/junit.xml + reporter: java-junit + fail-on-error: "false" + + teardown-cluster: + if: inputs.pipeline_mode == 'teardown-only' + name: teardown-cluster + runs-on: ${{ fromJSON(inputs.runner_labels) }} + env: + TEST_CLUSTER_CLEANUP: ${{ secrets.E2E_TEST_CLUSTER_CLEANUP || 'true' }} + steps: + - name: Set E2E temp paths + run: | + echo "E2E_ARTIFACT_DIR=${RUNNER_TEMP}/e2e-artifacts" >> "$GITHUB_ENV" + echo "E2E_TEMP_DIR=${RUNNER_TEMP}/e2e-artifacts" >> "$GITHUB_ENV" + echo "GOMODCACHE=${RUNNER_TEMP}/e2e-gomodcache" >> "$GITHUB_ENV" + echo "GOCACHE=${RUNNER_TEMP}/e2e-gocache" >> "$GITHUB_ENV" + mkdir -p "${RUNNER_TEMP}/e2e-artifacts" "${RUNNER_TEMP}/e2e-gomodcache" "${RUNNER_TEMP}/e2e-gocache" + + - name: Restore PR session cache + uses: actions/cache/restore@v4 + with: + path: ${{ env.E2E_ARTIFACT_DIR }} + key: ${{ env.E2E_CACHE_KEY }} + restore-keys: | + ${{ env.E2E_CACHE_KEY }} + + - name: Prepare workspace for checkout (self-hosted) + run: | + set -euo pipefail + WS="${GITHUB_WORKSPACE}" + prune_dir() { + local p="$1" + [ -e "$p" ] || return 0 + chmod -R u+w "$p" 2>/dev/null || true + rm -rf "$p" 2>/dev/null || { command -v sudo >/dev/null && sudo chmod -R u+w "$p" && sudo rm -rf "$p"; } || true + } + for d in .e2e-gomodcache .e2e-gocache .e2e-artifacts e2e/.gomodcache e2e/.gocache e2e/temp; do + prune_dir "${WS}/${d}" + done + + - name: Checkout storage-e2e + uses: actions/checkout@v4 + with: + repository: deckhouse/storage-e2e + ref: ${{ inputs.storage_e2e_ref }} + path: storage-e2e + clean: false + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version-file: storage-e2e/go.mod + + - name: Prepare SSH + run: storage-e2e/.github/scripts/e2e-prepare-env.sh + env: + E2E_SSH_PRIVATE_KEY: ${{ secrets.E2E_SSH_PRIVATE_KEY }} + E2E_SSH_PUBLIC_KEY: ${{ secrets.E2E_SSH_PUBLIC_KEY }} + + - name: Build storage-e2e CLI + run: go build -o "${{ runner.temp }}/storage-e2e" ./cmd/e2e + working-directory: storage-e2e + + - name: teardown-cluster + run: | + set -euo pipefail + source "${{ runner.temp }}/e2e-env.sh" + "${{ runner.temp }}/storage-e2e" teardown-cluster --artifact-dir "${E2E_ARTIFACT_DIR}" + + - name: Save tombstone session cache + if: always() + uses: actions/cache/save@v4 + with: + path: ${{ env.E2E_ARTIFACT_DIR }} + key: ${{ env.E2E_CACHE_KEY }} + + - name: Cleanup temp credentials + if: always() + run: rm -f "${{ runner.temp }}/e2e-env.sh" "${{ runner.temp }}/e2e_ssh_key" "${{ runner.temp }}/e2e_kubeconfig" 2>/dev/null || true + + e2e-complete: + if: always() && inputs.pipeline_mode == 'create-and-test' && needs.create-cluster.result == 'success' + name: e2e-complete + needs: [create-cluster, run-tests] + runs-on: ${{ fromJSON(inputs.runner_labels) }} + env: + TEST_CLUSTER_CLEANUP: "false" + E2E_CLUSTER_PHASE: cleanup + steps: + - name: Set E2E temp paths + run: | + echo "E2E_ARTIFACT_DIR=${RUNNER_TEMP}/e2e-artifacts" >> "$GITHUB_ENV" + echo "E2E_TEMP_DIR=${RUNNER_TEMP}/e2e-artifacts" >> "$GITHUB_ENV" + echo "GOMODCACHE=${RUNNER_TEMP}/e2e-gomodcache" >> "$GITHUB_ENV" + echo "GOCACHE=${RUNNER_TEMP}/e2e-gocache" >> "$GITHUB_ENV" + mkdir -p "${RUNNER_TEMP}/e2e-artifacts" "${RUNNER_TEMP}/e2e-gomodcache" "${RUNNER_TEMP}/e2e-gocache" + + - name: Restore PR session cache + id: session_cache + uses: actions/cache/restore@v4 + with: + path: ${{ env.E2E_ARTIFACT_DIR }} + key: ${{ env.E2E_CACHE_KEY }} + restore-keys: | + ${{ env.E2E_CACHE_KEY }} + + - name: Prepare workspace for checkout (self-hosted) + run: | + set -euo pipefail + WS="${GITHUB_WORKSPACE}" + prune_dir() { + local p="$1" + [ -e "$p" ] || return 0 + chmod -R u+w "$p" 2>/dev/null || true + rm -rf "$p" 2>/dev/null || { command -v sudo >/dev/null && sudo chmod -R u+w "$p" && sudo rm -rf "$p"; } || true + } + for d in .e2e-gomodcache .e2e-gocache .e2e-artifacts e2e/.gomodcache e2e/.gocache e2e/temp; do + prune_dir "${WS}/${d}" + done + + - name: Checkout module repository + uses: actions/checkout@v4 + with: + clean: false + + - name: Checkout storage-e2e + uses: actions/checkout@v4 + with: + repository: deckhouse/storage-e2e + ref: ${{ inputs.storage_e2e_ref }} + path: storage-e2e + + - name: Setup Go + uses: actions/setup-go@v5 + with: + go-version-file: ${{ inputs.module_path }}/go.mod + cache-dependency-path: ${{ inputs.module_path }}/go.sum + + - name: Download session artifact from create-cluster + uses: actions/download-artifact@v4 + with: + name: e2e-session-pr-${{ inputs.pr_number }} + path: ${{ env.E2E_ARTIFACT_DIR }} + + - name: Verify session for cleanup + run: | + ls -la "${E2E_ARTIFACT_DIR}" 2>/dev/null || true + if [ ! -f "${E2E_ARTIFACT_DIR}/session.json" ]; then + echo "::error::session.json missing for PR ${{ inputs.pr_number }} (cannot cleanup workloads)" + exit 1 + fi + + - name: Prepare SSH + run: storage-e2e/.github/scripts/e2e-prepare-env.sh + env: + E2E_SSH_PRIVATE_KEY: ${{ secrets.E2E_SSH_PRIVATE_KEY }} + E2E_SSH_PUBLIC_KEY: ${{ secrets.E2E_SSH_PUBLIC_KEY }} + E2E_CLUSTER_KUBECONFIG: ${{ secrets.E2E_CLUSTER_KUBECONFIG }} + + - name: Build storage-e2e CLI + run: go build -o "${{ runner.temp }}/storage-e2e" ./cmd/e2e + working-directory: storage-e2e + + - name: Pin storage-e2e module + working-directory: ${{ inputs.module_path }} + run: go mod edit -replace=github.com/deckhouse/storage-e2e=${{ github.workspace }}/storage-e2e + + - name: cleanup-workloads (always after run-tests) + if: always() + working-directory: ${{ inputs.module_path }} + run: | + set -euo pipefail + source "${{ runner.temp }}/e2e-env.sh" + mkdir -p "${GOMODCACHE}" "${GOCACHE}" + go mod download + go mod tidy + "${{ runner.temp }}/storage-e2e" cleanup-workloads \ + --package "${{ inputs.test_package }}" \ + --timeout "${E2E_CLEANUP_TIMEOUT:-45m}" \ + --artifact-dir "${E2E_ARTIFACT_DIR}" + + - name: Cleanup temp credentials + if: always() + run: rm -f "${{ runner.temp }}/e2e-env.sh" "${{ runner.temp }}/e2e_ssh_key" "${{ runner.temp }}/e2e_kubeconfig" 2>/dev/null || true + + - name: Report run-tests result + if: always() + run: | + echo "create-cluster=${{ needs.create-cluster.result }} run-tests=${{ needs.run-tests.result }}" + if [ "${{ needs.run-tests.result }}" != "success" ]; then + echo "::error::run-tests must succeed for create-and-test (result=${{ needs.run-tests.result }})" + exit 1 + fi diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..b691b1e --- /dev/null +++ b/Makefile @@ -0,0 +1,36 @@ +# storage-e2e — local and CI entrypoints share cmd/e2e + +GO ?= go +E2E_BIN ?= bin/e2e +E2E_PROVIDER ?= alwaysCreateNew +E2E_CONFIG ?= +E2E_PACKAGE ?= ./tests/... +E2E_LABEL_FILTER ?= +E2E_ARTIFACT_DIR ?= /tmp/e2e +E2E_TIMEOUT ?= 60m + +.PHONY: build-e2e e2e e2e-create e2e-run e2e-teardown help + +help: + @echo "Targets:" + @echo " build-e2e Build bin/e2e CLI" + @echo " e2e create-cluster + run-tests + teardown-cluster (full cycle)" + @echo " e2e-create create-cluster only" + @echo " e2e-run run-tests only (expects prior create; uses E2E_ARTIFACT_DIR)" + @echo " e2e-teardown teardown-cluster only" + +build-e2e: + $(GO) build -o $(E2E_BIN) ./cmd/e2e + +e2e: build-e2e e2e-create e2e-run e2e-teardown + +e2e-create: build-e2e + @test -n "$(E2E_CONFIG)" || (echo "E2E_CONFIG must point to cluster_config.yml for alwaysCreateNew"; exit 1) + $(E2E_BIN) create-cluster --provider $(E2E_PROVIDER) --config $(E2E_CONFIG) --artifact-dir $(E2E_ARTIFACT_DIR) + +e2e-run: build-e2e + $(E2E_BIN) run-tests --package $(E2E_PACKAGE) --artifact-dir $(E2E_ARTIFACT_DIR) \ + $(if $(E2E_LABEL_FILTER),--label-filter "$(E2E_LABEL_FILTER)",) + +e2e-teardown: build-e2e + $(E2E_BIN) teardown-cluster --artifact-dir $(E2E_ARTIFACT_DIR) diff --git a/README.md b/README.md index e776e98..80a24d7 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,10 @@ End-to-end tests for Deckhouse storage components. +## CI (reusable workflow) + +See [docs/CI.md](docs/CI.md) for the three-stage pipeline (`create-cluster` → `run-tests` → `teardown-cluster`), `cmd/e2e`, and module integration. + ## Quick Start 1. Create test with script: `cd tests && ./create-test.sh ` diff --git a/cmd/e2e/main.go b/cmd/e2e/main.go new file mode 100644 index 0000000..be5a79b --- /dev/null +++ b/cmd/e2e/main.go @@ -0,0 +1,476 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Command e2e is the shared entrypoint for local runs (make e2e) and CI (create-cluster / run-tests / teardown-cluster). +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "os" + "os/exec" + "os/signal" + "path/filepath" + "strings" + "syscall" + "time" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/internal/logger" + "github.com/deckhouse/storage-e2e/pkg/cluster" + "github.com/deckhouse/storage-e2e/pkg/provider" + storage_e2e "github.com/deckhouse/storage-e2e/pkg/storage-e2e" +) + +func main() { + if len(os.Args) < 2 { + printUsage() + os.Exit(2) + } + if err := run(os.Args[1], os.Args[2:]); err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } +} + +func printUsage() { + fmt.Fprintf(os.Stderr, `storage-e2e CLI — cluster lifecycle and test runner + +Usage: + e2e create-cluster --provider --config [--artifact-dir ] + e2e run-tests --package [--label-filter ] [--timeout ] [--artifact-dir ] + e2e cleanup-workloads --package [--timeout ] [--artifact-dir ] + e2e release-cluster-lock [--artifact-dir ] + e2e teardown-cluster [--artifact-dir ] + +Providers: alwaysCreateNew, alwaysUseExisting, commander + +Environment: same as storage-e2e tests (SSH_*, TEST_CLUSTER_*, DKP_LICENSE_KEY, REGISTRY_DOCKER_CFG, …). +Secrets must come from the environment; the CLI never prints secret values. + +`) +} + +func run(command string, args []string) error { + switch command { + case "create-cluster": + return runCreateCluster(args) + case "run-tests": + return runTests(args) + case "cleanup-workloads": + return runCleanupWorkloads(args) + case "release-cluster-lock": + return runReleaseClusterLock(args) + case "teardown-cluster": + return runTeardown(args) + case "-h", "--help", "help": + printUsage() + return nil + default: + return fmt.Errorf("unknown command %q", command) + } +} + +type commonFlags struct { + artifactDir string +} + +func parseCommon(fs *flag.FlagSet, flags *commonFlags) { + fs.StringVar(&flags.artifactDir, "artifact-dir", envOrDefault("E2E_ARTIFACT_DIR", config.E2ETempDir), "directory for session.json, junit, and kubeconfig copies") +} + +func runCreateCluster(args []string) error { + fs := flag.NewFlagSet("create-cluster", flag.ExitOnError) + var ( + providerName string + configPath string + common commonFlags + ) + fs.StringVar(&providerName, "provider", os.Getenv("TEST_CLUSTER_CREATE_MODE"), "cluster provider (alwaysCreateNew | alwaysUseExisting | commander)") + fs.StringVar(&configPath, "config", "", "absolute path to cluster_config.yml (required for alwaysCreateNew)") + skipIfReady := fs.Bool("skip-if-ready", os.Getenv("E2E_SKIP_IF_READY") == "true", "reuse cluster when session is healthy") + reuseOnly := fs.Bool("reuse-only", os.Getenv("E2E_REUSE_ONLY") == "true", "do not provision; fail if cluster cannot be reused") + prNumber := fs.String("pr-number", os.Getenv("E2E_PR_NUMBER"), "PR number for stable namespace/session (CI)") + parseCommon(fs, &common) + if err := fs.Parse(args); err != nil { + return err + } + + name, err := provider.ParseName(providerName) + if err != nil { + return err + } + if err := initFramework(); err != nil { + return err + } + + _ = os.Setenv("E2E_CLUSTER_PHASE", "create") + _ = os.Setenv("TEST_CLUSTER_CREATE_MODE", string(name)) + + p, err := provider.New(name) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Minute) + defer cancel() + + session, err := provider.EnsureCluster(ctx, p, provider.SetupOptions{ + ProviderName: name, + ClusterConfigPath: configPath, + ArtifactDir: common.artifactDir, + PRNumber: *prNumber, + SkipIfReady: *skipIfReady, + ReuseOnly: *reuseOnly, + }) + if err != nil { + return err + } + if err := provider.ValidateClusterReady(ctx, session); err != nil { + return fmt.Errorf("create-cluster: cluster not ready: %w", err) + } + logger.Info("Cluster session ready (status=%s); saved to %s", session.Status, provider.SessionPath(common.artifactDir)) + return nil +} + +func runTests(args []string) error { + fs := flag.NewFlagSet("run-tests", flag.ExitOnError) + var ( + pkg string + labelFilter string + timeout string + common commonFlags + ) + fs.StringVar(&pkg, "package", "./...", "Go package path to test (module-relative)") + fs.StringVar(&labelFilter, "label-filter", firstNonEmptyEnv("E2E_GINKGO_LABEL_FILTER", "E2E_LABEL_FILTER"), "Ginkgo label filter (empty = all tests)") + fs.StringVar(&timeout, "timeout", envOrDefault("E2E_TEST_TIMEOUT", "3h30m"), "go test timeout") + parseCommon(fs, &common) + if err := fs.Parse(args); err != nil { + return err + } + + var session *provider.Session + if loaded, err := provider.LoadSession(common.artifactDir); err == nil { + session = loaded + provider.RestoreKubeconfigFromArtifact(common.artifactDir, session) + } else { + return fmt.Errorf("no session.json in %s (run create-cluster first): %w", common.artifactDir, err) + } + // Prepare while SSH_HOST still points at the base cluster (e2e-env.sh). run-env.sh sets test master SSH_HOST. + prepCtx, prepCancel := context.WithTimeout(context.Background(), 15*time.Minute) + defer prepCancel() + if err := provider.PrepareSessionForRunTests(prepCtx, common.artifactDir, session); err != nil { + return err + } + if err := applyRunEnv(common.artifactDir); err != nil { + return err + } + if err := initFramework(); err != nil { + return err + } + _ = os.Setenv("E2E_CLUSTER_PHASE", "run") + _ = os.Setenv("CI", "true") + + junitPath := filepath.Join(common.artifactDir, "junit.xml") + if err := os.MkdirAll(common.artifactDir, 0o755); err != nil { + return err + } + + config.ReloadFromEnv() + logger.Info("run-tests env: SSH_HOST=%s SSH_JUMP_HOST=%s TEST_CLUSTER_CREATE_MODE=%s", + config.SSHHost, config.SSHJumpHost, config.TestClusterCreateMode) + + return runGinkgo(pkg, labelFilter, timeout, junitPath) +} + +func runCleanupWorkloads(args []string) error { + fs := flag.NewFlagSet("cleanup-workloads", flag.ExitOnError) + var ( + pkg string + timeout string + common commonFlags + ) + fs.StringVar(&pkg, "package", "./tests/", "Go package path to test (module-relative)") + fs.StringVar(&timeout, "timeout", envOrDefault("E2E_CLEANUP_TIMEOUT", "45m"), "go test timeout") + parseCommon(fs, &common) + if err := fs.Parse(args); err != nil { + return err + } + + var session *provider.Session + if loaded, err := provider.LoadSession(common.artifactDir); err == nil { + session = loaded + provider.RestoreKubeconfigFromArtifact(common.artifactDir, session) + } else { + return fmt.Errorf("no session.json in %s (run create-cluster first): %w", common.artifactDir, err) + } + + prepCtx, prepCancel := context.WithTimeout(context.Background(), 15*time.Minute) + defer prepCancel() + if err := provider.PrepareSessionForRunTests(prepCtx, common.artifactDir, session); err != nil { + return err + } + if err := applyRunEnv(common.artifactDir); err != nil { + return err + } + if err := initFramework(); err != nil { + return err + } + _ = os.Setenv("E2E_CLUSTER_PHASE", "cleanup") + _ = os.Setenv("TEST_CLUSTER_CLEANUP", "false") + _ = os.Setenv("CI", "true") + + config.ReloadFromEnv() + logger.Info("cleanup-workloads env: SSH_HOST=%s namespace=%s", config.SSHHost, session.Namespace) + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + testArgs := []string{"test", pkg, "-count=1", "-timeout=" + timeout, "-v", "-run", "TestE2EWorkloadCleanup"} + cmd := exec.CommandContext(ctx, "go", testArgs...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Env = os.Environ() + logger.Info("Running: go %s", strings.Join(testArgs, " ")) + if err := cmd.Run(); err != nil { + if ctx.Err() != nil { + return fmt.Errorf("cleanup-workloads cancelled: %w", ctx.Err()) + } + return err + } + return nil +} + +func runReleaseClusterLock(args []string) error { + fs := flag.NewFlagSet("release-cluster-lock", flag.ExitOnError) + var common commonFlags + parseCommon(fs, &common) + if err := fs.Parse(args); err != nil { + return err + } + + session, err := provider.LoadSession(common.artifactDir) + if err != nil { + return fmt.Errorf("no session.json in %s: %w", common.artifactDir, err) + } + provider.RestoreKubeconfigFromArtifact(common.artifactDir, session) + + prepCtx, prepCancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer prepCancel() + if err := provider.PrepareSessionForRunTests(prepCtx, common.artifactDir, session); err != nil { + return err + } + if err := applyRunEnv(common.artifactDir); err != nil { + return err + } + if err := initFramework(); err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + return cluster.ReleaseStaleClusterLock(ctx) +} + +func runTeardown(args []string) error { + fs := flag.NewFlagSet("teardown-cluster", flag.ExitOnError) + var common commonFlags + parseCommon(fs, &common) + if err := fs.Parse(args); err != nil { + return err + } + if err := initFramework(); err != nil { + return err + } + _ = os.Setenv("E2E_CLUSTER_PHASE", "teardown") + + session, err := provider.LoadSession(common.artifactDir) + if err != nil { + return err + } + _ = os.Setenv("TEST_CLUSTER_CREATE_MODE", session.Provider) + + p, err := provider.New(provider.Name(session.Provider)) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), config.ClusterCleanupTimeout) + defer cancel() + + if err := p.Teardown(ctx, session, nil); err != nil { + return err + } + return provider.MarkSessionTornDown(common.artifactDir, session) +} + +func initFramework() error { + if err := storage_e2e.Initialize(); err != nil { + return err + } + return nil +} + +func applyRunEnv(artifactDir string) error { + runEnv := filepath.Join(artifactDir, "run-env.sh") + if _, err := os.Stat(runEnv); err != nil { + return fmt.Errorf("run-env.sh not found in %s (run create-cluster first): %w", artifactDir, err) + } + data, err := os.ReadFile(runEnv) + if err != nil { + return err + } + for _, line := range strings.Split(string(data), "\n") { + line = strings.TrimSpace(line) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + if !strings.HasPrefix(line, "export ") { + continue + } + line = strings.TrimPrefix(line, "export ") + k, v, ok := strings.Cut(line, "=") + if !ok { + continue + } + v = strings.Trim(v, `"`) + if err := os.Setenv(k, v); err != nil { + return err + } + } + return nil +} + +func runGinkgo(pkg, labelFilter, timeout, junitPath string) error { + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + ginkgoArgs := []string{ + "run", + "-r", + "--keep-going=false", + "--timeout=" + timeout, + "--junit-report=" + junitPath, + } + if labelFilter != "" { + ginkgoArgs = append(ginkgoArgs, "--label-filter="+labelFilter) + } + ginkgoArgs = append(ginkgoArgs, pkg) + + runCmd := func(name string, args ...string) error { + cmd := exec.CommandContext(ctx, name, args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Env = os.Environ() + if err := cmd.Run(); err != nil { + if ctx.Err() != nil { + return fmt.Errorf("test run cancelled: %w", ctx.Err()) + } + return err + } + return nil + } + + // Prefer module-pinned Ginkgo (avoids runner-global CLI version skew). + if wd, err := os.Getwd(); err == nil { + if _, err := os.Stat(filepath.Join(wd, "go.mod")); err == nil { + goArgs := append([]string{"run", "github.com/onsi/ginkgo/v2/ginkgo"}, ginkgoArgs...) + logger.Info("Running: go %s (from %s)", strings.Join(goArgs, " "), wd) + cmd := exec.CommandContext(ctx, "go", goArgs...) + cmd.Dir = wd + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Env = os.Environ() + if err := cmd.Run(); err != nil { + if ctx.Err() != nil { + return fmt.Errorf("test run cancelled: %w", ctx.Err()) + } + var exitErr *exec.ExitError + if errors.As(err, &exitErr) { + // Ginkgo ran (pass or fail) — never fall back to another runner. + return err + } + logger.Warn("go run ginkgo unavailable (%v); trying ginkgo on PATH", err) + } else { + return nil + } + } + } + + if ginkgo, err := exec.LookPath("ginkgo"); err == nil { + logger.Info("Running: %s %s", ginkgo, strings.Join(ginkgoArgs, " ")) + return runCmd(ginkgo, ginkgoArgs...) + } + + return runGoTest(ctx, pkg, labelFilter, timeout, junitPath) +} + +func runGoTest(ctx context.Context, pkg, labelFilter, timeout, junitPath string) error { + args := []string{"test", pkg, "-count=1", "-timeout=" + timeout, "-v"} + if labelFilter != "" { + args = append(args, "-ginkgo.label-filter="+labelFilter) + } + if junitPath != "" { + args = append(args, "-ginkgo.junit-report="+junitPath) + } + cmd := exec.CommandContext(ctx, "go", args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Env = os.Environ() + logger.Info("Running: go %s", strings.Join(args, " ")) + if err := cmd.Run(); err != nil { + if ctx.Err() != nil { + return fmt.Errorf("test run cancelled: %w", ctx.Err()) + } + return err + } + return nil +} + +func closeResources(res interface{}) { + // Connections are closed by OS when the process exits; explicit close is optional for create-cluster job. + _ = res +} + +func copyKubeconfigArtifact(src, artifactDir string) error { + if src == "" { + return nil + } + data, err := os.ReadFile(src) + if err != nil { + return fmt.Errorf("read kubeconfig: %w", err) + } + dst := filepath.Join(artifactDir, filepath.Base(src)) + return os.WriteFile(dst, data, 0o600) +} + +func envOrDefault(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +func firstNonEmptyEnv(keys ...string) string { + for _, key := range keys { + if v := os.Getenv(key); v != "" { + return v + } + } + return "" +} diff --git a/docs/CI.md b/docs/CI.md new file mode 100644 index 0000000..ce9527b --- /dev/null +++ b/docs/CI.md @@ -0,0 +1,91 @@ +# Reusable CI pipeline (storage-e2e) + +Three jobs, one SDK entrypoint (`cmd/e2e`), no bash/YAML cluster provisioning. + +## Stages + +| Job | CLI command | Purpose | +|-----|-------------|---------| +| `create-cluster` | `e2e create-cluster --skip-if-ready …` | Provision, resume, or reuse (`EnsureCluster`) | +| `run-tests` | `e2e run-tests --package …` | Ginkgo/`go test`, optional `--label-filter` | +| `teardown-cluster` | `e2e teardown-cluster` | Destroy cluster (`pipeline_mode: teardown-only`) | + +## PR-scoped persistence + +- `inputs.pr_number` + `inputs.module_slug` → stable `TEST_CLUSTER_NAMESPACE=e2e--pr`. +- Session (`session.json`, `cluster-state.json`, kubeconfig) stored in **actions/cache** under stable key `e2e-pr--session` (one entry per PR; caller concurrency group serializes runs). +- `create-cluster` uses `--skip-if-ready` only when **live VMs** exist in `TEST_CLUSTER_NAMESPACE` (Running `master-*` via virt API) **and** the cluster API is healthy; otherwise stale `session.json` / `cluster-state.json` from actions/cache is cleared and the cluster is provisioned anew. +- On re-run after a mid-provision failure: reuses existing VMs in `TEST_CLUSTER_NAMESPACE` (no new hostnames). If cache is empty, discovers VMs in the PR namespace via the API. +- Failed provisioning does **not** tear down the cluster; cache is saved with `if: always()` for the next run. +- Normal test runs use `pipeline_mode: create-and-test` (no teardown). Teardown is a separate caller invocation. +- `create-cluster` always runs for `create-and-test`; `--skip-if-ready` reuses healthy session cache or discovers existing VMs in `TEST_CLUSTER_NAMESPACE` before provisioning. + +## Workflow + +Module repos call: + +```yaml +jobs: + e2e: + uses: deckhouse/storage-e2e/.github/workflows/e2e-reusable.yml@ + secrets: inherit + with: + pipeline_mode: create-and-test # or teardown-only + pr_number: "123" + module_slug: sds-node-configurator + module_path: e2e + cluster_provider: alwaysCreateNew + cluster_config: e2e/tests/cluster_config.yml + test_package: ./tests/ + label_filter: "" # empty = all specs + test_timeout: 60m + storage_e2e_ref: main +``` + +Required secrets (inherited from the module repo, same names as `build_dev` smoke): + +- `E2E_SSH_PRIVATE_KEY`, `E2E_SSH_PUBLIC_KEY`, `E2E_SSH_HOST`, `E2E_SSH_USER`, `SSH_VM_USER` +- Optional: `E2E_TEST_CLUSTER_CREATE_MODE` (overrides `cluster_provider` input), `E2E_TEST_CLUSTER_CLEANUP` +- Workflow env also sets `E2E_TUNNEL_SSH_JUMP_*`, `E2E_GINKGO_LABEL_FILTER`, `E2E_TEST_TIMEOUT` (default `3h30m`) +- `E2E_CLUSTER_KUBECONFIG` (base64 kubeconfig for the virtualization/base cluster) +- `E2E_TEST_CLUSTER_STORAGE_CLASS`, `E2E_DECKHOUSE_LICENSE`, `E2E_REGISTRY_DOCKER_CFG` +- Optional: `E2E_SSH_JUMP_*`, `E2E_TEST_CLUSTER_CLEANUP`, Commander secrets for `commander` provider + +Secrets are written to temp files by `.github/scripts/e2e-prepare-env.sh`; values are never printed. + +## JUnit / PR checks + +`run-tests` uploads `junit.xml` and publishes PR Checks via [`dorny/test-reporter`](https://github.com/dorny/test-reporter) (Node.js, no Docker — works on self-hosted runners without `ghcr.io` access). + +## Local run (same entrypoint) + +```bash +export TEST_CLUSTER_CREATE_MODE=alwaysCreateNew +# … SSH_*, DKP_LICENSE_KEY, REGISTRY_DOCKER_CFG, TEST_CLUSTER_STORAGE_CLASS, etc. + +make e2e-create E2E_CONFIG=/path/to/cluster_config.yml E2E_PROVIDER=alwaysCreateNew +make e2e-run E2E_PACKAGE=./tests/ E2E_LABEL_FILTER='Smoke' +make e2e-teardown +# or: make e2e +``` + +Direct CLI: + +```bash +go build -o bin/e2e ./cmd/e2e +bin/e2e create-cluster --provider alwaysCreateNew --config "$(pwd)/cluster_config.yml" +bin/e2e run-tests --package ./tests/ --label-filter Smoke +bin/e2e teardown-cluster +``` + +## Multi-job test suites + +`create-cluster` saves `session.json` and `run-env.sh`. The `run-tests` job sets `E2E_CLUSTER_PHASE=run` and `TEST_CLUSTER_CREATE_MODE=alwaysUseExisting`; `run-env.sh` points `SSH_HOST` / `KUBE_CONFIG_PATH` at the test cluster while `E2E_BASE_SSH_HOST` / `E2E_BASE_KUBE_CONFIG_PATH` keep the virtualization/base cluster credentials for VM discovery and health checks. Module suites should release the cluster lock in `AfterSuite` when `E2E_CLUSTER_PHASE=run` and leave VM teardown to `teardown-cluster`. + +## Cluster providers + +Single input `cluster_provider` — same values as `TEST_CLUSTER_CREATE_MODE`: + +- `alwaysCreateNew` +- `alwaysUseExisting` +- `commander` diff --git a/internal/cluster/cluster.go b/internal/cluster/cluster.go index caee1c9..a529663 100644 --- a/internal/cluster/cluster.go +++ b/internal/cluster/cluster.go @@ -211,7 +211,7 @@ const getKubeconfigRemoteShell = "sudo -n /bin/cat /etc/kubernetes/super-admin.c // If sshClient is provided, it will be used instead of creating a new connection. // If sshClient is nil, a new connection will be created and closed automatically. // If kubeconfigOutputDir is non-empty, the kubeconfig file is written there; otherwise /tmp/e2e/ is used. -func GetKubeconfig(ctx context.Context, masterIP, user, keyPath string, sshClient ssh.SSHClient, kubeconfigOutputDir string) (*rest.Config, string, error) { +func GetKubeconfig(ctx context.Context, masterIP, user, keyPath string, sshClient ssh.SSHClient, kubeconfigOutputDir, kubeconfigFallbackPath string) (*rest.Config, string, error) { // Create SSH client if not provided shouldClose := false if sshClient == nil { @@ -255,29 +255,31 @@ func GetKubeconfig(ctx context.Context, masterIP, user, keyPath string, sshClien kubeconfigContent = []byte(kubeconfigContentStr) kubeconfigSource = fmt.Sprintf("SSH(%s@%s:/etc/kubernetes/{super-admin,admin}.conf)", user, masterIP) - case config.KubeConfigPath != "": - // SSH retrieval failed (likely due to sudo password requirement) and the - // caller pointed us at a specific kubeconfig file via KUBE_CONFIG_PATH. - resolvedPath, expandErr := expandPath(config.KubeConfigPath) - if expandErr != nil { - return nil, "", fmt.Errorf("failed to expand KUBE_CONFIG_PATH (%s): %w", config.KubeConfigPath, expandErr) + default: + fallbackPath := strings.TrimSpace(kubeconfigFallbackPath) + if fallbackPath == "" { + fallbackPath = config.EffectiveBaseKubeConfigPath() } - readContent, readErr := os.ReadFile(resolvedPath) - if readErr != nil { - return nil, "", fmt.Errorf("failed to read kubeconfig from KUBE_CONFIG_PATH (%s): %w", resolvedPath, readErr) + if fallbackPath == "" { + fallbackPath = strings.TrimSpace(config.KubeConfigPath) + } + if fallbackPath != "" { + resolvedPath, expandErr := expandPath(fallbackPath) + if expandErr != nil { + return nil, "", fmt.Errorf("failed to expand kubeconfig fallback (%s): %w", fallbackPath, expandErr) + } + readContent, readErr := os.ReadFile(resolvedPath) + if readErr != nil { + return nil, "", fmt.Errorf("failed to read kubeconfig fallback (%s): %w", resolvedPath, readErr) + } + kubeconfigContent = readContent + kubeconfigSource = fmt.Sprintf("kubeconfig_fallback=%s", resolvedPath) + } else { + // SSH failed and no kubeconfig fallback. Fail fast rather than silently picking up + // ~/.kube/config / $KUBECONFIG (wrong-cluster bugs). + cause := classifyKubeconfigFetchFailure(ctx, sshErr, kubeconfigStderr) + return nil, "", buildKubeconfigFetchError(user, masterIP, sshErr, cause) } - kubeconfigContent = readContent - kubeconfigSource = fmt.Sprintf("KUBE_CONFIG_PATH=%s", resolvedPath) - - default: - // SSH failed and the caller did not opt into a specific kubeconfig via - // KUBE_CONFIG_PATH. Fail fast rather than silently picking up the - // developer's ~/.kube/config / $KUBECONFIG, which has historically - // caused tests to acquire stale locks on unrelated SAN clusters or - // deploy modules against the wrong stand. Classify the failed attempt's - // stderr so the returned error tells the operator which knob to turn. - cause := classifyKubeconfigFetchFailure(ctx, sshErr, kubeconfigStderr) - return nil, "", buildKubeconfigFetchError(user, masterIP, sshErr, cause) } // Always stamp the kubeconfig source + the resulting current-context/server diff --git a/internal/config/config.go b/internal/config/config.go index 9b19b18..9fa7152 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -73,5 +73,6 @@ const ( SSHRetryCount = 10 // Number of retry attempts for SSH operations SSHRetryInitialDelay = 2 * time.Second // Initial delay before first retry (doubles with each retry) SSHRetryMaxDelay = 60 * time.Second // Maximum delay between retries + SSHDialTimeout = 20 * time.Second // TCP dial timeout (avoids ~2m kernel timeout blocking workflow cancel) SSHKeepaliveInterval = 60 * time.Second // Interval for SSH keepalive requests ) diff --git a/internal/config/env.go b/internal/config/env.go index 23b3a0a..6be58c8 100644 --- a/internal/config/env.go +++ b/internal/config/env.go @@ -37,10 +37,114 @@ const ( // LogLevelError indicates error log level LogLevelError = "error" - // E2ETempDir is the directory for temporary e2e artifacts (kubeconfig, cluster-state, bootstrap config/log) + // E2ETempDir is the default directory for temporary e2e artifacts (kubeconfig, cluster-state, bootstrap config/log). E2ETempDir = "/tmp/e2e" ) +// EffectiveBaseSSHHost returns the SSH host for the virtualization/base cluster. +// run-env.sh points SSH_HOST at the test cluster master; E2E_BASE_SSH_HOST keeps the base host. +func EffectiveBaseSSHHost() string { + if h := strings.TrimSpace(os.Getenv("E2E_BASE_SSH_HOST")); h != "" { + return h + } + return strings.TrimSpace(SSHHost) +} + +// EffectiveBaseSSHUser returns the SSH user for the virtualization/base cluster. +func EffectiveBaseSSHUser() string { + if u := strings.TrimSpace(os.Getenv("E2E_BASE_SSH_USER")); u != "" { + return u + } + return strings.TrimSpace(SSHUser) +} + +// EffectiveBaseKubeConfigPath returns kubeconfig for the virtualization/base cluster. +// run-env.sh points KUBE_CONFIG_PATH at the test cluster; E2E_BASE_KUBE_CONFIG_PATH keeps the base file. +func EffectiveBaseKubeConfigPath() string { + if p := strings.TrimSpace(os.Getenv("E2E_BASE_KUBE_CONFIG_PATH")); p != "" { + return p + } + return strings.TrimSpace(KubeConfigPath) +} + +// EffectiveSSHJumpHostForTest returns the bastion/jump host for reaching test cluster nodes (e.g. 10.10.10.x). +func EffectiveSSHJumpHostForTest(testTargetHost string) string { + if h := strings.TrimSpace(os.Getenv("SSH_JUMP_HOST")); h != "" { + return h + } + if h := strings.TrimSpace(os.Getenv("E2E_TUNNEL_SSH_JUMP_HOST")); h != "" { + return h + } + if NeedsBastionJump(testTargetHost) { + return EffectiveBaseSSHHost() + } + return "" +} + +// NeedsBastionJump is true when the test node is on the internal lab network (10.10.10.x). +func NeedsBastionJump(testTargetHost string) bool { + return needsBastionJump(testTargetHost) +} + +// EffectiveSSHJumpUser returns the SSH user for the jump/bastion host. +func EffectiveSSHJumpUser() string { + if u := strings.TrimSpace(os.Getenv("SSH_JUMP_USER")); u != "" { + return u + } + if u := strings.TrimSpace(os.Getenv("E2E_TUNNEL_SSH_JUMP_USER")); u != "" { + return u + } + return EffectiveBaseSSHUser() +} + +func needsBastionJump(testTargetHost string) bool { + host := strings.TrimSpace(testTargetHost) + if host == "" { + return false + } + // Lab test clusters live on the internal 10.10.10.x network behind the base SSH bastion. + return strings.HasPrefix(host, "10.10.10.") +} + +// EnsureBaseClusterEnv restores base-cluster SSH/kubeconfig after test-cluster env overrides. +func EnsureBaseClusterEnv() { + if baseKC := os.Getenv("E2E_BASE_KUBE_CONFIG_PATH"); baseKC != "" { + _ = os.Setenv("KUBE_CONFIG_PATH", baseKC) + } + if bh := EffectiveBaseSSHHost(); bh != "" { + _ = os.Setenv("SSH_HOST", bh) + } +} + +// SyncTestSSHJumpEnv sets SSH_JUMP_* when missing (CI bastion fallback + tunnel aliases). +func SyncTestSSHJumpEnv(testTargetHost string) { + if strings.TrimSpace(os.Getenv("SSH_JUMP_HOST")) != "" { + return + } + jump := EffectiveSSHJumpHostForTest(testTargetHost) + if jump == "" { + return + } + _ = os.Setenv("SSH_JUMP_HOST", jump) + if strings.TrimSpace(os.Getenv("SSH_JUMP_USER")) == "" { + if u := EffectiveSSHJumpUser(); u != "" { + _ = os.Setenv("SSH_JUMP_USER", u) + } + } + ReloadFromEnv() +} + +// E2ETempDirEffective returns the runtime temp dir (E2E_TEMP_DIR, then E2E_ARTIFACT_DIR, else E2ETempDir). +func E2ETempDirEffective() string { + if d := strings.TrimSpace(os.Getenv("E2E_TEMP_DIR")); d != "" { + return d + } + if d := strings.TrimSpace(os.Getenv("E2E_ARTIFACT_DIR")); d != "" { + return d + } + return E2ETempDir +} + var ( // ENVIRONMENT VARIABLES DEFINITIONS @@ -242,7 +346,60 @@ func EffectiveVirtualMachineClassName() string { return n } +// ReloadFromEnv refreshes package-level settings from the current process environment. +// Call before ValidateEnvironment when env is applied after process start (e.g. run-env.sh in run-tests). +func ReloadFromEnv() { + YAMLConfigFilename = os.Getenv("YAML_CONFIG_FILENAME") + SSHPassphrase = os.Getenv("SSH_PASSPHRASE") + SSHUser = os.Getenv("SSH_USER") + SSHPrivateKey = os.Getenv("SSH_PRIVATE_KEY") + SSHPublicKey = os.Getenv("SSH_PUBLIC_KEY") + SSHHost = os.Getenv("SSH_HOST") + SSHJumpHost = os.Getenv("SSH_JUMP_HOST") + SSHJumpUser = os.Getenv("SSH_JUMP_USER") + SSHJumpKeyPath = os.Getenv("SSH_JUMP_KEY_PATH") + VMSSHUser = os.Getenv("SSH_VM_USER") + VMSSHPassword = os.Getenv("SSH_VM_PASSWORD") + KubeConfigPath = os.Getenv("KUBE_CONFIG_PATH") + TestClusterCreateMode = os.Getenv("TEST_CLUSTER_CREATE_MODE") + TestClusterCleanup = os.Getenv("TEST_CLUSTER_CLEANUP") + TestClusterNamespace = os.Getenv("TEST_CLUSTER_NAMESPACE") + TestClusterResume = os.Getenv("TEST_CLUSTER_RESUME") + TestClusterStorageClass = os.Getenv("TEST_CLUSTER_STORAGE_CLASS") + TestClusterVirtualMachineClassName = os.Getenv("TEST_CLUSTER_VIRTUAL_MACHINE_CLASS_NAME") + DKPLicenseKey = os.Getenv("DKP_LICENSE_KEY") + RegistryDockerCfg = os.Getenv("REGISTRY_DOCKER_CFG") + ImagePullPolicy = os.Getenv("IMAGE_PULL_POLICY") + LogLevel = os.Getenv("LOG_LEVEL") + LogFilePath = os.Getenv("LOG_FILE_PATH") + CommanderURL = os.Getenv("COMMANDER_URL") + CommanderToken = os.Getenv("COMMANDER_TOKEN") + CommanderClusterName = os.Getenv("COMMANDER_CLUSTER_NAME") + CommanderTemplateName = os.Getenv("COMMANDER_TEMPLATE_NAME") + CommanderTemplateVersion = os.Getenv("COMMANDER_TEMPLATE_VERSION") + CommanderRegistryName = os.Getenv("COMMANDER_REGISTRY_NAME") + CommanderCreateIfNotExists = os.Getenv("COMMANDER_CREATE_IF_NOT_EXISTS") + CommanderWaitTimeout = os.Getenv("COMMANDER_WAIT_TIMEOUT") + CommanderInsecureSkipTLSVerify = os.Getenv("COMMANDER_INSECURE_SKIP_TLS_VERIFY") + CommanderCACert = os.Getenv("COMMANDER_CA_CERT") + CommanderAuthMethod = os.Getenv("COMMANDER_AUTH_METHOD") + CommanderAuthUser = os.Getenv("COMMANDER_AUTH_USER") + CommanderAPIPrefix = os.Getenv("COMMANDER_API_PREFIX") + CommanderInputValues = os.Getenv("COMMANDER_VALUES") + StressTestPVCSize = os.Getenv("STRESS_TEST_PVC_SIZE") + StressTestPodsCount = os.Getenv("STRESS_TEST_PODS_COUNT") + StressTestPVCSizeAfterResize = os.Getenv("STRESS_TEST_PVC_SIZE_AFTER_RESIZE") + StressTestPVCSizeAfterResizeStage2 = os.Getenv("STRESS_TEST_PVC_SIZE_AFTER_RESIZE_STAGE2") + StressTestSnapshotsPerPVC = os.Getenv("STRESS_TEST_SNAPSHOTS_PER_PVC") + StressTestMaxAttempts = os.Getenv("STRESS_TEST_MAX_ATTEMPTS") + StressTestInterval = os.Getenv("STRESS_TEST_INTERVAL") + StressTestCleanup = os.Getenv("STRESS_TEST_CLEANUP") + LogTimetampsEnabled = os.Getenv("LOG_TIMESTAMPS_ENABLED") +} + func ValidateEnvironment() error { + ReloadFromEnv() + // Default values for environment variables if YAMLConfigFilename == "" { YAMLConfigFilename = YAMLConfigFilenameDefaultValue diff --git a/internal/infrastructure/ssh/client.go b/internal/infrastructure/ssh/client.go index b72254f..fdb2cb6 100644 --- a/internal/infrastructure/ssh/client.go +++ b/internal/infrastructure/ssh/client.go @@ -258,7 +258,7 @@ func (c *client) Create(user, host, keyPath string) (SSHClient, error) { addr = addr + ":22" } - sshClient, err := ssh.Dial("tcp", addr, sshConfig) + sshClient, err := dialSSH(addr, sshConfig) if err != nil { return nil, fmt.Errorf("failed to connect to %s@%s: %w\n Key used: %s (algorithm: %s, fingerprint: %s)\n Hint: verify this key's public part is in authorized_keys on the server", user, addr, err, keyInfo.Path, keyInfo.Algorithm, keyInfo.Fingerprint) @@ -626,6 +626,19 @@ func (c *client) Close() error { } // NewClient creates a new SSH client +func dialSSH(addr string, sshConfig *ssh.ClientConfig) (*ssh.Client, error) { + conn, err := net.DialTimeout("tcp", addr, config.SSHDialTimeout) + if err != nil { + return nil, err + } + sshConn, chans, reqs, err := ssh.NewClientConn(conn, addr, sshConfig) + if err != nil { + _ = conn.Close() + return nil, err + } + return ssh.NewClient(sshConn, chans, reqs), nil +} + func NewClient(user, host, keyPath string) (SSHClient, error) { var c client return c.Create(user, host, keyPath) @@ -647,7 +660,7 @@ func NewClientWithJumpHost(jumpUser, jumpHost, jumpKeyPath, targetUser, targetHo } // Connect to jump host - jumpClient, err := ssh.Dial("tcp", jumpAddr, jumpConfig) + jumpClient, err := dialSSH(jumpAddr, jumpConfig) if err != nil { return nil, fmt.Errorf("failed to connect to jump host %s@%s: %w\n Key used: %s (algorithm: %s, fingerprint: %s)\n Hint: verify this key's public part is in authorized_keys on the server", jumpUser, jumpAddr, err, jumpKeyInfo.Path, jumpKeyInfo.Algorithm, jumpKeyInfo.Fingerprint) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index a7ccb69..5188f7d 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -26,7 +26,6 @@ import ( "path/filepath" "runtime" "strings" - "sync" "time" . "github.com/onsi/ginkgo/v2" @@ -48,7 +47,6 @@ import ( "github.com/deckhouse/storage-e2e/internal/logger" "github.com/deckhouse/storage-e2e/pkg/kubernetes" "github.com/deckhouse/storage-e2e/pkg/testkit" - "github.com/deckhouse/virtualization/api/core/v1alpha2" ) // extraCommanderValues stores additional values to be passed to Commander cluster creation @@ -58,7 +56,9 @@ var extraCommanderValues map[string]interface{} // commanderResources stores Commander cluster resources for cleanup var commanderResources *CommanderClusterResources -var clusterStatePath = filepath.Join(config.E2ETempDir, "cluster-state.json") +func clusterStateFilePath() string { + return filepath.Join(config.E2ETempDirEffective(), "cluster-state.json") +} // SetExtraCommanderValues sets additional values to be passed when creating a cluster via Commander. // These values are merged with COMMANDER_VALUES env var (extra values take precedence over env, but prefix is always set). @@ -108,7 +108,7 @@ type resumeState struct { } func saveClusterState(state *resumeState) error { - path := clusterStatePath + path := clusterStateFilePath() if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { return err } @@ -116,11 +116,15 @@ func saveClusterState(state *resumeState) error { if err != nil { return err } - return os.WriteFile(path, data, 0600) + if err := os.WriteFile(path, data, 0600); err != nil { + return err + } + logger.Info("Saved cluster resume state to %s", path) + return nil } func loadClusterState() (*resumeState, error) { - path := clusterStatePath + path := clusterStateFilePath() data, err := os.ReadFile(path) if err != nil { return nil, err @@ -208,25 +212,29 @@ func CreateTestCluster( logger.Step(1, "Loading cluster configuration from %s", yamlConfigFilename) - // Find the test package directory by walking the call stack. - // CreateTestCluster is called from CreateOrConnectToTestCluster in cluster.go, so Caller(1) is not the test file. - var testDir string - for skip := 1; skip <= 10; skip++ { - _, callerFile, _, ok := runtime.Caller(skip) - if !ok { - break + var yamlConfigPath string + if filepath.IsAbs(yamlConfigFilename) { + yamlConfigPath = yamlConfigFilename + } else { + // Find the test package directory by walking the call stack. + // CreateTestCluster is called from CreateOrConnectToTestCluster in cluster.go, so Caller(1) is not the test file. + var testDir string + for skip := 1; skip <= 10; skip++ { + _, callerFile, _, ok := runtime.Caller(skip) + if !ok { + break + } + if strings.Contains(filepath.ToSlash(callerFile), "/tests/") { + testDir = filepath.Dir(callerFile) + break + } } - if strings.Contains(filepath.ToSlash(callerFile), "/tests/") { - testDir = filepath.Dir(callerFile) - break + if testDir == "" { + return nil, fmt.Errorf("failed to determine test directory (no caller under tests/)") } + yamlConfigPath = filepath.Join(testDir, yamlConfigFilename) } - if testDir == "" { - return nil, fmt.Errorf("failed to determine test directory (no caller under tests/)") - } - yamlConfigPath := filepath.Join(testDir, yamlConfigFilename) - logger.Debug("Test directory: %s", testDir) logger.Debug("Config file path: %s", yamlConfigPath) // Step 1: Load cluster configuration from YAML (from test directory, e.g. tests/sds-node-configurator/cluster_config.yml) @@ -253,7 +261,7 @@ func CreateTestCluster( } } } else { - logger.Info("TEST_CLUSTER_RESUME is set but cluster state not found or invalid: %v (tried %s)", loadErr, clusterStatePath) + logger.Info("TEST_CLUSTER_RESUME is set but cluster state not found or invalid: %v (tried %s)", loadErr, clusterStateFilePath()) } } if !resumeMode { @@ -345,144 +353,84 @@ func CreateTestCluster( } logger.StepComplete(4, "Test namespace created") + virtCtx, virtCancel := context.WithTimeout(ctx, config.VMCreationTimeout) + virtClient, err := virtualization.NewClient(virtCtx, baseClusterResources.Kubeconfig) + if err != nil { + virtCancel() + baseClusterResources.SSHClient.Close() + baseClusterResources.TunnelInfo.StopFunc() + return nil, fmt.Errorf("failed to create virtualization client: %w", err) + } + + var vmNames []string + var vmResources *VMResources + if resumeMode { - // Resume path: cluster is already fully created and ready. Only connect to it and return resources. - // No GatherVMInfo, NodeGroup, AddNodes, EnableModules — assume "should create test cluster" already completed. state, _ := loadClusterState() - namespace = state.Namespace - baseKubeconfig := baseClusterResources.Kubeconfig - baseKubeconfigPath := baseClusterResources.KubeconfigPath - baseTunnelInfo := baseClusterResources.TunnelInfo - vmResources := &VMResources{VMNames: state.VMNames, SetupVMName: state.SetupVMName} - - if len(clusterDefinition.Masters) > 0 { - clusterDefinition.Masters[0].IPAddress = state.FirstMasterIP + if state.Namespace != "" { + namespace = state.Namespace } - firstMasterIP := state.FirstMasterIP - if firstMasterIP == "" { + vmNames = state.VMNames + vmResources = &VMResources{ + VirtClient: virtClient, + Namespace: namespace, + VMNames: vmNames, + SetupVMName: state.SetupVMName, + } + + logger.Step(5, "Resume: reusing %d existing virtual machines in %s", len(vmNames), namespace) + if err := waitForVMsRunning(ctx, virtClient, namespace, vmNames); err != nil { + virtCancel() baseClusterResources.SSHClient.Close() baseClusterResources.TunnelInfo.StopFunc() - return nil, fmt.Errorf("resume: first_master_ip missing in cluster state") + return nil, fmt.Errorf("resume: wait for VMs: %w", err) } + logger.StepComplete(5, "Resume: all VMs are Running") - logger.Step(1, "Resume: connecting to existing test cluster master %s", firstMasterIP) - testConnectOpts := ConnectClusterOptions{ - SSHUser: sshUser, SSHHost: sshHost, SSHKeyPath: sshKeyPath, - UseJumpHost: true, TargetUser: config.VMSSHUser, TargetHost: firstMasterIP, TargetKeyPath: sshKeyPath, - KubeconfigOutputDir: config.E2ETempDir, - } - if useJumpHost { - testConnectOpts = ConnectClusterOptions{ - SSHUser: jumpUser, SSHHost: jumpHost, SSHKeyPath: jumpKeyPath, - UseJumpHost: true, TargetUser: config.VMSSHUser, TargetHost: firstMasterIP, TargetKeyPath: sshKeyPath, - KubeconfigOutputDir: config.E2ETempDir, - } + logger.Step(6, "Resume: gathering VM information from existing VMs") + gatherCtx, gatherCancel := context.WithTimeout(ctx, config.VMInfoTimeout) + err = GatherVMInfo(gatherCtx, virtClient, namespace, clusterDefinition, vmResources, nil) + gatherCancel() + if err != nil { + virtCancel() + baseClusterResources.SSHClient.Close() + baseClusterResources.TunnelInfo.StopFunc() + return nil, fmt.Errorf("resume: gather VM info: %w", err) } - testClusterResources, err := ConnectToCluster(ctx, testConnectOpts) + logger.StepComplete(6, "Resume: VM information gathered") + } else { + logger.Step(5, "Creating virtual machines (this may take up to %v)", config.VMCreationTimeout) + vmNames, vmResources, err = CreateVirtualMachines(virtCtx, virtClient, clusterDefinition) if err != nil { + virtCancel() baseClusterResources.SSHClient.Close() baseClusterResources.TunnelInfo.StopFunc() - return nil, fmt.Errorf("resume: connect to test cluster: %w", err) + return nil, fmt.Errorf("failed to create virtual machines: %w", err) } - logger.StepComplete(1, "Resume: connected to test cluster") + logger.StepComplete(5, "Created %d virtual machines: %v", len(vmNames), vmNames) - testClusterResources.ClusterDefinition = clusterDefinition - testClusterResources.VMResources = vmResources - testClusterResources.BaseClusterClient = baseClusterResources.SSHClient - testClusterResources.BaseKubeconfig = baseKubeconfig - testClusterResources.BaseKubeconfigPath = baseKubeconfigPath - testClusterResources.BaseTunnelInfo = baseTunnelInfo - testClusterResources.SetupSSHClient = nil - return testClusterResources, nil - } - - logger.Step(5, "Creating virtual machines (this may take up to %v)", config.VMCreationTimeout) - // Step 5: Create virtualization client and virtual machines - virtCtx, cancel := context.WithTimeout(ctx, config.VMCreationTimeout) - virtClient, err := virtualization.NewClient(virtCtx, baseClusterResources.Kubeconfig) - if err != nil { - cancel() - baseClusterResources.SSHClient.Close() - baseClusterResources.TunnelInfo.StopFunc() - return nil, fmt.Errorf("failed to create virtualization client: %w", err) - } - - vmNames, vmResources, err := CreateVirtualMachines(virtCtx, virtClient, clusterDefinition) - cancel() - if err != nil { - baseClusterResources.SSHClient.Close() - baseClusterResources.TunnelInfo.StopFunc() - return nil, fmt.Errorf("failed to create virtual machines: %w", err) - } - logger.StepComplete(5, "Created %d virtual machines: %v", len(vmNames), vmNames) - - logger.Info("Waiting for all VMs to become Running (this may take up to %v)", config.VMsRunningTimeout) - // Wait for all VMs to become Running in parallel - vmWaitCtx, cancel := context.WithTimeout(ctx, config.VMsRunningTimeout) - defer cancel() - - var wg sync.WaitGroup - var mu sync.Mutex // for thread-safe printing - errChan := make(chan error, len(vmNames)) - - for i, vmName := range vmNames { - wg.Add(1) - go func(index int, name string) { - defer wg.Done() - - mu.Lock() - logger.Progress("Waiting for VM %d/%d: %s", index+1, len(vmNames), name) - mu.Unlock() - - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - - for { - select { - case <-vmWaitCtx.Done(): - errChan <- fmt.Errorf("timeout waiting for VM %s to become Running", name) - return - case <-ticker.C: - vm, err := virtClient.VirtualMachines().Get(vmWaitCtx, namespace, name) - if err != nil { - continue - } - if vm.Status.Phase == v1alpha2.MachineRunning { - mu.Lock() - logger.Success("VM %s is Running", name) - mu.Unlock() - return - } - } - } - }(i, vmName) - } - - // Wait for all goroutines to complete - wg.Wait() - close(errChan) - - // Check if any errors occurred - if len(errChan) > 0 { - err := <-errChan - baseClusterResources.SSHClient.Close() - baseClusterResources.TunnelInfo.StopFunc() - return nil, err - } - - logger.StepComplete(5, "All VMs are Running") + logger.Info("Waiting for all VMs to become Running (this may take up to %v)", config.VMsRunningTimeout) + if err := waitForVMsRunning(ctx, virtClient, namespace, vmNames); err != nil { + virtCancel() + baseClusterResources.SSHClient.Close() + baseClusterResources.TunnelInfo.StopFunc() + return nil, err + } + logger.StepComplete(5, "All VMs are Running") - logger.Step(6, "Gathering VM information") - // Step 6: Gather VM information - gatherCtx, cancel := context.WithTimeout(ctx, config.VMInfoTimeout) - err = GatherVMInfo(gatherCtx, virtClient, namespace, clusterDefinition, vmResources, nil) - cancel() - if err != nil { - baseClusterResources.SSHClient.Close() - baseClusterResources.TunnelInfo.StopFunc() - return nil, fmt.Errorf("failed to gather VM information: %w", err) + logger.Step(6, "Gathering VM information") + gatherCtx, gatherCancel := context.WithTimeout(ctx, config.VMInfoTimeout) + err = GatherVMInfo(gatherCtx, virtClient, namespace, clusterDefinition, vmResources, nil) + gatherCancel() + if err != nil { + virtCancel() + baseClusterResources.SSHClient.Close() + baseClusterResources.TunnelInfo.StopFunc() + return nil, fmt.Errorf("failed to gather VM information: %w", err) + } + logger.StepComplete(6, "VM information gathered") } - logger.StepComplete(6, "VM information gathered") + virtCancel() // Save resume state so a failed run can be continued with TEST_CLUSTER_RESUME=true masterHostnames := make([]string, len(clusterDefinition.Masters)) @@ -762,6 +710,44 @@ func CreateTestCluster( // If the cluster is already locked by another test, this function will return an error. // The lock is automatically released when CleanupExistingCluster is called. func UseExistingCluster(ctx context.Context) (*TestClusterResources, error) { + return connectExistingCluster(ctx, existingClusterConnectOpts{acquireLock: true}) +} + +// ConnectExistingClusterForMaintenance connects to the test cluster for post-test cleanup. +// It clears any cluster lock left by run-tests in the same CI workflow and does not acquire a new lock. +func ConnectExistingClusterForMaintenance(ctx context.Context) (*TestClusterResources, error) { + return connectExistingCluster(ctx, existingClusterConnectOpts{clearStaleLock: true}) +} + +// ReleaseStaleClusterLock connects to the test cluster and removes the lock ConfigMap if present. +// Used by run-tests CI after ginkgo when AfterSuite did not run (timeout/kill) or failed to release. +func ReleaseStaleClusterLock(ctx context.Context) error { + res, err := connectExistingCluster(ctx, existingClusterConnectOpts{clearStaleLock: true, skipHealthAndBase: true}) + if res != nil { + if res.BaseTunnelInfo != nil && res.BaseTunnelInfo.StopFunc != nil { + _ = res.BaseTunnelInfo.StopFunc() + } + if res.BaseClusterClient != nil { + _ = res.BaseClusterClient.Close() + } + if res.TunnelInfo != nil && res.TunnelInfo.StopFunc != nil { + _ = res.TunnelInfo.StopFunc() + } + if res.SSHClient != nil { + _ = res.SSHClient.Close() + } + } + return err +} + +type existingClusterConnectOpts struct { + acquireLock bool + clearStaleLock bool + skipHealthAndBase bool +} + +func connectExistingCluster(ctx context.Context, opts existingClusterConnectOpts) (*TestClusterResources, error) { + config.ReloadFromEnv() logger.Step(1, "Connecting to existing cluster") // Get SSH credentials from environment variables @@ -772,6 +758,20 @@ func UseExistingCluster(ctx context.Context) (*TestClusterResources, error) { return nil, fmt.Errorf("failed to get SSH private key path: %w", err) } + abort := func(releaseLock bool, res *TestClusterResources) { + if releaseLock && res != nil && res.Kubeconfig != nil { + _ = ReleaseClusterLock(ctx, res.Kubeconfig) + } + if res != nil { + if res.TunnelInfo != nil && res.TunnelInfo.StopFunc != nil { + res.TunnelInfo.StopFunc() + } + if res.SSHClient != nil { + res.SSHClient.Close() + } + } + } + // Check if jump host is configured var clusterResources *TestClusterResources if config.SSHJumpHost != "" { @@ -811,26 +811,44 @@ func UseExistingCluster(ctx context.Context) (*TestClusterResources, error) { } logger.StepComplete(1, "Connected to existing cluster successfully") - logger.Step(2, "Acquiring cluster lock") - // Acquire cluster lock to ensure exclusive access - // Use a descriptive test name from environment or generate one - testName := config.TestClusterNamespace - if testName == "" { - testName = fmt.Sprintf("e2e-test-%d", time.Now().UnixNano()) + if opts.clearStaleLock { + logger.Step(2, "Clearing stale cluster lock (if any)") + if locked, lockErr := IsClusterLocked(ctx, clusterResources.Kubeconfig); lockErr != nil { + abort(false, clusterResources) + return nil, fmt.Errorf("check cluster lock before cleanup: %w", lockErr) + } else if locked { + if info, infoErr := GetClusterLockInfo(ctx, clusterResources.Kubeconfig); infoErr == nil { + logger.Warn("Clearing cluster lock held by %s (locked_at=%s)", info.LockedBy, info.LockedAt) + } + if err := ForceReleaseClusterLock(ctx, clusterResources.Kubeconfig); err != nil { + abort(false, clusterResources) + return nil, fmt.Errorf("clear stale cluster lock: %w", err) + } + } + logger.StepComplete(2, "Cluster lock cleared (maintenance mode)") } - err = AcquireClusterLock(ctx, clusterResources.Kubeconfig, testName) - if err != nil { - // Cleanup resources if we can't acquire the lock - if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { - clusterResources.TunnelInfo.StopFunc() + if opts.acquireLock { + logger.Step(2, "Acquiring cluster lock") + // Acquire cluster lock to ensure exclusive access + // Use a descriptive test name from environment or generate one + testName := config.TestClusterNamespace + if testName == "" { + testName = fmt.Sprintf("e2e-test-%d", time.Now().UnixNano()) } - if clusterResources.SSHClient != nil { - clusterResources.SSHClient.Close() + + err = AcquireClusterLock(ctx, clusterResources.Kubeconfig, testName) + if err != nil { + abort(false, clusterResources) + return nil, fmt.Errorf("failed to acquire cluster lock: %w", err) } - return nil, fmt.Errorf("failed to acquire cluster lock: %w", err) + logger.StepComplete(2, "Cluster lock acquired") + } + + if opts.skipHealthAndBase { + logger.Success("Cluster connected (lock maintenance only)") + return clusterResources, nil } - logger.StepComplete(2, "Cluster lock acquired") logger.Step(3, "Verifying cluster health (with retries)") // Verify cluster health with retries - cluster may need time to stabilize @@ -861,14 +879,7 @@ func UseExistingCluster(ctx context.Context) (*TestClusterResources, error) { } if healthErr != nil { - // Release the lock and cleanup on failure - _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) - if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { - clusterResources.TunnelInfo.StopFunc() - } - if clusterResources.SSHClient != nil { - clusterResources.SSHClient.Close() - } + abort(opts.acquireLock, clusterResources) return nil, fmt.Errorf("cluster health check failed after %d attempts: %w", maxHealthRetries, healthErr) } logger.StepComplete(3, "Cluster is healthy") @@ -887,53 +898,33 @@ func UseExistingCluster(ctx context.Context) (*TestClusterResources, error) { } baseSSHClient, connErr := ssh.NewClient(jumpUser, config.SSHJumpHost, jumpKeyPath) if connErr != nil { - _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) - if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { - clusterResources.TunnelInfo.StopFunc() - } - clusterResources.SSHClient.Close() + abort(opts.acquireLock, clusterResources) return nil, fmt.Errorf("failed to connect to base cluster (jump host %s): %w", config.SSHJumpHost, connErr) } baseTunnel, tunnelErr := ssh.EstablishSSHTunnel(context.Background(), baseSSHClient, "6445") if tunnelErr != nil { baseSSHClient.Close() - _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) - if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { - clusterResources.TunnelInfo.StopFunc() - } - clusterResources.SSHClient.Close() + abort(opts.acquireLock, clusterResources) return nil, fmt.Errorf("failed to establish base cluster tunnel: %w", tunnelErr) } - _, baseKubeconfigPath, kubeErr := internalcluster.GetKubeconfig(ctx, config.SSHJumpHost, jumpUser, jumpKeyPath, baseSSHClient, config.E2ETempDir) + _, baseKubeconfigPath, kubeErr := internalcluster.GetKubeconfig(ctx, config.SSHJumpHost, jumpUser, jumpKeyPath, baseSSHClient, config.E2ETempDir, config.EffectiveBaseKubeConfigPath()) if kubeErr != nil { baseTunnel.StopFunc() baseSSHClient.Close() - _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) - if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { - clusterResources.TunnelInfo.StopFunc() - } - clusterResources.SSHClient.Close() + abort(opts.acquireLock, clusterResources) return nil, fmt.Errorf("failed to get base cluster kubeconfig from jump host: %w", kubeErr) } if updateErr := internalcluster.UpdateKubeconfigPort(baseKubeconfigPath, baseTunnel.LocalPort); updateErr != nil { baseTunnel.StopFunc() baseSSHClient.Close() - _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) - if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { - clusterResources.TunnelInfo.StopFunc() - } - clusterResources.SSHClient.Close() + abort(opts.acquireLock, clusterResources) return nil, fmt.Errorf("failed to update base cluster kubeconfig port: %w", updateErr) } baseKubeconfig, buildErr := clientcmd.BuildConfigFromFlags("", baseKubeconfigPath) if buildErr != nil { baseTunnel.StopFunc() baseSSHClient.Close() - _ = ReleaseClusterLock(ctx, clusterResources.Kubeconfig) - if clusterResources.TunnelInfo != nil && clusterResources.TunnelInfo.StopFunc != nil { - clusterResources.TunnelInfo.StopFunc() - } - clusterResources.SSHClient.Close() + abort(opts.acquireLock, clusterResources) return nil, fmt.Errorf("failed to build base cluster rest config: %w", buildErr) } configureExtendedTimeouts(baseKubeconfig) @@ -944,6 +935,11 @@ func UseExistingCluster(ctx context.Context) (*TestClusterResources, error) { logger.StepComplete(4, "Base cluster kubeconfig ready (VirtualDisk operations)") } + if err := PopulateVMResourcesForExistingCluster(ctx, clusterResources); err != nil { + abort(opts.acquireLock, clusterResources) + return nil, err + } + logger.Success("Existing cluster is ready for use") return clusterResources, nil } @@ -1969,11 +1965,22 @@ type ConnectClusterOptions struct { // KubeconfigOutputDir if set saves kubeconfig to this dir instead of /tmp/e2e/ KubeconfigOutputDir string + + // KubeconfigFallbackPath is used when SSH kubeconfig fetch fails (base cluster secret file in CI). + KubeconfigFallbackPath string } // ConnectToCluster establishes SSH connection to a cluster (base or test), // retrieves kubeconfig, and sets up port forwarding tunnel. +func sshRetryCount() int { + if os.Getenv("CI") != "" && os.Getenv("E2E_CLUSTER_PHASE") == "run" { + return 3 + } + return config.SSHRetryCount +} + func ConnectToCluster(ctx context.Context, opts ConnectClusterOptions) (*TestClusterResources, error) { + config.ReloadFromEnv() // Validate required parameters if opts.SSHUser == "" { return nil, fmt.Errorf("SSHUser cannot be empty") @@ -2017,7 +2024,7 @@ func ConnectToCluster(ctx context.Context, opts ConnectClusterOptions) (*TestClu } // Create SSH client with jump host (retry with exponential backoff) - maxRetries := config.SSHRetryCount + maxRetries := sshRetryCount() retryDelay := config.SSHRetryInitialDelay var lastErr error for attempt := 0; attempt < maxRetries; attempt++ { @@ -2051,7 +2058,7 @@ func ConnectToCluster(ctx context.Context, opts ConnectClusterOptions) (*TestClu masterUser = opts.TargetUser } else { // Direct connection (no jump host) with retry logic - maxRetries := config.SSHRetryCount + maxRetries := sshRetryCount() retryDelay := config.SSHRetryInitialDelay var lastErr error for attempt := 0; attempt < maxRetries; attempt++ { @@ -2092,7 +2099,7 @@ func ConnectToCluster(ctx context.Context, opts ConnectClusterOptions) (*TestClu } // Step 3: Get kubeconfig from cluster master - _, kubeconfigPath, err := internalcluster.GetKubeconfig(ctx, masterHost, masterUser, opts.SSHKeyPath, sshClient, opts.KubeconfigOutputDir) + _, kubeconfigPath, err := internalcluster.GetKubeconfig(ctx, masterHost, masterUser, opts.SSHKeyPath, sshClient, opts.KubeconfigOutputDir, opts.KubeconfigFallbackPath) if err != nil { tunnelInfo.StopFunc() sshClient.Close() diff --git a/pkg/cluster/lifecycle.go b/pkg/cluster/lifecycle.go new file mode 100644 index 0000000..2a3c20b --- /dev/null +++ b/pkg/cluster/lifecycle.go @@ -0,0 +1,126 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/deckhouse/storage-e2e/internal/config" +) + +// CreateTestClusterFromConfig provisions a cluster using an absolute path to cluster_config.yml. +// Used by the storage-e2e CLI and CI create-cluster job (no runtime.Caller test directory lookup). +func CreateTestClusterFromConfig(ctx context.Context, configPath string) (*TestClusterResources, error) { + if configPath == "" { + return nil, fmt.Errorf("config path is required") + } + abs, err := filepath.Abs(configPath) + if err != nil { + return nil, err + } + return CreateTestCluster(ctx, abs) +} + +// ClusterStatePath returns the cluster-state.json path used by CreateTestCluster. +func ClusterStatePath() string { + return clusterStateFilePath() +} + +// ClusterStateSnapshot is the on-disk resume state after VM creation. +type ClusterStateSnapshot struct { + FirstMasterIP string `json:"first_master_ip"` + Namespace string `json:"namespace"` + VMNames []string `json:"vm_names"` + SetupVMName string `json:"setup_vm_name"` + MasterHostnames []string `json:"master_hostnames"` + WorkerHostnames []string `json:"worker_hostnames"` +} + +// LoadClusterStateFile reads cluster-state.json from the default e2e temp directory. +func LoadClusterStateFile() (*ClusterStateSnapshot, error) { + data, err := os.ReadFile(clusterStateFilePath()) + if err != nil { + return nil, err + } + var state ClusterStateSnapshot + if err := json.Unmarshal(data, &state); err != nil { + return nil, err + } + return &state, nil +} + +// BaseClusterConnectOptionsFromEnv builds ConnectClusterOptions for the virtualization/base cluster from env. +func BaseClusterConnectOptionsFromEnv(kubeconfigOutputDir string) (ConnectClusterOptions, error) { + sshUser := config.EffectiveBaseSSHUser() + sshHost := config.EffectiveBaseSSHHost() + if sshUser == "" || sshHost == "" { + return ConnectClusterOptions{}, fmt.Errorf("E2E_BASE_SSH_HOST/E2E_BASE_SSH_USER (or SSH_HOST/SSH_USER) are required for base cluster access") + } + keyPath, err := expandPathLocal(config.SSHPrivateKey) + if err != nil { + return ConnectClusterOptions{}, err + } + if kubeconfigOutputDir == "" { + kubeconfigOutputDir = config.E2ETempDirEffective() + } + useJump := config.SSHJumpHost != "" + opts := ConnectClusterOptions{ + SSHUser: sshUser, SSHHost: sshHost, SSHKeyPath: keyPath, + UseJumpHost: useJump, KubeconfigOutputDir: kubeconfigOutputDir, + KubeconfigFallbackPath: config.EffectiveBaseKubeConfigPath(), + } + if useJump { + jumpUser := config.SSHJumpUser + if jumpUser == "" { + jumpUser = sshUser + } + jumpKey := config.SSHJumpKeyPath + if jumpKey == "" { + jumpKey = keyPath + } else { + jumpKey, err = expandPathLocal(jumpKey) + if err != nil { + return ConnectClusterOptions{}, err + } + } + opts = ConnectClusterOptions{ + SSHUser: jumpUser, SSHHost: config.SSHJumpHost, SSHKeyPath: jumpKey, + UseJumpHost: true, TargetUser: sshUser, TargetHost: sshHost, TargetKeyPath: keyPath, + KubeconfigOutputDir: kubeconfigOutputDir, + KubeconfigFallbackPath: config.EffectiveBaseKubeConfigPath(), + } + } + return opts, nil +} + +func expandPathLocal(path string) (string, error) { + if path == "" { + path = config.SSHPrivateKeyDefaultValue + } + if len(path) >= 2 && path[:2] == "~/" { + home, err := os.UserHomeDir() + if err != nil { + return "", err + } + path = filepath.Join(home, path[2:]) + } + return filepath.Clean(path), nil +} diff --git a/pkg/cluster/namespace_verify.go b/pkg/cluster/namespace_verify.go new file mode 100644 index 0000000..6a136bd --- /dev/null +++ b/pkg/cluster/namespace_verify.go @@ -0,0 +1,189 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/internal/kubernetes/virtualization" + "github.com/deckhouse/storage-e2e/internal/logger" + v1alpha2 "github.com/deckhouse/virtualization/api/core/v1alpha2" +) + +// ClearPersistedClusterState removes runtime cluster-state.json (resume marker). +func ClearPersistedClusterState() error { + path := clusterStateFilePath() + if err := os.Remove(path); err != nil && !os.IsNotExist(err) { + return err + } + return nil +} + +// ListNamespaceVMs returns VirtualMachines in the given namespace on the base cluster. +func ListNamespaceVMs(ctx context.Context, namespace string) ([]v1alpha2.VirtualMachine, error) { + if namespace == "" { + return nil, fmt.Errorf("namespace is required") + } + config.EnsureBaseClusterEnv() + config.ReloadFromEnv() + connectOpts, err := BaseClusterConnectOptionsFromEnv(config.E2ETempDirEffective()) + if err != nil { + return nil, err + } + base, err := ConnectToCluster(ctx, connectOpts) + if err != nil { + return nil, fmt.Errorf("list VMs: connect to base cluster: %w", err) + } + defer base.SSHClient.Close() + if base.TunnelInfo != nil && base.TunnelInfo.StopFunc != nil { + defer base.TunnelInfo.StopFunc() + } + + virtCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + virtClient, err := virtualization.NewClient(virtCtx, base.Kubeconfig) + if err != nil { + return nil, err + } + return virtClient.VirtualMachines().List(virtCtx, namespace) +} + +// WaitNamespaceHasActiveCluster polls until a Running master-* VM exists or timeout. +func WaitNamespaceHasActiveCluster(ctx context.Context, namespace string, timeout time.Duration) error { + if namespace == "" { + return fmt.Errorf("namespace is required") + } + deadline := time.Now().Add(timeout) + var lastErr error + for { + config.EnsureBaseClusterEnv() + config.ReloadFromEnv() + active, err := namespaceHasActiveClusterOnce(ctx, namespace) + if err != nil { + lastErr = err + logger.Warn("VM check in %s: %v", namespace, err) + } else if active { + return nil + } else { + lastErr = fmt.Errorf("no running master VM in namespace %s", namespace) + } + if time.Now().After(deadline) { + if lastErr != nil { + return lastErr + } + return fmt.Errorf("timeout waiting for running master VM in %s", namespace) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(10 * time.Second): + } + } +} + +// WaitVMNamesRunning polls until every named VM is Running or timeout. +func WaitVMNamesRunning(ctx context.Context, namespace string, vmNames []string, timeout time.Duration) error { + if len(vmNames) == 0 { + return nil + } + deadline := time.Now().Add(timeout) + var lastErr error + for { + config.EnsureBaseClusterEnv() + config.ReloadFromEnv() + lastErr = VerifyVMNamesInNamespace(ctx, namespace, vmNames) + if lastErr == nil { + return nil + } + if time.Now().After(deadline) { + return lastErr + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(10 * time.Second): + } + } +} + +// NamespaceHasActiveCluster is true when the PR namespace has at least one Running master-* VM. +func NamespaceHasActiveCluster(ctx context.Context, namespace string) (bool, error) { + config.EnsureBaseClusterEnv() + config.ReloadFromEnv() + return namespaceHasActiveClusterOnce(ctx, namespace) +} + +func namespaceHasActiveClusterOnce(ctx context.Context, namespace string) (bool, error) { + vms, err := ListNamespaceVMs(ctx, namespace) + if err != nil { + return false, err + } + if len(vms) == 0 { + return false, nil + } + for _, vm := range vms { + if strings.HasPrefix(vm.Name, "master-") && vm.Status.Phase == v1alpha2.MachineRunning { + return true, nil + } + } + return false, nil +} + +// ClusterStateHasVMs is true when cluster-state.json lists provisioned VM names. +func ClusterStateHasVMs() bool { + state, err := LoadClusterStateFile() + return err == nil && len(state.VMNames) > 0 +} + +// VerifyVMNamesInNamespace checks that every expected VM name exists and is Running. +func VerifyVMNamesInNamespace(ctx context.Context, namespace string, vmNames []string) error { + if len(vmNames) == 0 { + return nil + } + vms, err := ListNamespaceVMs(ctx, namespace) + if err != nil { + return err + } + byName := make(map[string]v1alpha2.VirtualMachine, len(vms)) + for _, vm := range vms { + byName[vm.Name] = vm + } + var missing []string + var notRunning []string + for _, name := range vmNames { + vm, ok := byName[name] + if !ok { + missing = append(missing, name) + continue + } + if vm.Status.Phase != v1alpha2.MachineRunning { + notRunning = append(notRunning, fmt.Sprintf("%s(%s)", name, vm.Status.Phase)) + } + } + if len(missing) > 0 { + return fmt.Errorf("VMs missing in namespace %s: %v", namespace, missing) + } + if len(notRunning) > 0 { + return fmt.Errorf("VMs not Running in namespace %s: %v", namespace, notRunning) + } + return nil +} diff --git a/pkg/cluster/resume_discover.go b/pkg/cluster/resume_discover.go new file mode 100644 index 0000000..078e269 --- /dev/null +++ b/pkg/cluster/resume_discover.go @@ -0,0 +1,219 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/internal/kubernetes/virtualization" + "github.com/deckhouse/storage-e2e/internal/logger" + v1alpha2 "github.com/deckhouse/virtualization/api/core/v1alpha2" +) + +// TryDiscoverClusterState lists VMs in the PR namespace and writes cluster-state.json when a prior +// partial provision left VMs behind but the CI cache was lost. +func TryDiscoverClusterState(ctx context.Context, namespace, configPath string) error { + if namespace == "" { + return fmt.Errorf("namespace is required") + } + if _, err := loadClusterState(); err == nil { + return nil + } + + clusterDef, err := loadClusterConfigFromPath(configPath) + if err != nil { + return err + } + + connectOpts, err := BaseClusterConnectOptionsFromEnv(config.E2ETempDirEffective()) + if err != nil { + return err + } + base, err := ConnectToCluster(ctx, connectOpts) + if err != nil { + return fmt.Errorf("discover: connect to base cluster: %w", err) + } + defer base.SSHClient.Close() + if base.TunnelInfo != nil && base.TunnelInfo.StopFunc != nil { + defer base.TunnelInfo.StopFunc() + } + + virtCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + virtClient, err := virtualization.NewClient(virtCtx, base.Kubeconfig) + if err != nil { + return err + } + + vms, err := virtClient.VirtualMachines().List(virtCtx, namespace) + if err != nil { + return err + } + if len(vms) == 0 { + return fmt.Errorf("no VMs in namespace %s", namespace) + } + + state, err := buildResumeStateFromVMs(vms, namespace, clusterDef) + if err != nil { + return err + } + for i, h := range state.MasterHostnames { + if i < len(clusterDef.Masters) { + clusterDef.Masters[i].Hostname = h + } + } + for i, h := range state.WorkerHostnames { + if i < len(clusterDef.Workers) { + clusterDef.Workers[i].Hostname = h + } + } + + if err := saveClusterState(state); err != nil { + return err + } + logger.Info("Discovered %d existing VMs in %s; wrote resume state (setup=%s)", + len(state.VMNames), namespace, state.SetupVMName) + return nil +} + +func buildResumeStateFromVMs(vms []v1alpha2.VirtualMachine, namespace string, clusterDef *config.ClusterDefinition) (*resumeState, error) { + setupPrefix := strings.TrimSuffix(config.DefaultSetupVM.Hostname, "-") + if setupPrefix == "" { + setupPrefix = "bootstrap-node-" + } + setupName, _, ok := pickNewestVMByPrefix(vms, setupPrefix) + if !ok { + return nil, fmt.Errorf("no bootstrap VM with prefix %q", setupPrefix) + } + + masterHostnames := make([]string, 0, len(clusterDef.Masters)) + for _, m := range clusterDef.Masters { + name, _, found := pickNewestVMByPrefix(vms, roleNamePrefix(m.Hostname)) + if !found { + return nil, fmt.Errorf("no VM with prefix %q in namespace", roleNamePrefix(m.Hostname)) + } + masterHostnames = append(masterHostnames, name) + } + + workerHostnames := make([]string, 0, len(clusterDef.Workers)) + for _, w := range clusterDef.Workers { + name, _, found := pickNewestVMByPrefix(vms, roleNamePrefix(w.Hostname)) + if !found { + return nil, fmt.Errorf("no VM with prefix %q in namespace", roleNamePrefix(w.Hostname)) + } + workerHostnames = append(workerHostnames, name) + } + + vmNames := append(append([]string{}, masterHostnames...), workerHostnames...) + vmNames = append(vmNames, setupName) + sort.Strings(vmNames) + + firstMasterIP := "" + if len(masterHostnames) > 0 { + for _, vm := range vms { + if vm.Name == masterHostnames[0] { + firstMasterIP = vm.Status.IPAddress + break + } + } + } + + return &resumeState{ + FirstMasterIP: firstMasterIP, + Namespace: namespace, + VMNames: vmNames, + SetupVMName: setupName, + MasterHostnames: masterHostnames, + WorkerHostnames: workerHostnames, + }, nil +} + +// ResolveNewestMasterIP lists VMs in namespace and returns the IP of the newest Running master VM +// (name prefix master-1- by default). Used to fix stale session.test_ssh_host after cache restore. +func ResolveNewestMasterIP(ctx context.Context, namespace, masterRolePrefix string) (ip, vmName string, err error) { + if namespace == "" { + return "", "", fmt.Errorf("namespace is required") + } + if masterRolePrefix == "" { + masterRolePrefix = "master-1-" + } + connectOpts, err := BaseClusterConnectOptionsFromEnv(config.E2ETempDirEffective()) + if err != nil { + return "", "", err + } + base, err := ConnectToCluster(ctx, connectOpts) + if err != nil { + return "", "", fmt.Errorf("resolve master: connect to base cluster: %w", err) + } + defer base.SSHClient.Close() + if base.TunnelInfo != nil && base.TunnelInfo.StopFunc != nil { + defer base.TunnelInfo.StopFunc() + } + + virtCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + virtClient, err := virtualization.NewClient(virtCtx, base.Kubeconfig) + if err != nil { + return "", "", err + } + vms, err := virtClient.VirtualMachines().List(virtCtx, namespace) + if err != nil { + return "", "", err + } + name, _, ok := pickNewestVMByPrefix(vms, masterRolePrefix) + if !ok { + return "", "", fmt.Errorf("no master VM with prefix %q in namespace %s", masterRolePrefix, namespace) + } + for _, vm := range vms { + if vm.Name != name { + continue + } + if vm.Status.Phase != v1alpha2.MachineRunning { + return "", name, fmt.Errorf("master VM %s is not Running (phase=%s)", name, vm.Status.Phase) + } + if vm.Status.IPAddress == "" { + return "", name, fmt.Errorf("master VM %s has no IPAddress", name) + } + return vm.Status.IPAddress, name, nil + } + return "", name, fmt.Errorf("master VM %s not found after list", name) +} + +func pickNewestVMByPrefix(vms []v1alpha2.VirtualMachine, prefix string) (string, time.Time, bool) { + var name string + var newest time.Time + for _, vm := range vms { + if !strings.HasPrefix(vm.Name, prefix) { + continue + } + created := vm.CreationTimestamp.Time + if name == "" || created.After(newest) { + name = vm.Name + newest = created + } + } + return name, newest, name != "" +} + +func roleNamePrefix(hostname string) string { + return hostname + "-" +} diff --git a/pkg/cluster/resume_helpers.go b/pkg/cluster/resume_helpers.go new file mode 100644 index 0000000..b1572c1 --- /dev/null +++ b/pkg/cluster/resume_helpers.go @@ -0,0 +1,78 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/internal/kubernetes/virtualization" + "github.com/deckhouse/storage-e2e/internal/logger" + v1alpha2 "github.com/deckhouse/virtualization/api/core/v1alpha2" +) + +func waitForVMsRunning(ctx context.Context, virtClient *virtualization.Client, namespace string, vmNames []string) error { + vmWaitCtx, cancel := context.WithTimeout(ctx, config.VMsRunningTimeout) + defer cancel() + + var wg sync.WaitGroup + var mu sync.Mutex + errChan := make(chan error, len(vmNames)) + + for i, vmName := range vmNames { + wg.Add(1) + go func(index int, name string) { + defer wg.Done() + + mu.Lock() + logger.Progress("Waiting for VM %d/%d: %s", index+1, len(vmNames), name) + mu.Unlock() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-vmWaitCtx.Done(): + errChan <- fmt.Errorf("timeout waiting for VM %s to become Running", name) + return + case <-ticker.C: + vm, err := virtClient.VirtualMachines().Get(vmWaitCtx, namespace, name) + if err != nil { + continue + } + if vm.Status.Phase == v1alpha2.MachineRunning { + mu.Lock() + logger.Success("VM %s is Running", name) + mu.Unlock() + return + } + } + } + }(i, vmName) + } + + wg.Wait() + close(errChan) + if len(errChan) > 0 { + return <-errChan + } + return nil +} diff --git a/pkg/cluster/vm_resources.go b/pkg/cluster/vm_resources.go new file mode 100644 index 0000000..d6e6aa1 --- /dev/null +++ b/pkg/cluster/vm_resources.go @@ -0,0 +1,67 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "context" + "fmt" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/internal/logger" + "github.com/deckhouse/storage-e2e/pkg/kubernetes" +) + +// PopulateVMResourcesForExistingCluster sets VMResources on a connected test cluster. +// Uses cluster-state.json when present; otherwise intersects Running guest VMs with K8s nodes. +func PopulateVMResourcesForExistingCluster(ctx context.Context, res *TestClusterResources) error { + if res == nil || res.Kubeconfig == nil { + return nil + } + if res.VMResources != nil && len(res.VMResources.VMNames) > 0 { + return nil + } + + namespace := config.TestClusterNamespace + var vmNames []string + var setupVMName string + + if state, err := LoadClusterStateFile(); err == nil && len(state.VMNames) > 0 { + namespace = state.Namespace + if namespace == "" { + namespace = config.TestClusterNamespace + } + vmNames = state.VMNames + setupVMName = state.SetupVMName + logger.Info("VMResources from cluster-state: %d VMs in %s", len(vmNames), namespace) + } else if res.BaseKubeconfig != nil && namespace != "" { + names, err := kubernetes.ListVirtualMachineNamesOnClusterNodes(ctx, res.BaseKubeconfig, res.Kubeconfig, namespace) + if err != nil { + return fmt.Errorf("discover cluster node VMs in %s: %w", namespace, err) + } + vmNames = names + logger.Info("VMResources from K8s nodes ∩ Running VMs: %v in %s", vmNames, namespace) + } else { + return nil + } + + res.VMResources = &VMResources{ + Namespace: namespace, + VMNames: vmNames, + SetupVMName: setupVMName, + } + return nil +} diff --git a/pkg/kubernetes/virtualdisk.go b/pkg/kubernetes/virtualdisk.go index d298578..9cf63b8 100644 --- a/pkg/kubernetes/virtualdisk.go +++ b/pkg/kubernetes/virtualdisk.go @@ -19,6 +19,7 @@ package kubernetes import ( "context" "fmt" + "strings" "time" "k8s.io/apimachinery/pkg/api/resource" @@ -238,8 +239,8 @@ func WaitForVirtualDiskAttached(ctx context.Context, kubeconfig *rest.Config, na } } -// ListVirtualMachineNames returns names of VirtualMachines in the given namespace. -// Used to pick a VM when attaching a VirtualDisk (e.g. in alwaysUseExisting mode). +// ListVirtualMachineNames returns names of all VirtualMachines in the given namespace. +// For e2e disk attach prefer ListVirtualMachineNamesOnClusterNodes so orphan guest VMs are excluded. func ListVirtualMachineNames(ctx context.Context, kubeconfig *rest.Config, namespace string) ([]string, error) { virtClient, err := virtualization.NewClient(ctx, kubeconfig) if err != nil { @@ -256,6 +257,54 @@ func ListVirtualMachineNames(ctx context.Context, kubeconfig *rest.Config, names return names, nil } +// ListVirtualMachineNamesOnClusterNodes returns Running non-bootstrap guest VMs that are registered Kubernetes nodes. +// PR namespaces may contain orphan VMs from failed reprovisions; attaching disks to those VMs never creates BlockDevices. +func ListVirtualMachineNamesOnClusterNodes(ctx context.Context, baseKube, testKube *rest.Config, namespace string) ([]string, error) { + if baseKube == nil || testKube == nil { + return nil, fmt.Errorf("base and test kubeconfig are required") + } + if namespace == "" { + return nil, fmt.Errorf("namespace is required") + } + + virtClient, err := virtualization.NewClient(ctx, baseKube) + if err != nil { + return nil, fmt.Errorf("create virtualization client: %w", err) + } + vmList, err := virtClient.VirtualMachines().List(ctx, namespace) + if err != nil { + return nil, fmt.Errorf("list VirtualMachines in %s: %w", namespace, err) + } + + nodes, err := GetNodes(ctx, testKube) + if err != nil { + return nil, fmt.Errorf("list Kubernetes nodes: %w", err) + } + nodeNames := make(map[string]struct{}, len(nodes)) + for i := range nodes { + nodeNames[nodes[i].Name] = struct{}{} + } + + var names []string + for i := range vmList { + vm := vmList[i] + if vm.Status.Phase != v1alpha2.MachineRunning { + continue + } + if strings.HasPrefix(vm.Name, "bootstrap-node-") { + continue + } + if _, ok := nodeNames[vm.Name]; !ok { + continue + } + names = append(names, vm.Name) + } + if len(names) == 0 { + return nil, fmt.Errorf("no Running guest VMs in %s match Kubernetes nodes (clean up orphan VMs?)", namespace) + } + return names, nil +} + // GetVMIPFromBaseCluster returns the IP address of a VirtualMachine in the base cluster (namespace). // Used to SSH to the VM (e.g. cloud@ip) from the jump host to run lsblk on nested nodes. func GetVMIPFromBaseCluster(ctx context.Context, baseKubeconfig *rest.Config, namespace, vmName string) (string, error) { diff --git a/pkg/provider/artifact.go b/pkg/provider/artifact.go new file mode 100644 index 0000000..9c97ca8 --- /dev/null +++ b/pkg/provider/artifact.go @@ -0,0 +1,140 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/pkg/cluster" +) + +const clusterStateFileName = "cluster-state.json" + +// CacheKey returns the actions/cache key for a PR-scoped E2E session. +func CacheKey(prNumber string) string { + return fmt.Sprintf("e2e-pr-%s-session", prNumber) +} + +// KubeconfigArtifactPath returns the path to a kubeconfig file stored in the artifact dir. +func KubeconfigArtifactPath(artifactDir, originalPath string) string { + if originalPath == "" { + return "" + } + return filepath.Join(artifactDir, filepath.Base(originalPath)) +} + +// ClusterStateArtifactPath returns cluster-state.json inside the artifact dir. +func ClusterStateArtifactPath(artifactDir string) string { + return filepath.Join(artifactDir, clusterStateFileName) +} + +// RestoreClusterStateToRuntime copies cached cluster-state.json into /tmp/e2e for resume. +func RestoreClusterStateToRuntime(artifactDir string) error { + src := ClusterStateArtifactPath(artifactDir) + data, err := os.ReadFile(src) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + runtimePath := cluster.ClusterStatePath() + if err := os.MkdirAll(filepath.Dir(runtimePath), 0o755); err != nil { + return err + } + return os.WriteFile(runtimePath, data, 0o600) +} + +// SyncClusterStateToArtifact copies runtime cluster-state.json into the artifact dir. +func SyncClusterStateToArtifact(artifactDir string) error { + src := cluster.ClusterStatePath() + data, err := os.ReadFile(src) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + if err := os.MkdirAll(artifactDir, 0o755); err != nil { + return err + } + return os.WriteFile(ClusterStateArtifactPath(artifactDir), data, 0o600) +} + +// MarkSessionTornDown writes a tombstone session after successful teardown. +func MarkSessionTornDown(artifactDir string, session *Session) error { + if session == nil { + session = &Session{} + } + session.Status = SessionStatusTornDown + session.KubeconfigPath = "" + session.VMNames = nil + return SaveSession(artifactDir, session) +} + +// RestoreKubeconfigFromArtifact updates session kubeconfig path after cache restore. +func RestoreKubeconfigFromArtifact(artifactDir string, session *Session) { + if session == nil || session.KubeconfigPath == "" { + return + } + candidate := KubeconfigArtifactPath(artifactDir, session.KubeconfigPath) + if _, err := os.Stat(candidate); err == nil { + session.KubeconfigPath = candidate + } +} + +// NamespaceForPR builds a stable test namespace for a pull request. +func NamespaceForPR(moduleSlug, prNumber string) string { + if moduleSlug == "" { + moduleSlug = "module" + } + return fmt.Sprintf("e2e-%s-pr%s", moduleSlug, prNumber) +} + +func copyFileToArtifact(src, artifactDir string) error { + data, err := os.ReadFile(src) + if err != nil { + return err + } + dst := filepath.Join(artifactDir, filepath.Base(src)) + return os.WriteFile(dst, data, 0o600) +} + +func sessionFromPersistedStateName(name Name, opts SetupOptions) (*Session, error) { + state, err := cluster.LoadClusterStateFile() + if err != nil { + return nil, err + } + s := &Session{ + Provider: string(name), + PRNumber: opts.PRNumber, + Namespace: state.Namespace, + StorageClass: config.TestClusterStorageClass, + TestSSHHost: state.FirstMasterIP, + TestSSHUser: config.VMSSHUserDefaultValue, + VMNames: state.VMNames, + SetupVMName: state.SetupVMName, + Status: SessionStatusPartial, + } + if config.VMSSHUser != "" { + s.TestSSHUser = config.VMSSHUser + } + return s, nil +} diff --git a/pkg/provider/ensure.go b/pkg/provider/ensure.go new file mode 100644 index 0000000..c38a37a --- /dev/null +++ b/pkg/provider/ensure.go @@ -0,0 +1,119 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "context" + "fmt" + "os" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/internal/logger" + "github.com/deckhouse/storage-e2e/pkg/cluster" +) + +// EnsureCluster provisions or reuses a cluster for a stable PR-scoped session. +// It never tears down on failure — partial state is persisted for TEST_CLUSTER_RESUME. +func EnsureCluster(ctx context.Context, p ClusterProvider, opts SetupOptions) (*Session, error) { + if opts.ArtifactDir == "" { + return nil, fmt.Errorf("artifact dir is required") + } + // Drop actions/cache artifacts when the PR namespace has no live cluster VMs (must run before resume). + purgeStaleCacheIfNamespaceEmpty(ctx, opts.ArtifactDir) + if err := prepareResume(ctx, opts); err != nil { + return nil, err + } + + if opts.SkipIfReady { + if session, err := LoadSession(opts.ArtifactDir); err == nil { + RestoreKubeconfigFromArtifact(opts.ArtifactDir, session) + if session.Status == SessionStatusTornDown { + logger.Info("Previous session was torn down; provisioning a new cluster") + ClearStaleArtifacts(opts.ArtifactDir) + } else if err := VerifySessionReusable(ctx, session); err == nil { + if session.Status == SessionStatusPartial { + logger.Info("Partial session is healthy; upgrading to ready and reusing cluster") + } else { + logger.Info("Reusing existing cluster (session status=%s)", session.Status) + } + session.Status = SessionStatusReady + if err := SaveSession(opts.ArtifactDir, session); err != nil { + return nil, err + } + return session, nil + } else if shouldResumeNotReprovision(opts.ArtifactDir) { + logger.Info("Session not reusable (%v) but cluster-state has VMs — resuming, not reprovisioning", err) + } else { + logger.Info("Session not reusable (%v); clearing stale cache and provisioning", err) + ClearStaleArtifacts(opts.ArtifactDir) + if opts.ReuseOnly { + return nil, fmt.Errorf("reuse-only: session not reusable for PR %s: %w", + opts.PRNumber, err) + } + } + } else if _, err := cluster.LoadClusterStateFile(); err == nil { + logger.Info("No session cache for PR %s but cluster-state/VMs found in %s; will resume or reconnect", + opts.PRNumber, config.TestClusterNamespace) + } else if opts.ReuseOnly { + return nil, fmt.Errorf("reuse-only: no session in %s for PR %s", opts.ArtifactDir, opts.PRNumber) + } + } + + if opts.ReuseOnly { + return nil, fmt.Errorf("reuse-only: no reusable session for PR %s", opts.PRNumber) + } + + if _, err := cluster.LoadClusterStateFile(); err == nil { + _ = os.Setenv("TEST_CLUSTER_RESUME", "true") + logger.Info("Cluster state found on disk; enabling TEST_CLUSTER_RESUME") + } + + session, resources, setupErr := p.Setup(ctx, opts) + if session != nil { + session.PRNumber = opts.PRNumber + if setupErr == nil { + session.Status = SessionStatusReady + } else if session.Status == "" { + session.Status = SessionStatusPartial + } + if saveErr := persistSession(opts.ArtifactDir, session, resources); saveErr != nil { + return session, fmt.Errorf("save session: %w (setup err: %v)", saveErr, setupErr) + } + } else if setupErr != nil { + if partial, partialErr := sessionFromPersistedStateName(opts.ProviderName, opts); partialErr == nil { + partial.PRNumber = opts.PRNumber + _ = persistSession(opts.ArtifactDir, partial, nil) + } + return nil, setupErr + } + + return session, setupErr +} + +func persistSession(artifactDir string, session *Session, resources *cluster.TestClusterResources) error { + if err := SyncClusterStateToArtifact(artifactDir); err != nil { + logger.Warn("Failed to sync cluster-state to artifact dir: %v", err) + } + if session.KubeconfigPath != "" { + if err := copyFileToArtifact(session.KubeconfigPath, artifactDir); err != nil { + logger.Warn("Failed to copy kubeconfig to artifact dir: %v", err) + } + session.KubeconfigPath = KubeconfigArtifactPath(artifactDir, session.KubeconfigPath) + } + _ = resources + return SaveSession(artifactDir, session) +} diff --git a/pkg/provider/envfile.go b/pkg/provider/envfile.go new file mode 100644 index 0000000..e1a44fe --- /dev/null +++ b/pkg/provider/envfile.go @@ -0,0 +1,75 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "os" + "strings" +) + +// ApplyEnvFileKeys loads export KEY=VALUE lines from path for the given keys when unset in the process env. +func ApplyEnvFileKeys(path string, keys ...string) error { + if len(keys) == 0 { + return nil + } + want := make(map[string]struct{}, len(keys)) + for _, k := range keys { + if k == "" { + continue + } + if strings.TrimSpace(os.Getenv(k)) != "" { + continue + } + want[k] = struct{}{} + } + if len(want) == 0 { + return nil + } + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + for _, line := range strings.Split(string(data), "\n") { + line = strings.TrimSpace(line) + if line == "" || strings.HasPrefix(line, "#") || !strings.HasPrefix(line, "export ") { + continue + } + line = strings.TrimPrefix(line, "export ") + k, v, ok := strings.Cut(line, "=") + if !ok { + continue + } + if _, ok := want[k]; !ok { + continue + } + v = strings.Trim(v, `"`) + if v == "" { + continue + } + if err := os.Setenv(k, v); err != nil { + return err + } + delete(want, k) + if len(want) == 0 { + break + } + } + return nil +} diff --git a/pkg/provider/health.go b/pkg/provider/health.go new file mode 100644 index 0000000..d7e2bbd --- /dev/null +++ b/pkg/provider/health.go @@ -0,0 +1,58 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "context" + "fmt" + "os" + + "github.com/deckhouse/storage-e2e/pkg/cluster" +) + +const ( + SessionStatusReady = "ready" + SessionStatusPartial = "partial" + SessionStatusTornDown = "torn_down" +) + +// VerifySessionHealth reconnects using the session and checks cluster API health. +func VerifySessionHealth(ctx context.Context, session *Session) error { + if session == nil { + return fmt.Errorf("session is nil") + } + if session.Status == SessionStatusTornDown { + return fmt.Errorf("session was torn down") + } + if session.KubeconfigPath != "" { + if _, err := os.Stat(session.KubeconfigPath); err != nil { + return fmt.Errorf("kubeconfig missing: %w", err) + } + } + resources, err := ReconnectFromSession(ctx, session) + if err != nil { + return fmt.Errorf("reconnect: %w", err) + } + defer func() { + if resources != nil { + _ = cluster.CleanupExistingCluster(ctx, resources) + } + }() + return cluster.CheckClusterHealth(ctx, resources.Kubeconfig, cluster.CheckClusterHealthOptions{ + CheckBootstrapSecrets: false, + }) +} diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go new file mode 100644 index 0000000..7e3f3df --- /dev/null +++ b/pkg/provider/provider.go @@ -0,0 +1,176 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package provider implements cluster lifecycle for CI and local runs via a single SDK entrypoint. +package provider + +import ( + "context" + "fmt" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/pkg/cluster" +) + +// Name selects how the test cluster is provisioned. Matches TEST_CLUSTER_CREATE_MODE values. +type Name string + +const ( + NameAlwaysCreateNew Name = config.ClusterCreateModeAlwaysCreateNew + NameAlwaysUseExisting Name = config.ClusterCreateModeAlwaysUseExisting + NameCommander Name = config.ClusterCreateModeCommander +) + +// ClusterProvider provisions and tears down a test cluster without shell/YAML orchestration. +type ClusterProvider interface { + Setup(ctx context.Context, opts SetupOptions) (*Session, *cluster.TestClusterResources, error) + Teardown(ctx context.Context, session *Session, resources *cluster.TestClusterResources) error +} + +// SetupOptions configures cluster provisioning. +type SetupOptions struct { + ProviderName Name + ClusterConfigPath string + ArtifactDir string + PRNumber string + SkipIfReady bool + ReuseOnly bool +} + +// New returns a ClusterProvider for the given name. +func New(name Name) (ClusterProvider, error) { + switch name { + case NameAlwaysCreateNew, NameAlwaysUseExisting, NameCommander: + return &provider{name: name}, nil + default: + return nil, fmt.Errorf("unknown cluster provider %q (expected %q, %q, or %q)", + name, NameAlwaysCreateNew, NameAlwaysUseExisting, NameCommander) + } +} + +// ParseName validates and parses a provider name from workflow input or CLI flag. +func ParseName(s string) (Name, error) { + n := Name(s) + switch n { + case NameAlwaysCreateNew, NameAlwaysUseExisting, NameCommander: + return n, nil + default: + return "", fmt.Errorf("invalid cluster provider %q", s) + } +} + +type provider struct { + name Name +} + +func (p *provider) Setup(ctx context.Context, opts SetupOptions) (*Session, *cluster.TestClusterResources, error) { + switch p.name { + case NameAlwaysCreateNew: + if opts.ClusterConfigPath == "" { + return nil, nil, fmt.Errorf("cluster config path is required for provider %s", p.name) + } + resources, err := cluster.CreateTestClusterFromConfig(ctx, opts.ClusterConfigPath) + if err != nil { + session, _ := sessionFromPersistedStateName(p.name, opts) + if session != nil { + return session, nil, err + } + return nil, nil, err + } + if err := cluster.WaitForTestClusterReady(ctx, resources); err != nil { + session, sErr := sessionFromResources(p.name, resources) + if sErr == nil && session != nil { + session.Status = SessionStatusPartial + } + return session, resources, err + } + session, err := sessionFromResources(p.name, resources) + if err != nil { + return nil, nil, err + } + return session, resources, nil + + case NameAlwaysUseExisting: + resources, err := cluster.UseExistingCluster(ctx) + if err != nil { + return nil, nil, err + } + session, err := sessionFromResources(p.name, resources) + if err != nil { + _ = cluster.CleanupExistingCluster(ctx, resources) + return nil, nil, err + } + return session, resources, nil + + case NameCommander: + cmdRes, err := cluster.UseCommanderCluster(ctx) + if err != nil { + return nil, nil, err + } + cluster.SetCommanderResources(cmdRes) + resources := cmdRes.TestClusterResources + if err := cluster.WaitForTestClusterReady(ctx, resources); err != nil { + _ = cluster.CleanupCommanderCluster(ctx, cmdRes) + cluster.ClearCommanderResources() + return nil, nil, err + } + session, err := sessionFromCommander(p.name, cmdRes, resources) + if err != nil { + _ = cluster.CleanupCommanderCluster(ctx, cmdRes) + cluster.ClearCommanderResources() + return nil, nil, err + } + return session, resources, nil + + default: + return nil, nil, fmt.Errorf("unsupported provider %q", p.name) + } +} + +func (p *provider) Teardown(ctx context.Context, session *Session, resources *cluster.TestClusterResources) error { + if session == nil { + return fmt.Errorf("session is nil") + } + if resources == nil { + var err error + resources, err = ReconnectFromSession(ctx, session) + if err != nil { + return err + } + } + + switch Name(session.Provider) { + case NameAlwaysUseExisting: + return cluster.CleanupExistingCluster(ctx, resources) + case NameCommander: + cmdRes := cluster.GetCommanderResources() + if cmdRes == nil && session.CommanderClusterName != "" { + cmdRes = &cluster.CommanderClusterResources{ + ClusterName: session.CommanderClusterName, + CreatedByUs: session.CommanderCreatedByUs, + TestClusterResources: resources, + } + } + if cmdRes != nil { + err := cluster.CleanupCommanderCluster(ctx, cmdRes) + cluster.ClearCommanderResources() + return err + } + return cluster.CleanupExistingCluster(ctx, resources) + default: + return cluster.CleanupTestCluster(ctx, resources) + } +} diff --git a/pkg/provider/reconnect.go b/pkg/provider/reconnect.go new file mode 100644 index 0000000..078f2fd --- /dev/null +++ b/pkg/provider/reconnect.go @@ -0,0 +1,142 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/pkg/cluster" +) + +// ReconnectFromSession rebuilds TestClusterResources for teardown after CI artifacts are restored. +func ReconnectFromSession(ctx context.Context, session *Session) (*cluster.TestClusterResources, error) { + if session == nil { + return nil, fmt.Errorf("session is nil") + } + + switch Name(session.Provider) { + case NameAlwaysUseExisting, NameCommander: + applyRunSSH(session) + return cluster.UseExistingCluster(ctx) + default: + return reconnectCreatedCluster(ctx, session) + } +} + +func applyRunSSH(session *Session) { + if session.TestSSHHost != "" { + _ = os.Setenv("SSH_HOST", session.TestSSHHost) + } + if session.TestSSHUser != "" { + _ = os.Setenv("SSH_USER", session.TestSSHUser) + } + if session.KubeconfigPath != "" { + _ = os.Setenv("KUBE_CONFIG_PATH", session.KubeconfigPath) + } +} + +func reconnectCreatedCluster(ctx context.Context, session *Session) (*cluster.TestClusterResources, error) { + baseOpts, err := cluster.BaseClusterConnectOptionsFromEnv(config.E2ETempDir) + if err != nil { + return nil, err + } + baseRes, err := cluster.ConnectToCluster(ctx, baseOpts) + if err != nil { + return nil, fmt.Errorf("reconnect base cluster: %w", err) + } + + testHost := session.TestSSHHost + if testHost == "" { + return nil, fmt.Errorf("session missing test_ssh_host for teardown") + } + testUser := session.TestSSHUser + if testUser == "" { + testUser = config.VMSSHUserDefaultValue + } + + sshKeyPath, err := expandPath(config.SSHPrivateKey) + if err != nil { + baseRes.SSHClient.Close() + return nil, err + } + + testOpts := cluster.ConnectClusterOptions{ + SSHUser: baseOpts.SSHUser, + SSHHost: baseOpts.SSHHost, + SSHKeyPath: baseOpts.SSHKeyPath, + UseJumpHost: baseOpts.UseJumpHost, + TargetUser: testUser, + TargetHost: testHost, + TargetKeyPath: sshKeyPath, + KubeconfigOutputDir: config.E2ETempDir, + } + if baseOpts.UseJumpHost { + testOpts = cluster.ConnectClusterOptions{ + SSHUser: baseOpts.SSHUser, + SSHHost: baseOpts.SSHHost, + SSHKeyPath: baseOpts.SSHKeyPath, + UseJumpHost: true, + TargetUser: testUser, + TargetHost: testHost, + TargetKeyPath: sshKeyPath, + KubeconfigOutputDir: config.E2ETempDir, + } + } + + testRes, err := cluster.ConnectToCluster(ctx, testOpts) + if err != nil { + baseRes.SSHClient.Close() + return nil, fmt.Errorf("reconnect test cluster: %w", err) + } + + testRes.BaseClusterClient = baseRes.SSHClient + testRes.BaseKubeconfig = baseRes.Kubeconfig + testRes.BaseKubeconfigPath = baseRes.KubeconfigPath + testRes.BaseTunnelInfo = baseRes.TunnelInfo + + if session.KubeconfigPath != "" { + testRes.KubeconfigPath = session.KubeconfigPath + } + + if len(session.VMNames) > 0 { + testRes.VMResources = &cluster.VMResources{ + Namespace: session.Namespace, + VMNames: session.VMNames, + SetupVMName: session.SetupVMName, + } + } + + return testRes, nil +} + +func expandPath(path string) (string, error) { + if path == "" { + path = config.SSHPrivateKeyDefaultValue + } + if len(path) >= 2 && path[:2] == "~/" { + home, err := os.UserHomeDir() + if err != nil { + return "", err + } + path = filepath.Join(home, path[2:]) + } + return filepath.Clean(path), nil +} diff --git a/pkg/provider/resume.go b/pkg/provider/resume.go new file mode 100644 index 0000000..2f1edfa --- /dev/null +++ b/pkg/provider/resume.go @@ -0,0 +1,133 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "context" + "encoding/json" + "os" + "strings" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/internal/logger" + "github.com/deckhouse/storage-e2e/pkg/cluster" +) + +func prepareResume(ctx context.Context, opts SetupOptions) error { + if err := RestoreClusterStateToRuntime(opts.ArtifactDir); err != nil { + return err + } + if state, err := cluster.LoadClusterStateFile(); err == nil { + ns := state.Namespace + if ns == "" { + ns = config.TestClusterNamespace + } + active, checkErr := cluster.NamespaceHasActiveCluster(ctx, ns) + if checkErr != nil { + logger.Warn("Could not verify VMs for cached cluster-state in %s: %v", ns, checkErr) + } else if !active { + if len(state.VMNames) > 0 { + logger.Info("cluster-state has VMs in %s; keeping resume state (virt API check inconclusive)", ns) + return nil + } + logger.Info("Cached cluster-state for %s but no running VMs — clearing resume state", ns) + ClearStaleArtifacts(opts.ArtifactDir) + } else { + return nil + } + } + + if session, err := LoadSession(opts.ArtifactDir); err == nil { + ns := session.Namespace + if ns == "" { + ns = config.TestClusterNamespace + } + active, checkErr := cluster.NamespaceHasActiveCluster(ctx, ns) + if checkErr != nil { + logger.Warn("Could not verify VMs for cached session in %s: %v", ns, checkErr) + } else if !active { + if len(session.VMNames) > 0 { + logger.Info("session lists VMs in %s; keeping cache (virt API check inconclusive)", ns) + if restored := writeClusterStateFromSession(opts.ArtifactDir, session); restored { + _ = RestoreClusterStateToRuntime(opts.ArtifactDir) + return nil + } + } else { + logger.Info("Cached session for %s but no running VMs — clearing stale session", ns) + ClearStaleArtifacts(opts.ArtifactDir) + } + } else if restored := writeClusterStateFromSession(opts.ArtifactDir, session); restored { + logger.Info("Restored cluster-state.json from session VM names") + _ = RestoreClusterStateToRuntime(opts.ArtifactDir) + return nil + } + } + + if opts.ClusterConfigPath == "" || config.TestClusterNamespace == "" { + return nil + } + if err := cluster.TryDiscoverClusterState(ctx, config.TestClusterNamespace, opts.ClusterConfigPath); err != nil { + logger.Info("Could not discover existing VMs in %s: %v", config.TestClusterNamespace, err) + return nil + } + _ = RestoreClusterStateToRuntime(opts.ArtifactDir) + return nil +} + +func writeClusterStateFromSession(artifactDir string, session *Session) bool { + if session == nil || len(session.VMNames) == 0 || session.SetupVMName == "" { + return false + } + masterHostnames, workerHostnames := splitVMNamesByRole(session.VMNames, session.SetupVMName) + state := cluster.ClusterStateSnapshot{ + Namespace: session.Namespace, + VMNames: session.VMNames, + SetupVMName: session.SetupVMName, + FirstMasterIP: session.TestSSHHost, + MasterHostnames: masterHostnames, + WorkerHostnames: workerHostnames, + } + if state.Namespace == "" { + state.Namespace = config.TestClusterNamespace + } + data, err := json.MarshalIndent(state, "", " ") + if err != nil { + return false + } + path := ClusterStateArtifactPath(artifactDir) + if err := os.MkdirAll(artifactDir, 0o755); err != nil { + return false + } + if err := os.WriteFile(path, data, 0o600); err != nil { + return false + } + return true +} + +func splitVMNamesByRole(vmNames []string, setupVMName string) (masters, workers []string) { + for _, name := range vmNames { + if name == setupVMName || strings.HasPrefix(name, "bootstrap-node-") { + continue + } + if strings.HasPrefix(name, "master-") { + masters = append(masters, name) + } else if strings.HasPrefix(name, "worker-") { + workers = append(workers, name) + } + } + return masters, workers +} diff --git a/pkg/provider/run_session.go b/pkg/provider/run_session.go new file mode 100644 index 0000000..f5780a4 --- /dev/null +++ b/pkg/provider/run_session.go @@ -0,0 +1,98 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/internal/logger" + "github.com/deckhouse/storage-e2e/pkg/cluster" +) + +// PrepareSessionForRunTests refreshes SSH target to the newest master VM and verifies the cluster is reachable. +// Call after cache restore in run-tests (skip_create_cluster path may carry a stale or wrong test_ssh_host). +func PrepareSessionForRunTests(ctx context.Context, artifactDir string, session *Session) error { + if session == nil { + return fmt.Errorf("session is nil") + } + config.EnsureBaseClusterEnv() + config.ReloadFromEnv() + + namespace := session.Namespace + if namespace == "" { + namespace = os.Getenv("TEST_CLUSTER_NAMESPACE") + } + + masterPrefix := "master-1-" + if len(session.VMNames) > 0 { + for _, name := range session.VMNames { + if strings.HasPrefix(name, "master-") { + if i := strings.LastIndex(name, "-"); i > 0 { + masterPrefix = name[:i+1] + break + } + } + } + } + + resolveCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + ip, vmName, err := cluster.ResolveNewestMasterIP(resolveCtx, namespace, masterPrefix) + if err != nil { + return fmt.Errorf("resolve master IP in namespace %s: %w (stale cache? re-run with labeled event)", namespace, err) + } + if session.TestSSHHost != ip { + logger.Info("Correcting SSH target: %s → master %s@%s (was %s)", + namespace, vmName, ip, session.TestSSHHost) + } + session.TestSSHHost = ip + + RestoreKubeconfigFromArtifact(artifactDir, session) + _ = ApplyEnvFileKeys(filepath.Join(artifactDir, "run-env.sh"), + "SSH_JUMP_HOST", "SSH_JUMP_USER", "E2E_TUNNEL_SSH_JUMP_HOST", "E2E_TUNNEL_SSH_JUMP_USER", + "E2E_BASE_SSH_HOST", "E2E_BASE_SSH_USER", "E2E_BASE_KUBE_CONFIG_PATH") + ApplySessionEnv(session) + + if session.TestSSHHost == "" { + return fmt.Errorf("session has no test_ssh_host for namespace %s", namespace) + } + config.SyncTestSSHJumpEnv(session.TestSSHHost) + jumpHost := config.EffectiveSSHJumpHostForTest(session.TestSSHHost) + if jumpHost == "" && os.Getenv("CI") != "" && config.NeedsBastionJump(session.TestSSHHost) { + return fmt.Errorf("SSH_JUMP_HOST is required in CI to reach test cluster nodes (SSH_HOST=%s, E2E_BASE_SSH_HOST=%s)", + session.TestSSHHost, config.EffectiveBaseSSHHost()) + } + + logger.Info("run-tests SSH target: %s@%s (jump=%s, namespace=%s)", + session.TestSSHUser, session.TestSSHHost, jumpHost, namespace) + + healthCtx, healthCancel := context.WithTimeout(ctx, 3*time.Minute) + defer healthCancel() + if err := VerifySessionReusable(healthCtx, session); err != nil { + return fmt.Errorf("cluster not reusable at SSH_HOST=%s (namespace=%s): %w", + session.TestSSHHost, namespace, err) + } + + session.Status = SessionStatusReady + return SaveSession(artifactDir, session) +} diff --git a/pkg/provider/session.go b/pkg/provider/session.go new file mode 100644 index 0000000..56dfc3e --- /dev/null +++ b/pkg/provider/session.go @@ -0,0 +1,259 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/pkg/cluster" +) + +const sessionFileName = "session.json" + +// Session is persisted between CI jobs (create-cluster → run-tests → teardown-cluster). +type Session struct { + Provider string `json:"provider"` + Status string `json:"status,omitempty"` + PRNumber string `json:"pr_number,omitempty"` + CreatedAt time.Time `json:"created_at"` + ArtifactDir string `json:"artifact_dir"` + + KubeconfigPath string `json:"kubeconfig_path"` + ClusterStatePath string `json:"cluster_state_path,omitempty"` + TestSSHHost string `json:"test_ssh_host,omitempty"` + TestSSHUser string `json:"test_ssh_user,omitempty"` + UseJumpHost bool `json:"use_jump_host,omitempty"` + Namespace string `json:"namespace,omitempty"` + StorageClass string `json:"storage_class,omitempty"` + SetupVMName string `json:"setup_vm_name,omitempty"` + VMNames []string `json:"vm_names,omitempty"` + + CommanderClusterName string `json:"commander_cluster_name,omitempty"` + CommanderCreatedByUs bool `json:"commander_created_by_us,omitempty"` +} + +// SessionPath returns the path to session.json inside artifactDir. +func SessionPath(artifactDir string) string { + return filepath.Join(artifactDir, sessionFileName) +} + +// SaveSession writes session.json under artifactDir. +func SaveSession(artifactDir string, session *Session) error { + if session == nil { + return fmt.Errorf("session is nil") + } + normalizeSession(session) + session.ArtifactDir = artifactDir + if err := os.MkdirAll(artifactDir, 0o755); err != nil { + return fmt.Errorf("create artifact dir: %w", err) + } + data, err := json.MarshalIndent(session, "", " ") + if err != nil { + return err + } + path := SessionPath(artifactDir) + if err := os.WriteFile(path, data, 0o600); err != nil { + return fmt.Errorf("write %s: %w", path, err) + } + return WriteRunEnv(artifactDir, session) +} + +// LoadSession reads session.json from artifactDir. +func LoadSession(artifactDir string) (*Session, error) { + data, err := os.ReadFile(SessionPath(artifactDir)) + if err != nil { + return nil, fmt.Errorf("read session: %w", err) + } + var session Session + if err := json.Unmarshal(data, &session); err != nil { + return nil, fmt.Errorf("parse session: %w", err) + } + return &session, nil +} + +// WriteRunEnv emits run-env.sh for the run-tests CI job (non-secret variables only). +func WriteRunEnv(artifactDir string, session *Session) error { + normalizeSession(session) + path := filepath.Join(artifactDir, "run-env.sh") + f, err := os.Create(path) + if err != nil { + return err + } + defer f.Close() + + write := func(k, v string) { + if v != "" { + _, _ = fmt.Fprintf(f, "export %s=%q\n", k, v) + } + } + + _, _ = fmt.Fprintln(f, "# Generated by storage-e2e — do not commit") + write("E2E_CLUSTER_PHASE", "run") + write("TEST_CLUSTER_CREATE_MODE", config.ClusterCreateModeAlwaysUseExisting) + baseHost := os.Getenv("E2E_BASE_SSH_HOST") + if baseHost == "" { + baseHost = os.Getenv("SSH_HOST") + } + baseUser := os.Getenv("E2E_BASE_SSH_USER") + if baseUser == "" { + baseUser = os.Getenv("SSH_USER") + } + write("E2E_BASE_SSH_HOST", baseHost) + write("E2E_BASE_SSH_USER", baseUser) + baseKubeconfig := os.Getenv("E2E_BASE_KUBE_CONFIG_PATH") + if baseKubeconfig == "" { + baseKubeconfig = os.Getenv("KUBE_CONFIG_PATH") + } + write("E2E_BASE_KUBE_CONFIG_PATH", baseKubeconfig) + kubeconfig := session.KubeconfigPath + if kubeconfig != "" { + if candidate := KubeconfigArtifactPath(artifactDir, kubeconfig); fileExists(candidate) { + kubeconfig = candidate + } + } + write("KUBE_CONFIG_PATH", kubeconfig) + write("SSH_HOST", session.TestSSHHost) + write("SSH_USER", session.TestSSHUser) + write("SSH_VM_USER", session.TestSSHUser) + write("SSH_JUMP_HOST", os.Getenv("SSH_JUMP_HOST")) + write("SSH_JUMP_USER", os.Getenv("SSH_JUMP_USER")) + write("E2E_TUNNEL_SSH_JUMP_HOST", os.Getenv("E2E_TUNNEL_SSH_JUMP_HOST")) + write("E2E_TUNNEL_SSH_JUMP_USER", os.Getenv("E2E_TUNNEL_SSH_JUMP_USER")) + if session.Namespace != "" { + write("TEST_CLUSTER_NAMESPACE", session.Namespace) + } + write("TEST_CLUSTER_STORAGE_CLASS", session.StorageClass) + return nil +} + +// ApplySessionEnv sets process env from persisted session fields (run-tests after cache restore). +func ApplySessionEnv(session *Session) { + if session == nil { + return + } + normalizeSession(session) + setenv := func(k, v string) { + if v != "" { + _ = os.Setenv(k, v) + } + } + if os.Getenv("E2E_BASE_SSH_HOST") == "" { + if base := config.EffectiveBaseSSHHost(); base != "" && base != session.TestSSHHost { + setenv("E2E_BASE_SSH_HOST", base) + } + } + if os.Getenv("E2E_BASE_SSH_USER") == "" { + if user := config.EffectiveBaseSSHUser(); user != "" { + setenv("E2E_BASE_SSH_USER", user) + } + } + if os.Getenv("E2E_BASE_KUBE_CONFIG_PATH") == "" { + if baseKC := config.EffectiveBaseKubeConfigPath(); baseKC != "" && baseKC != session.KubeconfigPath { + setenv("E2E_BASE_KUBE_CONFIG_PATH", baseKC) + } + } + setenv("TEST_CLUSTER_STORAGE_CLASS", session.StorageClass) + setenv("TEST_CLUSTER_NAMESPACE", session.Namespace) + setenv("SSH_HOST", session.TestSSHHost) + setenv("SSH_USER", session.TestSSHUser) + setenv("SSH_VM_USER", session.TestSSHUser) + config.SyncTestSSHJumpEnv(session.TestSSHHost) + setenv("TEST_CLUSTER_CREATE_MODE", config.ClusterCreateModeAlwaysUseExisting) +} + +func normalizeSession(session *Session) { + if session == nil { + return + } + if session.TestSSHUser == "" { + session.TestSSHUser = config.VMSSHUser + if session.TestSSHUser == "" { + session.TestSSHUser = config.VMSSHUserDefaultValue + } + } + if session.TestSSHHost == "" { + if state, err := cluster.LoadClusterStateFile(); err == nil && state.FirstMasterIP != "" { + session.TestSSHHost = state.FirstMasterIP + } + } + if session.StorageClass == "" { + session.StorageClass = os.Getenv("TEST_CLUSTER_STORAGE_CLASS") + } + if session.Namespace == "" { + session.Namespace = os.Getenv("TEST_CLUSTER_NAMESPACE") + } +} + +func fileExists(path string) bool { + _, err := os.Stat(path) + return err == nil +} + +func sessionFromResources(name Name, res *cluster.TestClusterResources) (*Session, error) { + if res == nil || res.KubeconfigPath == "" { + return nil, fmt.Errorf("cluster resources missing kubeconfig path") + } + s := &Session{ + Provider: string(name), + Status: SessionStatusReady, + CreatedAt: time.Now().UTC(), + KubeconfigPath: res.KubeconfigPath, + Namespace: config.TestClusterNamespace, + StorageClass: config.TestClusterStorageClass, + } + if res.ClusterDefinition != nil && len(res.ClusterDefinition.Masters) > 0 { + s.TestSSHHost = res.ClusterDefinition.Masters[0].IPAddress + } + if s.TestSSHHost == "" { + s.TestSSHHost = config.SSHHost + } + s.TestSSHUser = config.VMSSHUser + if s.TestSSHUser == "" { + s.TestSSHUser = config.VMSSHUserDefaultValue + } + s.UseJumpHost = config.SSHJumpHost != "" || config.SSHHost != s.TestSSHHost + + if state, err := cluster.LoadClusterStateFile(); err == nil && state != nil { + s.ClusterStatePath = cluster.ClusterStatePath() + s.SetupVMName = state.SetupVMName + s.VMNames = state.VMNames + if state.FirstMasterIP != "" { + s.TestSSHHost = state.FirstMasterIP + } + if state.Namespace != "" { + s.Namespace = state.Namespace + } + } + return s, nil +} + +func sessionFromCommander(name Name, cmd *cluster.CommanderClusterResources, res *cluster.TestClusterResources) (*Session, error) { + s, err := sessionFromResources(name, res) + if err != nil { + return nil, err + } + if cmd != nil { + s.CommanderClusterName = cmd.ClusterName + s.CommanderCreatedByUs = cmd.CreatedByUs + } + return s, nil +} diff --git a/pkg/provider/verify.go b/pkg/provider/verify.go new file mode 100644 index 0000000..4360a20 --- /dev/null +++ b/pkg/provider/verify.go @@ -0,0 +1,176 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package provider + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/deckhouse/storage-e2e/internal/config" + "github.com/deckhouse/storage-e2e/internal/logger" + "github.com/deckhouse/storage-e2e/pkg/cluster" +) + +// ClearStaleArtifacts removes cached session/state that does not match live cluster VMs. +func ClearStaleArtifacts(artifactDir string) { + for _, name := range []string{sessionFileName, clusterStateFileName, "run-env.sh"} { + _ = os.Remove(filepath.Join(artifactDir, name)) + } + entries, err := os.ReadDir(artifactDir) + if err != nil { + return + } + for _, e := range entries { + if e.IsDir() { + continue + } + if strings.HasPrefix(e.Name(), "kubeconfig") { + _ = os.Remove(filepath.Join(artifactDir, e.Name())) + } + } + _ = cluster.ClearPersistedClusterState() +} + +func sessionNamespace(session *Session) string { + if session != nil && session.Namespace != "" { + return session.Namespace + } + return config.TestClusterNamespace +} + +// VerifySessionReusable checks live VMs in the PR namespace and cluster API health before reuse. +func VerifySessionReusable(ctx context.Context, session *Session) error { + if session == nil { + return fmt.Errorf("session is nil") + } + if session.Status == SessionStatusTornDown { + return fmt.Errorf("session was torn down") + } + ns := sessionNamespace(session) + config.EnsureBaseClusterEnv() + config.ReloadFromEnv() + active, err := cluster.NamespaceHasActiveCluster(ctx, ns) + if err != nil { + if len(session.VMNames) > 0 { + if healthErr := VerifySessionHealth(ctx, session); healthErr == nil { + logger.Warn("Virt VM check failed for %s (%v) but test cluster API is healthy; reusing session", ns, err) + return nil + } + } + return fmt.Errorf("namespace %s VM check: %w", ns, err) + } + if !active { + if len(session.VMNames) > 0 { + if healthErr := VerifySessionHealth(ctx, session); healthErr == nil { + logger.Warn("No Running master VM visible in %s via virt API but test cluster API is healthy; reusing session", ns) + return nil + } + } + return fmt.Errorf("no running master VM in namespace %s", ns) + } + if len(session.VMNames) > 0 { + if err := cluster.VerifyVMNamesInNamespace(ctx, ns, session.VMNames); err != nil { + if healthErr := VerifySessionHealth(ctx, session); healthErr == nil { + logger.Warn("VM name check failed in %s (%v) but test cluster API is healthy; reusing session", ns, err) + return nil + } + return err + } + } + return VerifySessionHealth(ctx, session) +} + +// ValidateClusterReady ensures create-cluster finished with live VMs and a healthy API. +func ValidateClusterReady(ctx context.Context, session *Session) error { + if session == nil { + return fmt.Errorf("session is nil after create-cluster") + } + if session.Status != SessionStatusReady { + return fmt.Errorf("session status is %q, expected %q", session.Status, SessionStatusReady) + } + config.EnsureBaseClusterEnv() + config.ReloadFromEnv() + ns := sessionNamespace(session) + + waitCtx, cancel := context.WithTimeout(ctx, 3*time.Minute) + defer cancel() + + if len(session.VMNames) > 0 { + if err := cluster.WaitVMNamesRunning(waitCtx, ns, session.VMNames, 3*time.Minute); err == nil { + return VerifySessionHealth(ctx, session) + } else { + logger.Warn("VM name wait in %s: %v", ns, err) + } + } + if err := cluster.WaitNamespaceHasActiveCluster(waitCtx, ns, 3*time.Minute); err != nil { + if healthErr := VerifySessionHealth(ctx, session); healthErr == nil { + logger.Warn("Virt VM check failed (%v) but test cluster API is healthy; accepting session", err) + return nil + } + return err + } + return VerifySessionHealth(ctx, session) +} + +func artifactClusterStateHasVMs(artifactDir string) bool { + data, err := os.ReadFile(ClusterStateArtifactPath(artifactDir)) + if err != nil { + return false + } + var state cluster.ClusterStateSnapshot + if err := json.Unmarshal(data, &state); err != nil { + return false + } + return len(state.VMNames) > 0 +} + +func shouldResumeNotReprovision(artifactDir string) bool { + return cluster.ClusterStateHasVMs() || artifactClusterStateHasVMs(artifactDir) +} + +func purgeStaleCacheIfNamespaceEmpty(ctx context.Context, artifactDir string) { + ns := config.TestClusterNamespace + if ns == "" { + return + } + if shouldResumeNotReprovision(artifactDir) { + logger.Info("cluster-state lists VMs in %s — keeping session cache", ns) + return + } + config.EnsureBaseClusterEnv() + config.ReloadFromEnv() + active, err := cluster.NamespaceHasActiveCluster(ctx, ns) + if err != nil { + logger.Warn("Could not verify VMs in %s: %v (keeping cache)", ns, err) + return + } + if active { + return + } + if _, err := LoadSession(artifactDir); err == nil { + logger.Info("Namespace %s has no cluster VMs — clearing stale actions/cache session", ns) + ClearStaleArtifacts(artifactDir) + } else if _, err := os.Stat(ClusterStateArtifactPath(artifactDir)); err == nil { + logger.Info("Namespace %s has no cluster VMs — clearing stale cluster-state cache", ns) + ClearStaleArtifacts(artifactDir) + } +}