Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions ddl/functions/handle_tastemaker.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
-- handle_tastemaker
--
-- Emits a `tastemaker` notification when the tastemaker challenge
-- processor (challenge_id 't') mints a user_challenges row. Each row
-- corresponds to one user who reposted or saved a track that later went
-- trending. The notification tells the tastemaker user that they were
-- early to a now-trending track.
--
-- Sibling of handle_user_challenges.sql which already emits the generic
-- `challenge_reward` notification for all challenge completions. This
-- trigger is the type-specific layer that matches apps' tastemaker
-- notification (src/tasks/index_tastemaker.py).
--
-- Specifier shape from jobs/challenges/tastemaker.go is
-- "<hex_user_id>:t:<hex_track_id>" — we parse the trailing hex track_id,
-- look up its owner from `tracks`, and infer the action (repost takes
-- precedence over save, matching apps' dedupe_notifications_by_group_id).
create or replace function handle_tastemaker() returns trigger as $$
declare
track_hex text;
track_id_int bigint;
owner_id_int int;
action_str text;
begin
-- WHEN clause on the trigger gates challenge_id='t', but defend in
-- depth here too in case the trigger is invoked another way.
if new.challenge_id <> 't' then
return null;
end if;

-- Parse trailing hex segment "<user_hex>:t:<track_hex>" → track_id.
track_hex := split_part(new.specifier, ':', 3);
if track_hex !~ '^[0-9a-f]+$' then
return null;
end if;
track_id_int := ('x' || lpad(track_hex, 16, '0'))::bit(64)::bigint;
if track_id_int <= 0 then
return null;
end if;

select t.owner_id
into owner_id_int
from tracks t
where t.track_id = track_id_int
and t.is_current = true
limit 1;
if owner_id_int is null then
return null;
end if;

-- Repost takes precedence over save when a user is in both lists for
-- the same track — matches apps' dedupe_notifications_by_group_id
-- where repost_notifications win over save_notifications.
if exists (
select 1
from reposts
where user_id = new.user_id
and repost_item_id = track_id_int
and repost_type = 'track'
and is_current = true
and is_delete = false
) then
action_str := 'repost';
else
action_str := 'save';
end if;

insert into notification
(blocknumber, user_ids, timestamp, type, specifier, group_id, data)
values
(
new.completed_blocknumber,
ARRAY[new.user_id],
new.completed_at,
'tastemaker',
track_id_int::text,
'tastemaker_user_id:' || new.user_id || ':tastemaker_item_id:' || track_id_int,
jsonb_build_object(
'tastemaker_item_id', track_id_int,
'tastemaker_item_type', 'track',
'tastemaker_item_owner_id', owner_id_int,
'action', action_str,
'tastemaker_user_id', new.user_id
)
)
on conflict do nothing;

return null;

exception
when others then
raise warning 'An error occurred in %: %', tg_name, sqlerrm;
return null;
end;
$$ language plpgsql;


do $$ begin
-- Fire only on INSERT (not UPDATE) so the notification is minted
-- exactly once per (user_id, track_id) pair — UpsertUserChallenge
-- hits its ON CONFLICT DO UPDATE branch on re-runs, which does not
-- fire AFTER INSERT triggers.
create trigger on_tastemaker_user_challenge
after insert on user_challenges
for each row when (new.challenge_id = 't')
execute procedure handle_tastemaker();
exception
when others then null;
end $$;
119 changes: 119 additions & 0 deletions ddl/functions/handle_trending.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
-- handle_trending
--
-- Emits a `trending` or `trending_underground` notification when the
-- trending challenge processor mints a user_challenges row for
-- challenge_id 'tt' / 'tut'. These are the "your track is trending"
-- notifications shown to the track owner.
--
-- (Trending playlists — challenge_id 'tp' — were a product feature that
-- has since been removed, so they're intentionally not handled here.
-- handle_user_challenges.sql still excludes 'tp' from the
-- claimable_reward path on line 14 for historical rows.)
--
-- Sibling of handle_user_challenges.sql which already emits the generic
-- `challenge_reward` notification for all challenge completions. This
-- trigger is the type-specific layer that matches apps'
-- index_trending.py notifications.
--
-- Specifier shape from jobs/challenges/trending.go is "<week>:<rank>"
-- (e.g. "2026-05-22:3"). Entity id is recovered from `trending_results`,
-- which the same processor wrote earlier in this transaction.
create or replace function handle_trending() returns trigger as $$
declare
rank_int int;
week_date date;
entity_id_str text;
entity_id_int bigint;
notif_type text;
trend_type text;
ts_epoch bigint;
data_jsonb jsonb;
begin
if new.challenge_id not in ('tt', 'tut') then
return null;
end if;

case new.challenge_id
when 'tt' then notif_type := 'trending'; trend_type := 'TRACKS';
when 'tut' then notif_type := 'trending_underground'; trend_type := 'UNDERGROUND_TRACKS';
end case;

-- Specifier: "<YYYY-MM-DD>:<rank>"
begin
week_date := split_part(new.specifier, ':', 1)::date;
rank_int := split_part(new.specifier, ':', 2)::int;
exception when others then
return null;
end;

-- Recover entity id from the trending_results row the processor wrote
-- earlier in this transaction. PK is (rank, type, version, week); we
-- pin to NEW.user_id so we ignore any unrelated version rows.
select id
into entity_id_str
from trending_results
where rank = rank_int
and type = trend_type
and week = week_date
and user_id = new.user_id
limit 1;
if entity_id_str is null then
return null;
end if;
begin
entity_id_int := entity_id_str::bigint;
exception when others then
return null;
end;

-- timestamp suffix matches apps: epoch seconds of the recompute. We
-- use completed_at which is set by UpsertUserChallenge to now() on
-- the first insert — close enough to the recompute moment.
ts_epoch := extract(epoch from new.completed_at)::bigint;

data_jsonb := jsonb_build_object(
'time_range', 'week',
'genre', 'all',
'rank', rank_int,
'track_id', entity_id_int
);

insert into notification
(blocknumber, user_ids, timestamp, type, specifier, group_id, data)
values
(
new.completed_blocknumber,
ARRAY[new.user_id],
new.completed_at,
notif_type,
entity_id_int::text,
notif_type
|| ':time_range:week:genre:all:rank:' || rank_int
|| ':track_id:' || entity_id_int
|| ':timestamp:' || ts_epoch,
data_jsonb
)
on conflict do nothing;

return null;

exception
when others then
raise warning 'An error occurred in %: %', tg_name, sqlerrm;
return null;
end;
$$ language plpgsql;


do $$ begin
-- Fire only on INSERT (not UPDATE) so the notification is minted
-- exactly once per (challenge_id, week, rank) — re-runs hit
-- UpsertUserChallenge's ON CONFLICT DO UPDATE branch and do not
-- fire AFTER INSERT triggers.
create trigger on_trending_user_challenge
after insert on user_challenges
for each row when (new.challenge_id in ('tt', 'tut'))
execute procedure handle_trending();
exception
when others then null;
end $$;
81 changes: 81 additions & 0 deletions jobs/challenges/tastemaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package challenges

import (
"context"
"encoding/json"
"fmt"
"testing"

Expand Down Expand Up @@ -57,3 +58,83 @@ func TestTastemaker_EarliestRepostersAndSavers(t *testing.T) {
}
}
}

