From ec9ce0de477c5eb7bb822abe77eabb51e51e0b44 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Fri, 22 May 2026 14:17:54 -0700 Subject: [PATCH 1/3] feat(notifications): tastemaker + trending notification triggers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the notification side of the tastemaker (t) and trending (tt/tut/tp) challenge ports. Phase 1+2 processors mint user_challenges rows but only handle_user_challenges.sql's generic claimable_reward / challenge_reward notifications fired — the type-specific tastemaker / trending / trending_underground / trending_playlist notifications that apps' index_tastemaker.py and index_trending.py created had no Go equivalent. Both new triggers fire AFTER INSERT on user_challenges with a WHEN clause filtered to their challenge_id. Re-runs hit UpsertUserChallenge's ON CONFLICT DO UPDATE branch (no AFTER INSERT) so each row's notification mints exactly once. handle_tastemaker.sql (challenge_id='t') - Parses track_id from the processor's specifier ":t:" - Looks up tracks.owner_id and infers repost-vs-save action (repost wins, matching apps' dedupe_notifications_by_group_id) - Emits notification with group_id "tastemaker_user_id::tastemaker_item_id:", specifier=tid, and data { tastemaker_item_id, tastemaker_item_type:'track', tastemaker_item_owner_id, action, tastemaker_user_id } — verbatim apps' Notification(type='tastemaker') shape handle_trending.sql (challenge_id in 'tt','tut','tp') - Parses week + rank from ":" - Looks up entity_id from trending_results (same processor wrote it earlier in the same transaction) - Routes to 'trending' / 'trending_underground' / 'trending_playlist', swapping the track_id/playlist_id label in group_id and data — matches apps' index_trending_notifications, ditto for underground + playlist variants - Idempotency comes from the AFTER INSERT (not UPDATE) gate plus the unique (group_id, specifier) constraint on notification Schema dump regeneration follows in a separate commit (cf. 4da78ab for the handle_comment_remix_contest_update precedent). Tests: - TestTastemaker_EmitsNotification — verifies notification row shape; asserts repost wins when a user has both a repost and a save - TestTrending_EmitsNotification — Friday-gated; asserts the rank-1 notification carries track_id, rank=1, and the expected group_id - TestTrendingPlaylist_EmitsNotification — playlist variant carries playlist_id (not track_id) in data and group_id Co-Authored-By: Claude Opus 4.7 --- ddl/functions/handle_tastemaker.sql | 109 ++++++++++++++++++++++++ ddl/functions/handle_trending.sql | 126 ++++++++++++++++++++++++++++ jobs/challenges/tastemaker_test.go | 81 ++++++++++++++++++ jobs/challenges/trending_test.go | 113 +++++++++++++++++++++++++ 4 files changed, 429 insertions(+) create mode 100644 ddl/functions/handle_tastemaker.sql create mode 100644 ddl/functions/handle_trending.sql diff --git a/ddl/functions/handle_tastemaker.sql b/ddl/functions/handle_tastemaker.sql new file mode 100644 index 00000000..4fc20367 --- /dev/null +++ b/ddl/functions/handle_tastemaker.sql @@ -0,0 +1,109 @@ +-- handle_tastemaker +-- +-- Emits a `tastemaker` notification when the tastemaker challenge +-- processor (challenge_id 't') mints a user_challenges row. Each row +-- corresponds to one user who reposted or saved a track that later went +-- trending. The notification tells the tastemaker user that they were +-- early to a now-trending track. +-- +-- Sibling of handle_user_challenges.sql which already emits the generic +-- `challenge_reward` notification for all challenge completions. This +-- trigger is the type-specific layer that matches apps' tastemaker +-- notification (src/tasks/index_tastemaker.py). +-- +-- Specifier shape from jobs/challenges/tastemaker.go is +-- ":t:" — we parse the trailing hex track_id, +-- look up its owner from `tracks`, and infer the action (repost takes +-- precedence over save, matching apps' dedupe_notifications_by_group_id). +create or replace function handle_tastemaker() returns trigger as $$ +declare + track_hex text; + track_id_int bigint; + owner_id_int int; + action_str text; +begin + -- WHEN clause on the trigger gates challenge_id='t', but defend in + -- depth here too in case the trigger is invoked another way. + if new.challenge_id <> 't' then + return null; + end if; + + -- Parse trailing hex segment ":t:" → track_id. + track_hex := split_part(new.specifier, ':', 3); + if track_hex !~ '^[0-9a-f]+$' then + return null; + end if; + track_id_int := ('x' || lpad(track_hex, 16, '0'))::bit(64)::bigint; + if track_id_int <= 0 then + return null; + end if; + + select t.owner_id + into owner_id_int + from tracks t + where t.track_id = track_id_int + and t.is_current = true + limit 1; + if owner_id_int is null then + return null; + end if; + + -- Repost takes precedence over save when a user is in both lists for + -- the same track — matches apps' dedupe_notifications_by_group_id + -- where repost_notifications win over save_notifications. + if exists ( + select 1 + from reposts + where user_id = new.user_id + and repost_item_id = track_id_int + and repost_type = 'track' + and is_current = true + and is_delete = false + ) then + action_str := 'repost'; + else + action_str := 'save'; + end if; + + insert into notification + (blocknumber, user_ids, timestamp, type, specifier, group_id, data) + values + ( + new.completed_blocknumber, + ARRAY[new.user_id], + new.completed_at, + 'tastemaker', + track_id_int::text, + 'tastemaker_user_id:' || new.user_id || ':tastemaker_item_id:' || track_id_int, + jsonb_build_object( + 'tastemaker_item_id', track_id_int, + 'tastemaker_item_type', 'track', + 'tastemaker_item_owner_id', owner_id_int, + 'action', action_str, + 'tastemaker_user_id', new.user_id + ) + ) + on conflict do nothing; + + return null; + +exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + return null; +end; +$$ language plpgsql; + + +do $$ begin + -- Fire only on INSERT (not UPDATE) so the notification is minted + -- exactly once per (user_id, track_id) pair — UpsertUserChallenge + -- hits its ON CONFLICT DO UPDATE branch on re-runs, which does not + -- fire AFTER INSERT triggers. + create trigger on_tastemaker_user_challenge + after insert on user_challenges + for each row when (new.challenge_id = 't') + execute procedure handle_tastemaker(); +exception + when others then null; +end $$; diff --git a/ddl/functions/handle_trending.sql b/ddl/functions/handle_trending.sql new file mode 100644 index 00000000..51e0f40e --- /dev/null +++ b/ddl/functions/handle_trending.sql @@ -0,0 +1,126 @@ +-- handle_trending +-- +-- Emits one of `trending`, `trending_underground`, or `trending_playlist` +-- when the trending challenge processor mints a user_challenges row for +-- challenge_id 'tt' / 'tut' / 'tp'. These are the "your track is +-- trending" notifications shown to the entity (track/playlist) owner. +-- +-- Sibling of handle_user_challenges.sql which already emits the generic +-- `challenge_reward` notification for all challenge completions (and +-- skips the legacy `claimable_reward` for these three ids on line 14). +-- This trigger is the type-specific layer that matches apps' +-- index_trending.py notifications. +-- +-- Specifier shape from jobs/challenges/trending.go is ":" +-- (e.g. "2026-05-22:3"). Entity id is recovered from `trending_results`, +-- which the same processor wrote earlier in this transaction. +create or replace function handle_trending() returns trigger as $$ +declare + rank_int int; + week_date date; + entity_id_str text; + entity_id_int bigint; + notif_type text; + trend_type text; + entity_label text; + ts_epoch bigint; + data_jsonb jsonb; +begin + if new.challenge_id not in ('tt', 'tut', 'tp') then + return null; + end if; + + case new.challenge_id + when 'tt' then notif_type := 'trending'; trend_type := 'TRACKS'; entity_label := 'track_id'; + when 'tut' then notif_type := 'trending_underground'; trend_type := 'UNDERGROUND_TRACKS'; entity_label := 'track_id'; + when 'tp' then notif_type := 'trending_playlist'; trend_type := 'PLAYLISTS'; entity_label := 'playlist_id'; + end case; + + -- Specifier: ":" + begin + week_date := split_part(new.specifier, ':', 1)::date; + rank_int := split_part(new.specifier, ':', 2)::int; + exception when others then + return null; + end; + + -- Recover entity id from the trending_results row the processor wrote + -- earlier in this transaction. PK is (rank, type, version, week); we + -- pin to NEW.user_id so we ignore any unrelated version rows. + select id + into entity_id_str + from trending_results + where rank = rank_int + and type = trend_type + and week = week_date + and user_id = new.user_id + limit 1; + if entity_id_str is null then + return null; + end if; + begin + entity_id_int := entity_id_str::bigint; + exception when others then + return null; + end; + + -- timestamp suffix matches apps: epoch seconds of the recompute. We + -- use completed_at which is set by UpsertUserChallenge to now() on + -- the first insert — close enough to the recompute moment. + ts_epoch := extract(epoch from new.completed_at)::bigint; + + if new.challenge_id = 'tp' then + data_jsonb := jsonb_build_object( + 'time_range', 'week', + 'genre', 'all', + 'rank', rank_int, + 'playlist_id', entity_id_int + ); + else + data_jsonb := jsonb_build_object( + 'time_range', 'week', + 'genre', 'all', + 'rank', rank_int, + 'track_id', entity_id_int + ); + end if; + + insert into notification + (blocknumber, user_ids, timestamp, type, specifier, group_id, data) + values + ( + new.completed_blocknumber, + ARRAY[new.user_id], + new.completed_at, + notif_type, + entity_id_int::text, + notif_type + || ':time_range:week:genre:all:rank:' || rank_int + || ':' || entity_label || ':' || entity_id_int + || ':timestamp:' || ts_epoch, + data_jsonb + ) + on conflict do nothing; + + return null; + +exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + return null; +end; +$$ language plpgsql; + + +do $$ begin + -- Fire only on INSERT (not UPDATE) so the notification is minted + -- exactly once per (challenge_id, week, rank) — re-runs hit + -- UpsertUserChallenge's ON CONFLICT DO UPDATE branch and do not + -- fire AFTER INSERT triggers. + create trigger on_trending_user_challenge + after insert on user_challenges + for each row when (new.challenge_id in ('tt', 'tut', 'tp')) + execute procedure handle_trending(); +exception + when others then null; +end $$; diff --git a/jobs/challenges/tastemaker_test.go b/jobs/challenges/tastemaker_test.go index d0cac1a2..e5967a6b 100644 --- a/jobs/challenges/tastemaker_test.go +++ b/jobs/challenges/tastemaker_test.go @@ -2,6 +2,7 @@ package challenges import ( "context" + "encoding/json" "fmt" "testing" @@ -57,3 +58,83 @@ func TestTastemaker_EarliestRepostersAndSavers(t *testing.T) { } } } + +// TestTastemaker_EmitsNotification — handle_tastemaker trigger fans out +// a `tastemaker` notification when a user_challenges row is minted for +// challenge_id='t'. Repost wins over save when a user qualifies via both. +func TestTastemaker_EmitsNotification(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_tmn", "number": 1}}, + "users": { + {"user_id": 2200, "wallet": "0x2200"}, + {"user_id": 2201, "wallet": "0x2201"}, + {"user_id": 2202, "wallet": "0x2202"}, + }, + "tracks": {{"track_id": 220000, "owner_id": 2200, "title": "Hit", "blocknumber": 1}}, + }) + + _, err := pool.Exec(ctx, ` + INSERT INTO track_trending_scores (track_id, type, version, time_range, score, created_at) + VALUES (220000, 'TRACKS', 'pnagD', 'week', 999.0, now()) + `) + require.NoError(t, err) + + // 2201 reposts only; 2202 both reposts AND saves (repost should win). + _, err = pool.Exec(ctx, ` + INSERT INTO reposts (user_id, repost_item_id, repost_type, is_current, is_delete, created_at, txhash) + VALUES + (2201, 220000, 'track', true, false, now() - interval '2 hours', 'tx-rp-2201'), + (2202, 220000, 'track', true, false, now() - interval '1 hour', 'tx-rp-2202') + `) + require.NoError(t, err) + _, err = pool.Exec(ctx, ` + INSERT INTO saves (user_id, save_item_id, save_type, is_current, is_delete, created_at, txhash) + VALUES (2202, 220000, 'track', true, false, now() - interval '30 minutes', 'tx-sv-2202') + `) + require.NoError(t, err) + + runProcessor(t, pool, &TastemakerProcessor{}) + + type notifRow struct { + UserIDs []int64 + Specifier string + GroupID string + Data []byte + } + queryNotif := func(uid int) (notifRow, bool) { + var n notifRow + err := pool.QueryRow(ctx, ` + SELECT user_ids, specifier, group_id, data + FROM notification + WHERE type = 'tastemaker' + AND group_id = $1 + `, fmt.Sprintf("tastemaker_user_id:%d:tastemaker_item_id:%d", uid, 220000)). + Scan(&n.UserIDs, &n.Specifier, &n.GroupID, &n.Data) + if err != nil { + return notifRow{}, false + } + return n, true + } + + n2201, ok := queryNotif(2201) + require.True(t, ok, "expected tastemaker notif for user 2201") + assert.Equal(t, []int64{2201}, n2201.UserIDs) + assert.Equal(t, "220000", n2201.Specifier) + + var data2201 map[string]any + require.NoError(t, json.Unmarshal(n2201.Data, &data2201)) + assert.Equal(t, "repost", data2201["action"]) + assert.Equal(t, "track", data2201["tastemaker_item_type"]) + assert.EqualValues(t, 220000, data2201["tastemaker_item_id"]) + assert.EqualValues(t, 2200, data2201["tastemaker_item_owner_id"]) + assert.EqualValues(t, 2201, data2201["tastemaker_user_id"]) + + n2202, ok := queryNotif(2202) + require.True(t, ok, "expected tastemaker notif for user 2202") + var data2202 map[string]any + require.NoError(t, json.Unmarshal(n2202.Data, &data2202)) + assert.Equal(t, "repost", data2202["action"], "user has both repost and save — repost wins") +} diff --git a/jobs/challenges/trending_test.go b/jobs/challenges/trending_test.go index bde6b337..745a5fca 100644 --- a/jobs/challenges/trending_test.go +++ b/jobs/challenges/trending_test.go @@ -2,6 +2,7 @@ package challenges import ( "context" + "encoding/json" "fmt" "testing" "time" @@ -91,3 +92,115 @@ func TestTrending_SkipsNonFriday(t *testing.T) { "SELECT COUNT(*) FROM trending_results").Scan(&count)) assert.Equal(t, 0, count, "no rows written on non-Friday") } + +// TestTrending_EmitsNotification — handle_trending trigger fans out a +// `trending` notification when a user_challenges row is minted for 'tt'. +// Skips on non-Fridays since the underlying processor is Friday-gated. +func TestTrending_EmitsNotification(t *testing.T) { + if time.Now().UTC().Weekday() != time.Friday { + t.Skip("trending processor only runs on Fridays UTC") + } + pool := withChallengesDB(t) + ctx := context.Background() + now := time.Now() + + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_ttn", "number": 1}}, + "users": { + {"user_id": 600, "wallet": "0x600"}, + {"user_id": 601, "wallet": "0x601"}, + }, + "tracks": { + {"track_id": 6001, "owner_id": 600, "title": "Hit1", "blocknumber": 1, "created_at": now}, + {"track_id": 6011, "owner_id": 601, "title": "Hit2", "blocknumber": 1, "created_at": now}, + }, + }) + + for i, tid := range []int{6001, 6011} { + _, err := pool.Exec(ctx, ` + INSERT INTO track_trending_scores (track_id, type, version, time_range, score, created_at) + VALUES ($1, 'TRACKS', 'pnagD', 'week', $2, now()) + `, tid, float64(100-i)) + require.NoError(t, err) + } + + runProcessor(t, pool, NewTrendingTrackProcessor()) + + weekDate := time.Date(now.UTC().Year(), now.UTC().Month(), now.UTC().Day(), 0, 0, 0, 0, time.UTC).Format("2006-01-02") + + // Find the rank-1 row. + var ownerID int64 + var specifier string + require.NoError(t, pool.QueryRow(ctx, ` + SELECT user_id, specifier FROM user_challenges + WHERE challenge_id = 'tt' AND specifier = $1 + `, fmt.Sprintf("%s:1", weekDate)).Scan(&ownerID, &specifier)) + + // Find the corresponding notification. + var nUserIDs []int64 + var nSpecifier, nGroupID string + var nData []byte + err := pool.QueryRow(ctx, ` + SELECT user_ids, specifier, group_id, data + FROM notification + WHERE type = 'trending' AND user_ids = ARRAY[$1::int] + `, ownerID).Scan(&nUserIDs, &nSpecifier, &nGroupID, &nData) + require.NoError(t, err, "expected trending notif for owner %d", ownerID) + + var data map[string]any + require.NoError(t, json.Unmarshal(nData, &data)) + assert.Equal(t, "week", data["time_range"]) + assert.Equal(t, "all", data["genre"]) + assert.EqualValues(t, 1, data["rank"]) + assert.Contains(t, data, "track_id") + assert.Contains(t, nGroupID, ":rank:1:track_id:") + assert.Contains(t, nGroupID, "trending:time_range:week:genre:all") +} + +// TestTrendingPlaylist_EmitsNotification — handle_trending fans out a +// `trending_playlist` notification with playlist_id (not track_id) in +// the data payload and group_id. +func TestTrendingPlaylist_EmitsNotification(t *testing.T) { + if time.Now().UTC().Weekday() != time.Friday { + t.Skip("trending processor only runs on Fridays UTC") + } + pool := withChallengesDB(t) + ctx := context.Background() + now := time.Now() + + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_tpn", "number": 1}}, + "users": {{"user_id": 700, "wallet": "0x700"}}, + "playlists": {{ + "playlist_id": 7001, + "playlist_owner_id": 700, + "playlist_name": "P", + "blocknumber": 1, + "created_at": now, + }}, + }) + + _, err := pool.Exec(ctx, ` + INSERT INTO playlist_trending_scores (playlist_id, type, version, time_range, score, created_at) + VALUES (7001, 'PLAYLISTS', 'pnagD', 'week', 999.0, now()) + `) + require.NoError(t, err) + + runProcessor(t, pool, NewTrendingPlaylistProcessor()) + + var nGroupID string + var nData []byte + err = pool.QueryRow(ctx, ` + SELECT group_id, data + FROM notification + WHERE type = 'trending_playlist' AND user_ids = ARRAY[700] + `).Scan(&nGroupID, &nData) + require.NoError(t, err, "expected trending_playlist notif") + + var data map[string]any + require.NoError(t, json.Unmarshal(nData, &data)) + assert.EqualValues(t, 7001, data["playlist_id"]) + assert.NotContains(t, data, "track_id") + assert.Contains(t, nGroupID, ":playlist_id:7001:") + assert.Contains(t, nGroupID, "trending_playlist:time_range:week:genre:all") +} From eea3fe75c63ac1e2b9e4409b7641712a1f37def2 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Fri, 22 May 2026 16:29:08 -0700 Subject: [PATCH 2/3] fixup(notifications): drop trending_playlist from handle_trending MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The trending playlist reward (challenge_id 'tp') is being removed as a product feature, so handle_trending no longer needs to emit `trending_playlist` notifications. Scope: just this PR's trigger. PR #835's NewTrendingPlaylistProcessor + 'tp' catalog seed are still in place — harmless if the upstream feature stays inactive, can be torn out separately if desired. Changes: - handle_trending.sql: drop the 'tp' case from the type switch, WHEN clause, and data_jsonb branch. Trigger now only handles 'tt' and 'tut' (both tracks), so the entity_label variable goes away too. - trending_test.go: remove TestTrendingPlaylist_EmitsNotification. Co-Authored-By: Claude Opus 4.7 --- ddl/functions/handle_trending.sql | 51 +++++++++++++------------------ jobs/challenges/trending_test.go | 48 ----------------------------- 2 files changed, 22 insertions(+), 77 deletions(-) diff --git a/ddl/functions/handle_trending.sql b/ddl/functions/handle_trending.sql index 51e0f40e..20216e5b 100644 --- a/ddl/functions/handle_trending.sql +++ b/ddl/functions/handle_trending.sql @@ -1,14 +1,18 @@ -- handle_trending -- --- Emits one of `trending`, `trending_underground`, or `trending_playlist` --- when the trending challenge processor mints a user_challenges row for --- challenge_id 'tt' / 'tut' / 'tp'. These are the "your track is --- trending" notifications shown to the entity (track/playlist) owner. +-- Emits a `trending` or `trending_underground` notification when the +-- trending challenge processor mints a user_challenges row for +-- challenge_id 'tt' / 'tut'. These are the "your track is trending" +-- notifications shown to the track owner. +-- +-- (Trending playlists — challenge_id 'tp' — were a product feature that +-- has since been removed, so they're intentionally not handled here. +-- handle_user_challenges.sql still excludes 'tp' from the +-- claimable_reward path on line 14 for historical rows.) -- -- Sibling of handle_user_challenges.sql which already emits the generic --- `challenge_reward` notification for all challenge completions (and --- skips the legacy `claimable_reward` for these three ids on line 14). --- This trigger is the type-specific layer that matches apps' +-- `challenge_reward` notification for all challenge completions. This +-- trigger is the type-specific layer that matches apps' -- index_trending.py notifications. -- -- Specifier shape from jobs/challenges/trending.go is ":" @@ -22,18 +26,16 @@ declare entity_id_int bigint; notif_type text; trend_type text; - entity_label text; ts_epoch bigint; data_jsonb jsonb; begin - if new.challenge_id not in ('tt', 'tut', 'tp') then + if new.challenge_id not in ('tt', 'tut') then return null; end if; case new.challenge_id - when 'tt' then notif_type := 'trending'; trend_type := 'TRACKS'; entity_label := 'track_id'; - when 'tut' then notif_type := 'trending_underground'; trend_type := 'UNDERGROUND_TRACKS'; entity_label := 'track_id'; - when 'tp' then notif_type := 'trending_playlist'; trend_type := 'PLAYLISTS'; entity_label := 'playlist_id'; + when 'tt' then notif_type := 'trending'; trend_type := 'TRACKS'; + when 'tut' then notif_type := 'trending_underground'; trend_type := 'UNDERGROUND_TRACKS'; end case; -- Specifier: ":" @@ -69,21 +71,12 @@ begin -- the first insert — close enough to the recompute moment. ts_epoch := extract(epoch from new.completed_at)::bigint; - if new.challenge_id = 'tp' then - data_jsonb := jsonb_build_object( - 'time_range', 'week', - 'genre', 'all', - 'rank', rank_int, - 'playlist_id', entity_id_int - ); - else - data_jsonb := jsonb_build_object( - 'time_range', 'week', - 'genre', 'all', - 'rank', rank_int, - 'track_id', entity_id_int - ); - end if; + data_jsonb := jsonb_build_object( + 'time_range', 'week', + 'genre', 'all', + 'rank', rank_int, + 'track_id', entity_id_int + ); insert into notification (blocknumber, user_ids, timestamp, type, specifier, group_id, data) @@ -96,7 +89,7 @@ begin entity_id_int::text, notif_type || ':time_range:week:genre:all:rank:' || rank_int - || ':' || entity_label || ':' || entity_id_int + || ':track_id:' || entity_id_int || ':timestamp:' || ts_epoch, data_jsonb ) @@ -119,7 +112,7 @@ do $$ begin -- fire AFTER INSERT triggers. create trigger on_trending_user_challenge after insert on user_challenges - for each row when (new.challenge_id in ('tt', 'tut', 'tp')) + for each row when (new.challenge_id in ('tt', 'tut')) execute procedure handle_trending(); exception when others then null; diff --git a/jobs/challenges/trending_test.go b/jobs/challenges/trending_test.go index 745a5fca..aecfa704 100644 --- a/jobs/challenges/trending_test.go +++ b/jobs/challenges/trending_test.go @@ -156,51 +156,3 @@ func TestTrending_EmitsNotification(t *testing.T) { assert.Contains(t, nGroupID, ":rank:1:track_id:") assert.Contains(t, nGroupID, "trending:time_range:week:genre:all") } - -// TestTrendingPlaylist_EmitsNotification — handle_trending fans out a -// `trending_playlist` notification with playlist_id (not track_id) in -// the data payload and group_id. -func TestTrendingPlaylist_EmitsNotification(t *testing.T) { - if time.Now().UTC().Weekday() != time.Friday { - t.Skip("trending processor only runs on Fridays UTC") - } - pool := withChallengesDB(t) - ctx := context.Background() - now := time.Now() - - database.Seed(pool, database.FixtureMap{ - "blocks": {{"blockhash": "blk_tpn", "number": 1}}, - "users": {{"user_id": 700, "wallet": "0x700"}}, - "playlists": {{ - "playlist_id": 7001, - "playlist_owner_id": 700, - "playlist_name": "P", - "blocknumber": 1, - "created_at": now, - }}, - }) - - _, err := pool.Exec(ctx, ` - INSERT INTO playlist_trending_scores (playlist_id, type, version, time_range, score, created_at) - VALUES (7001, 'PLAYLISTS', 'pnagD', 'week', 999.0, now()) - `) - require.NoError(t, err) - - runProcessor(t, pool, NewTrendingPlaylistProcessor()) - - var nGroupID string - var nData []byte - err = pool.QueryRow(ctx, ` - SELECT group_id, data - FROM notification - WHERE type = 'trending_playlist' AND user_ids = ARRAY[700] - `).Scan(&nGroupID, &nData) - require.NoError(t, err, "expected trending_playlist notif") - - var data map[string]any - require.NoError(t, json.Unmarshal(nData, &data)) - assert.EqualValues(t, 7001, data["playlist_id"]) - assert.NotContains(t, data, "track_id") - assert.Contains(t, nGroupID, ":playlist_id:7001:") - assert.Contains(t, nGroupID, "trending_playlist:time_range:week:genre:all") -} From fead757a950d028fa665e120308d2add27797615 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Fri, 29 May 2026 12:27:37 -0700 Subject: [PATCH 3/3] chore(schema): regenerate dump for trending + tastemaker triggers Adds handle_trending and handle_tastemaker functions and their on_*_user_challenge triggers to sql/01_schema.sql so the test-schema template includes them and the trending/tastemaker notification tests pass. Stacked on the comment-notifications dump regen. Co-Authored-By: Claude Opus 4.7 --- sql/01_schema.sql | 193 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 193 insertions(+) diff --git a/sql/01_schema.sql b/sql/01_schema.sql index 54944a6e..06abf11a 100644 --- a/sql/01_schema.sql +++ b/sql/01_schema.sql @@ -4703,6 +4703,92 @@ end; $$; +-- +-- Name: handle_tastemaker(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.handle_tastemaker() RETURNS trigger + LANGUAGE plpgsql + AS $_$ +declare + track_hex text; + track_id_int bigint; + owner_id_int int; + action_str text; +begin + -- WHEN clause on the trigger gates challenge_id='t', but defend in + -- depth here too in case the trigger is invoked another way. + if new.challenge_id <> 't' then + return null; + end if; + + -- Parse trailing hex segment ":t:" → track_id. + track_hex := split_part(new.specifier, ':', 3); + if track_hex !~ '^[0-9a-f]+$' then + return null; + end if; + track_id_int := ('x' || lpad(track_hex, 16, '0'))::bit(64)::bigint; + if track_id_int <= 0 then + return null; + end if; + + select t.owner_id + into owner_id_int + from tracks t + where t.track_id = track_id_int + and t.is_current = true + limit 1; + if owner_id_int is null then + return null; + end if; + + -- Repost takes precedence over save when a user is in both lists for + -- the same track — matches apps' dedupe_notifications_by_group_id + -- where repost_notifications win over save_notifications. + if exists ( + select 1 + from reposts + where user_id = new.user_id + and repost_item_id = track_id_int + and repost_type = 'track' + and is_current = true + and is_delete = false + ) then + action_str := 'repost'; + else + action_str := 'save'; + end if; + + insert into notification + (blocknumber, user_ids, timestamp, type, specifier, group_id, data) + values + ( + new.completed_blocknumber, + ARRAY[new.user_id], + new.completed_at, + 'tastemaker', + track_id_int::text, + 'tastemaker_user_id:' || new.user_id || ':tastemaker_item_id:' || track_id_int, + jsonb_build_object( + 'tastemaker_item_id', track_id_int, + 'tastemaker_item_type', 'track', + 'tastemaker_item_owner_id', owner_id_int, + 'action', action_str, + 'tastemaker_user_id', new.user_id + ) + ) + on conflict do nothing; + + return null; + +exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + return null; +end; +$_$; + + -- -- Name: handle_track(); Type: FUNCTION; Schema: public; Owner: - -- @@ -4954,6 +5040,99 @@ end; $$; +-- +-- Name: handle_trending(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.handle_trending() RETURNS trigger + LANGUAGE plpgsql + AS $$ +declare + rank_int int; + week_date date; + entity_id_str text; + entity_id_int bigint; + notif_type text; + trend_type text; + ts_epoch bigint; + data_jsonb jsonb; +begin + if new.challenge_id not in ('tt', 'tut') then + return null; + end if; + + case new.challenge_id + when 'tt' then notif_type := 'trending'; trend_type := 'TRACKS'; + when 'tut' then notif_type := 'trending_underground'; trend_type := 'UNDERGROUND_TRACKS'; + end case; + + -- Specifier: ":" + begin + week_date := split_part(new.specifier, ':', 1)::date; + rank_int := split_part(new.specifier, ':', 2)::int; + exception when others then + return null; + end; + + -- Recover entity id from the trending_results row the processor wrote + -- earlier in this transaction. PK is (rank, type, version, week); we + -- pin to NEW.user_id so we ignore any unrelated version rows. + select id + into entity_id_str + from trending_results + where rank = rank_int + and type = trend_type + and week = week_date + and user_id = new.user_id + limit 1; + if entity_id_str is null then + return null; + end if; + begin + entity_id_int := entity_id_str::bigint; + exception when others then + return null; + end; + + -- timestamp suffix matches apps: epoch seconds of the recompute. We + -- use completed_at which is set by UpsertUserChallenge to now() on + -- the first insert — close enough to the recompute moment. + ts_epoch := extract(epoch from new.completed_at)::bigint; + + data_jsonb := jsonb_build_object( + 'time_range', 'week', + 'genre', 'all', + 'rank', rank_int, + 'track_id', entity_id_int + ); + + insert into notification + (blocknumber, user_ids, timestamp, type, specifier, group_id, data) + values + ( + new.completed_blocknumber, + ARRAY[new.user_id], + new.completed_at, + notif_type, + entity_id_int::text, + notif_type + || ':time_range:week:genre:all:rank:' || rank_int + || ':track_id:' || entity_id_int + || ':timestamp:' || ts_epoch, + data_jsonb + ) + on conflict do nothing; + + return null; + +exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + return null; +end; +$$; + + -- -- Name: handle_usdc_purchase(); Type: FUNCTION; Schema: public; Owner: - -- @@ -13952,6 +14131,13 @@ CREATE TRIGGER on_sol_usdc_withdrawal AFTER INSERT ON public.sol_claimable_accou CREATE TRIGGER on_supporter_rank_up AFTER INSERT ON public.supporter_rank_ups FOR EACH ROW EXECUTE FUNCTION public.handle_supporter_rank_up(); +-- +-- Name: user_challenges on_tastemaker_user_challenge; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER on_tastemaker_user_challenge AFTER INSERT ON public.user_challenges FOR EACH ROW WHEN (((new.challenge_id)::text = 't'::text)) EXECUTE FUNCTION public.handle_tastemaker(); + + -- -- Name: tracks on_track; Type: TRIGGER; Schema: public; Owner: - -- @@ -13966,6 +14152,13 @@ CREATE TRIGGER on_track AFTER INSERT OR UPDATE ON public.tracks FOR EACH ROW EXE CREATE TRIGGER on_track_notify_pending_purchase_revalidation AFTER INSERT OR UPDATE OF blocknumber ON public.tracks FOR EACH ROW EXECUTE FUNCTION public.notify_pending_purchase_revalidation(); +-- +-- Name: user_challenges on_trending_user_challenge; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER on_trending_user_challenge AFTER INSERT ON public.user_challenges FOR EACH ROW WHEN (((new.challenge_id)::text = ANY ((ARRAY['tt'::character varying, 'tut'::character varying])::text[]))) EXECUTE FUNCTION public.handle_trending(); + + -- -- Name: usdc_purchases on_usdc_purchase; Type: TRIGGER; Schema: public; Owner: - --