Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions ddl/migrations/0209_user_events_blocknumber_idx.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- Btree index on user_events(blocknumber) — required by the dirty-set
-- scan that drives the Phase 3 (m/r/rv/rd) incremental processors.
--
-- Each tick the processors run:
-- SELECT user_id, blocknumber FROM user_events
-- WHERE blocknumber > $1
-- AND is_current = true
-- AND <type-specific filter>
-- ORDER BY blocknumber ASC LIMIT $2
--
-- Without this index that query is a seq-scan of 2.3M rows (the table only
-- carries a pkey and a user_id btree today). With it the scan is a small
-- index range read keyed on the per-processor checkpoint — same pattern
-- #875 established for follows/saves/reposts/tracks/playlists/users.
--
-- NOTE: intentionally NOT wrapped in BEGIN/COMMIT so CREATE INDEX
-- CONCURRENTLY can run without holding an ACCESS EXCLUSIVE lock on
-- user_events. IF NOT EXISTS makes the migration idempotent.

CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_user_events_blocknumber
ON public.user_events USING btree (blocknumber);

COMMENT ON INDEX ix_user_events_blocknumber IS
'Range scans by blocknumber for the incremental Phase 3 challenge processors (m/r/rv/rd).';
33 changes: 33 additions & 0 deletions ddl/migrations/0210_notification_cooldown_partial_gin.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
-- Partial GIN on notification(user_ids) WHERE type='reward_in_cooldown'.
--
-- The on_user_challenge trigger fires on every is_complete=true write to
-- user_challenges. For challenges with cooldown_days > 0 it does:
--
-- SELECT id FROM notification
-- WHERE type = 'reward_in_cooldown'
-- AND new.user_id = ANY(user_ids)
-- AND timestamp >= (new.completed_at - interval '1 hour')
-- LIMIT 1;
--
-- The full GIN on user_ids matches across all ~23.5M / 8 GB of notification
-- rows, so each trigger call became IO-bound (DataFileRead). Right after
-- #842 (which added Phase 3 m/r/rv/rd — all cooldown_days=7) deployed,
-- pg_stat_activity caught individual user_challenges upserts spending
-- 19s+ per row, wedging the IndexChallengesJob's first reconcile tick.
--
-- A partial GIN restricted to type='reward_in_cooldown' is small (one type
-- out of ~30) and lets the planner go straight to the candidate user-id
-- matches within that slice; the timestamp filter then runs over a handful
-- of rows instead of the whole table. The benefit applies to every
-- cooldown_days>0 challenge, not just Phase 3.
--
-- NOTE: intentionally NOT wrapped in BEGIN/COMMIT so CREATE INDEX
-- CONCURRENTLY can run without holding an ACCESS EXCLUSIVE lock on
-- notification. IF NOT EXISTS makes the migration idempotent.

CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_notification_cooldown_user_ids
ON public.notification USING gin (user_ids)
WHERE type = 'reward_in_cooldown';