// TestTastemaker_EmitsNotification — handle_tastemaker trigger fans out
// a `tastemaker` notification when a user_challenges row is minted for
// challenge_id='t'. Repost wins over save when a user qualifies via both.
func TestTastemaker_EmitsNotification(t *testing.T) {
pool := withChallengesDB(t)
ctx := context.Background()

database.Seed(pool, database.FixtureMap{
"blocks": {{"blockhash": "blk_tmn", "number": 1}},
"users": {
{"user_id": 2200, "wallet": "0x2200"},
{"user_id": 2201, "wallet": "0x2201"},
{"user_id": 2202, "wallet": "0x2202"},
},
"tracks": {{"track_id": 220000, "owner_id": 2200, "title": "Hit", "blocknumber": 1}},
})

_, err := pool.Exec(ctx, `
INSERT INTO track_trending_scores (track_id, type, version, time_range, score, created_at)
VALUES (220000, 'TRACKS', 'pnagD', 'week', 999.0, now())
`)
require.NoError(t, err)

// 2201 reposts only; 2202 both reposts AND saves (repost should win).
_, err = pool.Exec(ctx, `
INSERT INTO reposts (user_id, repost_item_id, repost_type, is_current, is_delete, created_at, txhash)
VALUES
(2201, 220000, 'track', true, false, now() - interval '2 hours', 'tx-rp-2201'),
(2202, 220000, 'track', true, false, now() - interval '1 hour', 'tx-rp-2202')
`)
require.NoError(t, err)
_, err = pool.Exec(ctx, `
INSERT INTO saves (user_id, save_item_id, save_type, is_current, is_delete, created_at, txhash)
VALUES (2202, 220000, 'track', true, false, now() - interval '30 minutes', 'tx-sv-2202')
`)
require.NoError(t, err)

runProcessor(t, pool, &TastemakerProcessor{})

type notifRow struct {
UserIDs []int64
Specifier string
GroupID string
Data []byte
}
queryNotif := func(uid int) (notifRow, bool) {
var n notifRow
err := pool.QueryRow(ctx, `
SELECT user_ids, specifier, group_id, data
FROM notification
WHERE type = 'tastemaker'
AND group_id = $1
`, fmt.Sprintf("tastemaker_user_id:%d:tastemaker_item_id:%d", uid, 220000)).
Scan(&n.UserIDs, &n.Specifier, &n.GroupID, &n.Data)
if err != nil {
return notifRow{}, false
}
return n, true
}

n2201, ok := queryNotif(2201)
require.True(t, ok, "expected tastemaker notif for user 2201")
assert.Equal(t, []int64{2201}, n2201.UserIDs)
assert.Equal(t, "220000", n2201.Specifier)

var data2201 map[string]any
require.NoError(t, json.Unmarshal(n2201.Data, &data2201))
assert.Equal(t, "repost", data2201["action"])
assert.Equal(t, "track", data2201["tastemaker_item_type"])
assert.EqualValues(t, 220000, data2201["tastemaker_item_id"])
assert.EqualValues(t, 2200, data2201["tastemaker_item_owner_id"])
assert.EqualValues(t, 2201, data2201["tastemaker_user_id"])

n2202, ok := queryNotif(2202)
require.True(t, ok, "expected tastemaker notif for user 2202")
var data2202 map[string]any
require.NoError(t, json.Unmarshal(n2202.Data, &data2202))
assert.Equal(t, "repost", data2202["action"], "user has both repost and save — repost wins")
}
65 changes: 65 additions & 0 deletions jobs/challenges/trending_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package challenges

