From 2b785c9870b46a18be5a829ead1f11a1591c164c Mon Sep 17 00:00:00 2001 From: Brandon Shearin Date: Mon, 15 Jun 2026 15:06:28 -0400 Subject: [PATCH 1/5] initial --- drivers/pg/driver.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/drivers/pg/driver.go b/drivers/pg/driver.go index 6f303521..d41c6e38 100644 --- a/drivers/pg/driver.go +++ b/drivers/pg/driver.go @@ -178,5 +178,18 @@ func (s *Driver) RefreshKinds(ctx context.Context) error { } func (s *Driver) OptimizeStorage(ctx context.Context) error { + conn, err := s.pool.Acquire(ctx) + if err != nil { + return fmt.Errorf("acquire connection for VACUUM: %w", err) + } + defer conn.Release() + + // VACUUM cannot run inside a transaction block, so it must be issued + // through the simple query protocol; pgx's default extended protocol + // wraps statements in an implicit transaction the server rejects. + if _, err := conn.Exec(ctx, "VACUUM (ANALYZE)", pgx.QueryExecModeSimpleProtocol); err != nil { + return fmt.Errorf("VACUUM (ANALYZE): %w", err) + } + return nil } From ac6d2a71bbe0f497f45097268a573848ae61e889 Mon Sep 17 00:00:00 2001 From: Brandon Shearin Date: Mon, 15 Jun 2026 15:16:29 -0400 Subject: [PATCH 2/5] specify node and edge parent tables --- drivers/pg/driver.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/drivers/pg/driver.go b/drivers/pg/driver.go index d41c6e38..f103d5ca 100644 --- a/drivers/pg/driver.go +++ b/drivers/pg/driver.go @@ -184,11 +184,9 @@ func (s *Driver) OptimizeStorage(ctx context.Context) error { } defer conn.Release() - // VACUUM cannot run inside a transaction block, so it must be issued - // through the simple query protocol; pgx's default extended protocol - // wraps statements in an implicit transaction the server rejects. - if _, err := conn.Exec(ctx, "VACUUM (ANALYZE)", pgx.QueryExecModeSimpleProtocol); err != nil { - return fmt.Errorf("VACUUM (ANALYZE): %w", err) + // Targeting the partitioned parent tables cascades to every partition; + if _, err := conn.Exec(ctx, "VACUUM (ANALYZE) node, edge", pgx.QueryExecModeSimpleProtocol); err != nil { + return fmt.Errorf("VACUUM (ANALYZE) node, edge: %w", err) } return nil From 73d98998cba96177ac618b0fd2de33649ec72c4b Mon Sep 17 00:00:00 2001 From: Brandon Shearin Date: Tue, 16 Jun 2026 09:24:04 -0700 Subject: [PATCH 3/5] gate the vacuum behind n_dead_tup --- drivers/pg/driver.go | 45 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/drivers/pg/driver.go b/drivers/pg/driver.go index f103d5ca..9c1ee0c7 100644 --- a/drivers/pg/driver.go +++ b/drivers/pg/driver.go @@ -3,6 +3,7 @@ package pg import ( "context" "fmt" + "strings" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" @@ -177,6 +178,11 @@ func (s *Driver) RefreshKinds(ctx context.Context) error { return s.SchemaManager.Fetch(ctx) } +// deadTupleThreshold is the minimum fraction of dead tuples a partitioned +// parent must accumulate across its partitions before OptimizeStorage will +// vacuum it. +const deadTupleThreshold = 0.1 + func (s *Driver) OptimizeStorage(ctx context.Context) error { conn, err := s.pool.Acquire(ctx) if err != nil { @@ -184,9 +190,42 @@ func (s *Driver) OptimizeStorage(ctx context.Context) error { } defer conn.Release() - // Targeting the partitioned parent tables cascades to every partition; - if _, err := conn.Exec(ctx, "VACUUM (ANALYZE) node, edge", pgx.QueryExecModeSimpleProtocol); err != nil { - return fmt.Errorf("VACUUM (ANALYZE) node, edge: %w", err) + // Sum n_dead_tup and n_live_tup across every leaf partition of the parent; + // partitioned parents hold no heap rows, so dead/live tuple stats relevant + // to vacuum decisions come from the leaf partitions. + const statsQuery = ` + SELECT + COALESCE(SUM(stat.n_dead_tup), 0), + COALESCE(SUM(stat.n_live_tup), 0) + FROM pg_partition_tree($1::regclass) tree + LEFT JOIN pg_stat_user_tables stat ON stat.relid = tree.relid + WHERE tree.isleaf + ` + + var targets []string + for _, table := range []string{"node", "edge"} { + var dead, live int64 + if err := conn.QueryRow(ctx, statsQuery, table).Scan(&dead, &live); err != nil { + return fmt.Errorf("query dead tuple stats for %s: %w", table, err) + } + + total := dead + live + if total == 0 { + continue + } + if float64(dead)/float64(total) >= deadTupleThreshold { + targets = append(targets, table) + } + } + + if len(targets) == 0 { + return nil + } + + // Targeting the partitioned parents cascades to every partition. + stmt := "VACUUM (ANALYZE) " + strings.Join(targets, ", ") + if _, err := conn.Exec(ctx, stmt, pgx.QueryExecModeSimpleProtocol); err != nil { + return fmt.Errorf("%s: %w", stmt, err) } return nil From 80d8d4736369a4e48b17e824e76b7b2e531dc8cf Mon Sep 17 00:00:00 2001 From: Brandon Shearin Date: Tue, 16 Jun 2026 11:44:28 -0700 Subject: [PATCH 4/5] unit test vacuum analyze --- drivers/pg/driver.go | 46 +----------------- drivers/pg/optimize.go | 60 +++++++++++++++++++++++ drivers/pg/optimize_test.go | 96 +++++++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 + 5 files changed, 160 insertions(+), 45 deletions(-) create mode 100644 drivers/pg/optimize.go create mode 100644 drivers/pg/optimize_test.go diff --git a/drivers/pg/driver.go b/drivers/pg/driver.go index 9c1ee0c7..5b8adb1b 100644 --- a/drivers/pg/driver.go +++ b/drivers/pg/driver.go @@ -3,7 +3,6 @@ package pg import ( "context" "fmt" - "strings" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" @@ -178,11 +177,6 @@ func (s *Driver) RefreshKinds(ctx context.Context) error { return s.SchemaManager.Fetch(ctx) } -// deadTupleThreshold is the minimum fraction of dead tuples a partitioned -// parent must accumulate across its partitions before OptimizeStorage will -// vacuum it. -const deadTupleThreshold = 0.1 - func (s *Driver) OptimizeStorage(ctx context.Context) error { conn, err := s.pool.Acquire(ctx) if err != nil { @@ -190,43 +184,5 @@ func (s *Driver) OptimizeStorage(ctx context.Context) error { } defer conn.Release() - // Sum n_dead_tup and n_live_tup across every leaf partition of the parent; - // partitioned parents hold no heap rows, so dead/live tuple stats relevant - // to vacuum decisions come from the leaf partitions. - const statsQuery = ` - SELECT - COALESCE(SUM(stat.n_dead_tup), 0), - COALESCE(SUM(stat.n_live_tup), 0) - FROM pg_partition_tree($1::regclass) tree - LEFT JOIN pg_stat_user_tables stat ON stat.relid = tree.relid - WHERE tree.isleaf - ` - - var targets []string - for _, table := range []string{"node", "edge"} { - var dead, live int64 - if err := conn.QueryRow(ctx, statsQuery, table).Scan(&dead, &live); err != nil { - return fmt.Errorf("query dead tuple stats for %s: %w", table, err) - } - - total := dead + live - if total == 0 { - continue - } - if float64(dead)/float64(total) >= deadTupleThreshold { - targets = append(targets, table) - } - } - - if len(targets) == 0 { - return nil - } - - // Targeting the partitioned parents cascades to every partition. - stmt := "VACUUM (ANALYZE) " + strings.Join(targets, ", ") - if _, err := conn.Exec(ctx, stmt, pgx.QueryExecModeSimpleProtocol); err != nil { - return fmt.Errorf("%s: %w", stmt, err) - } - - return nil + return optimizeStorage(ctx, conn) } diff --git a/drivers/pg/optimize.go b/drivers/pg/optimize.go new file mode 100644 index 00000000..2e322357 --- /dev/null +++ b/drivers/pg/optimize.go @@ -0,0 +1,60 @@ +package pg + +import ( + "context" + "fmt" + "strings" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +// deadTupleThreshold is the minimum fraction of dead tuples a partitioned +// parent must accumulate across its partitions before OptimizeStorage will +// vacuum it. +const deadTupleThreshold = 0.1 + +// Sum n_dead_tup and n_live_tup across every leaf partition of the parent; +const optimizeStorageStatsQuery = ` + SELECT + COALESCE(SUM(stat.n_dead_tup), 0), + COALESCE(SUM(stat.n_live_tup), 0) + FROM pg_partition_tree($1::regclass) tree + LEFT JOIN pg_stat_user_tables stat ON stat.relid = tree.relid + WHERE tree.isleaf +` + +type optimizeStorageConn interface { + Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) + QueryRow(ctx context.Context, sql string, arguments ...any) pgx.Row +} + +func optimizeStorage(ctx context.Context, conn optimizeStorageConn) error { + var targets []string + for _, table := range []string{"node", "edge"} { + var dead, live int64 + if err := conn.QueryRow(ctx, optimizeStorageStatsQuery, table).Scan(&dead, &live); err != nil { + return fmt.Errorf("query dead tuple stats for %s: %w", table, err) + } + + total := dead + live + if total == 0 { + continue + } + if float64(dead)/float64(total) >= deadTupleThreshold { + targets = append(targets, table) + } + } + + if len(targets) == 0 { + return nil + } + + // Targeting the partitioned parents cascades to every partition. + stmt := "VACUUM (ANALYZE) " + strings.Join(targets, ", ") + if _, err := conn.Exec(ctx, stmt, pgx.QueryExecModeSimpleProtocol); err != nil { + return fmt.Errorf("%s: %w", stmt, err) + } + + return nil +} diff --git a/drivers/pg/optimize_test.go b/drivers/pg/optimize_test.go new file mode 100644 index 00000000..8fd0f6c5 --- /dev/null +++ b/drivers/pg/optimize_test.go @@ -0,0 +1,96 @@ +package pg + +import ( + "context" + "errors" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/pashagolub/pgxmock/v5" + "github.com/stretchr/testify/require" +) + +func TestOptimizeStorage(t *testing.T) { + t.Run("skips vacuum when dead tuple ratios are below threshold", func(t *testing.T) { + ctx := context.Background() + conn := newOptimizeStorageMockConn(t) + + expectOptimizeStorageStats(conn, "node", 9, 91) + expectOptimizeStorageStats(conn, "edge", 0, 0) + + require.NoError(t, optimizeStorage(ctx, conn)) + require.NoError(t, conn.ExpectationsWereMet()) + }) + + t.Run("vacuums node only", func(t *testing.T) { + ctx := context.Background() + conn := newOptimizeStorageMockConn(t) + + expectOptimizeStorageStats(conn, "node", 10, 90) + expectOptimizeStorageStats(conn, "edge", 9, 91) + expectOptimizeStorageVacuum(conn, "VACUUM (ANALYZE) node") + + require.NoError(t, optimizeStorage(ctx, conn)) + require.NoError(t, conn.ExpectationsWereMet()) + }) + + t.Run("vacuums edge only", func(t *testing.T) { + ctx := context.Background() + conn := newOptimizeStorageMockConn(t) + + expectOptimizeStorageStats(conn, "node", 9, 91) + expectOptimizeStorageStats(conn, "edge", 10, 90) + expectOptimizeStorageVacuum(conn, "VACUUM (ANALYZE) edge") + + require.NoError(t, optimizeStorage(ctx, conn)) + require.NoError(t, conn.ExpectationsWereMet()) + }) + + t.Run("vacuums node and edge", func(t *testing.T) { + ctx := context.Background() + conn := newOptimizeStorageMockConn(t) + + expectOptimizeStorageStats(conn, "node", 10, 90) + expectOptimizeStorageStats(conn, "edge", 10, 90) + expectOptimizeStorageVacuum(conn, "VACUUM (ANALYZE) node, edge") + + require.NoError(t, optimizeStorage(ctx, conn)) + require.NoError(t, conn.ExpectationsWereMet()) + }) + + t.Run("returns query error", func(t *testing.T) { + ctx := context.Background() + conn := newOptimizeStorageMockConn(t) + expectedErr := errors.New("stats unavailable") + + conn.ExpectQuery(optimizeStorageStatsQuery). + WithArgs("node"). + WillReturnError(expectedErr) + + err := optimizeStorage(ctx, conn) + require.ErrorIs(t, err, expectedErr) + require.ErrorContains(t, err, "query dead tuple stats for node") + require.NoError(t, conn.ExpectationsWereMet()) + }) +} + +func newOptimizeStorageMockConn(t *testing.T) pgxmock.PgxConnIface { + t.Helper() + + conn, err := pgxmock.NewConn(pgxmock.QueryMatcherOption(pgxmock.QueryMatcherEqual)) + require.NoError(t, err) + + return conn +} + +func expectOptimizeStorageStats(conn pgxmock.PgxConnIface, table string, dead, live int64) { + conn.ExpectQuery(optimizeStorageStatsQuery). + WithArgs(table). + WillReturnRows(pgxmock.NewRows([]string{"dead", "live"}).AddRow(dead, live)) +} + +func expectOptimizeStorageVacuum(conn pgxmock.PgxConnIface, stmt string) { + conn.ExpectExec(stmt). + WithArgs(pgx.QueryExecModeSimpleProtocol). + WillReturnResult(pgxmock.NewResult("VACUUM", 0)) +} diff --git a/go.mod b/go.mod index 8d3a5014..1cd56f36 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/jackc/pgtype v1.14.4 github.com/jackc/pgx/v5 v5.9.2 github.com/neo4j/neo4j-go-driver/v5 v5.28.4 + github.com/pashagolub/pgxmock/v5 v5.1.0 github.com/stretchr/testify v1.11.1 golang.org/x/tools v0.44.0 ) diff --git a/go.sum b/go.sum index 40995d1e..229f79e1 100644 --- a/go.sum +++ b/go.sum @@ -435,6 +435,8 @@ github.com/otiai10/curr v0.0.0-20150429015615-9b4961190c95/go.mod h1:9qAhocn7zKJ github.com/otiai10/curr v1.0.0/go.mod h1:LskTG5wDwr8Rs+nNQ+1LlxRjAtTZZjtJW4rMXl6j4vs= github.com/otiai10/mint v1.3.0/go.mod h1:F5AjcsTsWUqX+Na9fpHb52P8pcRX2CI6A3ctIT91xUo= github.com/otiai10/mint v1.3.1/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH1OTc= +github.com/pashagolub/pgxmock/v5 v5.1.0 h1:NZ4pl82b335sEGIbD/+tk2fVIgVs3yNWr1R42ukpUvU= +github.com/pashagolub/pgxmock/v5 v5.1.0/go.mod h1:8IJct22b7+EuqecVmYb9aKiENJLLqTsbjFHXH/znAEg= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= From 25c716147d350a76a809d23c9c44cbd435b92645 Mon Sep 17 00:00:00 2001 From: Brandon Shearin Date: Tue, 16 Jun 2026 13:15:01 -0700 Subject: [PATCH 5/5] add slog to optimizeStorage; add dawgrun support --- drivers/pg/optimize.go | 22 +++++++++++++++++++++- tools/dawgrun/pkg/commands/db.go | 24 ++++++++++++++++++++++++ tools/dawgrun/pkg/commands/registry.go | 1 + 3 files changed, 46 insertions(+), 1 deletion(-) diff --git a/drivers/pg/optimize.go b/drivers/pg/optimize.go index 2e322357..8da0cfb8 100644 --- a/drivers/pg/optimize.go +++ b/drivers/pg/optimize.go @@ -3,6 +3,7 @@ package pg import ( "context" "fmt" + "log/slog" "strings" "github.com/jackc/pgx/v5" @@ -38,10 +39,24 @@ func optimizeStorage(ctx context.Context, conn optimizeStorageConn) error { } total := dead + live + var deadTupleRatio float64 + if total > 0 { + deadTupleRatio = float64(dead) / float64(total) + } + + slog.InfoContext(ctx, "Queried PostgreSQL table storage statistics", + slog.String("table", table), + slog.Int64("dead_tuples", dead), + slog.Int64("live_tuples", live), + slog.Int64("total_tuples", total), + slog.Float64("dead_tuple_ratio", deadTupleRatio), + slog.Float64("dead_tuple_threshold", deadTupleThreshold), + ) + if total == 0 { continue } - if float64(dead)/float64(total) >= deadTupleThreshold { + if deadTupleRatio >= deadTupleThreshold { targets = append(targets, table) } } @@ -52,6 +67,11 @@ func optimizeStorage(ctx context.Context, conn optimizeStorageConn) error { // Targeting the partitioned parents cascades to every partition. stmt := "VACUUM (ANALYZE) " + strings.Join(targets, ", ") + slog.InfoContext(ctx, "Executing PostgreSQL storage optimization", + slog.Any("targets", targets), + slog.String("statement", stmt), + ) + if _, err := conn.Exec(ctx, stmt, pgx.QueryExecModeSimpleProtocol); err != nil { return fmt.Errorf("%s: %w", stmt, err) } diff --git a/tools/dawgrun/pkg/commands/db.go b/tools/dawgrun/pkg/commands/db.go index 99a4f136..36bd7e1d 100644 --- a/tools/dawgrun/pkg/commands/db.go +++ b/tools/dawgrun/pkg/commands/db.go @@ -368,3 +368,27 @@ func lookupKindIDCmd() CommandDesc { }, } } + +func optimizeStorageCmd() CommandDesc { + return CommandDesc{ + args: []string{""}, + help: "Optimize the storage on an opened connection", + desc: "Perform routine maintenance operations on the underlying storage, such as VACUUM, ANALYZE, etc.", + Fn: func(ctx *CommandContext, fields []string) error { + if len(fields) != 1 { + return fmt.Errorf("invalid usage: optimize-storage ") + } + + connName := fields[0] + if db, ok := ctx.scope.GetConnection(connName); ok { + if err := db.OptimizeStorage(ctx); err != nil { + return fmt.Errorf("failed to optimize storage: %w", err) + } + } else { + return fmt.Errorf("connection not found: %s", connName) + } + + return nil + }, + } +} diff --git a/tools/dawgrun/pkg/commands/registry.go b/tools/dawgrun/pkg/commands/registry.go index 183a30fc..fe07b05a 100644 --- a/tools/dawgrun/pkg/commands/registry.go +++ b/tools/dawgrun/pkg/commands/registry.go @@ -24,6 +24,7 @@ var cmdRegistry map[string]CommandDesc = map[string]CommandDesc{ "save-connections": saveConnectionsCmd(), "save-opengraph": saveOpenGraphCmd(), "translate-psql": translateToPsqlCmd(), + "optimize-storage": optimizeStorageCmd(), } func init() {