diff --git a/ddl/functions/handle_tastemaker.sql b/ddl/functions/handle_tastemaker.sql new file mode 100644 index 00000000..4fc20367 --- /dev/null +++ b/ddl/functions/handle_tastemaker.sql @@ -0,0 +1,109 @@ +-- handle_tastemaker +-- +-- Emits a `tastemaker` notification when the tastemaker challenge +-- processor (challenge_id 't') mints a user_challenges row. Each row +-- corresponds to one user who reposted or saved a track that later went +-- trending. The notification tells the tastemaker user that they were +-- early to a now-trending track. +-- +-- Sibling of handle_user_challenges.sql which already emits the generic +-- `challenge_reward` notification for all challenge completions. This +-- trigger is the type-specific layer that matches apps' tastemaker +-- notification (src/tasks/index_tastemaker.py). +-- +-- Specifier shape from jobs/challenges/tastemaker.go is +-- ":t:" — we parse the trailing hex track_id, +-- look up its owner from `tracks`, and infer the action (repost takes +-- precedence over save, matching apps' dedupe_notifications_by_group_id). +create or replace function handle_tastemaker() returns trigger as $$ +declare + track_hex text; + track_id_int bigint; + owner_id_int int; + action_str text; +begin + -- WHEN clause on the trigger gates challenge_id='t', but defend in + -- depth here too in case the trigger is invoked another way. + if new.challenge_id <> 't' then + return null; + end if; + + -- Parse trailing hex segment ":t:" → track_id. + track_hex := split_part(new.specifier, ':', 3); + if track_hex !~ '^[0-9a-f]+$' then + return null; + end if; + track_id_int := ('x' || lpad(track_hex, 16, '0'))::bit(64)::bigint; + if track_id_int <= 0 then + return null; + end if; + + select t.owner_id + into owner_id_int + from tracks t + where t.track_id = track_id_int + and t.is_current = true + limit 1; + if owner_id_int is null then + return null; + end if; + + -- Repost takes precedence over save when a user is in both lists for + -- the same track — matches apps' dedupe_notifications_by_group_id + -- where repost_notifications win over save_notifications. + if exists ( + select 1 + from reposts + where user_id = new.user_id + and repost_item_id = track_id_int + and repost_type = 'track' + and is_current = true + and is_delete = false + ) then + action_str := 'repost'; + else + action_str := 'save'; + end if; + + insert into notification + (blocknumber, user_ids, timestamp, type, specifier, group_id, data) + values + ( + new.completed_blocknumber, + ARRAY[new.user_id], + new.completed_at, + 'tastemaker', + track_id_int::text, + 'tastemaker_user_id:' || new.user_id || ':tastemaker_item_id:' || track_id_int, + jsonb_build_object( + 'tastemaker_item_id', track_id_int, + 'tastemaker_item_type', 'track', + 'tastemaker_item_owner_id', owner_id_int, + 'action', action_str, + 'tastemaker_user_id', new.user_id + ) + ) + on conflict do nothing; + + return null; + +exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + return null; +end; +$$ language plpgsql; + + +do $$ begin + -- Fire only on INSERT (not UPDATE) so the notification is minted + -- exactly once per (user_id, track_id) pair — UpsertUserChallenge + -- hits its ON CONFLICT DO UPDATE branch on re-runs, which does not + -- fire AFTER INSERT triggers. + create trigger on_tastemaker_user_challenge + after insert on user_challenges + for each row when (new.challenge_id = 't') + execute procedure handle_tastemaker(); +exception + when others then null; +end $$; diff --git a/ddl/functions/handle_trending.sql b/ddl/functions/handle_trending.sql new file mode 100644 index 00000000..20216e5b --- /dev/null +++ b/ddl/functions/handle_trending.sql @@ -0,0 +1,119 @@ +-- handle_trending +-- +-- Emits a `trending` or `trending_underground` notification when the +-- trending challenge processor mints a user_challenges row for +-- challenge_id 'tt' / 'tut'. These are the "your track is trending" +-- notifications shown to the track owner. +-- +-- (Trending playlists — challenge_id 'tp' — were a product feature that +-- has since been removed, so they're intentionally not handled here. +-- handle_user_challenges.sql still excludes 'tp' from the +-- claimable_reward path on line 14 for historical rows.) +-- +-- Sibling of handle_user_challenges.sql which already emits the generic +-- `challenge_reward` notification for all challenge completions. This +-- trigger is the type-specific layer that matches apps' +-- index_trending.py notifications. +-- +-- Specifier shape from jobs/challenges/trending.go is ":" +-- (e.g. "2026-05-22:3"). Entity id is recovered from `trending_results`, +-- which the same processor wrote earlier in this transaction. +create or replace function handle_trending() returns trigger as $$ +declare + rank_int int; + week_date date; + entity_id_str text; + entity_id_int bigint; + notif_type text; + trend_type text; + ts_epoch bigint; + data_jsonb jsonb; +begin + if new.challenge_id not in ('tt', 'tut') then + return null; + end if; + + case new.challenge_id + when 'tt' then notif_type := 'trending'; trend_type := 'TRACKS'; + when 'tut' then notif_type := 'trending_underground'; trend_type := 'UNDERGROUND_TRACKS'; + end case; + + -- Specifier: ":" + begin + week_date := split_part(new.specifier, ':', 1)::date; + rank_int := split_part(new.specifier, ':', 2)::int; + exception when others then + return null; + end; + + -- Recover entity id from the trending_results row the processor wrote + -- earlier in this transaction. PK is (rank, type, version, week); we + -- pin to NEW.user_id so we ignore any unrelated version rows. + select id + into entity_id_str + from trending_results + where rank = rank_int + and type = trend_type + and week = week_date + and user_id = new.user_id + limit 1; + if entity_id_str is null then + return null; + end if; + begin + entity_id_int := entity_id_str::bigint; + exception when others then + return null; + end; + + -- timestamp suffix matches apps: epoch seconds of the recompute. We + -- use completed_at which is set by UpsertUserChallenge to now() on + -- the first insert — close enough to the recompute moment. + ts_epoch := extract(epoch from new.completed_at)::bigint; + + data_jsonb := jsonb_build_object( + 'time_range', 'week', + 'genre', 'all', + 'rank', rank_int, + 'track_id', entity_id_int + ); + + insert into notification + (blocknumber, user_ids, timestamp, type, specifier, group_id, data) + values + ( + new.completed_blocknumber, + ARRAY[new.user_id], + new.completed_at, + notif_type, + entity_id_int::text, + notif_type + || ':time_range:week:genre:all:rank:' || rank_int + || ':track_id:' || entity_id_int + || ':timestamp:' || ts_epoch, + data_jsonb + ) + on conflict do nothing; + + return null; + +exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + return null; +end; +$$ language plpgsql; + + +do $$ begin + -- Fire only on INSERT (not UPDATE) so the notification is minted + -- exactly once per (challenge_id, week, rank) — re-runs hit + -- UpsertUserChallenge's ON CONFLICT DO UPDATE branch and do not + -- fire AFTER INSERT triggers. + create trigger on_trending_user_challenge + after insert on user_challenges + for each row when (new.challenge_id in ('tt', 'tut')) + execute procedure handle_trending(); +exception + when others then null; +end $$; diff --git a/jobs/challenges/tastemaker_test.go b/jobs/challenges/tastemaker_test.go index d0cac1a2..e5967a6b 100644 --- a/jobs/challenges/tastemaker_test.go +++ b/jobs/challenges/tastemaker_test.go @@ -2,6 +2,7 @@ package challenges import ( "context" + "encoding/json" "fmt" "testing" @@ -57,3 +58,83 @@ func TestTastemaker_EarliestRepostersAndSavers(t *testing.T) { } } } + +// TestTastemaker_EmitsNotification — handle_tastemaker trigger fans out +// a `tastemaker` notification when a user_challenges row is minted for +// challenge_id='t'. Repost wins over save when a user qualifies via both. +func TestTastemaker_EmitsNotification(t *testing.T) { + pool := withChallengesDB(t) + ctx := context.Background() + + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_tmn", "number": 1}}, + "users": { + {"user_id": 2200, "wallet": "0x2200"}, + {"user_id": 2201, "wallet": "0x2201"}, + {"user_id": 2202, "wallet": "0x2202"}, + }, + "tracks": {{"track_id": 220000, "owner_id": 2200, "title": "Hit", "blocknumber": 1}}, + }) + + _, err := pool.Exec(ctx, ` + INSERT INTO track_trending_scores (track_id, type, version, time_range, score, created_at) + VALUES (220000, 'TRACKS', 'pnagD', 'week', 999.0, now()) + `) + require.NoError(t, err) + + // 2201 reposts only; 2202 both reposts AND saves (repost should win). + _, err = pool.Exec(ctx, ` + INSERT INTO reposts (user_id, repost_item_id, repost_type, is_current, is_delete, created_at, txhash) + VALUES + (2201, 220000, 'track', true, false, now() - interval '2 hours', 'tx-rp-2201'), + (2202, 220000, 'track', true, false, now() - interval '1 hour', 'tx-rp-2202') + `) + require.NoError(t, err) + _, err = pool.Exec(ctx, ` + INSERT INTO saves (user_id, save_item_id, save_type, is_current, is_delete, created_at, txhash) + VALUES (2202, 220000, 'track', true, false, now() - interval '30 minutes', 'tx-sv-2202') + `) + require.NoError(t, err) + + runProcessor(t, pool, &TastemakerProcessor{}) + + type notifRow struct { + UserIDs []int64 + Specifier string + GroupID string + Data []byte + } + queryNotif := func(uid int) (notifRow, bool) { + var n notifRow + err := pool.QueryRow(ctx, ` + SELECT user_ids, specifier, group_id, data + FROM notification + WHERE type = 'tastemaker' + AND group_id = $1 + `, fmt.Sprintf("tastemaker_user_id:%d:tastemaker_item_id:%d", uid, 220000)). + Scan(&n.UserIDs, &n.Specifier, &n.GroupID, &n.Data) + if err != nil { + return notifRow{}, false + } + return n, true + } + + n2201, ok := queryNotif(2201) + require.True(t, ok, "expected tastemaker notif for user 2201") + assert.Equal(t, []int64{2201}, n2201.UserIDs) + assert.Equal(t, "220000", n2201.Specifier) + + var data2201 map[string]any + require.NoError(t, json.Unmarshal(n2201.Data, &data2201)) + assert.Equal(t, "repost", data2201["action"]) + assert.Equal(t, "track", data2201["tastemaker_item_type"]) + assert.EqualValues(t, 220000, data2201["tastemaker_item_id"]) + assert.EqualValues(t, 2200, data2201["tastemaker_item_owner_id"]) + assert.EqualValues(t, 2201, data2201["tastemaker_user_id"]) + + n2202, ok := queryNotif(2202) + require.True(t, ok, "expected tastemaker notif for user 2202") + var data2202 map[string]any + require.NoError(t, json.Unmarshal(n2202.Data, &data2202)) + assert.Equal(t, "repost", data2202["action"], "user has both repost and save — repost wins") +} diff --git a/jobs/challenges/trending_test.go b/jobs/challenges/trending_test.go index bde6b337..aecfa704 100644 --- a/jobs/challenges/trending_test.go +++ b/jobs/challenges/trending_test.go @@ -2,6 +2,7 @@ package challenges import ( "context" + "encoding/json" "fmt" "testing" "time" @@ -91,3 +92,67 @@ func TestTrending_SkipsNonFriday(t *testing.T) { "SELECT COUNT(*) FROM trending_results").Scan(&count)) assert.Equal(t, 0, count, "no rows written on non-Friday") } + +// TestTrending_EmitsNotification — handle_trending trigger fans out a +// `trending` notification when a user_challenges row is minted for 'tt'. +// Skips on non-Fridays since the underlying processor is Friday-gated. +func TestTrending_EmitsNotification(t *testing.T) { + if time.Now().UTC().Weekday() != time.Friday { + t.Skip("trending processor only runs on Fridays UTC") + } + pool := withChallengesDB(t) + ctx := context.Background() + now := time.Now() + + database.Seed(pool, database.FixtureMap{ + "blocks": {{"blockhash": "blk_ttn", "number": 1}}, + "users": { + {"user_id": 600, "wallet": "0x600"}, + {"user_id": 601, "wallet": "0x601"}, + }, + "tracks": { + {"track_id": 6001, "owner_id": 600, "title": "Hit1", "blocknumber": 1, "created_at": now}, + {"track_id": 6011, "owner_id": 601, "title": "Hit2", "blocknumber": 1, "created_at": now}, + }, + }) + + for i, tid := range []int{6001, 6011} { + _, err := pool.Exec(ctx, ` + INSERT INTO track_trending_scores (track_id, type, version, time_range, score, created_at) + VALUES ($1, 'TRACKS', 'pnagD', 'week', $2, now()) + `, tid, float64(100-i)) + require.NoError(t, err) + } + + runProcessor(t, pool, NewTrendingTrackProcessor()) + + weekDate := time.Date(now.UTC().Year(), now.UTC().Month(), now.UTC().Day(), 0, 0, 0, 0, time.UTC).Format("2006-01-02") + + // Find the rank-1 row. + var ownerID int64 + var specifier string + require.NoError(t, pool.QueryRow(ctx, ` + SELECT user_id, specifier FROM user_challenges + WHERE challenge_id = 'tt' AND specifier = $1 + `, fmt.Sprintf("%s:1", weekDate)).Scan(&ownerID, &specifier)) + + // Find the corresponding notification. + var nUserIDs []int64 + var nSpecifier, nGroupID string + var nData []byte + err := pool.QueryRow(ctx, ` + SELECT user_ids, specifier, group_id, data + FROM notification + WHERE type = 'trending' AND user_ids = ARRAY[$1::int] + `, ownerID).Scan(&nUserIDs, &nSpecifier, &nGroupID, &nData) + require.NoError(t, err, "expected trending notif for owner %d", ownerID) + + var data map[string]any + require.NoError(t, json.Unmarshal(nData, &data)) + assert.Equal(t, "week", data["time_range"]) + assert.Equal(t, "all", data["genre"]) + assert.EqualValues(t, 1, data["rank"]) + assert.Contains(t, data, "track_id") + assert.Contains(t, nGroupID, ":rank:1:track_id:") + assert.Contains(t, nGroupID, "trending:time_range:week:genre:all") +} diff --git a/sql/01_schema.sql b/sql/01_schema.sql index 54944a6e..06abf11a 100644 --- a/sql/01_schema.sql +++ b/sql/01_schema.sql @@ -4703,6 +4703,92 @@ end; $$; +-- +-- Name: handle_tastemaker(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.handle_tastemaker() RETURNS trigger + LANGUAGE plpgsql + AS $_$ +declare + track_hex text; + track_id_int bigint; + owner_id_int int; + action_str text; +begin + -- WHEN clause on the trigger gates challenge_id='t', but defend in + -- depth here too in case the trigger is invoked another way. + if new.challenge_id <> 't' then + return null; + end if; + + -- Parse trailing hex segment ":t:" → track_id. + track_hex := split_part(new.specifier, ':', 3); + if track_hex !~ '^[0-9a-f]+$' then + return null; + end if; + track_id_int := ('x' || lpad(track_hex, 16, '0'))::bit(64)::bigint; + if track_id_int <= 0 then + return null; + end if; + + select t.owner_id + into owner_id_int + from tracks t + where t.track_id = track_id_int + and t.is_current = true + limit 1; + if owner_id_int is null then + return null; + end if; + + -- Repost takes precedence over save when a user is in both lists for + -- the same track — matches apps' dedupe_notifications_by_group_id + -- where repost_notifications win over save_notifications. + if exists ( + select 1 + from reposts + where user_id = new.user_id + and repost_item_id = track_id_int + and repost_type = 'track' + and is_current = true + and is_delete = false + ) then + action_str := 'repost'; + else + action_str := 'save'; + end if; + + insert into notification + (blocknumber, user_ids, timestamp, type, specifier, group_id, data) + values + ( + new.completed_blocknumber, + ARRAY[new.user_id], + new.completed_at, + 'tastemaker', + track_id_int::text, + 'tastemaker_user_id:' || new.user_id || ':tastemaker_item_id:' || track_id_int, + jsonb_build_object( + 'tastemaker_item_id', track_id_int, + 'tastemaker_item_type', 'track', + 'tastemaker_item_owner_id', owner_id_int, + 'action', action_str, + 'tastemaker_user_id', new.user_id + ) + ) + on conflict do nothing; + + return null; + +exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + return null; +end; +$_$; + + -- -- Name: handle_track(); Type: FUNCTION; Schema: public; Owner: - -- @@ -4954,6 +5040,99 @@ end; $$; +-- +-- Name: handle_trending(); Type: FUNCTION; Schema: public; Owner: - +-- + +CREATE FUNCTION public.handle_trending() RETURNS trigger + LANGUAGE plpgsql + AS $$ +declare + rank_int int; + week_date date; + entity_id_str text; + entity_id_int bigint; + notif_type text; + trend_type text; + ts_epoch bigint; + data_jsonb jsonb; +begin + if new.challenge_id not in ('tt', 'tut') then + return null; + end if; + + case new.challenge_id + when 'tt' then notif_type := 'trending'; trend_type := 'TRACKS'; + when 'tut' then notif_type := 'trending_underground'; trend_type := 'UNDERGROUND_TRACKS'; + end case; + + -- Specifier: ":" + begin + week_date := split_part(new.specifier, ':', 1)::date; + rank_int := split_part(new.specifier, ':', 2)::int; + exception when others then + return null; + end; + + -- Recover entity id from the trending_results row the processor wrote + -- earlier in this transaction. PK is (rank, type, version, week); we + -- pin to NEW.user_id so we ignore any unrelated version rows. + select id + into entity_id_str + from trending_results + where rank = rank_int + and type = trend_type + and week = week_date + and user_id = new.user_id + limit 1; + if entity_id_str is null then + return null; + end if; + begin + entity_id_int := entity_id_str::bigint; + exception when others then + return null; + end; + + -- timestamp suffix matches apps: epoch seconds of the recompute. We + -- use completed_at which is set by UpsertUserChallenge to now() on + -- the first insert — close enough to the recompute moment. + ts_epoch := extract(epoch from new.completed_at)::bigint; + + data_jsonb := jsonb_build_object( + 'time_range', 'week', + 'genre', 'all', + 'rank', rank_int, + 'track_id', entity_id_int + ); + + insert into notification + (blocknumber, user_ids, timestamp, type, specifier, group_id, data) + values + ( + new.completed_blocknumber, + ARRAY[new.user_id], + new.completed_at, + notif_type, + entity_id_int::text, + notif_type + || ':time_range:week:genre:all:rank:' || rank_int + || ':track_id:' || entity_id_int + || ':timestamp:' || ts_epoch, + data_jsonb + ) + on conflict do nothing; + + return null; + +exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + return null; +end; +$$; + + -- -- Name: handle_usdc_purchase(); Type: FUNCTION; Schema: public; Owner: - -- @@ -13952,6 +14131,13 @@ CREATE TRIGGER on_sol_usdc_withdrawal AFTER INSERT ON public.sol_claimable_accou CREATE TRIGGER on_supporter_rank_up AFTER INSERT ON public.supporter_rank_ups FOR EACH ROW EXECUTE FUNCTION public.handle_supporter_rank_up(); +-- +-- Name: user_challenges on_tastemaker_user_challenge; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER on_tastemaker_user_challenge AFTER INSERT ON public.user_challenges FOR EACH ROW WHEN (((new.challenge_id)::text = 't'::text)) EXECUTE FUNCTION public.handle_tastemaker(); + + -- -- Name: tracks on_track; Type: TRIGGER; Schema: public; Owner: - -- @@ -13966,6 +14152,13 @@ CREATE TRIGGER on_track AFTER INSERT OR UPDATE ON public.tracks FOR EACH ROW EXE CREATE TRIGGER on_track_notify_pending_purchase_revalidation AFTER INSERT OR UPDATE OF blocknumber ON public.tracks FOR EACH ROW EXECUTE FUNCTION public.notify_pending_purchase_revalidation(); +-- +-- Name: user_challenges on_trending_user_challenge; Type: TRIGGER; Schema: public; Owner: - +-- + +CREATE TRIGGER on_trending_user_challenge AFTER INSERT ON public.user_challenges FOR EACH ROW WHEN (((new.challenge_id)::text = ANY ((ARRAY['tt'::character varying, 'tut'::character varying])::text[]))) EXECUTE FUNCTION public.handle_trending(); + + -- -- Name: usdc_purchases on_usdc_purchase; Type: TRIGGER; Schema: public; Owner: - --