COMMENT ON INDEX ix_notification_cooldown_user_ids IS
'Partial GIN for the on_user_challenge trigger''s cooldown-window check; replaces a multi-second IO-bound scan against the full 8GB notification table with a tiny in-subset lookup.';
50 changes: 50 additions & 0 deletions ddl/migrations/0211_seed_phase_3_user_event_checkpoints.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
-- Seed the incremental checkpoints for the Phase 3 user_events processors.
--
-- mobile_install (m), referral (r), verified_referral (rv), and referred
-- (rd) are now rewritten in the same dirty-set style as #875's other
-- aggregating processors: each tick only rescans user_events rows whose
-- blocknumber moved past a per-processor checkpoint. See
-- jobs/challenges/user_event_challenges.go.
--
-- A fresh checkpoint defaults to 0, which would re-derive every completion
-- over ~2.3M user_events rows on first run — exactly the load that wedged
-- the cd94ede (#842) deploy's first reconcile tick: each completed upsert
-- fired the handle_on_user_challenge trigger, whose cooldown_days>0 path
-- scans the 8GB notification table. Seed each checkpoint to the current
-- max(user_events.blocknumber) so prod starts "from now" and skips the
-- redundant backfill. Python already populated user_challenges and the
-- upserts are idempotent.
--
-- ON CONFLICT DO NOTHING keeps this idempotent and never rewinds a
-- checkpoint the running job has already advanced. The max(blocknumber)
-- probe is index-only against ix_user_events_blocknumber (added in 0209).
--
-- Checkpoint names must match the constants in user_event_challenges.go.

BEGIN;

-- mobile_install
INSERT INTO indexing_checkpoints (tablename, last_checkpoint)
SELECT 'challenges:m:last_blocknumber',
COALESCE((SELECT max(blocknumber) FROM user_events), 0)
ON CONFLICT (tablename) DO NOTHING;

-- referral (unverified tier)
INSERT INTO indexing_checkpoints (tablename, last_checkpoint)
SELECT 'challenges:r:last_blocknumber',
COALESCE((SELECT max(blocknumber) FROM user_events), 0)
ON CONFLICT (tablename) DO NOTHING;

-- referral (verified tier)
INSERT INTO indexing_checkpoints (tablename, last_checkpoint)
SELECT 'challenges:rv:last_blocknumber',
COALESCE((SELECT max(blocknumber) FROM user_events), 0)
ON CONFLICT (tablename) DO NOTHING;

-- referred (the referred user's side)
INSERT INTO indexing_checkpoints (tablename, last_checkpoint)
SELECT 'challenges:rd:last_blocknumber',
COALESCE((SELECT max(blocknumber) FROM user_events), 0)
ON CONFLICT (tablename) DO NOTHING;

COMMIT;
139 changes: 112 additions & 27 deletions jobs/challenges/user_event_challenges.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,32 @@
// The mobile-install ("m") and referral ("r"/"rv"/"rd") challenges are
// driven by the user_events table, which the indexer populates from the
// `events` object in user-profile metadata (is_mobile_user, referrer) —
// see indexer/user_events_hook.go. Each processor recomputes completions
// from the current user_events state on every Reconcile (no checkpoint),
// matching the on-chain-derived Phase 1/2 processors.
// see indexer/user_events_hook.go.
//
// Incremental: like #875's other dirty-set processors, each tick only
// rescans user_events rows whose blocknumber moved past a per-processor
// checkpoint, then re-derives state for just the affected users. Three
// supporting migrations land alongside this file:
//
// - 0209 adds a btree on user_events(blocknumber) so the dirty scan is
// an index range read instead of a 2.3M-row seq-scan.
// - 0210 adds a partial GIN on notification(user_ids) WHERE
// type='reward_in_cooldown' — the on_user_challenge trigger's
// cooldown-window check fires on every is_complete=true upsert for
// challenges with cooldown_days>0 (which all four Phase 3 challenges
// have), and without that index it scans the full 8GB notification
// table per call.
// - 0211 seeds the four checkpoints to the current max
// user_events.blocknumber so prod starts "from now" and skips
// re-deriving completions over 2.3M historical rows on first run.
// Python already populated user_challenges; upserts are idempotent.
//
// Caveat: the dirty scan keys on user_events.blocknumber only, so a
// verification flip on an existing referrer's users row (`is_verified`
// goes true) is NOT picked up by the r/rv processors until the *referred*
// user's user_events row changes again. Verification changes are rare and
// the old full-scan code caught them only on its next tick; this matches
// #875's other processors which similarly key on a single source.
package challenges

import (
Expand All @@ -15,12 +38,39 @@ import (
"github.com/jackc/pgx/v5"
)

// MobileInstallProcessor implements challenge "m" — boolean reward when a
// user's profile reports is_mobile_user (events.is_mobile_user=true).
// userEventReferralDirtySQL surfaces user_events rows whose blocknumber moved
// past the checkpoint and currently carry a referrer. Shared by the r, rv, and
// rd processors — the dirty source is the same; recompute partitions the
// recipient (referrer vs referred) and the verification gate.
//
// The user_id column here is the *referred* user (the user_events row that
// names a referrer); recompute resolves the (referrer, referred) pairs.
const userEventReferralDirtySQL = `
SELECT user_id, blocknumber FROM user_events
WHERE blocknumber > $1
AND is_current = true
AND referrer IS NOT NULL
ORDER BY blocknumber ASC
LIMIT $2
`

// MobileInstallProcessor — challenge "m". Boolean reward when a user's
// profile reports is_mobile_user=true (events.is_mobile_user).
type MobileInstallProcessor struct{}

func (p *MobileInstallProcessor) ChallengeID() string { return "m" }

const mobileInstallCheckpoint = "challenges:m:last_blocknumber"

const mobileInstallDirtySQL = `
SELECT user_id, blocknumber FROM user_events
WHERE blocknumber > $1
AND is_current = true
AND is_mobile_user = true
ORDER BY blocknumber ASC
LIMIT $2
`

func (p *MobileInstallProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID())
if err != nil {
Expand All @@ -31,28 +81,45 @@ func (p *MobileInstallProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error
}
amount := c.AmountInt()

return reconcileIncrementalUsers(ctx, tx, mobileInstallCheckpoint, mobileInstallDirtySQL,
func(ctx context.Context, tx pgx.Tx, userIDs []int64) error {
return p.recompute(ctx, tx, userIDs, amount)
})
}

func (p *MobileInstallProcessor) recompute(ctx context.Context, tx pgx.Tx, userIDs []int64, amount int32) error {
// Re-verify each candidate still has a mobile-flagged is_current row.
// Within the same tx this is mostly belt-and-suspenders, but the EXISTS
// pattern mirrors first_playlist.go and stays robust if the dirty scan
// ever loosens its filter.
rows, err := tx.Query(ctx, `
SELECT user_id FROM user_events
WHERE is_current = true AND is_mobile_user = true
`)
SELECT x.user_id
FROM unnest($1::bigint[]) AS x(user_id)
WHERE EXISTS (
SELECT 1 FROM user_events ue
WHERE ue.user_id = x.user_id
AND ue.is_current = true
AND ue.is_mobile_user = true
)
`, userIDs)
if err != nil {
return fmt.Errorf("scan user_events: %w", err)
}
var userIDs []int64
var qualifying []int64
for rows.Next() {
var uid int64
if err := rows.Scan(&uid); err != nil {
rows.Close()
return err
}
userIDs = append(userIDs, uid)
qualifying = append(qualifying, uid)
}
rows.Close()
if err := rows.Err(); err != nil {
return err
}

for _, uid := range userIDs {
for _, uid := range qualifying {
if err := UpsertUserChallenge(ctx, tx,
p.ChallengeID(), SpecifierFromUserID(uid), uid, 1, 1, amount,
); err != nil {
Expand All @@ -62,16 +129,15 @@ func (p *MobileInstallProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error
return nil
}

// ReferralProcessor implements challenges "r" / "rv" — the referrer earns
// when another user records them as referrer (events.referrer). The split
// gates on the referrer's verification:
// ReferralProcessor challenges "r" / "rv". The referrer earns when
// another user records them as referrer; the split gates on the referrer's
// verification:
//
// r: referrer NOT verified
// rv: referrer IS verified
//
// (Python only ever recorded verified referrers; sourcing from
// user_events here lets api/ award the unverified "r" tier too, per the
// Phase 3 challenge catalog.)
// Python only ever recorded verified referrers; sourcing from user_events
// here lets api/ award the unverified "r" tier too per the Phase 3 catalog.
type ReferralProcessor struct {
ID string
Verified bool
Expand All @@ -91,15 +157,24 @@ func (p *ReferralProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
return nil
}
amount := c.AmountInt()
checkpoint := fmt.Sprintf("challenges:%s:last_blocknumber", p.ID)

return reconcileIncrementalUsers(ctx, tx, checkpoint, userEventReferralDirtySQL,
func(ctx context.Context, tx pgx.Tx, referredIDs []int64) error {
return p.recompute(ctx, tx, referredIDs, amount)
})
}

func (p *ReferralProcessor) recompute(ctx context.Context, tx pgx.Tx, referredIDs []int64, amount int32) error {
rows, err := tx.Query(ctx, `
SELECT ue.user_id, ue.referrer
FROM user_events ue
JOIN users u ON u.user_id = ue.referrer AND u.is_current = true
WHERE ue.is_current = true
WHERE ue.user_id = ANY($1::bigint[])
AND ue.is_current = true
AND ue.referrer IS NOT NULL
AND COALESCE(u.is_verified, false) = $1
`, p.Verified)
AND COALESCE(u.is_verified, false) = $2
`, referredIDs, p.Verified)
if err != nil {
return fmt.Errorf("scan referrals: %w", err)
}
Expand All @@ -119,7 +194,7 @@ func (p *ReferralProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
}

for _, r := range refs {
// Specifier: <hex_referrer>:<hex_referred> — reward target is the referrer.
// Specifier: <hex_referrer>:<hex_referred> — recipient is the referrer.
specifier := fmt.Sprintf("%x:%x", r.referrer, r.referred)
if err := UpsertUserChallenge(ctx, tx,
p.ID, specifier, r.referrer, 1, 1, amount,
Expand All @@ -130,13 +205,14 @@ func (p *ReferralProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
return nil
}

// ReferredProcessor implements challenge "rd" — the referred user earns
// once for having a referrer recorded (regardless of referrer
// verification). Sourced from user_events.
// ReferredProcessor — challenge "rd". The referred user earns once for
// having a referrer recorded (regardless of referrer verification).
type ReferredProcessor struct{}

func (p *ReferredProcessor) ChallengeID() string { return "rd" }

const referredCheckpoint = "challenges:rd:last_blocknumber"

func (p *ReferredProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID())
if err != nil {
Expand All @@ -147,10 +223,19 @@ func (p *ReferredProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
}
amount := c.AmountInt()

return reconcileIncrementalUsers(ctx, tx, referredCheckpoint, userEventReferralDirtySQL,
func(ctx context.Context, tx pgx.Tx, referredIDs []int64) error {
return p.recompute(ctx, tx, referredIDs, amount)
})
}

func (p *ReferredProcessor) recompute(ctx context.Context, tx pgx.Tx, referredIDs []int64, amount int32) error {
rows, err := tx.Query(ctx, `
SELECT user_id, referrer FROM user_events
WHERE is_current = true AND referrer IS NOT NULL
`)
WHERE user_id = ANY($1::bigint[])
AND is_current = true
AND referrer IS NOT NULL
`, referredIDs)
if err != nil {
return fmt.Errorf("scan referrals: %w", err)
}
Expand All @@ -170,7 +255,7 @@ func (p *ReferredProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
}

for _, r := range refs {
// Specifier: <hex_referred>:<hex_referrer> — reward target is the referred user.
// Specifier: <hex_referred>:<hex_referrer> — recipient is the referred user.
specifier := fmt.Sprintf("%x:%x", r.referred, r.referrer)
if err := UpsertUserChallenge(ctx, tx,
p.ChallengeID(), specifier, r.referred, 1, 1, amount,
Expand Down
22 changes: 22 additions & 0 deletions jobs/challenges/user_event_challenges_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,25 @@ func TestReferred_AwardsReferredUser(t *testing.T) {
assert.True(t, r.IsComplete)
}
}

// Pre-seeding the checkpoint past a user_events row's blocknumber must
// cause the dirty scan to skip it — this is what makes the prod backfill
// safe on first deploy (migration 0211 sets checkpoints to current max).
func TestMobileInstall_SkipsRowsBelowCheckpoint(t *testing.T) {
pool := withChallengesDB(t)
database.Seed(pool, database.FixtureMap{}) // seeds block 101
seedUserEvent(t, pool, 700, true, nil) // mobile user at block 101

// Set the checkpoint past the seeded row's blocknumber.
_, err := pool.Exec(context.Background(), `
INSERT INTO indexing_checkpoints (tablename, last_checkpoint)
VALUES ('challenges:m:last_blocknumber', 200)
ON CONFLICT (tablename) DO UPDATE SET last_checkpoint = EXCLUDED.last_checkpoint
`)
require.NoError(t, err)

runProcessor(t, pool, &MobileInstallProcessor{})

_, ok := queryUserChallenge(t, pool, "m", fmt.Sprintf("%x", 700))
assert.False(t, ok, "row below checkpoint must not be reprocessed")
}
Loading
Loading