From aebb5c4ce2cb3be1c0f3af96105e32ca14c68b40 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Fri, 29 May 2026 12:38:45 -0700 Subject: [PATCH] feat(challenges): incremental Phase 3 + cooldown-trigger index MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related fixes for the wedge the IndexChallengesJob hit right after #842 deployed. The first post-deploy reconcile tick stalled (20+ min, no completion) because the four new Phase 3 processors (m/r/rv/rd) were full-scan over 2.3M user_events, and every is_complete=true upsert fired the on_user_challenge trigger's cooldown-window check — an unindexed scan against the 8 GB notification table that ran 19s+ per call. This change splits the fix: - Phase 3 processors go incremental (#875-style dirty-set). Each tick rescans only user_events rows whose blocknumber moved past a per-processor checkpoint, then re-derives state for the affected users. New supporting files: * jobs/challenges/user_event_challenges.go (rewrite) * ddl/migrations/0209: btree on user_events(blocknumber) so the dirty scan is an index range read instead of a 2.3M-row seq-scan * ddl/migrations/0211: seed the four checkpoints to current max user_events.blocknumber so prod starts "from now" and skips the redundant backfill (Python already populated user_challenges; the upserts are idempotent) - Partial GIN on notification(user_ids) WHERE type='reward_in_cooldown' (ddl/migrations/0210, CONCURRENTLY). Lets the trigger's cooldown-window query hit a small in-subset index instead of the full 8 GB table. Helps every cooldown_days>0 challenge, not just Phase 3. Caveat noted in user_event_challenges.go: r/rv key the dirty scan on user_events.blocknumber, so a referrer's verification flip is not picked up until the *referred* user's row changes again. Verification changes are rare and the old full-scan code only caught them on the next tick; this matches the precedent #875 set for the other incremental processors. Tests: existing 5 user_events processor tests still pass (checkpoint defaults to 0 in the test DB → first run catches the seeded block-101 rows); added TestMobileInstall_SkipsRowsBelowCheckpoint to explicitly cover the dirty-set skip path. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../0209_user_events_blocknumber_idx.sql | 24 +++ ...0210_notification_cooldown_partial_gin.sql | 33 +++++ ...11_seed_phase_3_user_event_checkpoints.sql | 50 +++++++ jobs/challenges/user_event_challenges.go | 139 ++++++++++++++---- jobs/challenges/user_event_challenges_test.go | 22 +++ sql/01_schema.sql | 28 ++++ sql/03_migration_tracker.sql | 3 + 7 files changed, 272 insertions(+), 27 deletions(-) create mode 100644 ddl/migrations/0209_user_events_blocknumber_idx.sql create mode 100644 ddl/migrations/0210_notification_cooldown_partial_gin.sql create mode 100644 ddl/migrations/0211_seed_phase_3_user_event_checkpoints.sql diff --git a/ddl/migrations/0209_user_events_blocknumber_idx.sql b/ddl/migrations/0209_user_events_blocknumber_idx.sql new file mode 100644 index 00000000..18fc71ea --- /dev/null +++ b/ddl/migrations/0209_user_events_blocknumber_idx.sql @@ -0,0 +1,24 @@ +-- Btree index on user_events(blocknumber) — required by the dirty-set +-- scan that drives the Phase 3 (m/r/rv/rd) incremental processors. +-- +-- Each tick the processors run: +-- SELECT user_id, blocknumber FROM user_events +-- WHERE blocknumber > $1 +-- AND is_current = true +-- AND +-- ORDER BY blocknumber ASC LIMIT $2 +-- +-- Without this index that query is a seq-scan of 2.3M rows (the table only +-- carries a pkey and a user_id btree today). With it the scan is a small +-- index range read keyed on the per-processor checkpoint — same pattern +-- #875 established for follows/saves/reposts/tracks/playlists/users. +-- +-- NOTE: intentionally NOT wrapped in BEGIN/COMMIT so CREATE INDEX +-- CONCURRENTLY can run without holding an ACCESS EXCLUSIVE lock on +-- user_events. IF NOT EXISTS makes the migration idempotent. + +CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_user_events_blocknumber + ON public.user_events USING btree (blocknumber); + +COMMENT ON INDEX ix_user_events_blocknumber IS + 'Range scans by blocknumber for the incremental Phase 3 challenge processors (m/r/rv/rd).'; diff --git a/ddl/migrations/0210_notification_cooldown_partial_gin.sql b/ddl/migrations/0210_notification_cooldown_partial_gin.sql new file mode 100644 index 00000000..06f09b71 --- /dev/null +++ b/ddl/migrations/0210_notification_cooldown_partial_gin.sql @@ -0,0 +1,33 @@ +-- Partial GIN on notification(user_ids) WHERE type='reward_in_cooldown'. +-- +-- The on_user_challenge trigger fires on every is_complete=true write to +-- user_challenges. For challenges with cooldown_days > 0 it does: +-- +-- SELECT id FROM notification +-- WHERE type = 'reward_in_cooldown' +-- AND new.user_id = ANY(user_ids) +-- AND timestamp >= (new.completed_at - interval '1 hour') +-- LIMIT 1; +-- +-- The full GIN on user_ids matches across all ~23.5M / 8 GB of notification +-- rows, so each trigger call became IO-bound (DataFileRead). Right after +-- #842 (which added Phase 3 m/r/rv/rd — all cooldown_days=7) deployed, +-- pg_stat_activity caught individual user_challenges upserts spending +-- 19s+ per row, wedging the IndexChallengesJob's first reconcile tick. +-- +-- A partial GIN restricted to type='reward_in_cooldown' is small (one type +-- out of ~30) and lets the planner go straight to the candidate user-id +-- matches within that slice; the timestamp filter then runs over a handful +-- of rows instead of the whole table. The benefit applies to every +-- cooldown_days>0 challenge, not just Phase 3. +-- +-- NOTE: intentionally NOT wrapped in BEGIN/COMMIT so CREATE INDEX +-- CONCURRENTLY can run without holding an ACCESS EXCLUSIVE lock on +-- notification. IF NOT EXISTS makes the migration idempotent. + +CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_notification_cooldown_user_ids + ON public.notification USING gin (user_ids) + WHERE type = 'reward_in_cooldown'; + +COMMENT ON INDEX ix_notification_cooldown_user_ids IS + 'Partial GIN for the on_user_challenge trigger''s cooldown-window check; replaces a multi-second IO-bound scan against the full 8GB notification table with a tiny in-subset lookup.'; diff --git a/ddl/migrations/0211_seed_phase_3_user_event_checkpoints.sql b/ddl/migrations/0211_seed_phase_3_user_event_checkpoints.sql new file mode 100644 index 00000000..44261f52 --- /dev/null +++ b/ddl/migrations/0211_seed_phase_3_user_event_checkpoints.sql @@ -0,0 +1,50 @@ +-- Seed the incremental checkpoints for the Phase 3 user_events processors. +-- +-- mobile_install (m), referral (r), verified_referral (rv), and referred +-- (rd) are now rewritten in the same dirty-set style as #875's other +-- aggregating processors: each tick only rescans user_events rows whose +-- blocknumber moved past a per-processor checkpoint. See +-- jobs/challenges/user_event_challenges.go. +-- +-- A fresh checkpoint defaults to 0, which would re-derive every completion +-- over ~2.3M user_events rows on first run — exactly the load that wedged +-- the cd94ede (#842) deploy's first reconcile tick: each completed upsert +-- fired the handle_on_user_challenge trigger, whose cooldown_days>0 path +-- scans the 8GB notification table. Seed each checkpoint to the current +-- max(user_events.blocknumber) so prod starts "from now" and skips the +-- redundant backfill. Python already populated user_challenges and the +-- upserts are idempotent. +-- +-- ON CONFLICT DO NOTHING keeps this idempotent and never rewinds a +-- checkpoint the running job has already advanced. The max(blocknumber) +-- probe is index-only against ix_user_events_blocknumber (added in 0209). +-- +-- Checkpoint names must match the constants in user_event_challenges.go. + +BEGIN; + +-- mobile_install +INSERT INTO indexing_checkpoints (tablename, last_checkpoint) +SELECT 'challenges:m:last_blocknumber', + COALESCE((SELECT max(blocknumber) FROM user_events), 0) +ON CONFLICT (tablename) DO NOTHING; + +-- referral (unverified tier) +INSERT INTO indexing_checkpoints (tablename, last_checkpoint) +SELECT 'challenges:r:last_blocknumber', + COALESCE((SELECT max(blocknumber) FROM user_events), 0) +ON CONFLICT (tablename) DO NOTHING; + +-- referral (verified tier) +INSERT INTO indexing_checkpoints (tablename, last_checkpoint) +SELECT 'challenges:rv:last_blocknumber', + COALESCE((SELECT max(blocknumber) FROM user_events), 0) +ON CONFLICT (tablename) DO NOTHING; + +-- referred (the referred user's side) +INSERT INTO indexing_checkpoints (tablename, last_checkpoint) +SELECT 'challenges:rd:last_blocknumber', + COALESCE((SELECT max(blocknumber) FROM user_events), 0) +ON CONFLICT (tablename) DO NOTHING; + +COMMIT; diff --git a/jobs/challenges/user_event_challenges.go b/jobs/challenges/user_event_challenges.go index c902d52b..c62203ae 100644 --- a/jobs/challenges/user_event_challenges.go +++ b/jobs/challenges/user_event_challenges.go @@ -3,9 +3,32 @@ // The mobile-install ("m") and referral ("r"/"rv"/"rd") challenges are // driven by the user_events table, which the indexer populates from the // `events` object in user-profile metadata (is_mobile_user, referrer) — -// see indexer/user_events_hook.go. Each processor recomputes completions -// from the current user_events state on every Reconcile (no checkpoint), -// matching the on-chain-derived Phase 1/2 processors. +// see indexer/user_events_hook.go. +// +// Incremental: like #875's other dirty-set processors, each tick only +// rescans user_events rows whose blocknumber moved past a per-processor +// checkpoint, then re-derives state for just the affected users. Three +// supporting migrations land alongside this file: +// +// - 0209 adds a btree on user_events(blocknumber) so the dirty scan is +// an index range read instead of a 2.3M-row seq-scan. +// - 0210 adds a partial GIN on notification(user_ids) WHERE +// type='reward_in_cooldown' — the on_user_challenge trigger's +// cooldown-window check fires on every is_complete=true upsert for +// challenges with cooldown_days>0 (which all four Phase 3 challenges +// have), and without that index it scans the full 8GB notification +// table per call. +// - 0211 seeds the four checkpoints to the current max +// user_events.blocknumber so prod starts "from now" and skips +// re-deriving completions over 2.3M historical rows on first run. +// Python already populated user_challenges; upserts are idempotent. +// +// Caveat: the dirty scan keys on user_events.blocknumber only, so a +// verification flip on an existing referrer's users row (`is_verified` +// goes true) is NOT picked up by the r/rv processors until the *referred* +// user's user_events row changes again. Verification changes are rare and +// the old full-scan code caught them only on its next tick; this matches +// #875's other processors which similarly key on a single source. package challenges import ( @@ -15,12 +38,39 @@ import ( "github.com/jackc/pgx/v5" ) -// MobileInstallProcessor implements challenge "m" — boolean reward when a -// user's profile reports is_mobile_user (events.is_mobile_user=true). +// userEventReferralDirtySQL surfaces user_events rows whose blocknumber moved +// past the checkpoint and currently carry a referrer. Shared by the r, rv, and +// rd processors — the dirty source is the same; recompute partitions the +// recipient (referrer vs referred) and the verification gate. +// +// The user_id column here is the *referred* user (the user_events row that +// names a referrer); recompute resolves the (referrer, referred) pairs. +const userEventReferralDirtySQL = ` + SELECT user_id, blocknumber FROM user_events + WHERE blocknumber > $1 + AND is_current = true + AND referrer IS NOT NULL + ORDER BY blocknumber ASC + LIMIT $2 +` + +// MobileInstallProcessor — challenge "m". Boolean reward when a user's +// profile reports is_mobile_user=true (events.is_mobile_user). type MobileInstallProcessor struct{} func (p *MobileInstallProcessor) ChallengeID() string { return "m" } +const mobileInstallCheckpoint = "challenges:m:last_blocknumber" + +const mobileInstallDirtySQL = ` + SELECT user_id, blocknumber FROM user_events + WHERE blocknumber > $1 + AND is_current = true + AND is_mobile_user = true + ORDER BY blocknumber ASC + LIMIT $2 +` + func (p *MobileInstallProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) if err != nil { @@ -31,28 +81,45 @@ func (p *MobileInstallProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error } amount := c.AmountInt() + return reconcileIncrementalUsers(ctx, tx, mobileInstallCheckpoint, mobileInstallDirtySQL, + func(ctx context.Context, tx pgx.Tx, userIDs []int64) error { + return p.recompute(ctx, tx, userIDs, amount) + }) +} + +func (p *MobileInstallProcessor) recompute(ctx context.Context, tx pgx.Tx, userIDs []int64, amount int32) error { + // Re-verify each candidate still has a mobile-flagged is_current row. + // Within the same tx this is mostly belt-and-suspenders, but the EXISTS + // pattern mirrors first_playlist.go and stays robust if the dirty scan + // ever loosens its filter. rows, err := tx.Query(ctx, ` - SELECT user_id FROM user_events - WHERE is_current = true AND is_mobile_user = true - `) + SELECT x.user_id + FROM unnest($1::bigint[]) AS x(user_id) + WHERE EXISTS ( + SELECT 1 FROM user_events ue + WHERE ue.user_id = x.user_id + AND ue.is_current = true + AND ue.is_mobile_user = true + ) + `, userIDs) if err != nil { return fmt.Errorf("scan user_events: %w", err) } - var userIDs []int64 + var qualifying []int64 for rows.Next() { var uid int64 if err := rows.Scan(&uid); err != nil { rows.Close() return err } - userIDs = append(userIDs, uid) + qualifying = append(qualifying, uid) } rows.Close() if err := rows.Err(); err != nil { return err } - for _, uid := range userIDs { + for _, uid := range qualifying { if err := UpsertUserChallenge(ctx, tx, p.ChallengeID(), SpecifierFromUserID(uid), uid, 1, 1, amount, ); err != nil { @@ -62,16 +129,15 @@ func (p *MobileInstallProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error return nil } -// ReferralProcessor implements challenges "r" / "rv" — the referrer earns -// when another user records them as referrer (events.referrer). The split -// gates on the referrer's verification: +// ReferralProcessor — challenges "r" / "rv". The referrer earns when +// another user records them as referrer; the split gates on the referrer's +// verification: // // r: referrer NOT verified // rv: referrer IS verified // -// (Python only ever recorded verified referrers; sourcing from -// user_events here lets api/ award the unverified "r" tier too, per the -// Phase 3 challenge catalog.) +// Python only ever recorded verified referrers; sourcing from user_events +// here lets api/ award the unverified "r" tier too per the Phase 3 catalog. type ReferralProcessor struct { ID string Verified bool @@ -91,15 +157,24 @@ func (p *ReferralProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { return nil } amount := c.AmountInt() + checkpoint := fmt.Sprintf("challenges:%s:last_blocknumber", p.ID) + + return reconcileIncrementalUsers(ctx, tx, checkpoint, userEventReferralDirtySQL, + func(ctx context.Context, tx pgx.Tx, referredIDs []int64) error { + return p.recompute(ctx, tx, referredIDs, amount) + }) +} +func (p *ReferralProcessor) recompute(ctx context.Context, tx pgx.Tx, referredIDs []int64, amount int32) error { rows, err := tx.Query(ctx, ` SELECT ue.user_id, ue.referrer FROM user_events ue JOIN users u ON u.user_id = ue.referrer AND u.is_current = true - WHERE ue.is_current = true + WHERE ue.user_id = ANY($1::bigint[]) + AND ue.is_current = true AND ue.referrer IS NOT NULL - AND COALESCE(u.is_verified, false) = $1 - `, p.Verified) + AND COALESCE(u.is_verified, false) = $2 + `, referredIDs, p.Verified) if err != nil { return fmt.Errorf("scan referrals: %w", err) } @@ -119,7 +194,7 @@ func (p *ReferralProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { } for _, r := range refs { - // Specifier: : — reward target is the referrer. + // Specifier: : — recipient is the referrer. specifier := fmt.Sprintf("%x:%x", r.referrer, r.referred) if err := UpsertUserChallenge(ctx, tx, p.ID, specifier, r.referrer, 1, 1, amount, @@ -130,13 +205,14 @@ func (p *ReferralProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { return nil } -// ReferredProcessor implements challenge "rd" — the referred user earns -// once for having a referrer recorded (regardless of referrer -// verification). Sourced from user_events. +// ReferredProcessor — challenge "rd". The referred user earns once for +// having a referrer recorded (regardless of referrer verification). type ReferredProcessor struct{} func (p *ReferredProcessor) ChallengeID() string { return "rd" } +const referredCheckpoint = "challenges:rd:last_blocknumber" + func (p *ReferredProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID()) if err != nil { @@ -147,10 +223,19 @@ func (p *ReferredProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { } amount := c.AmountInt() + return reconcileIncrementalUsers(ctx, tx, referredCheckpoint, userEventReferralDirtySQL, + func(ctx context.Context, tx pgx.Tx, referredIDs []int64) error { + return p.recompute(ctx, tx, referredIDs, amount) + }) +} + +func (p *ReferredProcessor) recompute(ctx context.Context, tx pgx.Tx, referredIDs []int64, amount int32) error { rows, err := tx.Query(ctx, ` SELECT user_id, referrer FROM user_events - WHERE is_current = true AND referrer IS NOT NULL - `) + WHERE user_id = ANY($1::bigint[]) + AND is_current = true + AND referrer IS NOT NULL + `, referredIDs) if err != nil { return fmt.Errorf("scan referrals: %w", err) } @@ -170,7 +255,7 @@ func (p *ReferredProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error { } for _, r := range refs { - // Specifier: : — reward target is the referred user. + // Specifier: : — recipient is the referred user. specifier := fmt.Sprintf("%x:%x", r.referred, r.referrer) if err := UpsertUserChallenge(ctx, tx, p.ChallengeID(), specifier, r.referred, 1, 1, amount, diff --git a/jobs/challenges/user_event_challenges_test.go b/jobs/challenges/user_event_challenges_test.go index afd197e3..92b47060 100644 --- a/jobs/challenges/user_event_challenges_test.go +++ b/jobs/challenges/user_event_challenges_test.go @@ -111,3 +111,25 @@ func TestReferred_AwardsReferredUser(t *testing.T) { assert.True(t, r.IsComplete) } } + +// Pre-seeding the checkpoint past a user_events row's blocknumber must +// cause the dirty scan to skip it — this is what makes the prod backfill +// safe on first deploy (migration 0211 sets checkpoints to current max). +func TestMobileInstall_SkipsRowsBelowCheckpoint(t *testing.T) { + pool := withChallengesDB(t) + database.Seed(pool, database.FixtureMap{}) // seeds block 101 + seedUserEvent(t, pool, 700, true, nil) // mobile user at block 101 + + // Set the checkpoint past the seeded row's blocknumber. + _, err := pool.Exec(context.Background(), ` + INSERT INTO indexing_checkpoints (tablename, last_checkpoint) + VALUES ('challenges:m:last_blocknumber', 200) + ON CONFLICT (tablename) DO UPDATE SET last_checkpoint = EXCLUDED.last_checkpoint + `) + require.NoError(t, err) + + runProcessor(t, pool, &MobileInstallProcessor{}) + + _, ok := queryUserChallenge(t, pool, "m", fmt.Sprintf("%x", 700)) + assert.False(t, ok, "row below checkpoint must not be reprocessed") +} diff --git a/sql/01_schema.sql b/sql/01_schema.sql index 3797b92c..127fc782 100644 --- a/sql/01_schema.sql +++ b/sql/01_schema.sql @@ -12212,6 +12212,20 @@ CREATE INDEX ix_follows_follower_user_id ON public.follows USING btree (follower CREATE INDEX ix_notification ON public.notification USING gin (user_ids); +-- +-- Name: ix_notification_cooldown_user_ids; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX ix_notification_cooldown_user_ids ON public.notification USING gin (user_ids) WHERE ((type)::text = 'reward_in_cooldown'::text); + + +-- +-- Name: INDEX ix_notification_cooldown_user_ids; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON INDEX public.ix_notification_cooldown_user_ids IS 'Partial GIN for the on_user_challenge trigger''s cooldown-window check; replaces a multi-second IO-bound scan against the full 8GB notification table with a tiny in-subset lookup.'; + + -- -- Name: ix_playlist_trending_scores_playlist_id; Type: INDEX; Schema: public; Owner: - -- @@ -12331,6 +12345,20 @@ CREATE INDEX ix_trending_scores ON public.track_trending_scores USING btree (typ CREATE INDEX ix_user_created_at ON public.users USING btree (created_at, user_id, is_current); +-- +-- Name: ix_user_events_blocknumber; Type: INDEX; Schema: public; Owner: - +-- + +CREATE INDEX ix_user_events_blocknumber ON public.user_events USING btree (blocknumber); + + +-- +-- Name: INDEX ix_user_events_blocknumber; Type: COMMENT; Schema: public; Owner: - +-- + +COMMENT ON INDEX public.ix_user_events_blocknumber IS 'Range scans by blocknumber for the incremental Phase 3 challenge processors (m/r/rv/rd).'; + + -- -- Name: ix_user_tips_receiver_user_id; Type: INDEX; Schema: public; Owner: - -- diff --git a/sql/03_migration_tracker.sql b/sql/03_migration_tracker.sql index 6abd1958..039b7122 100644 --- a/sql/03_migration_tracker.sql +++ b/sql/03_migration_tracker.sql @@ -158,6 +158,9 @@ migrations/0203_seed_phase_1_challenges.sql b027784464de897b26d4b420ca51a970 202 migrations/0204_seed_phase_2_challenges.sql 168a6d57c056e2e8f7fe14c36fc1c367 2026-05-29 16:22:36.811563+00 migrations/0205_seed_phase_3_challenges.sql dc2a08647a63c0e355c6a3b2cc23a8bd 2026-05-29 16:22:37.200322+00 migrations/0208_seed_challenge_checkpoints.sql ed11876806de4dd1d80b389894b4db45 2026-05-29 16:22:38.000000+00 +migrations/0209_user_events_blocknumber_idx.sql 19ed339385266f28e83399125b6593df 2026-05-29 19:30:00.000000+00 +migrations/0210_notification_cooldown_partial_gin.sql 7156a9b6e236e17acef7d6b91cb1291b 2026-05-29 19:30:00.100000+00 +migrations/0211_seed_phase_3_user_event_checkpoints.sql e37093a8ba1d1a4a3bcca7f98b612be2 2026-05-29 19:30:00.200000+00 \.