Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion drivers/pg/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,5 +178,11 @@ func (s *Driver) RefreshKinds(ctx context.Context) error {
}

func (s *Driver) OptimizeStorage(ctx context.Context) error {
return nil
conn, err := s.pool.Acquire(ctx)
if err != nil {
return fmt.Errorf("acquire connection for VACUUM: %w", err)
}
defer conn.Release()

return optimizeStorage(ctx, conn)
}
80 changes: 80 additions & 0 deletions drivers/pg/optimize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package pg

import (
"context"
"fmt"
"log/slog"
"strings"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)

// deadTupleThreshold is the minimum fraction of dead tuples a partitioned
// parent must accumulate across its partitions before OptimizeStorage will
// vacuum it.
const deadTupleThreshold = 0.1

// Sum n_dead_tup and n_live_tup across every leaf partition of the parent;
const optimizeStorageStatsQuery = `
SELECT
COALESCE(SUM(stat.n_dead_tup), 0),
COALESCE(SUM(stat.n_live_tup), 0)
FROM pg_partition_tree($1::regclass) tree
LEFT JOIN pg_stat_user_tables stat ON stat.relid = tree.relid
WHERE tree.isleaf
`

type optimizeStorageConn interface {
Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
QueryRow(ctx context.Context, sql string, arguments ...any) pgx.Row
}

func optimizeStorage(ctx context.Context, conn optimizeStorageConn) error {
var targets []string
for _, table := range []string{"node", "edge"} {
var dead, live int64
if err := conn.QueryRow(ctx, optimizeStorageStatsQuery, table).Scan(&dead, &live); err != nil {
return fmt.Errorf("query dead tuple stats for %s: %w", table, err)
}

total := dead + live
var deadTupleRatio float64
if total > 0 {
deadTupleRatio = float64(dead) / float64(total)
}

slog.InfoContext(ctx, "Queried PostgreSQL table storage statistics",
slog.String("table", table),
slog.Int64("dead_tuples", dead),
slog.Int64("live_tuples", live),
slog.Int64("total_tuples", total),
slog.Float64("dead_tuple_ratio", deadTupleRatio),
slog.Float64("dead_tuple_threshold", deadTupleThreshold),
)

if total == 0 {
continue
}
if deadTupleRatio >= deadTupleThreshold {
targets = append(targets, table)
}
}

if len(targets) == 0 {
return nil
}

// Targeting the partitioned parents cascades to every partition.
stmt := "VACUUM (ANALYZE) " + strings.Join(targets, ", ")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be safe as the logic is currently written. The only ways around it are pretty complex for what we currently need, so as long as we don't allow targets to be modified by outside input (it remains strings we pull out of table queries), this should be safe enough.

slog.InfoContext(ctx, "Executing PostgreSQL storage optimization",
slog.Any("targets", targets),
slog.String("statement", stmt),
)

if _, err := conn.Exec(ctx, stmt, pgx.QueryExecModeSimpleProtocol); err != nil {
return fmt.Errorf("%s: %w", stmt, err)
}

return nil
}
96 changes: 96 additions & 0 deletions drivers/pg/optimize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package pg

import (
"context"
"errors"
"testing"

"github.com/jackc/pgx/v5"
"github.com/pashagolub/pgxmock/v5"
"github.com/stretchr/testify/require"
)

func TestOptimizeStorage(t *testing.T) {
t.Run("skips vacuum when dead tuple ratios are below threshold", func(t *testing.T) {
ctx := context.Background()
conn := newOptimizeStorageMockConn(t)

expectOptimizeStorageStats(conn, "node", 9, 91)
expectOptimizeStorageStats(conn, "edge", 0, 0)

require.NoError(t, optimizeStorage(ctx, conn))
require.NoError(t, conn.ExpectationsWereMet())
})

t.Run("vacuums node only", func(t *testing.T) {
ctx := context.Background()
conn := newOptimizeStorageMockConn(t)

expectOptimizeStorageStats(conn, "node", 10, 90)
expectOptimizeStorageStats(conn, "edge", 9, 91)
expectOptimizeStorageVacuum(conn, "VACUUM (ANALYZE) node")

require.NoError(t, optimizeStorage(ctx, conn))
require.NoError(t, conn.ExpectationsWereMet())
})

t.Run("vacuums edge only", func(t *testing.T) {
ctx := context.Background()
conn := newOptimizeStorageMockConn(t)

expectOptimizeStorageStats(conn, "node", 9, 91)
expectOptimizeStorageStats(conn, "edge", 10, 90)
expectOptimizeStorageVacuum(conn, "VACUUM (ANALYZE) edge")

require.NoError(t, optimizeStorage(ctx, conn))
require.NoError(t, conn.ExpectationsWereMet())
})

t.Run("vacuums node and edge", func(t *testing.T) {
ctx := context.Background()
conn := newOptimizeStorageMockConn(t)

expectOptimizeStorageStats(conn, "node", 10, 90)
expectOptimizeStorageStats(conn, "edge", 10, 90)
expectOptimizeStorageVacuum(conn, "VACUUM (ANALYZE) node, edge")

require.NoError(t, optimizeStorage(ctx, conn))
require.NoError(t, conn.ExpectationsWereMet())
})

t.Run("returns query error", func(t *testing.T) {
ctx := context.Background()
conn := newOptimizeStorageMockConn(t)
expectedErr := errors.New("stats unavailable")

conn.ExpectQuery(optimizeStorageStatsQuery).
WithArgs("node").
WillReturnError(expectedErr)

err := optimizeStorage(ctx, conn)
require.ErrorIs(t, err, expectedErr)
require.ErrorContains(t, err, "query dead tuple stats for node")
require.NoError(t, conn.ExpectationsWereMet())
})
}