import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -91,3 +92,67 @@ func TestTrending_SkipsNonFriday(t *testing.T) {
"SELECT COUNT(*) FROM trending_results").Scan(&count))
assert.Equal(t, 0, count, "no rows written on non-Friday")
}

// TestTrending_EmitsNotification — handle_trending trigger fans out a
// `trending` notification when a user_challenges row is minted for 'tt'.
// Skips on non-Fridays since the underlying processor is Friday-gated.
func TestTrending_EmitsNotification(t *testing.T) {
if time.Now().UTC().Weekday() != time.Friday {
t.Skip("trending processor only runs on Fridays UTC")
}
pool := withChallengesDB(t)
ctx := context.Background()
now := time.Now()

database.Seed(pool, database.FixtureMap{
"blocks": {{"blockhash": "blk_ttn", "number": 1}},
"users": {
{"user_id": 600, "wallet": "0x600"},
{"user_id": 601, "wallet": "0x601"},
},
"tracks": {
{"track_id": 6001, "owner_id": 600, "title": "Hit1", "blocknumber": 1, "created_at": now},
{"track_id": 6011, "owner_id": 601, "title": "Hit2", "blocknumber": 1, "created_at": now},
},
})

for i, tid := range []int{6001, 6011} {
_, err := pool.Exec(ctx, `
INSERT INTO track_trending_scores (track_id, type, version, time_range, score, created_at)
VALUES ($1, 'TRACKS', 'pnagD', 'week', $2, now())
`, tid, float64(100-i))
require.NoError(t, err)
}

runProcessor(t, pool, NewTrendingTrackProcessor())

weekDate := time.Date(now.UTC().Year(), now.UTC().Month(), now.UTC().Day(), 0, 0, 0, 0, time.UTC).Format("2006-01-02")

// Find the rank-1 row.
var ownerID int64
var specifier string
require.NoError(t, pool.QueryRow(ctx, `
SELECT user_id, specifier FROM user_challenges
WHERE challenge_id = 'tt' AND specifier = $1
`, fmt.Sprintf("%s:1", weekDate)).Scan(&ownerID, &specifier))

// Find the corresponding notification.
var nUserIDs []int64
var nSpecifier, nGroupID string
var nData []byte
err := pool.QueryRow(ctx, `
SELECT user_ids, specifier, group_id, data
FROM notification
WHERE type = 'trending' AND user_ids = ARRAY[$1::int]
`, ownerID).Scan(&nUserIDs, &nSpecifier, &nGroupID, &nData)
require.NoError(t, err, "expected trending notif for owner %d", ownerID)

var data map[string]any
require.NoError(t, json.Unmarshal(nData, &data))
assert.Equal(t, "week", data["time_range"])
assert.Equal(t, "all", data["genre"])
assert.EqualValues(t, 1, data["rank"])
assert.Contains(t, data, "track_id")
assert.Contains(t, nGroupID, ":rank:1:track_id:")
assert.Contains(t, nGroupID, "trending:time_range:week:genre:all")
}
Loading
Loading