diff --git a/docs/cluster/ges-grant.md b/docs/cluster/ges-grant.md index 1817034191..9bedb9c255 100644 --- a/docs/cluster/ges-grant.md +++ b/docs/cluster/ges-grant.md @@ -16,6 +16,8 @@ cross-node grants will land in spec-2.17 (BAST + deadlock + 4-node). |---|---|---|---|---| | `cluster.ges_request_timeout_ms` | `60000` | `[1, 600000]` | `USERSET` | Timeout (ms) for cross-node GES grant request. Backend rolls back via GES_RELEASE on expiry. PG `lock_timeout=0` (disabled) does NOT short-circuit — falls back to this GUC. | | `cluster.grd_max_entries` | `0` | `[0, 1048576]` | `POSTMASTER` | Size of GRD entry HTAB (per spec-2.15). `0` = skeleton mode. | +| `cluster.grd_entry_reclaim` | `on` | `bool` | `SIGHUP` | Enable safe cold reclaim of holderless GRD entries after the lookup pin drops to zero. | +| `cluster.grd_entry_reclaim_max_per_sweep` | `256` | `[0, 65536]` | `SIGHUP` | Maximum cold entries LMON attempts to reclaim per sweep. | ### Effective timeout @@ -62,6 +64,19 @@ Four cap counters (per spec-2.16 D1) surface entry-level saturation: - `converts_full_count` - `ngranted_promoted_count` +spec-6.3a adds GRD entry-lifecycle counters in the `grd` category: + +| Key | Meaning | +|---|---| +| `grd_entries_reclaimed_count` | Cold holderless entries removed from the GRD HTAB | +| `grd_reclaim_skipped_pinned_count` | Reclaim attempts skipped because a lookup pin was still held | +| `grd_pin_high_water` | Highest observed lookup-pin count on a single entry | +| `grd_sweep_runs` | LMON cold-reclaim sweep invocations | + +Implementation details for the pin/release discipline, shard-local scan +model, and ERROR cleanup classification are in +`docs/cluster/grd-entry-lifecycle.md`. + ## Wire Format Payload bytes follow the 36-byte `ClusterICEnvelope`: diff --git a/docs/cluster/grd-entry-lifecycle.md b/docs/cluster/grd-entry-lifecycle.md new file mode 100644 index 0000000000..112e6b7ea0 --- /dev/null +++ b/docs/cluster/grd-entry-lifecycle.md @@ -0,0 +1,55 @@ +# GRD Entry Lifecycle And Cold Reclaim + +Status: spec-6.3a implementation note. + +## Lock Model + +GRD entries live in the partitioned GRD HTAB and are protected in this order: + +1. shard LWLock +2. entry spinlock + +There is no table-wide GRD entry lock in the hot create or reclaim path. +Create, lookup, and `HASH_REMOVE` take only the target shard LWLock. + +Scan paths do not use `hash_seq_search` over the live HTAB. Each entry is +linked into a shard-local intrusive list while the shard LWLock is held. A +scanner takes one shard LWLock in shared mode, copies `ClusterResId` keys, +releases the shard, and later re-lookups each key through the normal +pin/release API. This keeps scan safety local to the shard and avoids +serializing unrelated entry churn. + +## Pin Discipline + +`cluster_grd_entry_lookup_or_create()` pins an entry before releasing the +shard LWLock. Callers must pair every successful lookup with +`cluster_grd_entry_release()`. + +`cluster_grd_entry_release()` copies the `ClusterResId` before decrementing +the pin. After the decrement publishes `pin == 0`, release does not +dereference the old entry pointer. Last-unpin reclaim re-enters by copied +resource id, takes the shard LWLock in exclusive mode, revalidates +cold state under the entry spinlock, sets `RECLAIMING`, unlinks the entry +from the shard list, and then calls `HASH_REMOVE`. + +Cold means `pin == 0` and no holders, waiters, converts, or reservations. +Entries with live state are never reclaimed. + +## ERROR Cleanup Audit + +Pinned windows are intentionally short. spec-6.3a classifies lookup sites as: + +| Class | Sites | Cleanup rule | +|---|---|---| +| F | Snapshot walkers, cleanup sweeps, normal grant/release/convert mutators | The pinned window contains only fixed-size copies, spinlock-protected array mutation, atomics, and no allocation or visitor callback. External WFG refresh and SQL row visitors run after release. | +| T | Starvation fairness grant-barrier LMD submit/cancel while a pin is held | Wrapped by `grd_lmd_submit_wait_edge_pinned()` / `grd_lmd_cancel_wait_edge_pinned()`. `PG_CATCH` releases the entry pin and rethrows. | +| R | none in spec-6.3a | No long-lived GRD entry pin is registered in `ResourceOwner`. Future paths that keep a pin across arbitrary backend code must add ResourceOwner tracking or a local `PG_TRY` guard. | + +The unit case `test_grd_pin_cleanup_on_lmd_submit_error` injects an ERROR +through the pinned LMD submit path and verifies the pin is not leaked. + +## Tests + +The cluster unit lifecycle suite covers paired pin/release, last-unpin cold +reclaim, periodic sweep reclaim, live-state exclusion, large sweep batches, +over-release fail-safe behavior, and the pinned LMD ERROR cleanup path. diff --git a/docs/reference/ges-lock-modes.md b/docs/reference/ges-lock-modes.md index c7ea2127cd..908df50377 100644 --- a/docs/reference/ges-lock-modes.md +++ b/docs/reference/ges-lock-modes.md @@ -77,7 +77,10 @@ Ordinary lock acquisition, waiting, and release across nodes are unaffected. The `nconverts` column of `pg_cluster_grd_entries` reports the number of pending conversion requests queued on a resource; it is `0` in normal -operation. +operation. The diagnostic view snapshots entry keys by walking shard-local +entry lists under each shard lock and then re-looks up each entry through the +normal pin/release API before taking the per-entry snapshot, so cold reclaim +can safely remove holderless entries while diagnostics are active. ## Blocking notifications diff --git a/docs/user-guide/configuration.md b/docs/user-guide/configuration.md index a65391a233..b01ce1fac5 100644 --- a/docs/user-guide/configuration.md +++ b/docs/user-guide/configuration.md @@ -3,14 +3,16 @@ linkdb uses two configuration mechanisms layered on top of standard PostgreSQL configuration: -1. **`postgresql.conf`** — standard PG config plus three new - `cluster.*` GUCs added by linkdb's cluster subsystem. +1. **`postgresql.conf`** — standard PG config plus the `cluster.*` + GUCs added by linkdb's cluster subsystem. 2. **`pgrac.conf`** — INI-style file describing the cluster topology (the list of nodes that participate in the cluster). ## cluster.* GUCs -All three GUCs require server restart to change (PGC_POSTMASTER). +Most bootstrap and storage-routing GUCs require server restart to +change (PGC_POSTMASTER). Runtime maintenance knobs are marked with +their own context below. ### `cluster.node_id` @@ -138,6 +140,47 @@ Reserved for future timeout enforcement on `tier1` peer `recv(2)`. Currently in Upper bound on the payload size accepted by the chunked-send API. A caller asking to send a larger payload is rejected outright with `ERRCODE_PROGRAM_LIMIT_EXCEEDED` rather than silently truncating. Increase this when the workload expects larger cross-node messages; the hard cap is 256 MB. +### `cluster.grd_max_entries` + +| | | +|---|---| +| Type | integer | +| Default | `0` | +| Range | `0` – `1048576` | +| Context | postmaster | + +Capacity of the GRD resource-entry hash table. `0` keeps the entry +table disabled for skeleton-mode deployments. Values above zero enable +GES resource tracking; empty entries are eligible for lifecycle reclaim +once they have no holders, waiters, converts, reservations, or lookup +pins. + +### `cluster.grd_entry_reclaim` + +| | | +|---|---| +| Type | bool | +| Default | `on` | +| Context | sighup | + +Enables safe cold reclaim of holderless GRD entries. The lookup/release +pin discipline remains active even when this is off; disabling reclaim +only prevents `HASH_REMOVE` so operators can preserve entries for +diagnostics during investigation. + +### `cluster.grd_entry_reclaim_max_per_sweep` + +| | | +|---|---| +| Type | integer | +| Default | `256` | +| Range | `0` – `65536` | +| Context | sighup | + +Maximum number of cold entries LMON attempts to reclaim during one +sweep. `0` disables periodic sweeps, while last-pin release can still +reclaim a cold entry when `cluster.grd_entry_reclaim = on`. + ### `cluster.interconnect_chunk_reassembly_timeout_ms` | | | diff --git a/src/backend/cluster/cluster_debug.c b/src/backend/cluster/cluster_debug.c index 8bb3ca7c3a..8bf200141b 100644 --- a/src/backend/cluster/cluster_debug.c +++ b/src/backend/cluster/cluster_debug.c @@ -968,8 +968,8 @@ dump_scn(ReturnSetInfo *rsinfo) /* * dump_grd -- spec-2.14 D6 GRD routing substrate observability. * - * Emits 14 rows under category='grd' (8 from spec-2.14 + 6 from - * spec-2.15 entry-table infrastructure): + * Emits core routing rows plus entry lifecycle counters under + * category='grd': * - grd_shard_count: 4096 (constant) * - grd_local_master_count: shards mastered by self node * - grd_remote_master_count: 4096 - local (SQL-friendly though derivable) @@ -984,6 +984,10 @@ dump_scn(ReturnSetInfo *rsinfo) * - grd_entry_create_count: lifetime created entries * - grd_entry_lookup_hit_count: lifetime OK lookups * - grd_entry_full_count: lifetime FULL returns + * - grd_entries_reclaimed_count: lifetime cold entry removes + * - grd_reclaim_skipped_pinned_count: reclaim skipped because pin>0 + * - grd_pin_high_water: max observed per-entry pin count + * - grd_sweep_runs: LMON reclaim sweep invocations * * Counter invariant (v0.4 P1.2): * grd_shard_lookup_count >= @@ -1025,6 +1029,12 @@ dump_grd(ReturnSetInfo *rsinfo) fmt_int64((int64)cluster_grd_entry_lookup_hit_count())); emit_row(rsinfo, "grd", "grd_entry_full_count", fmt_int64((int64)cluster_grd_entry_full_count())); + emit_row(rsinfo, "grd", "grd_entries_reclaimed_count", + fmt_int64((int64)cluster_grd_entries_reclaimed_count())); + emit_row(rsinfo, "grd", "grd_reclaim_skipped_pinned_count", + fmt_int64((int64)cluster_grd_reclaim_skipped_pinned_count())); + emit_row(rsinfo, "grd", "grd_pin_high_water", fmt_int64((int64)cluster_grd_pin_high_water())); + emit_row(rsinfo, "grd", "grd_sweep_runs", fmt_int64((int64)cluster_grd_sweep_runs())); emit_row(rsinfo, "grd", "grd_holders_full_count", fmt_int64((int64)cluster_grd_holders_full_count())); diff --git a/src/backend/cluster/cluster_grd.c b/src/backend/cluster/cluster_grd.c index 4c01f9e42c..f35a453083 100644 --- a/src/backend/cluster/cluster_grd.c +++ b/src/backend/cluster/cluster_grd.c @@ -82,6 +82,10 @@ static LWLockPadded *cluster_grd_shard_locks = NULL; * = 0 (skeleton mode → NOT_READY sentinel) or non-cluster builds. */ static HTAB *cluster_grd_entry_htab = NULL; +#define CLUSTER_GRD_ENTRY_FLAG_RECLAIMING ((uint32)0x00000001) + +static int cluster_grd_snapshot_entry_resids(ClusterResId **out_resids); + /* spec-2.15 v0.4 P1.1: HTAB init size = Max(GUC, PGRAC_GRD_SHARD_COUNT) * — HASH_PARTITION=4096 forces dynahash nbuckets >= 4096 (nbuckets = * max(next_pow2(n), num_partitions)). naive max_size=GUC=16 would let @@ -156,7 +160,8 @@ typedef struct ClusterGrdWaiter { struct ClusterGrdEntry { ClusterResId resid; /* hash key (16B) */ - slock_t lock; /* entry-level spinlock (Q11 + P1.3 minor) */ + dlist_node shard_link; + slock_t lock; /* entry-level spinlock (Q11 + P1.3 minor) */ int ngranted; ClusterGrdHolder holders[PGRAC_GRD_MAX_HOLDERS]; int nwaiters; @@ -186,6 +191,7 @@ struct ClusterGrdEntry { * (P1c) — never for a grant or a scan-on-grant pass. */ uint64 fair_queue_next; + pg_atomic_uint32 pin; /* spec-6.3a: lookup pin gating safe cold reclaim */ }; @@ -273,6 +279,41 @@ grd_wfg_make_vertex(int32 node_id, uint32 procno, uint64 cluster_epoch, uint64 r out->wait_seq = wait_seq; } +static bool +grd_lmd_submit_wait_edge_pinned(ClusterGrdEntry *entry, ClusterLmdVertex *waiter, + ClusterLmdVertex *blocker, uint64 request_id) +{ + bool published = false; + + PG_TRY(); + { + published = cluster_lmd_submit_wait_edge_real(waiter, blocker, request_id); + } + PG_CATCH(); + { + cluster_grd_entry_release(entry); + PG_RE_THROW(); + } + PG_END_TRY(); + + return published; +} + +static void +grd_lmd_cancel_wait_edge_pinned(ClusterGrdEntry *entry, ClusterLmdVertex *waiter) +{ + PG_TRY(); + { + cluster_lmd_cancel_wait_edge_real(waiter); + } + PG_CATCH(); + { + cluster_grd_entry_release(entry); + PG_RE_THROW(); + } + PG_END_TRY(); +} + /* Caller holds entry->lock. Copy holders + queued waiters + pending converts * into *snap (identities + modes only — no LMD graph lock taken here). */ static void @@ -515,6 +556,77 @@ grd_entries_estimate_bytes(void) return hash_estimate_size(init_max_size, sizeof(ClusterGrdEntry)); } +static void +cluster_grd_update_pin_high_water(uint32 value) +{ + uint64 observed; + + if (cluster_grd_state == NULL) + return; + + observed = pg_atomic_read_u64(&cluster_grd_state->pin_high_water); + while ((uint64)value > observed) { + uint64 expected = observed; + + if (pg_atomic_compare_exchange_u64(&cluster_grd_state->pin_high_water, &expected, + (uint64)value)) + break; + observed = expected; + } +} + +static bool +cluster_grd_entry_pin_locked(ClusterGrdEntry *entry) +{ + uint32 old; + bool reclaiming; + + Assert(entry != NULL); + if (entry == NULL) + return false; + + SpinLockAcquire(&entry->lock); + reclaiming = ((entry->state_flags & CLUSTER_GRD_ENTRY_FLAG_RECLAIMING) != 0); + SpinLockRelease(&entry->lock); + if (reclaiming) { + ereport(WARNING, (errmsg("cluster_grd: refusing lookup of reclaiming entry"))); + return false; + } + + old = pg_atomic_read_u32(&entry->pin); + for (;;) { + uint32 expected = old; + + if (old == PG_UINT32_MAX) { + ereport(WARNING, (errmsg("cluster_grd: entry pin count overflow; refusing lookup"))); + return false; + } + if (pg_atomic_compare_exchange_u32(&entry->pin, &expected, old + 1)) { + cluster_grd_update_pin_high_water(old + 1); + return true; + } + old = expected; + } +} + +uint32 +cluster_grd_entry_pin_count(ClusterGrdEntry *entry) +{ + if (entry == NULL) + return 0; + return pg_atomic_read_u32(&entry->pin); +} + +bool +cluster_grd_entry_is_reclaimable(ClusterGrdEntry *entry) +{ + if (entry == NULL) + return false; + return pg_atomic_read_u32(&entry->pin) == 0 && entry->ngranted == 0 && entry->nwaiters == 0 + && entry->nconverts == 0 && entry->nreservations == 0 + && (entry->state_flags & CLUSTER_GRD_ENTRY_FLAG_RECLAIMING) == 0; +} + Size cluster_grd_shmem_size(void) { @@ -540,6 +652,7 @@ cluster_grd_shmem_init(void) /* spec-4.6 D2/D1: remaster generation + recovery phase. */ pg_atomic_init_u32(&cluster_grd_state->master_generation[i], 0); pg_atomic_init_u32(&cluster_grd_state->shard_phase[i], (uint32)GRD_SHARD_NORMAL); + dlist_init(&cluster_grd_state->entry_shard_lists[i]); } pg_atomic_init_u32(&cluster_grd_state->master_map_initialized, 0); pg_atomic_init_u64(&cluster_grd_state->resid_encode_count, 0); @@ -555,6 +668,10 @@ cluster_grd_shmem_init(void) pg_atomic_init_u64(&cluster_grd_state->entry_create_count, 0); pg_atomic_init_u64(&cluster_grd_state->entry_lookup_hit_count, 0); pg_atomic_init_u64(&cluster_grd_state->entry_full_count, 0); + pg_atomic_init_u64(&cluster_grd_state->entries_reclaimed_count, 0); + pg_atomic_init_u64(&cluster_grd_state->reclaim_skipped_pinned_count, 0); + pg_atomic_init_u64(&cluster_grd_state->pin_high_water, 0); + pg_atomic_init_u64(&cluster_grd_state->sweep_runs, 0); /* spec-2.16 D1: 4 cap counter + 5 nofail counter * (skeleton-init; mutator + nofail path 真激活在 Step 2-4). */ @@ -1593,18 +1710,23 @@ cluster_grd_recovery_counters_snapshot(ClusterGrdRecoveryCounters *out) void cluster_grd_cleanup_stale_epoch_scoped(uint64 current_epoch, const uint64 *affected_shards) { - HASH_SEQ_STATUS status; - ClusterGrdEntry *entry; + ClusterResId *resids = NULL; int swept = 0; + int nresids; + int r; if (cluster_grd_entry_htab == NULL || affected_shards == NULL) return; - hash_seq_init(&status, cluster_grd_entry_htab); - while ((entry = (ClusterGrdEntry *)hash_seq_search(&status)) != NULL) { + nresids = cluster_grd_snapshot_entry_resids(&resids); + for (r = 0; r < nresids; r++) { + ClusterGrdEntry *entry = NULL; int i; - if (!grd_shard_bitmap_test(affected_shards, cluster_grd_shard_for_resource(&entry->resid))) + if (!grd_shard_bitmap_test(affected_shards, cluster_grd_shard_for_resource(&resids[r]))) + continue; + if (cluster_grd_entry_lookup_or_create(&resids[r], false, &entry) != CLUSTER_GRD_ENTRY_OK + || entry == NULL) continue; SpinLockAcquire(&entry->lock); @@ -1646,7 +1768,10 @@ cluster_grd_cleanup_stale_epoch_scoped(uint64 current_epoch, const uint64 *affec i++; } SpinLockRelease(&entry->lock); + cluster_grd_entry_release(entry); } + if (resids != NULL) + pfree(resids); if (swept > 0) ereport(DEBUG1, (errmsg_internal("cluster_grd_cleanup_stale_epoch_scoped(" UINT64_FORMAT @@ -1670,18 +1795,24 @@ cluster_grd_cleanup_stale_epoch_scoped(uint64 current_epoch, const uint64 *affec uint32 cluster_grd_cleanup_stale_epoch_postbarrier(uint64 current_epoch) { - HASH_SEQ_STATUS status; - ClusterGrdEntry *entry; + ClusterResId *resids = NULL; uint32 swept = 0; uint32 waiters_dropped = 0; + int nresids; + int r; if (cluster_grd_entry_htab == NULL) return 0; - hash_seq_init(&status, cluster_grd_entry_htab); - while ((entry = (ClusterGrdEntry *)hash_seq_search(&status)) != NULL) { + nresids = cluster_grd_snapshot_entry_resids(&resids); + for (r = 0; r < nresids; r++) { + ClusterGrdEntry *entry = NULL; int i; + if (cluster_grd_entry_lookup_or_create(&resids[r], false, &entry) != CLUSTER_GRD_ENTRY_OK + || entry == NULL) + continue; + SpinLockAcquire(&entry->lock); for (i = 0; i < entry->ngranted;) { if (entry->holders[i].cluster_epoch < current_epoch) { @@ -1706,7 +1837,10 @@ cluster_grd_cleanup_stale_epoch_postbarrier(uint64 current_epoch) i++; } SpinLockRelease(&entry->lock); + cluster_grd_entry_release(entry); } + if (resids != NULL) + pfree(resids); if (swept > 0) pg_atomic_fetch_add_u64(&cluster_grd_state->stale_holder_swept_count, swept); @@ -2474,6 +2608,7 @@ cluster_grd_entry_rebind_or_insert_holder(const ClusterResId *resid, * deleted by the scoped sweep and stays operable. */ if (cluster_grd_shard_phase(rb_shard) == GRD_SHARD_NORMAL) pg_atomic_fetch_add_u64(&cluster_grd_state->unaffected_holder_survived_count, 1); + cluster_grd_entry_release(entry); return CLUSTER_GRD_ENTRY_OK; } } @@ -2485,6 +2620,7 @@ cluster_grd_entry_rebind_or_insert_holder(const ClusterResId *resid, && !ges_modes_compatible(entry->holders[i].mode, (LOCKMODE)lockmode)) { /* spec-5.1b D1: frozen matrix */ SpinLockRelease(&entry->lock); + cluster_grd_entry_release(entry); return CLUSTER_GRD_ENTRY_ERROR; } } @@ -2492,6 +2628,7 @@ cluster_grd_entry_rebind_or_insert_holder(const ClusterResId *resid, if (entry->ngranted >= PGRAC_GRD_MAX_HOLDERS) { pg_atomic_fetch_add_u64(&cluster_grd_state->holders_full_count, 1); SpinLockRelease(&entry->lock); + cluster_grd_entry_release(entry); return CLUSTER_GRD_ENTRY_FULL; } @@ -2506,6 +2643,7 @@ cluster_grd_entry_rebind_or_insert_holder(const ClusterResId *resid, (void)source_node_id; /* identity already carried by new_holder */ pg_atomic_fetch_add_u64(&cluster_grd_state->holders_redeclared_count, 1); + cluster_grd_entry_release(entry); return CLUSTER_GRD_ENTRY_OK; } @@ -2634,6 +2772,10 @@ cluster_grd_entry_lookup_or_create(const ClusterResId *resid, bool create, Clust entry = hash_search_with_hash_value(cluster_grd_entry_htab, resid, hashvalue, HASH_FIND, &found); if (entry != NULL) { + if (!cluster_grd_entry_pin_locked(entry)) { + LWLockRelease(&cluster_grd_shard_locks[shard_id].lock); + return CLUSTER_GRD_ENTRY_ERROR; + } pg_atomic_fetch_add_u64(&cluster_grd_state->entry_lookup_hit_count, 1); LWLockRelease(&cluster_grd_shard_locks[shard_id].lock); *out = entry; @@ -2677,20 +2819,30 @@ cluster_grd_entry_lookup_or_create(const ClusterResId *resid, bool create, Clust if (!found) { /* New entry — init slock + body zero. */ + dlist_node_init(&entry->shard_link); SpinLockInit(&entry->lock); entry->ngranted = 0; entry->nwaiters = 0; entry->nconverts = 0; entry->last_modified_scn = 0; entry->state_flags = 0; + entry->generation = 0; + entry->nreservations = 0; + entry->fair_queue_next = 0; + pg_atomic_init_u32(&entry->pin, 0); + dlist_push_tail(&cluster_grd_state->entry_shard_lists[shard_id], &entry->shard_link); /* holders / waiters / converts arrays left uninitialized; * spec-2.16 mutator path initializes per-slot on add. */ pg_atomic_fetch_add_u64(&cluster_grd_state->entry_current_count, 1); pg_atomic_fetch_add_u64(&cluster_grd_state->entry_create_count, 1); } + if (!cluster_grd_entry_pin_locked(entry)) { + LWLockRelease(&cluster_grd_shard_locks[shard_id].lock); + return CLUSTER_GRD_ENTRY_ERROR; + } pg_atomic_fetch_add_u64(&cluster_grd_state->entry_lookup_hit_count, 1); - /* Step 8: release shard partition LWLock — caller holds entry handle. */ + /* Step 8: release shard partition LWLock — caller holds a lookup pin. */ LWLockRelease(&cluster_grd_shard_locks[shard_id].lock); *out = entry; @@ -2700,15 +2852,38 @@ cluster_grd_entry_lookup_or_create(const ClusterResId *resid, bool create, Clust void cluster_grd_entry_release(ClusterGrdEntry *entry) { - /* spec-2.15 RESERVED no-op (v0.3 P1.3 contract unified — header doc - * + impl 一致). 本 spec 不保证任何 side effect:不 decrement - * refcount,不 remove entry,不改 holders/waiters/converts 状态. - * - * spec-2.16 caller-side 集成时真实装 logic (API signature 不变,body - * 加):decrement refcount + 若 ngranted == 0 && nwaiters == 0 && - * nconverts == 0 → HASH_REMOVE + DRM reclaim path (Stage 6). + ClusterResId rid; + uint32 old; + bool last; + + if (entry == NULL) + return; + + /* + * spec-6.3a: copy the hash key before dropping the last pin. Once the + * CAS publishes pin==0, a different reclaimer may remove and recycle the + * slot, so this function must not dereference entry after the decrement. */ - (void)entry; + rid = entry->resid; + + old = pg_atomic_read_u32(&entry->pin); + for (;;) { + uint32 expected = old; + + if (old == 0) { + ereport(WARNING, (errmsg("cluster_grd: over-release of unpinned entry ignored"))); + return; + } + if (pg_atomic_compare_exchange_u32(&entry->pin, &expected, old - 1)) + break; + old = expected; + } + + last = (old == 1); + if (!last || !cluster_grd_entry_reclaim) + return; + + (void)cluster_grd_reclaim_if_cold(&rid); } @@ -2764,23 +2939,51 @@ cluster_grd_entry_full_count(void) return pg_atomic_read_u64(&cluster_grd_state->entry_full_count); } +uint64 +cluster_grd_entries_reclaimed_count(void) +{ + Assert(cluster_grd_state != NULL); + return pg_atomic_read_u64(&cluster_grd_state->entries_reclaimed_count); +} + +uint64 +cluster_grd_reclaim_skipped_pinned_count(void) +{ + Assert(cluster_grd_state != NULL); + return pg_atomic_read_u64(&cluster_grd_state->reclaim_skipped_pinned_count); +} + +uint64 +cluster_grd_pin_high_water(void) +{ + Assert(cluster_grd_state != NULL); + return pg_atomic_read_u64(&cluster_grd_state->pin_high_water); +} + +uint64 +cluster_grd_sweep_runs(void) +{ + Assert(cluster_grd_state != NULL); + return pg_atomic_read_u64(&cluster_grd_state->sweep_runs); +} + /* ============================================================ - * spec-2.15 D8: SRF row visitor — hash_seq_search the entry HTAB - * and emit one row per entry under per-entry slock_t snapshot. + * spec-2.15 D8 + spec-6.3a: SRF row visitor. * - * **spec-2.16 forward-link TODO (P2.4 + I14)**: - * Wrap hash_seq_search in full 4096-shard LW_SHARED acquire OR - * chunked snapshot to defend concurrent HASH_ENTER_NULL writers - * once caller-side LockAcquire integration lands. 本 spec 0 - * caller → 0 row → 无并发问题 (本 walker safe). + * Snapshot resource ids from shard-local entry lists while holding one shard + * lock at a time, then re-lookup each row through cluster_grd_entry_lookup_ + * or_create() so the per-entry slock_t snapshot is protected by a real lookup + * pin. This keeps observability safe while cold reclaim can HASH_REMOVE + * holderless entries. * ============================================================ */ void cluster_grd_entries_walk(ClusterGrdEntryRowVisitor visitor, void *ctx) { - HASH_SEQ_STATUS status; - ClusterGrdEntry *entry; + ClusterResId *resids = NULL; + int nresids; + int r; Assert(visitor != NULL); @@ -2789,10 +2992,15 @@ cluster_grd_entries_walk(ClusterGrdEntryRowVisitor visitor, void *ctx) if (cluster_grd_entry_htab == NULL) return; - hash_seq_init(&status, cluster_grd_entry_htab); - while ((entry = (ClusterGrdEntry *)hash_seq_search(&status)) != NULL) { + nresids = cluster_grd_snapshot_entry_resids(&resids); + for (r = 0; r < nresids; r++) { + ClusterGrdEntry *entry = NULL; int32 fields[11]; + if (cluster_grd_entry_lookup_or_create(&resids[r], false, &entry) != CLUSTER_GRD_ENTRY_OK + || entry == NULL) + continue; + /* per-entry slock_t snapshot — short critical section (memcpy * fixed-size struct fields). spec-2.16 mutator writers also * acquire entry->lock so snapshot is consistent. */ @@ -2809,9 +3017,12 @@ cluster_grd_entries_walk(ClusterGrdEntryRowVisitor visitor, void *ctx) fields[9] = entry->nconverts; fields[10] = (int32)entry->state_flags; SpinLockRelease(&entry->lock); + cluster_grd_entry_release(entry); visitor(ctx, fields); } + if (resids != NULL) + pfree(resids); } @@ -3265,19 +3476,25 @@ cluster_grd_starvation_max_skip_observed(void) uint32 cluster_grd_clear_all_boosted(void) { - HASH_SEQ_STATUS status; - ClusterGrdEntry *entry; + ClusterResId *resids = NULL; uint32 cleared = 0; + int nresids; + int r; if (cluster_grd_entry_htab == NULL) return 0; - hash_seq_init(&status, cluster_grd_entry_htab); - while ((entry = (ClusterGrdEntry *)hash_seq_search(&status)) != NULL) { + nresids = cluster_grd_snapshot_entry_resids(&resids); + for (r = 0; r < nresids; r++) { + ClusterGrdEntry *entry = NULL; GrdWfgSnapshot snap; bool changed = false; int i; + if (cluster_grd_entry_lookup_or_create(&resids[r], false, &entry) != CLUSTER_GRD_ENTRY_OK + || entry == NULL) + continue; + SpinLockAcquire(&entry->lock); for (i = 0; i < entry->nwaiters; i++) { if (entry->waiters[i].boosted) { @@ -3296,12 +3513,15 @@ cluster_grd_clear_all_boosted(void) entry->generation++; grd_wfg_snapshot_locked(entry, &snap); SpinLockRelease(&entry->lock); + cluster_grd_entry_release(entry); if (changed) { for (i = 0; i < snap.n_waiters; i++) grd_wfg_refresh_waiter_edges(&snap, &snap.waiters[i]); } } + if (resids != NULL) + pfree(resids); return cleared; } @@ -3316,19 +3536,25 @@ cluster_grd_clear_all_boosted(void) uint32 cluster_grd_clear_boosted_for_node(int32 dead_node) { - HASH_SEQ_STATUS status; - ClusterGrdEntry *entry; + ClusterResId *resids = NULL; uint32 cleared = 0; + int nresids; + int r; if (cluster_grd_entry_htab == NULL) return 0; - hash_seq_init(&status, cluster_grd_entry_htab); - while ((entry = (ClusterGrdEntry *)hash_seq_search(&status)) != NULL) { + nresids = cluster_grd_snapshot_entry_resids(&resids); + for (r = 0; r < nresids; r++) { + ClusterGrdEntry *entry = NULL; GrdWfgSnapshot snap; bool changed = false; int i; + if (cluster_grd_entry_lookup_or_create(&resids[r], false, &entry) != CLUSTER_GRD_ENTRY_OK + || entry == NULL) + continue; + SpinLockAcquire(&entry->lock); for (i = 0; i < entry->nwaiters; i++) { if (entry->waiters[i].boosted && entry->waiters[i].node_id == dead_node) { @@ -3347,12 +3573,15 @@ cluster_grd_clear_boosted_for_node(int32 dead_node) entry->generation++; grd_wfg_snapshot_locked(entry, &snap); SpinLockRelease(&entry->lock); + cluster_grd_entry_release(entry); if (changed) { for (i = 0; i < snap.n_waiters; i++) grd_wfg_refresh_waiter_edges(&snap, &snap.waiters[i]); } } + if (resids != NULL) + pfree(resids); return cleared; } @@ -3722,7 +3951,8 @@ cluster_grd_entry_enqueue_or_grant_impl(const ClusterResId *resid, const Cluster request_id, meta.xid, meta.wait_seq, &r_vertex); SpinLockRelease(&entry->lock); - published = cluster_lmd_submit_wait_edge_real(&r_vertex, &w_vertex, request_id); + published + = grd_lmd_submit_wait_edge_pinned(entry, &r_vertex, &w_vertex, request_id); SpinLockAcquire(&entry->lock); /* @@ -3742,7 +3972,7 @@ cluster_grd_entry_enqueue_or_grant_impl(const ClusterResId *resid, const Cluster (LOCKMODE)lockmode, meta) < 0) { SpinLockRelease(&entry->lock); - cluster_lmd_cancel_wait_edge_real(&r_vertex); + grd_lmd_cancel_wait_edge_pinned(entry, &r_vertex); cluster_grd_entry_release(entry); cluster_grd_inc_ges_work_queue_full(); return CLUSTER_GRD_WAIT_QUEUE_FULL; @@ -3759,7 +3989,7 @@ cluster_grd_entry_enqueue_or_grant_impl(const ClusterResId *resid, const Cluster * the CURRENT holders (never false-grant). */ SpinLockRelease(&entry->lock); if (published) - cluster_lmd_cancel_wait_edge_real(&r_vertex); + grd_lmd_cancel_wait_edge_pinned(entry, &r_vertex); else pg_atomic_fetch_add_u64( &cluster_grd_state->starvation_barrier_publish_fail_count, 1); @@ -4711,6 +4941,7 @@ cluster_grd_convert_grant_by_backend(const ClusterResId *resid, int32 node_id, u /* spec-5.3 — forward decl for the post-release empty-entry reclaim (definition * lives further down with the D10 cleanup primitives). */ +static bool cluster_grd_hashremove_if_still_empty_locked(ClusterGrdEntry *entry); static bool cluster_grd_hashremove_if_still_empty(const ClusterResId *resid); /* @@ -4787,25 +5018,10 @@ cluster_grd_release_and_drain(const ClusterResId *resid, const ClusterGrdHolderI cluster_grd_entry_release(entry); /* spec-5.8 D1b — granted waiters/converts departed; holders changed. - * Runs BEFORE the spec-5.3 empty-entry reclaim below: resync re-looks-up - * the entry by resid to refresh WFG edges, so it must see the entry before - * the reclaim may HASH_REMOVE it. */ + * cluster_grd_entry_release() may already have reclaimed a now-empty entry; + * the resync path first removes departed edges by identity and then treats a + * missing entry as "nothing left to refresh". */ grd_wfg_resync_after_grants(resid, granted_out, n); - - /* - * PGRAC: spec-5.3 — reclaim the entry if this release left it completely - * empty (no holders/waiters/converts/reservations). cluster_grd_entry_release - * is a documented no-op and the D8 periodic sweep is not wired, so without - * this a long-lived backend never frees GRD HTAB slots for finished lock - * resources — most visibly LOCKTAG_TRANSACTION, where every write txn has a - * unique xid (unique entry) and the HTAB fills (53R71 FAIL_RESERVATION_FULL). - * The helper re-acquires the shard partition lock, re-looks up by resid, and - * HASH_REMOVEs at-most-once ONLY when all occupancy counters are 0 — so a - * concurrent S3 reservation (nreservations > 0) or a freshly granted waiter - * (ngranted > 0) is safe (no-op). Done after the spinlock is dropped to keep - * the partition-lock-then-entry-spinlock order. - */ - (void)cluster_grd_hashremove_if_still_empty(resid); return n; } @@ -4982,33 +5198,93 @@ cluster_grd_reservation_promote(ClusterGrdEntry *entry, const ClusterGrdHolderId * ============================================================ */ /* - * spec-2.16 Step 4 D11: CSSD DEAD master sweep — traverses entry HTAB - * and per-entry filters holders[] / waiters[] / converts[] by - * node_id == dead_node_id (I48 — NO epoch filter). - * - * Step 4 implementation: uses cluster_grd_entry_htab via existing - * hash_seq_search pattern (mirror cluster_grd_entries_walk). For - * each matching slot, decrement entry->ngranted / nwaiters / nconverts - * under entry->lock and zero the slot. Idempotent re-entry safe. + * spec-2.16 Step 4 D11 + spec-6.3a: CSSD DEAD master sweep. * - * Counters per cleanup invocation tracked via existing entry mutator - * counter family (spec-2.15 entry_current_count when ngranted hits 0). - * 本 Step 0 真 mutator caller (spec-2.16 ships caller-side hooks - * stub only — full LockAcquireExtended 6-step integration in spec- - * 2.17), so sweep is a no-op until cluster_unit Step 6 inject test - * exercises mutator + sweep round-trip. + * Snapshot entry keys from shard-local lists, then re-lookup each resource + * through the 6.3a pin discipline before filtering holders[] / waiters[] / + * converts[] / reservations by node_id == dead_node_id (I48 — NO epoch + * filter). Empty entries are reclaimed by the last-pin release path, not by + * an escaped raw HTAB pointer. */ /* spec-2.24 D10 forward decl (definition later in same TU). */ extern int cluster_grd_entry_cleanup_guarded(ClusterGrdEntry *entry, int dead_procno, int32 dead_node_id); +static int +cluster_grd_snapshot_entry_resids(ClusterResId **out_resids) +{ + ClusterResId *resids; + int capacity; + int n = 0; + bool overflow = false; + uint32 shard_id; + + Assert(out_resids != NULL); + *out_resids = NULL; + + if (cluster_grd_entry_htab == NULL) + return 0; + + capacity = Max(cluster_grd_entry_count(), 1); + do { + resids = (ClusterResId *)palloc0((Size)capacity * sizeof(ClusterResId)); + n = 0; + overflow = false; + + for (shard_id = 0; shard_id < PGRAC_GRD_SHARD_COUNT; shard_id++) { + dlist_iter iter; + + LWLockAcquire(&cluster_grd_shard_locks[shard_id].lock, LW_SHARED); + dlist_foreach(iter, &cluster_grd_state->entry_shard_lists[shard_id]) + { + ClusterGrdEntry *entry; + + if (n >= capacity) { + overflow = true; + break; + } + entry = dlist_container(ClusterGrdEntry, shard_link, iter.cur); + resids[n++] = entry->resid; + } + LWLockRelease(&cluster_grd_shard_locks[shard_id].lock); + if (overflow) + break; + } + + if (overflow) { + pfree(resids); + capacity = capacity <= INT_MAX / 2 ? capacity * 2 : INT_MAX; + } + } while (overflow); + + *out_resids = resids; + return n; +} + +static int +cluster_grd_entry_cleanup_guarded_by_resid(const ClusterResId *resid, int dead_procno, + int32 dead_node_id) +{ + ClusterGrdEntry *entry = NULL; + int removed; + + if (cluster_grd_entry_lookup_or_create(resid, false, &entry) != CLUSTER_GRD_ENTRY_OK + || entry == NULL) + return 0; + + removed = cluster_grd_entry_cleanup_guarded(entry, dead_procno, dead_node_id); + cluster_grd_entry_release(entry); + return removed; +} + void cluster_grd_cleanup_on_node_dead(int32 dead_node_id) { - HASH_SEQ_STATUS status; - ClusterGrdEntry *entry; + ClusterResId *resids = NULL; int swept = 0; + int nresids; uint64 pending_x_cleared = 0; + int i; pending_x_cleared = cluster_pcm_lock_clear_pending_x_for_node(dead_node_id); if (pending_x_cleared > 0) @@ -5029,10 +5305,11 @@ cluster_grd_cleanup_on_node_dead(int32 dead_node_id) * sweep loop here). D10 enforces HC25-26 / I-cleanup-1..4 — HASH_REMOVE * at-most-once + concurrent cleanup safety + RELEASE enqueue. */ - hash_seq_init(&status, cluster_grd_entry_htab); - while ((entry = (ClusterGrdEntry *)hash_seq_search(&status)) != NULL) { - swept += cluster_grd_entry_cleanup_guarded(entry, -1, dead_node_id); - } + nresids = cluster_grd_snapshot_entry_resids(&resids); + for (i = 0; i < nresids; i++) + swept += cluster_grd_entry_cleanup_guarded_by_resid(&resids[i], -1, dead_node_id); + if (resids != NULL) + pfree(resids); if (swept > 0) ereport(DEBUG1, (errmsg_internal("cluster_grd_cleanup_on_node_dead(%d): " @@ -5087,9 +5364,11 @@ cluster_grd_clean_leave_drain_self(int32 leaving_node, uint64 leave_epoch) bool cluster_grd_clean_leave_verify_no_leftover(int32 leaving_node) { - HASH_SEQ_STATUS status; - ClusterGrdEntry *entry; + ClusterResId *resids = NULL; uint32 i; + int nresids; + int r; + bool ok = true; if (leaving_node < 0 || leaving_node >= CLUSTER_MAX_NODES) return true; @@ -5106,30 +5385,42 @@ cluster_grd_clean_leave_verify_no_leftover(int32 leaving_node) if (cluster_grd_entry_htab == NULL) return true; - hash_seq_init(&status, cluster_grd_entry_htab); - while ((entry = (ClusterGrdEntry *)hash_seq_search(&status)) != NULL) { + nresids = cluster_grd_snapshot_entry_resids(&resids); + for (r = 0; r < nresids; r++) { + ClusterGrdEntry *entry = NULL; int k; + if (cluster_grd_entry_lookup_or_create(&resids[r], false, &entry) != CLUSTER_GRD_ENTRY_OK + || entry == NULL) + continue; + + SpinLockAcquire(&entry->lock); for (k = 0; k < entry->ngranted; k++) { if (entry->holders[k].node_id == leaving_node) { - hash_seq_term(&status); - return false; + ok = false; + break; } } - for (k = 0; k < entry->nwaiters; k++) { + for (k = 0; ok && k < entry->nwaiters; k++) { if (entry->waiters[k].node_id == leaving_node) { - hash_seq_term(&status); - return false; + ok = false; + break; } } - for (k = 0; k < entry->nconverts; k++) { + for (k = 0; ok && k < entry->nconverts; k++) { if (entry->converts[k].node_id == leaving_node) { - hash_seq_term(&status); - return false; + ok = false; + break; } } + SpinLockRelease(&entry->lock); + cluster_grd_entry_release(entry); + if (!ok) + break; } - return true; + if (resids != NULL) + pfree(resids); + return ok; } /* @@ -5144,9 +5435,10 @@ cluster_grd_clean_leave_verify_no_leftover(int32 leaving_node) void cluster_grd_cleanup_stale_epoch(uint64 current_epoch) { - HASH_SEQ_STATUS status; - ClusterGrdEntry *entry; + ClusterResId *resids = NULL; int swept = 0; + int nresids; + int r; if (cluster_grd_entry_htab == NULL) { ereport(DEBUG2, (errmsg_internal("cluster_grd_cleanup_stale_epoch(%lu): " @@ -5155,10 +5447,15 @@ cluster_grd_cleanup_stale_epoch(uint64 current_epoch) return; } - hash_seq_init(&status, cluster_grd_entry_htab); - while ((entry = (ClusterGrdEntry *)hash_seq_search(&status)) != NULL) { + nresids = cluster_grd_snapshot_entry_resids(&resids); + for (r = 0; r < nresids; r++) { + ClusterGrdEntry *entry = NULL; int i; + if (cluster_grd_entry_lookup_or_create(&resids[r], false, &entry) != CLUSTER_GRD_ENTRY_OK + || entry == NULL) + continue; + SpinLockAcquire(&entry->lock); for (i = 0; i < entry->ngranted;) { if (entry->holders[i].cluster_epoch < current_epoch) { @@ -5172,7 +5469,10 @@ cluster_grd_cleanup_stale_epoch(uint64 current_epoch) i++; } SpinLockRelease(&entry->lock); + cluster_grd_entry_release(entry); } + if (resids != NULL) + pfree(resids); if (swept > 0) ereport(DEBUG1, (errmsg_internal("cluster_grd_cleanup_stale_epoch(%lu): " @@ -5429,17 +5729,56 @@ DEFINE_BAST_COUNTER(deadlock_chunk_oo_buffer_overflow, deadlock_chunk_oo_buffer_ * ============================================================ */ /* - * spec-2.24 D10 helper — HASH_REMOVE guarded by shard partition LWLock - * + re-lookup + verify-still-empty. Returns true if removed (we are - * the at-most-once winner per HC26 I-cleanup-4); false if another - * cleanup path already removed the entry or the entry is no longer - * empty (race lost — caller bumps skip counter). + * spec-2.24 D10 helper — HASH_REMOVE guarded by the shard partition LWLock, + * with re-lookup and verify-still-empty. Returns true if removed (we are the + * at-most-once winner per HC26 I-cleanup-4); false if another cleanup path + * already removed the entry or the entry is no longer empty. */ +static bool +cluster_grd_hashremove_if_still_empty_locked(ClusterGrdEntry *entry) +{ + ClusterResId rid; + uint64 hash64; + uint32 hashvalue; + bool found = false; + bool pinned = false; + bool removed = false; + + if (entry == NULL || cluster_grd_entry_htab == NULL) + return false; + + SpinLockAcquire(&entry->lock); + pinned = (pg_atomic_read_u32(&entry->pin) != 0); + if (cluster_grd_entry_is_reclaimable(entry)) { + rid = entry->resid; + hash64 = cluster_grd_hash_resource(&rid); + hashvalue = (uint32)hash64; + entry->state_flags |= CLUSTER_GRD_ENTRY_FLAG_RECLAIMING; + SpinLockRelease(&entry->lock); + + dlist_delete_thoroughly(&entry->shard_link); + (void)hash_search_with_hash_value(cluster_grd_entry_htab, &rid, hashvalue, HASH_REMOVE, + &found); + if (found) { + pg_atomic_fetch_sub_u64(&cluster_grd_state->entry_current_count, 1); + pg_atomic_fetch_add_u64(&cluster_grd_state->entries_reclaimed_count, 1); + removed = true; + } + } else { + SpinLockRelease(&entry->lock); + if (pinned) + pg_atomic_fetch_add_u64(&cluster_grd_state->reclaim_skipped_pinned_count, 1); + } + + return removed; +} + static bool cluster_grd_hashremove_if_still_empty(const ClusterResId *resid) { + uint64 hash64; uint32 hashvalue; - int shard_id; + uint32 shard_id; bool found = false; ClusterGrdEntry *entry; bool removed = false; @@ -5447,28 +5786,74 @@ cluster_grd_hashremove_if_still_empty(const ClusterResId *resid) if (cluster_grd_entry_htab == NULL) return false; - hashvalue = cluster_grd_hash_resource(resid); - shard_id = cluster_grd_shard_for_hash(hashvalue); + hash64 = cluster_grd_hash_resource(resid); + hashvalue = (uint32)hash64; + shard_id = cluster_grd_shard_for_hash(hash64); LWLockAcquire(&cluster_grd_shard_locks[shard_id].lock, LW_EXCLUSIVE); entry = hash_search_with_hash_value(cluster_grd_entry_htab, resid, hashvalue, HASH_FIND, &found); - if (entry != NULL) { - SpinLockAcquire(&entry->lock); - if (entry->ngranted == 0 && entry->nwaiters == 0 && entry->nconverts == 0 - && entry->nreservations == 0) { - SpinLockRelease(&entry->lock); - (void)hash_search_with_hash_value(cluster_grd_entry_htab, resid, hashvalue, HASH_REMOVE, - &found); - pg_atomic_fetch_sub_u64(&cluster_grd_state->entry_current_count, 1); - removed = true; - } else { + if (entry != NULL) + removed = cluster_grd_hashremove_if_still_empty_locked(entry); + + LWLockRelease(&cluster_grd_shard_locks[shard_id].lock); + return removed; +} + +bool +cluster_grd_reclaim_if_cold(const ClusterResId *resid) +{ + if (resid == NULL) + return false; + return cluster_grd_hashremove_if_still_empty(resid); +} + +int +cluster_grd_reclaim_sweep(void) +{ + ClusterResId *batch; + int budget; + int n = 0; + int removed = 0; + int i; + uint32 shard_id; + + if (!cluster_grd_entry_reclaim || cluster_grd_entry_htab == NULL) + return 0; + if (cluster_grd_entry_reclaim_max_per_sweep <= 0) + return 0; + + budget = cluster_grd_entry_reclaim_max_per_sweep; + batch = (ClusterResId *)palloc0((Size)budget * sizeof(ClusterResId)); + pg_atomic_fetch_add_u64(&cluster_grd_state->sweep_runs, 1); + + for (shard_id = 0; shard_id < PGRAC_GRD_SHARD_COUNT && n < budget; shard_id++) { + dlist_iter iter; + + LWLockAcquire(&cluster_grd_shard_locks[shard_id].lock, LW_SHARED); + dlist_foreach(iter, &cluster_grd_state->entry_shard_lists[shard_id]) + { + ClusterGrdEntry *entry; + + if (n >= budget) + break; + entry = dlist_container(ClusterGrdEntry, shard_link, iter.cur); + SpinLockAcquire(&entry->lock); + if (cluster_grd_entry_is_reclaimable(entry)) + batch[n++] = entry->resid; + else if (pg_atomic_read_u32(&entry->pin) != 0) + pg_atomic_fetch_add_u64(&cluster_grd_state->reclaim_skipped_pinned_count, 1); SpinLockRelease(&entry->lock); } + LWLockRelease(&cluster_grd_shard_locks[shard_id].lock); } - LWLockRelease(&cluster_grd_shard_locks[shard_id].lock); + for (i = 0; i < n; i++) + if (cluster_grd_hashremove_if_still_empty(&batch[i])) + removed++; + + pfree(batch); return removed; } @@ -5506,7 +5891,6 @@ int cluster_grd_entry_cleanup_guarded(ClusterGrdEntry *entry, int dead_procno, int32 dead_node_id) { int removed = 0; - bool became_empty = false; GesRequestPayload release_payloads[PGRAC_GRD_MAX_HOLDERS]; int n_release = 0; ClusterResId entry_resid; @@ -5617,9 +6001,6 @@ cluster_grd_entry_cleanup_guarded(ClusterGrdEntry *entry, int dead_procno, int32 if (removed > 0) entry->generation++; - became_empty = (entry->ngranted == 0 && entry->nwaiters == 0 && entry->nconverts == 0 - && entry->nreservations == 0); - memcpy(&entry_resid, &entry->resid, sizeof(entry_resid)); SpinLockRelease(&entry->lock); @@ -5636,37 +6017,30 @@ cluster_grd_entry_cleanup_guarded(ClusterGrdEntry *entry, int dead_procno, int32 } } - /* HC26 I-cleanup-4 — HASH_REMOVE at-most-once per entry lifetime. - * Re-acquire shard partition LWLock + re-lookup resid + verify still - * empty; the losing path (race with another cleanup that already - * removed) increments skip counter. */ - if (became_empty) { - bool removed_ok = cluster_grd_hashremove_if_still_empty(&entry_resid); - if (!removed_ok) - cluster_lmd_cleanup_skip_other_owner_count_inc(1); - } - return removed; } /* - * Entry-by-procno sweep. Iterates GRD HTAB; for each entry, invokes - * D10 guarded primitive. Returns total slots removed. + * Entry-by-procno sweep. Snapshots GRD entry keys, then re-lookups each entry + * with a pin before invoking the D10 guarded primitive. Returns total slots + * removed. */ static int cluster_grd_entries_cleanup_by_procno_guarded(int procno) { - HASH_SEQ_STATUS status; - ClusterGrdEntry *entry; + ClusterResId *resids = NULL; int total = 0; + int nresids; + int i; if (cluster_grd_entry_htab == NULL || procno < 0) return 0; - hash_seq_init(&status, cluster_grd_entry_htab); - while ((entry = (ClusterGrdEntry *)hash_seq_search(&status)) != NULL) { - total += cluster_grd_entry_cleanup_guarded(entry, procno, -1); - } + nresids = cluster_grd_snapshot_entry_resids(&resids); + for (i = 0; i < nresids; i++) + total += cluster_grd_entry_cleanup_guarded_by_resid(&resids[i], procno, -1); + if (resids != NULL) + pfree(resids); return total; } @@ -5705,11 +6079,12 @@ cluster_grd_cleanup_on_backend_exit(int procno) int cluster_grd_sweep_local_stale_procnos(void) { - HASH_SEQ_STATUS status; - ClusterGrdEntry *entry; + ClusterResId *resids = NULL; int total = 0; uint8 *alive; int n_alive_max = MaxBackends; + int nresids; + int r; if (cluster_grd_entry_htab == NULL) return 0; @@ -5728,11 +6103,16 @@ cluster_grd_sweep_local_stale_procnos(void) } LWLockRelease(ProcArrayLock); - hash_seq_init(&status, cluster_grd_entry_htab); - while ((entry = (ClusterGrdEntry *)hash_seq_search(&status)) != NULL) { + nresids = cluster_grd_snapshot_entry_resids(&resids); + for (r = 0; r < nresids; r++) { + ClusterGrdEntry *entry = NULL; uint32 stale_procno = (uint32)-1; uint32 i; + if (cluster_grd_entry_lookup_or_create(&resids[r], false, &entry) != CLUSTER_GRD_ENTRY_OK + || entry == NULL) + continue; + SpinLockAcquire(&entry->lock); for (i = 0; i < (uint32)entry->ngranted; i++) { if (entry->holders[i].node_id != (int32)cluster_node_id) @@ -5747,8 +6127,11 @@ cluster_grd_sweep_local_stale_procnos(void) if (stale_procno != (uint32)-1) total += cluster_grd_entry_cleanup_guarded(entry, (int)stale_procno, -1); + cluster_grd_entry_release(entry); } + if (resids != NULL) + pfree(resids); pfree(alive); return total; } @@ -5801,6 +6184,7 @@ cluster_grd_try_reserve(const ClusterResId *resid, const ClusterGrdHolderId *hol if (fast_path_out) *fast_path_out = fast_path && (er == CLUSTER_GRD_ENTRY_OK); + cluster_grd_entry_release(entry); return er; } @@ -5858,10 +6242,12 @@ cluster_grd_revalidate_and_promote(const ClusterResId *resid, const ClusterGrdHo if (!revalidate_ok) { (void)cluster_grd_reservation_cancel(entry, holder); SpinLockRelease(&entry->lock); + cluster_grd_entry_release(entry); return CLUSTER_GRD_ENTRY_NOT_FOUND; } er = cluster_grd_reservation_promote(entry, holder); SpinLockRelease(&entry->lock); + cluster_grd_entry_release(entry); return er; } @@ -5880,6 +6266,7 @@ cluster_grd_release_holder_by_id(const ClusterResId *resid, const ClusterGrdHold SpinLockAcquire(&entry->lock); er = cluster_grd_entry_release_holder(entry, holder); SpinLockRelease(&entry->lock); + cluster_grd_entry_release(entry); /* spec-5.8 D1b — holder removed; refresh queued waiters' edges. */ grd_wfg_resync_entry(resid, NULL, 0); return er; @@ -5900,6 +6287,7 @@ cluster_grd_cancel_reservation_by_id(const ClusterResId *resid, const ClusterGrd SpinLockAcquire(&entry->lock); er = cluster_grd_reservation_cancel(entry, holder); SpinLockRelease(&entry->lock); + cluster_grd_entry_release(entry); return er; } diff --git a/src/backend/cluster/cluster_guc.c b/src/backend/cluster/cluster_guc.c index d2d78f1575..fca70c883e 100644 --- a/src/backend/cluster/cluster_guc.c +++ b/src/backend/cluster/cluster_guc.c @@ -124,6 +124,8 @@ static const struct config_enum_entry cluster_undo_writeback_boundary_check_opti { NULL, 0, false } }; int cluster_grd_max_entries = 1024; +bool cluster_grd_entry_reclaim = true; +int cluster_grd_entry_reclaim_max_per_sweep = 256; int cluster_ges_request_timeout_ms = 60000; /* spec-2.16 D12 + v0.5 P1.5 */ int cluster_cf_enqueue_timeout_ms = 30000; /* spec-5.6 Dc4b — CF X/S grant wait */ int cluster_ges_starvation_max_skips = 8; /* spec-5.10 — boost after N skips */ @@ -1441,6 +1443,23 @@ cluster_init_guc(void) NULL, /* assign_hook */ NULL); /* show_hook */ + DefineCustomBoolVariable( + "cluster.grd_entry_reclaim", + gettext_noop("Enable safe cold reclaim for GRD resource entries."), + gettext_noop( + "When on, lookup pins are released with a copy-resid last-ref protocol and " + "cold holderless entries are removed after shard-LWLock and entry-spinlock " + "recheck. Turning this off keeps the pin discipline but disables HASH_REMOVE, " + "matching the legacy cap-only entry table shape."), + &cluster_grd_entry_reclaim, true, PGC_SIGHUP, 0, NULL, NULL, NULL); + + DefineCustomIntVariable( + "cluster.grd_entry_reclaim_max_per_sweep", + gettext_noop("Maximum GRD cold entries reclaimed by one LMON sweep."), + gettext_noop("Bounds the background cold-entry reclaim pass. 0 disables the sweep while " + "leaving eager last-unpin reclaim enabled."), + &cluster_grd_entry_reclaim_max_per_sweep, 256, 0, 65536, PGC_SIGHUP, 0, NULL, NULL, NULL); + /* * spec-5.10: cluster.ges_starvation_max_skips — bounded fairness threshold * for GES enqueue lock-starvation protection. A waiter jumped this many diff --git a/src/backend/cluster/cluster_lmon.c b/src/backend/cluster/cluster_lmon.c index 727e46ae89..1c4dae9ba3 100644 --- a/src/backend/cluster/cluster_lmon.c +++ b/src/backend/cluster/cluster_lmon.c @@ -944,6 +944,7 @@ LmonMain(void) * reconfig epoch bump (S1 → S2 order writ). bitmap diff * per v0.5 P1.2; no-op when dead_generation unchanged. */ cluster_grd_lmon_tick_dead_sweep(); + (void)cluster_grd_reclaim_sweep(); /* spec-5.10 fix-forward — runtime-off starvation sweep (no-op * unless cluster.ges_starvation_protection was just turned off). */ (void)cluster_grd_lmon_tick_starvation_sweep(); @@ -1542,6 +1543,7 @@ LmonMain(void) * reconfig epoch bump (S1 → S2 order writ). bitmap diff * per v0.5 P1.2; no-op when dead_generation unchanged. */ cluster_grd_lmon_tick_dead_sweep(); + (void)cluster_grd_reclaim_sweep(); /* spec-5.10 fix-forward — runtime-off starvation sweep. */ (void)cluster_grd_lmon_tick_starvation_sweep(); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index b183b93a3f..5653c11367 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12375,9 +12375,10 @@ # spec-2.15 D6 (2026-05-13) -- pg_cluster_grd_entries SRF (oid 8922). # Diagnostic/observability only -- NOT for production hot-path queries -# (P1.5). hash_seq_search() walks the entire entry HTAB; per-entry -# slock_t snapshot. Returns 0 rows when cluster.grd_max_entries=0 -# (skeleton mode) or when the entry table is empty (本 spec 0 caller). +# (P1.5). The implementation snapshots keys under the GRD entry table scan +# lock, then re-lookups through the GRD entry pin/release API before taking +# the per-entry slock_t snapshot. Returns 0 rows when cluster.grd_max_entries=0 +# (skeleton mode) or when the entry table is empty. { oid => '8922', descr => 'list cluster_grd entry table snapshot (diagnostic only; not for production hot-path queries) (spec-2.15)', proname => 'cluster_get_grd_entries', prorows => '1024', proretset => 't', provolatile => 's', proparallel => 'r', diff --git a/src/include/cluster/cluster_grd.h b/src/include/cluster/cluster_grd.h index 840611bdcf..7015600fec 100644 --- a/src/include/cluster/cluster_grd.h +++ b/src/include/cluster/cluster_grd.h @@ -67,6 +67,7 @@ #include "cluster/cluster_conf.h" /* CLUSTER_MAX_NODES (spec-4.6 D1 bitmap sizing) */ #include "datatype/timestamp.h" /* TimestampTz (spec-5.1b ClusterGrdConvert) */ +#include "lib/ilist.h" #include "port/atomics.h" #include "storage/lock.h" /* LOCKTAG */ @@ -190,10 +191,16 @@ typedef struct ClusterGrdShared { * 删 holder/waiter/convert counter,推 spec-2.16 配 mutator API). * entry_current_count is an internal current-size source for soft-cap * and observability; it is not a separate pg_cluster_state row. */ - pg_atomic_uint64 entry_current_count; /* current live HTAB entries */ - pg_atomic_uint64 entry_create_count; /* lifetime ++ on HASH_ENTER_NULL OK + new */ - pg_atomic_uint64 entry_lookup_hit_count; /* lifetime ++ on OK return (hit 语义;P2.5) */ - pg_atomic_uint64 entry_full_count; /* lifetime ++ on FULL */ + pg_atomic_uint64 entry_current_count; /* current live HTAB entries */ + pg_atomic_uint64 entry_create_count; /* lifetime ++ on HASH_ENTER_NULL OK + new */ + pg_atomic_uint64 entry_lookup_hit_count; /* lifetime ++ on OK return (hit 语义;P2.5) */ + pg_atomic_uint64 entry_full_count; /* lifetime ++ on FULL */ + pg_atomic_uint64 entries_reclaimed_count; /* spec-6.3a: cold entry removes */ + pg_atomic_uint64 reclaim_skipped_pinned_count; /* spec-6.3a: pin>0 reclaim skips */ + pg_atomic_uint64 pin_high_water; /* spec-6.3a: max observed entry pin */ + pg_atomic_uint64 sweep_runs; /* spec-6.3a: LMON reclaim sweeps */ + dlist_head + entry_shard_lists[PGRAC_GRD_SHARD_COUNT]; /* spec-6.3a: scan-safe per-shard entry lists */ /* spec-2.16 D1: 4 cap counter + 5 nofail counter (skeleton-init; * mutator bodies + nofail paths land Step 2-4). */ @@ -759,17 +766,12 @@ extern ClusterGrdEntryResult cluster_grd_entry_lookup_or_create(const ClusterRes bool create, ClusterGrdEntry **out); /* - * spec-2.15: Entry release — RESERVED no-op (v0.3 P1.3 contract unified). - * - * 本 spec 不保证任何 side effect: - * - 不 decrement refcount - * - 不 remove entry from HTAB - * - 不改 holders/waiters/converts 状态 - * - * caller 调 release 是 stub safe-call (本 spec 0 caller,future spec-2.16+ - * caller-side 集成时按 reserved no-op 契约调用). 真 refcount + HASH_REMOVE - * + reclaim logic 在 spec-2.16 caller-side 实装 (API signature 不变,body - * 加 logic). + * spec-6.3a: Entry release — decrements the lookup pin acquired by + * cluster_grd_entry_lookup_or_create(). Every successful lookup returning + * CLUSTER_GRD_ENTRY_OK must be paired with exactly one release. The release + * path copies resid before decrement; after pin reaches zero it never + * dereferences the old pointer and instead reclaims by value under the shard + * LWLock + entry spinlock cold recheck. */ extern void cluster_grd_entry_release(ClusterGrdEntry *entry); @@ -788,12 +790,21 @@ cluster_grd_entry_create_count(void); /* lifetime ++ on HASH_ENTER_NULL OK + new extern uint64 cluster_grd_entry_lookup_hit_count(void); /* lifetime ++ on OK return (atomic;P2.5 hit 语义) */ extern uint64 cluster_grd_entry_full_count(void); /* lifetime ++ on FULL (atomic) */ +extern uint64 cluster_grd_entries_reclaimed_count(void); +extern uint64 cluster_grd_reclaim_skipped_pinned_count(void); +extern uint64 cluster_grd_pin_high_water(void); +extern uint64 cluster_grd_sweep_runs(void); +extern uint32 cluster_grd_entry_pin_count(ClusterGrdEntry *entry); +extern bool cluster_grd_entry_is_reclaimable(ClusterGrdEntry *entry); +extern bool cluster_grd_reclaim_if_cold(const ClusterResId *resid); +extern int cluster_grd_reclaim_sweep(void); /* - * spec-2.15 D8: SRF row visitor. Iterates the entry HTAB (when - * allocated) and invokes `visitor(ctx, row_fields)` per entry under - * per-entry slock_t snapshot. The 11 row_fields columns are + * spec-2.15 D8 + spec-6.3a: SRF row visitor. Snapshots entry keys from + * shard-local entry lists, re-lookups each key through the lookup-pin API, + * and invokes `visitor(ctx, row_fields)` per entry under per-entry + * slock_t snapshot. The 11 row_fields columns are * (shard_id, field1, field2, field3, field4, type, lockmethodid, * ngranted, nwaiters, nconverts, state_flags) — all stored as int32. * @@ -801,12 +812,6 @@ extern uint64 cluster_grd_entry_full_count(void); /* lifetime ++ on FULL (atomic * srf.c) emit rows without exposing the private ClusterGrdEntry layout. * GUC=0 / htab==NULL → visitor invoked zero times (caller sees empty * result set, matching the NOT_READY sentinel surface). - * - * **spec-2.16 forward-link (P2.4 + I14)**: before caller-side - * LockAcquire 集成 ships, this visitor must amend locking — wrap - * hash_seq_search in full 4096-shard LW_SHARED acquire or chunked - * snapshot to defend concurrent HASH_ENTER_NULL writers. 本 spec - * 0 caller → 0 row → 无并发问题. */ typedef void (*ClusterGrdEntryRowVisitor)(void *ctx, const int32 row_fields[11]); @@ -1450,8 +1455,9 @@ extern ClusterGrdEntryResult cluster_grd_rollback_convert(const ClusterResId *re * diff). Sweeps all GRD entries: holder.node_id == dead_node_id * → release (independent of epoch per I48). * - * Idempotent (safe re-entry on bitmap re-sync). Skeleton (Step 1): - * no-op + DEBUG2 log. Body activation Step 4 D11. + * Idempotent (safe re-entry on bitmap re-sync). Since spec-6.3a, the + * sweep snapshots keys and re-lookups through the pin/release lifecycle + * before mutating entry state. */ extern void cluster_grd_cleanup_on_node_dead(int32 dead_node_id); diff --git a/src/include/cluster/cluster_guc.h b/src/include/cluster/cluster_guc.h index c32059522d..03ea74a49e 100644 --- a/src/include/cluster/cluster_guc.h +++ b/src/include/cluster/cluster_guc.h @@ -262,6 +262,8 @@ extern int cluster_shmem_max_regions; * ---------- */ extern int cluster_grd_max_entries; +extern bool cluster_grd_entry_reclaim; +extern int cluster_grd_entry_reclaim_max_per_sweep; /* spec-2.16 D12 + v0.5 P1.5: cluster.ges_request_timeout_ms + * effective_timeout helper. range [1, 600000]; default 60000. */ diff --git a/src/test/cluster_tap/t/104_grd_entry_shmem_alloc_smoke.pl b/src/test/cluster_tap/t/104_grd_entry_shmem_alloc_smoke.pl index a983eb48a7..6e17211e5a 100644 --- a/src/test/cluster_tap/t/104_grd_entry_shmem_alloc_smoke.pl +++ b/src/test/cluster_tap/t/104_grd_entry_shmem_alloc_smoke.pl @@ -14,7 +14,7 @@ # so hash_estimate_size(4096,...) ≈ 3-5MB,not naive 12KB) + # pg_cluster_grd_entries still 0 row (0 caller) # L4 SET cluster.grd_max_entries session-level fail (PGC_POSTMASTER) -# + dump_grd 37 row baseline + GRD atomic counters all 0 +# + dump_grd row baseline + GRD atomic counters all 0 # # Spec authority: pgrac:specs/spec-2.15-grd-entry-table-holders- # waiters.md (frozen v0.4 Q1-Q15 + 4 轮 codereview corrections). @@ -129,13 +129,14 @@ # 1 cleanup_skip_stale_cancel_count counter, spec-2.25 adds # 1 relation_object_cluster_path_count counter, spec-2.26 adds # 1 transaction_cluster_path_count counter, spec-5.1b D9 adds 3 -# convert verdict counters (granted_inplace / enqueued / illegal), and +# convert verdict counters (granted_inplace / enqueued / illegal), +# spec-6.3a adds 4 entry lifecycle reclaim counters, and # spec-5.10 D7 adds 4 starvation-fairness counters (boost / barrier_enqueued # / barrier_publish_fail / max_skip_observed). is($node->safe_psql('postgres', q{SELECT count(*)::int FROM pg_cluster_state WHERE category='grd'}), - '47', - 'L4b dump_grd category="grd" emits 47 rows (spec-2.14 8 + spec-2.15 6 + spec-2.16 14 + spec-2.17 9 + spec-2.24 1 + spec-2.25 1 + spec-2.26 1 + spec-5.1b 3 + spec-5.10 4)'); + '51', + 'L4b dump_grd category="grd" emits 51 rows (adds spec-6.3a entry lifecycle counters)'); # All 3 NEW atomic counter baseline 0 (本 spec 0 caller invokes # cluster_grd_entry_lookup_or_create). @@ -151,6 +152,23 @@ '0|0|0', 'L4c grd_entry_{create,lookup_hit,full}_count all 0 baseline (0 caller)'); +is($node->safe_psql('postgres', + q{SELECT (SELECT value FROM pg_cluster_state + WHERE category='grd' AND key='grd_entries_reclaimed_count') + || '|' || + (SELECT value FROM pg_cluster_state + WHERE category='grd' AND key='grd_reclaim_skipped_pinned_count') + || '|' || + (SELECT value FROM pg_cluster_state + WHERE category='grd' AND key='grd_pin_high_water')}), + '0|0|0', + 'L4c2 grd entry lifecycle reclaim counters all 0 baseline except sweep tick count'); + +ok($node->safe_psql('postgres', + q{SELECT (SELECT value::bigint FROM pg_cluster_state + WHERE category='grd' AND key='grd_sweep_runs') >= 0}) eq 't', + 'L4c3 grd_sweep_runs is present and non-negative'); + is($node->safe_psql('postgres', q{SELECT count(*)::int FROM pg_cluster_state WHERE category='grd' diff --git a/src/test/cluster_tap/t/296_grd_entry_lifecycle_reclaim_2node.pl b/src/test/cluster_tap/t/296_grd_entry_lifecycle_reclaim_2node.pl new file mode 100644 index 0000000000..56339d9a35 --- /dev/null +++ b/src/test/cluster_tap/t/296_grd_entry_lifecycle_reclaim_2node.pl @@ -0,0 +1,141 @@ +#!/usr/bin/env perl +#------------------------------------------------------------------------- +# +# 296_grd_entry_lifecycle_reclaim_2node.pl +# spec-6.3a -- GRD/GES entry lifecycle reclaim under cross-node churn. +# +# The workload uses a small GRD entry cap and more distinct cross-node +# advisory resources than the cap. Each resource is held on node0, observed +# as unavailable from node1, released, and then reacquired from node1. True +# cold reclaim keeps pg_cluster_grd_entries bounded and prevents FULL/53R71 +# style exhaustion under holderless churn. +# +# IDENTIFICATION +# src/test/cluster_tap/t/296_grd_entry_lifecycle_reclaim_2node.pl +# +# Portions Copyright (c) 2026, pgrac contributors +# +# Author: SqlRush +# +#------------------------------------------------------------------------- + +use strict; +use warnings; + +use FindBin; +use lib "$FindBin::RealBin/../../perl"; + +use PostgreSQL::Test::ClusterPair; +use Test::More; +use Time::HiRes qw(usleep); + +my ($pair, $n0, $n1); + +sub grd_sum +{ + my ($key) = @_; + my $sum = 0; + for my $node ($n0, $n1) + { + my $v = $node->safe_psql( + 'postgres', qq{ + SELECT coalesce(( + SELECT value::bigint FROM pg_cluster_state + WHERE category = 'grd' AND key = '$key'), 0)}); + $sum += int($v // 0); + } + return $sum; +} + +sub grd_entry_rows +{ + my $sum = 0; + for my $node ($n0, $n1) + { + my $v = $node->safe_psql('postgres', + q{SELECT count(*)::int FROM pg_cluster_grd_entries}); + $sum += int($v // 0); + } + return $sum; +} + +sub try_lock +{ + my ($node, $key) = @_; + return $node->safe_psql('postgres', "SELECT pg_try_advisory_lock($key)"); +} + +sub wait_until_acquirable +{ + my ($node, $key, $secs) = @_; + my $deadline = time() + $secs; + + while (time() < $deadline) + { + my $v = try_lock($node, $key); + return 1 if defined $v && $v =~ /^t/; + usleep(100_000); + } + return 0; +} + +$pair = PostgreSQL::Test::ClusterPair->new_pair( + 'grd_lifecycle', + quorum_voting_disks => 3, + shared_data => 1, + extra_conf => [ + 'autovacuum = off', + 'cluster.grd_max_entries = 32', + 'cluster.grd_entry_reclaim = on', + 'cluster.grd_entry_reclaim_max_per_sweep = 512', + 'cluster.cssd_heartbeat_interval_ms = 2000', + 'cluster.cssd_dead_deadband_factor = 10', + ]); +$pair->start_pair; +usleep(3_000_000); + +$n0 = $pair->node0; +$n1 = $pair->node1; + +my $alive0 = $n0->safe_psql('postgres', 'SELECT 1'); +my $alive1 = $n1->safe_psql('postgres', 'SELECT 1'); +if (($alive0 // '') ne '1' || ($alive1 // '') ne '1') +{ + $pair->stop_pair; + plan skip_all => "cluster pair prerequisites not met (alive0=$alive0 alive1=$alive1)"; +} + +ok($pair->wait_for_peer_state(0, 1, 'connected', 30), 'node0 sees node1 connected'); +ok($pair->wait_for_peer_state(1, 0, 'connected', 30), 'node1 sees node0 connected'); + +is(grd_entry_rows(), 0, 'initial GRD entry views are empty'); + +my $reclaimed_before = grd_sum('grd_entries_reclaimed_count'); +my $full_before = grd_sum('grd_entry_full_count'); +my $iters = 48; +my $blocked_ok = 1; +my $release_ok = 1; +my $holder = $n0->background_psql('postgres', on_error_die => 1); + +for my $i (1 .. $iters) +{ + my $key = 2960000 + $i; + + $holder->query_safe("SELECT pg_advisory_lock($key)"); + $blocked_ok &&= ((try_lock($n1, $key) // '') =~ /^f/); + $holder->query_safe("SELECT pg_advisory_unlock($key)"); + $release_ok &&= wait_until_acquirable($n1, $key, 10); +} +$holder->quit; + +ok($blocked_ok, "node1 try-locks were blocked for all $iters held resources"); +ok($release_ok, "node1 reacquired all $iters resources after release"); +is(grd_entry_rows(), 0, 'cross-node cold churn leaves no live GRD entries'); +ok(grd_sum('grd_entries_reclaimed_count') - $reclaimed_before >= $iters, + 'cross-node cold churn increments reclaimed counter'); +is(grd_sum('grd_entry_full_count') - $full_before, 0, + 'cross-node cold churn does not hit GRD entry FULL at cap 32'); +ok(grd_sum('grd_pin_high_water') >= 1, 'cross-node cold churn observes lookup pins'); + +$pair->stop_pair; +done_testing(); diff --git a/src/test/cluster_unit/test_cluster_debug.c b/src/test/cluster_unit/test_cluster_debug.c index 5ff80b0801..22aaffccc8 100644 --- a/src/test/cluster_unit/test_cluster_debug.c +++ b/src/test/cluster_unit/test_cluster_debug.c @@ -2767,6 +2767,30 @@ cluster_grd_entry_full_count(void) return 0; } +uint64 +cluster_grd_entries_reclaimed_count(void) +{ + return 0; +} + +uint64 +cluster_grd_reclaim_skipped_pinned_count(void) +{ + return 0; +} + +uint64 +cluster_grd_pin_high_water(void) +{ + return 0; +} + +uint64 +cluster_grd_sweep_runs(void) +{ + return 0; +} + uint64 cluster_grd_holders_full_count(void) { diff --git a/src/test/cluster_unit/test_cluster_grd.c b/src/test/cluster_unit/test_cluster_grd.c index 4dc7482871..49bbdb276c 100644 --- a/src/test/cluster_unit/test_cluster_grd.c +++ b/src/test/cluster_unit/test_cluster_grd.c @@ -45,6 +45,7 @@ #include "postgres.h" #include +#include #include #include "cluster/cluster_conf.h" @@ -147,6 +148,17 @@ void elog_finish(int e pg_attribute_unused(), const char *f pg_attribute_unused(), ...) {} +sigjmp_buf *PG_exception_stack = NULL; +ErrorContextCallback *error_context_stack = NULL; + +void +pg_re_throw(void) +{ + if (PG_exception_stack != NULL) + siglongjmp(*PG_exception_stack, 1); + abort(); +} + /* * spec-5.16 — test-only flag to force the GRD shmem stub to re-run its full * init (so the join-fence suite gets a clean recovery_done_epoch / fence / @@ -173,9 +185,7 @@ ShmemInitStruct(const char *name, Size size, bool *foundPtr) * standalone shmem stub's alignment to at least 8 bytes for * pg_atomic_uint64 fields inside ClusterGrdShared. */ uint64 force_align; - char data[131072]; /* 3×4096 atomic uint32 arrays + counters < 50KB - * (spec-4.6 adds master_generation[] + shard_phase[]); - * buffer 128KB 充足 */ + char data[262144]; /* ClusterGrdShared includes 4096 shard lists. */ } grd_buf; static bool grd_initialized = false; @@ -251,6 +261,8 @@ cluster_cssd_get_peer_state(int32 peer_id pg_attribute_unused()) * → skeleton mode → lookup_or_create returns NOT_READY; the soft-cap * regression test sets 1 and drives a tiny fake HTAB path. */ int cluster_grd_max_entries = 0; +bool cluster_grd_entry_reclaim = true; +int cluster_grd_entry_reclaim_max_per_sweep = 256; /* spec-5.10 — GES starvation-fairness GUC stub (cluster_grd.o references it). */ int cluster_ges_starvation_max_skips = 8; @@ -422,13 +434,14 @@ cluster_grd_outbound_enqueue_backend_request(uint32 dest_node_id pg_attribute_un return true; } -#define FAKE_GRD_HTAB_MAX_ENTRIES 4 +#define FAKE_GRD_HTAB_MAX_ENTRIES 320 #define FAKE_GRD_HTAB_ENTRY_BYTES 4096 static int fake_grd_htab_token; static int fake_grd_htab_count; static int fake_grd_htab_seq_index; static Size fake_grd_entrysize; +static bool fake_grd_htab_used[FAKE_GRD_HTAB_MAX_ENTRIES]; static union { uint64 force_align; char data[FAKE_GRD_HTAB_MAX_ENTRIES][FAKE_GRD_HTAB_ENTRY_BYTES]; @@ -437,9 +450,11 @@ static union { static void reset_fake_grd_htab(void) { + ut_reset_grd_shmem(); fake_grd_htab_count = 0; fake_grd_htab_seq_index = 0; fake_grd_entrysize = 0; + memset(fake_grd_htab_used, 0, sizeof(fake_grd_htab_used)); memset(&fake_grd_htab_entries, 0, sizeof(fake_grd_htab_entries)); } @@ -463,6 +478,7 @@ ShmemInitHash(const char *name pg_attribute_unused(), long init_size pg_attribut fake_grd_entrysize = infoP->entrysize; fake_grd_htab_count = 0; + memset(fake_grd_htab_used, 0, sizeof(fake_grd_htab_used)); memset(&fake_grd_htab_entries, 0, sizeof(fake_grd_htab_entries)); return (HTAB *)&fake_grd_htab_token; } @@ -484,17 +500,17 @@ hash_search_with_hash_value(HTAB *hashp pg_attribute_unused(), Assert(keyPtr != NULL); Assert(fake_grd_entrysize > 0); - for (i = 0; i < fake_grd_htab_count; i++) { + for (i = 0; i < FAKE_GRD_HTAB_MAX_ENTRIES; i++) { char *entry = fake_grd_htab_entries.data[i]; + if (!fake_grd_htab_used[i]) + continue; if (memcmp(entry, keyPtr, sizeof(ClusterResId)) == 0) { if (action == HASH_REMOVE) { if (foundPtr != NULL) *foundPtr = true; - if (i < fake_grd_htab_count - 1) - memcpy(fake_grd_htab_entries.data[i], - fake_grd_htab_entries.data[fake_grd_htab_count - 1], fake_grd_entrysize); - memset(fake_grd_htab_entries.data[fake_grd_htab_count - 1], 0, fake_grd_entrysize); + fake_grd_htab_used[i] = false; + memset(entry, 0, fake_grd_entrysize); fake_grd_htab_count--; return entry; } @@ -516,7 +532,16 @@ hash_search_with_hash_value(HTAB *hashp pg_attribute_unused(), if (fake_grd_htab_count >= FAKE_GRD_HTAB_MAX_ENTRIES) return NULL; - entry = fake_grd_htab_entries.data[fake_grd_htab_count++]; + for (i = 0; i < FAKE_GRD_HTAB_MAX_ENTRIES; i++) + if (!fake_grd_htab_used[i]) + break; + Assert(i < FAKE_GRD_HTAB_MAX_ENTRIES); + if (i >= FAKE_GRD_HTAB_MAX_ENTRIES) + return NULL; + + fake_grd_htab_used[i] = true; + fake_grd_htab_count++; + entry = fake_grd_htab_entries.data[i]; memset(entry, 0, fake_grd_entrysize); memcpy(entry, keyPtr, sizeof(ClusterResId)); return entry; @@ -534,9 +559,13 @@ hash_seq_init(HASH_SEQ_STATUS *status pg_attribute_unused(), HTAB *hashp pg_attr void * hash_seq_search(HASH_SEQ_STATUS *status pg_attribute_unused()) { - if (fake_grd_htab_seq_index >= fake_grd_htab_count) - return NULL; - return fake_grd_htab_entries.data[fake_grd_htab_seq_index++]; + while (fake_grd_htab_seq_index < FAKE_GRD_HTAB_MAX_ENTRIES) { + int i = fake_grd_htab_seq_index++; + + if (fake_grd_htab_used[i]) + return fake_grd_htab_entries.data[i]; + } + return NULL; } /* spec-5.13 S3/D4 stub: cluster_grd_clean_leave_verify_no_leftover early- @@ -638,11 +667,15 @@ typedef struct UtWfgEdge { static UtWfgEdge ut_wfg[UT_WFG_MAX]; static int ut_wfg_n = 0; +static bool ut_wfg_throw_on_submit_once = false; +static bool ut_wfg_throw_on_cancel_once = false; static void ut_wfg_reset(void) { ut_wfg_n = 0; + ut_wfg_throw_on_submit_once = false; + ut_wfg_throw_on_cancel_once = false; memset(ut_wfg, 0, sizeof(ut_wfg)); } @@ -661,6 +694,12 @@ cluster_lmd_submit_wait_edge_real(const ClusterLmdVertex *waiter, const ClusterL { int i; + if (ut_wfg_throw_on_submit_once) { + ut_wfg_throw_on_submit_once = false; + if (PG_exception_stack != NULL) + siglongjmp(*PG_exception_stack, 1); + abort(); + } if (waiter == NULL || blocker == NULL) return false; if (ut_vtx_eq(waiter, blocker)) /* self-edge rejected (graph defensive) */ @@ -682,6 +721,12 @@ cluster_lmd_cancel_wait_edge_real(const ClusterLmdVertex *waiter) { int i; + if (ut_wfg_throw_on_cancel_once) { + ut_wfg_throw_on_cancel_once = false; + if (PG_exception_stack != NULL) + siglongjmp(*PG_exception_stack, 1); + abort(); + } if (waiter == NULL) return; for (i = 0; i < ut_wfg_n;) { @@ -1156,8 +1201,7 @@ UT_TEST(test_grd_named_tranche_describe_only) UT_TEST(test_grd_entry_release_no_op_safe) { - /* RESERVED no-op contract (P1.3): cluster_grd_entry_release(NULL) - * must not crash; no side effect promised. */ + /* spec-6.3a: NULL release must remain safe for cleanup-style callers. */ cluster_grd_entry_release(NULL); UT_ASSERT_EQ(1, 1); /* reaching here suffices */ } @@ -1194,6 +1238,283 @@ UT_TEST(test_grd_hash_source_unification) UT_ASSERT_EQ(shard_a, shard_b); } +static void +grd_lifecycle_reset(int max_entries) +{ + ut_reset_grd_shmem(); + reset_fake_grd_htab(); + cluster_grd_max_entries = max_entries; + cluster_grd_entry_reclaim = true; + cluster_grd_entry_reclaim_max_per_sweep = 256; + cluster_node_id = 0; + cluster_grd_shmem_init(); +} + +static void +grd_lifecycle_resid(uint32 field1, ClusterResId *resid) +{ + LOCKTAG src; + + memset(&src, 0, sizeof(src)); + src.locktag_field1 = field1; + src.locktag_type = LOCKTAG_RELATION; + src.locktag_lockmethodid = 1; + cluster_grd_resid_encode(&src, resid); +} + +static ClusterGrdHolderId +grd_lifecycle_holder(int32 node_id, uint32 procno, uint64 request_id) +{ + ClusterGrdHolderId holder; + + memset(&holder, 0, sizeof(holder)); + holder.node_id = node_id; + holder.procno = procno; + holder.cluster_epoch = 1; + holder.request_id = request_id; + return holder; +} + +static ClusterGrdConvert +grd_lifecycle_convert_req(int32 node_id, uint32 procno, LOCKMODE current_mode, + LOCKMODE requested_mode, uint64 request_id) +{ + ClusterGrdConvert req; + + memset(&req, 0, sizeof(req)); + req.node_id = node_id; + req.source_node_id = node_id; + req.procno = procno; + req.cluster_epoch = 1; + req.current_mode = current_mode; + req.requested_mode = requested_mode; + req.convert_request_id = request_id; + req.request_opcode = 2; /* mirror GES_REQ_OPCODE_CONVERT without cluster_ges.h */ + return req; +} + +UT_TEST(test_grd_entry_pin_release_reclaims_cold) +{ + ClusterResId resid; + ClusterGrdEntry *first = NULL; + ClusterGrdEntry *second = NULL; + ClusterGrdEntry *again = NULL; + ClusterGrdEntry *after = (ClusterGrdEntry *)0xdeadbeef; + uint64 skipped_before; + uint64 reclaimed_before; + + grd_lifecycle_reset(4); + grd_lifecycle_resid(6301, &resid); + + skipped_before = cluster_grd_reclaim_skipped_pinned_count(); + reclaimed_before = cluster_grd_entries_reclaimed_count(); + + UT_ASSERT_EQ((int)cluster_grd_entry_lookup_or_create(&resid, true, &first), + (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ(cluster_grd_entry_pin_count(first), 1); + UT_ASSERT_EQ(cluster_grd_entry_count(), 1); + + UT_ASSERT_EQ((int)cluster_grd_entry_lookup_or_create(&resid, true, &second), + (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ((void *)second, (void *)first); + UT_ASSERT_EQ(cluster_grd_entry_pin_count(first), 2); + UT_ASSERT(cluster_grd_pin_high_water() >= 2); + + UT_ASSERT(!cluster_grd_reclaim_if_cold(&resid)); + UT_ASSERT_EQ(cluster_grd_entry_count(), 1); + UT_ASSERT_EQ(cluster_grd_reclaim_skipped_pinned_count(), skipped_before + 1); + + cluster_grd_entry_release(second); + UT_ASSERT_EQ(cluster_grd_entry_pin_count(first), 1); + UT_ASSERT_EQ(cluster_grd_entry_count(), 1); + + cluster_grd_entry_release(first); + UT_ASSERT_EQ(cluster_grd_entry_count(), 0); + UT_ASSERT_EQ(cluster_grd_entries_reclaimed_count(), reclaimed_before + 1); + + UT_ASSERT_EQ((int)cluster_grd_entry_lookup_or_create(&resid, false, &after), + (int)CLUSTER_GRD_ENTRY_NOT_FOUND); + UT_ASSERT_EQ((void *)after, (void *)NULL); + + /* Reusing the same resource after reclaim must start from a clean pin/state. */ + UT_ASSERT_EQ((int)cluster_grd_entry_lookup_or_create(&resid, true, &again), + (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ(cluster_grd_entry_pin_count(again), 1); + UT_ASSERT(!cluster_grd_entry_is_reclaimable(again)); + cluster_grd_entry_release(again); + UT_ASSERT_EQ(cluster_grd_entry_count(), 0); + + cluster_grd_max_entries = 0; + reset_fake_grd_htab(); +} + +UT_TEST(test_grd_reclaim_sweep_reclaims_legacy_cold_entries) +{ + ClusterGrdEntry *entry = NULL; + uint64 runs_before; + int i; + + grd_lifecycle_reset(4); + cluster_grd_entry_reclaim = false; + + for (i = 0; i < 3; i++) { + ClusterResId resid; + + grd_lifecycle_resid((uint32)(6310 + i), &resid); + UT_ASSERT_EQ((int)cluster_grd_entry_lookup_or_create(&resid, true, &entry), + (int)CLUSTER_GRD_ENTRY_OK); + cluster_grd_entry_release(entry); + UT_ASSERT_EQ(cluster_grd_entry_count(), i + 1); + } + + runs_before = cluster_grd_sweep_runs(); + UT_ASSERT_EQ(cluster_grd_reclaim_sweep(), 0); + UT_ASSERT_EQ(cluster_grd_sweep_runs(), runs_before); + + cluster_grd_entry_reclaim = true; + cluster_grd_entry_reclaim_max_per_sweep = 2; + UT_ASSERT_EQ(cluster_grd_reclaim_sweep(), 2); + UT_ASSERT_EQ(cluster_grd_sweep_runs(), runs_before + 1); + UT_ASSERT_EQ(cluster_grd_entry_count(), 1); + + cluster_grd_entry_reclaim_max_per_sweep = 256; + UT_ASSERT_EQ(cluster_grd_reclaim_sweep(), 1); + UT_ASSERT_EQ(cluster_grd_entry_count(), 0); + + cluster_grd_max_entries = 0; + reset_fake_grd_htab(); +} + +UT_TEST(test_grd_reclaim_sweep_honors_large_batch_guc) +{ + ClusterGrdEntry *entry = NULL; + int i; + + grd_lifecycle_reset(320); + cluster_grd_entry_reclaim = false; + + for (i = 0; i < 300; i++) { + ClusterResId resid; + + grd_lifecycle_resid((uint32)(6340 + i), &resid); + UT_ASSERT_EQ((int)cluster_grd_entry_lookup_or_create(&resid, true, &entry), + (int)CLUSTER_GRD_ENTRY_OK); + cluster_grd_entry_release(entry); + } + UT_ASSERT_EQ(cluster_grd_entry_count(), 300); + + cluster_grd_entry_reclaim = true; + cluster_grd_entry_reclaim_max_per_sweep = 300; + UT_ASSERT_EQ(cluster_grd_reclaim_sweep(), 300); + UT_ASSERT_EQ(cluster_grd_entry_count(), 0); + + cluster_grd_max_entries = 0; + reset_fake_grd_htab(); +} + +UT_TEST(test_grd_reclaim_excludes_live_state) +{ + ClusterResId resid; + ClusterGrdEntry *entry = NULL; + ClusterGrdHolderId h1; + ClusterGrdHolderId h2; + ClusterGrdConvert creq; + bool drain = false; + + grd_lifecycle_reset(4); + + grd_lifecycle_resid(6320, &resid); + h1 = grd_lifecycle_holder(1, 10, 100); + UT_ASSERT_EQ((int)cluster_grd_entry_lookup_or_create(&resid, true, &entry), + (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ((int)cluster_grd_entry_grant_holder(entry, &h1, ShareLock), + (int)CLUSTER_GRD_ENTRY_OK); + cluster_grd_entry_release(entry); + UT_ASSERT(!cluster_grd_reclaim_if_cold(&resid)); + UT_ASSERT_EQ(cluster_grd_entry_count(), 1); + UT_ASSERT_EQ((int)cluster_grd_release_holder_by_id(&resid, &h1), (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ(cluster_grd_entry_count(), 0); + + grd_lifecycle_resid(6321, &resid); + h1 = grd_lifecycle_holder(1, 11, 101); + UT_ASSERT_EQ((int)cluster_grd_entry_lookup_or_create(&resid, true, &entry), + (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ((int)cluster_grd_entry_add_waiter(entry, &h1, ExclusiveLock), + (int)CLUSTER_GRD_ENTRY_OK); + cluster_grd_entry_release(entry); + UT_ASSERT(!cluster_grd_reclaim_if_cold(&resid)); + UT_ASSERT_EQ(cluster_grd_entry_count(), 1); + UT_ASSERT_EQ((int)cluster_grd_cancel_waiter_by_id(&resid, &h1), (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ(cluster_grd_entry_count(), 0); + + grd_lifecycle_resid(6322, &resid); + h1 = grd_lifecycle_holder(1, 12, 102); + UT_ASSERT_EQ((int)cluster_grd_entry_lookup_or_create(&resid, true, &entry), + (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ((int)cluster_grd_reservation_create(entry, &h1, ShareLock), + (int)CLUSTER_GRD_ENTRY_OK); + cluster_grd_entry_release(entry); + UT_ASSERT(!cluster_grd_reclaim_if_cold(&resid)); + UT_ASSERT_EQ(cluster_grd_entry_count(), 1); + UT_ASSERT_EQ((int)cluster_grd_cancel_reservation_by_id(&resid, &h1), (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ(cluster_grd_entry_count(), 0); + + grd_lifecycle_resid(6323, &resid); + h1 = grd_lifecycle_holder(1, 13, 103); + h2 = grd_lifecycle_holder(2, 14, 104); + UT_ASSERT_EQ((int)cluster_grd_entry_lookup_or_create(&resid, true, &entry), + (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ((int)cluster_grd_entry_grant_holder(entry, &h1, ShareLock), + (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ((int)cluster_grd_entry_grant_holder(entry, &h2, ShareLock), + (int)CLUSTER_GRD_ENTRY_OK); + creq = grd_lifecycle_convert_req(1, 13, ShareLock, AccessExclusiveLock, 203); + UT_ASSERT_EQ((int)cluster_grd_entry_request_convert(entry, &creq, &drain), + (int)CLUSTER_GRD_CONVERT_ENQUEUED); + UT_ASSERT_EQ(cluster_grd_entry_nconverts(entry), 1); + cluster_grd_entry_release(entry); + UT_ASSERT(!cluster_grd_reclaim_if_cold(&resid)); + UT_ASSERT_EQ(cluster_grd_entry_count(), 1); + h1.request_id = 203; + UT_ASSERT_EQ((int)cluster_grd_cancel_convert_by_id(&resid, &h1, 0), (int)CLUSTER_GRD_ENTRY_OK); + h1.request_id = 103; + UT_ASSERT_EQ((int)cluster_grd_release_holder_by_id(&resid, &h2), (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ((int)cluster_grd_release_holder_by_id(&resid, &h1), (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ(cluster_grd_entry_count(), 0); + + cluster_grd_max_entries = 0; + reset_fake_grd_htab(); +} + +UT_TEST(test_grd_entry_release_overrelease_fail_safe) +{ + ClusterResId resid; + ClusterGrdEntry *entry = NULL; + ClusterGrdHolderId holder; + + grd_lifecycle_reset(4); + grd_lifecycle_resid(6330, &resid); + holder = grd_lifecycle_holder(1, 20, 200); + + UT_ASSERT_EQ((int)cluster_grd_entry_lookup_or_create(&resid, true, &entry), + (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ((int)cluster_grd_entry_grant_holder(entry, &holder, ShareLock), + (int)CLUSTER_GRD_ENTRY_OK); + cluster_grd_entry_release(entry); + UT_ASSERT_EQ(cluster_grd_entry_pin_count(entry), 0); + UT_ASSERT_EQ(cluster_grd_entry_count(), 1); + + cluster_grd_entry_release(entry); + UT_ASSERT_EQ(cluster_grd_entry_pin_count(entry), 0); + UT_ASSERT_EQ(cluster_grd_entry_count(), 1); + + UT_ASSERT_EQ((int)cluster_grd_release_holder_by_id(&resid, &holder), (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ(cluster_grd_entry_count(), 0); + + cluster_grd_max_entries = 0; + reset_fake_grd_htab(); +} + /* ============================================================ * spec-2.26 T-grd-N..N+2 — LOCKTAG_TRANSACTION ClusterResId wrapper + * cleanup tests. @@ -1334,6 +1655,7 @@ UT_TEST(test_grd_transaction_cleanup_on_node_dead_removes_entry) (int)CLUSTER_GRD_ENTRY_OK); UT_ASSERT_EQ(cluster_grd_entry_has_remote_holder(entry, 0), true); UT_ASSERT_EQ(cluster_grd_entry_has_pending_waiter(entry), true); + cluster_grd_entry_release(entry); cluster_grd_cleanup_on_node_dead(5); @@ -1382,6 +1704,7 @@ UT_TEST(test_grd_release_and_drain_reclaims_empty_entry) UT_ASSERT_EQ((int)cluster_grd_entry_grant_holder(entry, &h1, ExclusiveLock), (int)CLUSTER_GRD_ENTRY_OK); UT_ASSERT_EQ(cluster_grd_entry_count(), 1); + cluster_grd_entry_release(entry); (void)cluster_grd_release_and_drain(&resid, &h1, granted, lengthof(granted)); UT_ASSERT_EQ(cluster_grd_entry_count(), 0); /* empty -> reclaimed */ @@ -1403,6 +1726,7 @@ UT_TEST(test_grd_release_and_drain_reclaims_empty_entry) UT_ASSERT_EQ((int)cluster_grd_entry_grant_holder(entry, &h2, AccessShareLock), (int)CLUSTER_GRD_ENTRY_OK); UT_ASSERT_EQ(cluster_grd_entry_count(), 1); + cluster_grd_entry_release(entry); (void)cluster_grd_release_and_drain(&resid, &h1, granted, lengthof(granted)); UT_ASSERT_EQ(cluster_grd_entry_count(), 1); /* h2 still holds -> NOT reclaimed */ (void)cluster_grd_release_and_drain(&resid, &h2, granted, lengthof(granted)); @@ -1430,6 +1754,7 @@ UT_TEST(test_grd_release_and_drain_reclaims_empty_entry) hx.request_id = (uint64)(200 + i); UT_ASSERT_EQ((int)cluster_grd_entry_grant_holder(e2, &hx, ExclusiveLock), (int)CLUSTER_GRD_ENTRY_OK); + cluster_grd_entry_release(e2); (void)cluster_grd_release_and_drain(&r2, &hx, granted, lengthof(granted)); } UT_ASSERT_EQ(cluster_grd_entry_count(), 0); /* no accumulation */ @@ -1479,6 +1804,8 @@ UT_TEST(test_grd_entry_existing_hit_survives_soft_cap) UT_ASSERT_EQ((void *)third, (void *)NULL); UT_ASSERT_EQ(cluster_grd_entry_count(), 1); + cluster_grd_entry_release(second); + cluster_grd_entry_release(first); cluster_grd_max_entries = 0; reset_fake_grd_htab(); } @@ -1742,8 +2069,7 @@ convert_reset(void) * reset_fake_grd_htab() only clears the fake HTAB storage. The convert * suite is not exercising the entry cap, so set a max far above the * total entries the suite ever creates to keep lookup_or_create out of - * the FULL path. At most ~2 entries are live per fake-HTAB reset, so - * the 4-slot fake storage never overflows. + * the FULL path. The fake storage is larger than the per-test live set. */ reset_fake_grd_htab(); cluster_grd_max_entries = 1000000; @@ -2723,6 +3049,7 @@ UT_TEST(test_ul_grant_conditional_no_waiter_enqueued) conflicts, &n_conflict), (int)CLUSTER_GRD_CONFLICT_NOWAIT); UT_ASSERT_EQ(cluster_grd_entry_ngranted(e), 1); /* node2 NOT added as a holder */ + cluster_grd_entry_release(e); /* Release node1: zero compatible waiters pop (grant_conditional enqueued none). */ h = bast_holder(1, 100, 1); @@ -3101,6 +3428,65 @@ UT_TEST(test_5_8_d1b_u2d_cancel_removes_waiter_edges) convert_teardown(); } +UT_TEST(test_grd_pin_cleanup_on_lmd_submit_error) +{ + int saved_node = cluster_node_id; + int saved_max_skips = cluster_ges_starvation_max_skips; + ClusterResId resid; + ClusterGrdHolderId h; + ClusterGrdEntry *entry = NULL; + ClusterGrdConflictHolder conflicts[PGRAC_GRD_MAX_HOLDERS_PUBLIC]; + volatile bool caught = false; + int nc = -1; + + cluster_node_id = 0; + convert_reset(); + ut_wfg_reset(); + cluster_grd_set_starvation_protection(true); + cluster_ges_starvation_max_skips = 1; + bast_resid(5804, &resid); + + h = bast_holder(1, 100, 1); + UT_ASSERT_EQ((int)cluster_grd_entry_enqueue_or_grant(&resid, &h, 1, 1, 0, UT_GES_OPCODE_REQUEST, + ShareLock, conflicts, &nc), + (int)CLUSTER_GRD_GRANT_NOW); + + h = bast_holder(2, 200, 2); + UT_ASSERT_EQ((int)cluster_grd_entry_enqueue_or_grant(&resid, &h, 2, 2, 0, UT_GES_OPCODE_REQUEST, + ExclusiveLock, conflicts, &nc), + (int)CLUSTER_GRD_ENQUEUED_WAITER); + + ut_wfg_throw_on_submit_once = true; + h = bast_holder(3, 300, 3); + PG_TRY(); + { + (void)cluster_grd_entry_enqueue_or_grant(&resid, &h, 3, 3, 0, UT_GES_OPCODE_REQUEST, + ShareLock, conflicts, &nc); + } + PG_CATCH(); + { + caught = true; + } + PG_END_TRY(); + + UT_ASSERT(caught); + UT_ASSERT(!ut_wfg_throw_on_submit_once); + UT_ASSERT_EQ((int)cluster_grd_entry_lookup_or_create(&resid, false, &entry), + (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ(cluster_grd_entry_pin_count(entry), 1); + cluster_grd_entry_release(entry); + + h = bast_holder(2, 200, 2); + UT_ASSERT_EQ((int)cluster_grd_cancel_waiter_by_id(&resid, &h), (int)CLUSTER_GRD_ENTRY_OK); + h = bast_holder(1, 100, 1); + UT_ASSERT_EQ((int)cluster_grd_release_holder_by_id(&resid, &h), (int)CLUSTER_GRD_ENTRY_OK); + UT_ASSERT_EQ(cluster_grd_entry_count(), 0); + + cluster_ges_starvation_max_skips = saved_max_skips; + cluster_node_id = saved_node; + convert_teardown(); +} + /* ============================================================ * spec-5.8 D1c U3 — the master-side WFG vertex carries the real waiter xid @@ -3645,8 +4031,12 @@ int * other cluster_unit binaries; argv is intentionally unused. */ main(int argc pg_attribute_unused(), char *argv[] pg_attribute_unused()) { - UT_PLAN( - 74); /* prior 42; spec-5.3:+6; 5.5:+3; +1 release reclaim; 5.8 D1b:+4 (U2a-d); D1c:+2 (U3a-b); D1e:+2 (U4a-b); 5.9 Hardening:+1 (convert ABA); spec-5.16:+12 (join-remaster U1-U5/U10-U16); +1 (U17 cross-episode fence Hardening) */ + /* prior 42; spec-5.3:+6; 5.5:+3; +1 release reclaim; + * spec-6.3a:+6 lifecycle; 5.8 D1b:+4 (U2a-d); D1c:+2 (U3a-b); + * D1e:+2 (U4a-b); 5.9 Hardening:+1 (convert ABA); + * spec-5.16:+12 (join-remaster U1-U5/U10-U16); + * +1 (U17 cross-episode fence Hardening). */ + UT_PLAN(80); UT_RUN(test_grd_clusterresid_size_16); UT_RUN(test_grd_resid_encode_decode_roundtrip); @@ -3662,6 +4052,11 @@ main(int argc pg_attribute_unused(), char *argv[] pg_attribute_unused()) UT_RUN(test_grd_named_tranche_describe_only); UT_RUN(test_grd_entry_release_no_op_safe); UT_RUN(test_grd_hash_source_unification); + UT_RUN(test_grd_entry_pin_release_reclaims_cold); + UT_RUN(test_grd_reclaim_sweep_reclaims_legacy_cold_entries); + UT_RUN(test_grd_reclaim_sweep_honors_large_batch_guc); + UT_RUN(test_grd_reclaim_excludes_live_state); + UT_RUN(test_grd_entry_release_overrelease_fail_safe); /* spec-2.26 T-grd-N..N+2 */ UT_RUN(test_grd_resid_encode_transaction_roundtrip); @@ -3720,6 +4115,7 @@ main(int argc pg_attribute_unused(), char *argv[] pg_attribute_unused()) UT_RUN(test_5_8_d1b_u2b_refresh_follows_current_holders); UT_RUN(test_5_8_d1b_u2c_convert_enqueue_registers_edge); UT_RUN(test_5_8_d1b_u2d_cancel_removes_waiter_edges); + UT_RUN(test_grd_pin_cleanup_on_lmd_submit_error); /* spec-5.8 D1c — waiter xid threaded into the WFG vertex (U3a-b). */ UT_RUN(test_5_8_d1c_u3a_request_waiter_carries_xid); diff --git a/src/test/cluster_unit/test_cluster_grd_starvation.c b/src/test/cluster_unit/test_cluster_grd_starvation.c index 4323419956..b4d306d0df 100644 --- a/src/test/cluster_unit/test_cluster_grd_starvation.c +++ b/src/test/cluster_unit/test_cluster_grd_starvation.c @@ -51,6 +51,7 @@ #include "postgres.h" #include +#include #include #include "cluster/cluster_conf.h" @@ -153,6 +154,30 @@ void elog_finish(int e pg_attribute_unused(), const char *f pg_attribute_unused(), ...) {} +sigjmp_buf *PG_exception_stack = NULL; +ErrorContextCallback *error_context_stack = NULL; + +void +pg_re_throw(void) +{ + if (PG_exception_stack != NULL) + siglongjmp(*PG_exception_stack, 1); + abort(); +} + +/* + * spec-5.16 — test-only flag to force the GRD shmem stub to re-run its full + * init between scenarios. The stub allocates the buffer once, so without this + * a repeat cluster_grd_shmem_init() would see found=true and skip shared-state + * field init, including the spec-6.3a shard entry lists. + */ +static bool ut_grd_force_reinit = false; +static void +ut_reset_grd_shmem(void) +{ + ut_grd_force_reinit = true; +} + /* * L105 union force-align shmem stub. */ @@ -166,12 +191,14 @@ ShmemInitStruct(const char *name, Size size, bool *foundPtr) * standalone shmem stub's alignment to at least 8 bytes for * pg_atomic_uint64 fields inside ClusterGrdShared. */ uint64 force_align; - char data[131072]; /* 3×4096 atomic uint32 arrays + counters < 50KB - * (spec-4.6 adds master_generation[] + shard_phase[]); - * buffer 128KB 充足 */ + char data[262144]; /* ClusterGrdShared includes 4096 shard lists. */ } grd_buf; static bool grd_initialized = false; + if (ut_grd_force_reinit) { + grd_initialized = false; + ut_grd_force_reinit = false; + } Assert(size <= sizeof(grd_buf.data)); *foundPtr = grd_initialized; grd_initialized = true; @@ -240,6 +267,8 @@ cluster_cssd_get_peer_state(int32 peer_id pg_attribute_unused()) * → skeleton mode → lookup_or_create returns NOT_READY; the soft-cap * regression test sets 1 and drives a tiny fake HTAB path. */ int cluster_grd_max_entries = 0; +bool cluster_grd_entry_reclaim = true; +int cluster_grd_entry_reclaim_max_per_sweep = 256; /* spec-4.6 D2 stub: cluster_grd_lookup_master_gen forwards the LMS * wire routing token verbatim (Q3-C). Settable so the unit test can @@ -395,13 +424,14 @@ cluster_grd_outbound_enqueue_backend_request(uint32 dest_node_id pg_attribute_un return true; } -#define FAKE_GRD_HTAB_MAX_ENTRIES 4 +#define FAKE_GRD_HTAB_MAX_ENTRIES 320 #define FAKE_GRD_HTAB_ENTRY_BYTES 4096 static int fake_grd_htab_token; static int fake_grd_htab_count; static int fake_grd_htab_seq_index; static Size fake_grd_entrysize; +static bool fake_grd_htab_used[FAKE_GRD_HTAB_MAX_ENTRIES]; static union { uint64 force_align; char data[FAKE_GRD_HTAB_MAX_ENTRIES][FAKE_GRD_HTAB_ENTRY_BYTES]; @@ -410,9 +440,11 @@ static union { static void reset_fake_grd_htab(void) { + ut_reset_grd_shmem(); fake_grd_htab_count = 0; fake_grd_htab_seq_index = 0; fake_grd_entrysize = 0; + memset(fake_grd_htab_used, 0, sizeof(fake_grd_htab_used)); memset(&fake_grd_htab_entries, 0, sizeof(fake_grd_htab_entries)); } @@ -436,6 +468,7 @@ ShmemInitHash(const char *name pg_attribute_unused(), long init_size pg_attribut fake_grd_entrysize = infoP->entrysize; fake_grd_htab_count = 0; + memset(fake_grd_htab_used, 0, sizeof(fake_grd_htab_used)); memset(&fake_grd_htab_entries, 0, sizeof(fake_grd_htab_entries)); return (HTAB *)&fake_grd_htab_token; } @@ -457,17 +490,17 @@ hash_search_with_hash_value(HTAB *hashp pg_attribute_unused(), Assert(keyPtr != NULL); Assert(fake_grd_entrysize > 0); - for (i = 0; i < fake_grd_htab_count; i++) { + for (i = 0; i < FAKE_GRD_HTAB_MAX_ENTRIES; i++) { char *entry = fake_grd_htab_entries.data[i]; + if (!fake_grd_htab_used[i]) + continue; if (memcmp(entry, keyPtr, sizeof(ClusterResId)) == 0) { if (action == HASH_REMOVE) { if (foundPtr != NULL) *foundPtr = true; - if (i < fake_grd_htab_count - 1) - memcpy(fake_grd_htab_entries.data[i], - fake_grd_htab_entries.data[fake_grd_htab_count - 1], fake_grd_entrysize); - memset(fake_grd_htab_entries.data[fake_grd_htab_count - 1], 0, fake_grd_entrysize); + fake_grd_htab_used[i] = false; + memset(entry, 0, fake_grd_entrysize); fake_grd_htab_count--; return entry; } @@ -489,7 +522,16 @@ hash_search_with_hash_value(HTAB *hashp pg_attribute_unused(), if (fake_grd_htab_count >= FAKE_GRD_HTAB_MAX_ENTRIES) return NULL; - entry = fake_grd_htab_entries.data[fake_grd_htab_count++]; + for (i = 0; i < FAKE_GRD_HTAB_MAX_ENTRIES; i++) + if (!fake_grd_htab_used[i]) + break; + Assert(i < FAKE_GRD_HTAB_MAX_ENTRIES); + if (i >= FAKE_GRD_HTAB_MAX_ENTRIES) + return NULL; + + fake_grd_htab_used[i] = true; + fake_grd_htab_count++; + entry = fake_grd_htab_entries.data[i]; memset(entry, 0, fake_grd_entrysize); memcpy(entry, keyPtr, sizeof(ClusterResId)); return entry; @@ -507,9 +549,13 @@ hash_seq_init(HASH_SEQ_STATUS *status pg_attribute_unused(), HTAB *hashp pg_attr void * hash_seq_search(HASH_SEQ_STATUS *status pg_attribute_unused()) { - if (fake_grd_htab_seq_index >= fake_grd_htab_count) - return NULL; - return fake_grd_htab_entries.data[fake_grd_htab_seq_index++]; + while (fake_grd_htab_seq_index < FAKE_GRD_HTAB_MAX_ENTRIES) { + int i = fake_grd_htab_seq_index++; + + if (fake_grd_htab_used[i]) + return fake_grd_htab_entries.data[i]; + } + return NULL; } /* spec-5.13 S3/D4 stub: cluster_grd_clean_leave_verify_no_leftover (in diff --git a/src/test/cluster_unit/test_cluster_lmon.c b/src/test/cluster_unit/test_cluster_lmon.c index 4d6bcc6cb8..3b36fae5b2 100644 --- a/src/test/cluster_unit/test_cluster_lmon.c +++ b/src/test/cluster_unit/test_cluster_lmon.c @@ -565,6 +565,13 @@ void cluster_grd_lmon_tick_dead_sweep(void) {} +int cluster_grd_reclaim_sweep(void); +int +cluster_grd_reclaim_sweep(void) +{ + return 0; +} + /* spec-5.10 fix-forward — cluster_lmon.c calls the runtime-off starvation sweep. */ uint32 cluster_grd_lmon_tick_starvation_sweep(void); uint32