func newOptimizeStorageMockConn(t *testing.T) pgxmock.PgxConnIface {
t.Helper()

conn, err := pgxmock.NewConn(pgxmock.QueryMatcherOption(pgxmock.QueryMatcherEqual))
require.NoError(t, err)

return conn
}

func expectOptimizeStorageStats(conn pgxmock.PgxConnIface, table string, dead, live int64) {
conn.ExpectQuery(optimizeStorageStatsQuery).
WithArgs(table).
WillReturnRows(pgxmock.NewRows([]string{"dead", "live"}).AddRow(dead, live))
}

func expectOptimizeStorageVacuum(conn pgxmock.PgxConnIface, stmt string) {
conn.ExpectExec(stmt).
WithArgs(pgx.QueryExecModeSimpleProtocol).
WillReturnResult(pgxmock.NewResult("VACUUM", 0))
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/jackc/pgtype v1.14.4
github.com/jackc/pgx/v5 v5.9.2
github.com/neo4j/neo4j-go-driver/v5 v5.28.4
github.com/pashagolub/pgxmock/v5 v5.1.0
github.com/stretchr/testify v1.11.1
golang.org/x/tools v0.44.0
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,8 @@ github.com/otiai10/curr v0.0.0-20150429015615-9b4961190c95/go.mod h1:9qAhocn7zKJ
github.com/otiai10/curr v1.0.0/go.mod h1:LskTG5wDwr8Rs+nNQ+1LlxRjAtTZZjtJW4rMXl6j4vs=
github.com/otiai10/mint v1.3.0/go.mod h1:F5AjcsTsWUqX+Na9fpHb52P8pcRX2CI6A3ctIT91xUo=
github.com/otiai10/mint v1.3.1/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH1OTc=
github.com/pashagolub/pgxmock/v5 v5.1.0 h1:NZ4pl82b335sEGIbD/+tk2fVIgVs3yNWr1R42ukpUvU=
github.com/pashagolub/pgxmock/v5 v5.1.0/go.mod h1:8IJct22b7+EuqecVmYb9aKiENJLLqTsbjFHXH/znAEg=
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
Expand Down
24 changes: 24 additions & 0 deletions tools/dawgrun/pkg/commands/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,3 +368,27 @@ func lookupKindIDCmd() CommandDesc {
},
}
}

func optimizeStorageCmd() CommandDesc {
return CommandDesc{
args: []string{"<connection name>"},
help: "Optimize the storage on an opened connection",
desc: "Perform routine maintenance operations on the underlying storage, such as VACUUM, ANALYZE, etc.",
Fn: func(ctx *CommandContext, fields []string) error {
if len(fields) != 1 {
return fmt.Errorf("invalid usage: optimize-storage <connection name>")
}

connName := fields[0]
if db, ok := ctx.scope.GetConnection(connName); ok {
if err := db.OptimizeStorage(ctx); err != nil {
return fmt.Errorf("failed to optimize storage: %w", err)
}
} else {
return fmt.Errorf("connection not found: %s", connName)
}

return nil
},
}
}
1 change: 1 addition & 0 deletions tools/dawgrun/pkg/commands/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var cmdRegistry map[string]CommandDesc = map[string]CommandDesc{
"save-connections": saveConnectionsCmd(),
"save-opengraph": saveOpenGraphCmd(),
"translate-psql": translateToPsqlCmd(),
"optimize-storage": optimizeStorageCmd(),
}

func init() {
Expand Down
Loading