diff --git a/indexer/indexer.go b/indexer/indexer.go index 58930466..86627692 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -89,6 +89,17 @@ func NewIndexer(cfg config.Config) *CoreIndexer { etlIndexer.SetUserCreatedHook(userEventsHook) etlIndexer.RegisterPostHook(em.EntityTypeUser, em.ActionUpdate, userEventsHook) + // Decode hashid-encoded track IDs in playlist_contents back to integers. + // ETL's playlist Create/Update handlers decode hashids only for the + // playlist_tracks junction; the JSONB column is written verbatim, so + // SDK-supplied string IDs ("1aV5byE") leak into the column and break + // the downstream `::int` cast in v1_playlist_tracks and the int64 Scan + // in dbv1.PlaylistContents. The hook re-reads the JSONB and rewrites + // any string `track`/`track_id` values to ints in the same tx. + playlistContentsHook := newPlaylistContentsHook(logger) + etlIndexer.RegisterPostHook(em.EntityTypePlaylist, em.ActionCreate, playlistContentsHook) + etlIndexer.RegisterPostHook(em.EntityTypePlaylist, em.ActionUpdate, playlistContentsHook) + return &CoreIndexer{ aggregatesCalculator: aggregatesCalculator, etlIndexer: etlIndexer, diff --git a/indexer/playlist_contents_hook.go b/indexer/playlist_contents_hook.go new file mode 100644 index 00000000..48e8b6e1 --- /dev/null +++ b/indexer/playlist_contents_hook.go @@ -0,0 +1,250 @@ +package indexer + +import ( + "context" + "encoding/json" + + "api.audius.co/trashid" + em "github.com/OpenAudio/go-openaudio/pkg/etl/processors/entity_manager" + "go.uber.org/zap" +) + +// newPlaylistContentsHook returns a pkg/etl post-hook that normalizes the +// playlists.playlist_contents JSONB column so it matches the canonical +// shape API readers expect, regardless of which field names and value +// types the SDK uploaded. +// +// Why this exists: the TypeScript SDK uploads playlist contents in its +// own camelCase shape and snakecase-keys converts that to snake_case +// before signing. The on-chain metadata therefore carries entries like: +// +// {"track_id": "1aV5byE", "timestamp": 1700000000, "metadata_timestamp": ...} +// +// (see audius-protocol packages/sdk/.../playlists/types.ts — the schema +// is `{ trackId: HashId, timestamp, metadataTimestamp? }`). ETL's +// `normalizePlaylistContentsJSON` writes those entries to the JSONB +// verbatim. The downstream API readers in this repo, however, expect the +// canonical shape that the legacy Python apps indexer always wrote: +// +// {"track": , "time": , "metadata_time": } +// +// - api/v1_playlist_tracks.go casts (e.value->>'track')::int — returns +// NULL when the column has `track_id` instead of `track`, which then +// fails the JOIN to tracks and the endpoint returns no rows. +// - api/dbv1/jsonb_types.go's PlaylistContents.TrackIDs.Track is +// int64, so JSON Scan returns 0 for any missing `track` field. Zero +// never matches a real track ID, so the playlist's tracks slice +// comes back empty. +// +// Either path produces "playlist is empty after a refresh" — which is +// exactly the prod report this hook is fixing. +// +// What it does: after a successful Playlist Create or Update, re-read +// the JSONB inside the same transaction and, per entry: +// +// - Resolve a `track` integer (from existing `track`/`track_id`, +// decoding via trashid.DecodeHashId for hashid strings). +// - Copy `timestamp` to `time` if `time` is missing. +// - Copy `metadata_timestamp` (falling back to `timestamp`) to +// `metadata_time` if missing. +// - Drop the SDK-style `track_id`/`timestamp`/`metadata_timestamp` +// keys after copying their values, matching the Python indexer's +// historical canonical output. +// +// Rows that already have the canonical shape (every entry has integer +// `track` + `time` + `metadata_time`, no SDK keys) are left byte- +// identical — no spurious UPDATE. +// +// Errors are logged and swallowed per the upstream PostHook contract; +// failure to normalize must not roll back the playlist row. +// +// This belongs upstream in normalizePlaylistContentsJSON itself +// (open follow-up to go-openaudio). Patching here lands the fix on +// this repo's CI/CD now without waiting for an ETL bump. +func newPlaylistContentsHook(logger *zap.Logger) em.PostHook { + hookLogger := logger.Named("PlaylistContentsHook") + + return func(ctx context.Context, params *em.Params) error { + return normalizePlaylistContentsRow(ctx, hookLogger, params) + } +} + +func normalizePlaylistContentsRow(ctx context.Context, logger *zap.Logger, params *em.Params) error { + // Only run when the dispatched tx actually carried playlist_contents. + // Updates that touch only the name/description don't rewrite the JSONB + // column, so there's nothing for us to fix. + if params.Metadata == nil { + return nil + } + if _, ok := params.Metadata["playlist_contents"]; !ok { + return nil + } + + var contentsRaw []byte + err := params.DBTX.QueryRow(ctx, + `SELECT playlist_contents FROM playlists + WHERE playlist_id = $1 AND is_current = true`, + params.EntityID, + ).Scan(&contentsRaw) + if err != nil { + logger.Debug("playlist_contents read missed; skipping normalization", + zap.Int64("playlist_id", params.EntityID), + zap.Error(err)) + return nil + } + if len(contentsRaw) == 0 { + return nil + } + + var contents struct { + TrackIDs []map[string]any `json:"track_ids"` + } + if err := json.Unmarshal(contentsRaw, &contents); err != nil { + // The upstream normalizer should always produce + // {"track_ids":[...]}; if it didn't, this is an unfamiliar shape + // and we shouldn't try to second-guess it. + logger.Warn("playlist_contents JSONB not in {track_ids:[...]} shape; skipping", + zap.Int64("playlist_id", params.EntityID), + zap.Error(err)) + return nil + } + + changed := false + for _, entry := range contents.TrackIDs { + if normalizeEntry(entry, logger, params.EntityID) { + changed = true + } + } + + if !changed { + return nil + } + + rewritten, err := json.Marshal(map[string]any{"track_ids": contents.TrackIDs}) + if err != nil { + logger.Warn("failed to remarshal normalized playlist_contents", + zap.Int64("playlist_id", params.EntityID), + zap.Error(err)) + return nil + } + + if _, err := params.DBTX.Exec(ctx, + `UPDATE playlists SET playlist_contents = $1 + WHERE playlist_id = $2 AND is_current = true`, + rewritten, params.EntityID, + ); err != nil { + logger.Warn("failed to write normalized playlist_contents", + zap.Int64("playlist_id", params.EntityID), + zap.Error(err)) + return nil + } + + logger.Info("normalized playlist_contents to canonical shape", + zap.Int64("playlist_id", params.EntityID)) + return nil +} + +// normalizeEntry mutates a single playlist_contents entry in place, +// returning true if it made any change. Canonical entries (integer +// `track`, numeric `time` + `metadata_time`, no SDK keys) pass through +// untouched. +// +// Field-rename policy matches what apps' Python process_playlist_contents +// emits: `track` is the integer ID, `time` is the block-indexed time +// (we use whatever the SDK gave as `timestamp`/`metadata_timestamp` — +// the legacy block_integer_time isn't available in a post-hook), and +// `metadata_time` is the SDK-supplied metadata timestamp. +func normalizeEntry(entry map[string]any, logger *zap.Logger, playlistID int64) (changed bool) { + // Track ID: target is integer `track`. + if trackVal, ok := resolveTrackID(entry, logger, playlistID); ok { + if !isInt64Value(entry["track"], trackVal) { + entry["track"] = trackVal + changed = true + } + } + if _, has := entry["track_id"]; has { + delete(entry, "track_id") + changed = true + } + + // time and metadata_time: copy from SDK-style fields if the canonical + // fields are absent. Prefer `timestamp` for `time`; prefer + // `metadata_timestamp` (with `timestamp` as fallback) for + // `metadata_time`. This mirrors the apps Python fallback chain. + if _, has := entry["time"]; !has { + for _, src := range []string{"timestamp", "metadata_timestamp"} { + if v, ok := entry[src].(float64); ok { + entry["time"] = v + changed = true + break + } + } + } + if _, has := entry["metadata_time"]; !has { + for _, src := range []string{"metadata_timestamp", "timestamp"} { + if v, ok := entry[src].(float64); ok { + entry["metadata_time"] = v + changed = true + break + } + } + } + + // Drop SDK-style keys whose values we've already copied to canonical. + for _, sdkKey := range []string{"timestamp", "metadata_timestamp"} { + if _, has := entry[sdkKey]; has { + delete(entry, sdkKey) + changed = true + } + } + + return changed +} + +// resolveTrackID returns the canonical integer track ID for an entry, +// looking first at `track` and falling back to `track_id`. String values +// are decoded via trashid.DecodeHashId (same Hashids salt + min length +// as the upstream junction-table decoder). +func resolveTrackID(entry map[string]any, logger *zap.Logger, playlistID int64) (int64, bool) { + for _, key := range []string{"track", "track_id"} { + raw, ok := entry[key] + if !ok { + continue + } + if v, ok := asInt64(raw); ok { + return v, true + } + strVal, isString := raw.(string) + if !isString || strVal == "" { + continue + } + decoded, err := trashid.DecodeHashId(strVal) + if err != nil { + logger.Warn("failed to decode track ID in playlist_contents; leaving entry alone", + zap.Int64("playlist_id", playlistID), + zap.String("key", key), + zap.String("raw", strVal), + zap.Error(err)) + continue + } + return int64(decoded), true + } + return 0, false +} + +func asInt64(raw any) (int64, bool) { + switch v := raw.(type) { + case float64: + return int64(v), true + case int64: + return v, true + case int: + return int64(v), true + } + return 0, false +} + +func isInt64Value(raw any, target int64) bool { + v, ok := asInt64(raw) + return ok && v == target +} diff --git a/indexer/playlist_contents_hook_test.go b/indexer/playlist_contents_hook_test.go new file mode 100644 index 00000000..16ba76cf --- /dev/null +++ b/indexer/playlist_contents_hook_test.go @@ -0,0 +1,304 @@ +package indexer + +import ( + "context" + "encoding/json" + "testing" + + "api.audius.co/database" + "api.audius.co/trashid" + em "github.com/OpenAudio/go-openaudio/pkg/etl/processors/entity_manager" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +// seedPlaylist seeds a single playlists row with the given contents JSON. +// Mirrors the column defaults in database/seed.go's playlists block. +func seedPlaylist(t *testing.T, playlistID int, ownerID int, contents string) database.FixtureMap { + t.Helper() + return database.FixtureMap{ + "playlists": { + { + "playlist_id": playlistID, + "playlist_owner_id": ownerID, + "playlist_name": "test", + "playlist_contents": contents, + }, + }, + } +} + +// runHook is shorthand for invoking the hook against an in-test playlists row. +func runHook(t *testing.T, pool *pgxpool.Pool, entityID int64) { + t.Helper() + logger := zap.NewNop() + hook := newPlaylistContentsHook(logger) + err := hook(context.Background(), &em.Params{ + EntityID: entityID, + EntityType: em.EntityTypePlaylist, + Action: em.ActionUpdate, + DBTX: pool, + // Non-nil to pass the metadata gate. + Metadata: map[string]any{"playlist_contents": []any{}}, + }) + assert.NoError(t, err) +} + +// TestPlaylistContentsHook_RenamesSDKShape covers the production regression: +// the TS SDK uploads `{"track_id": , "timestamp": }` (after +// snakecase-keys), ETL writes those entries to JSONB verbatim, and the API +// readers see no `track` field — so the playlist looks empty even though +// the row exists. The hook must rewrite the entry into the canonical +// `{"track": , "time": , "metadata_time": }` shape. +func TestPlaylistContentsHook_RenamesSDKShape(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_indexer") + + hashid, err := trashid.EncodeHashId(28622946) + require.NoError(t, err) + + sdkShape, err := json.Marshal(map[string]any{ + "track_ids": []map[string]any{ + {"track_id": hashid, "timestamp": 1700000000}, + }, + }) + require.NoError(t, err) + database.Seed(pool, seedPlaylist(t, 2001, 1, string(sdkShape))) + + runHook(t, pool, 2001) + + var got string + require.NoError(t, pool.QueryRow(context.Background(), + `SELECT playlist_contents::text FROM playlists WHERE playlist_id = 2001`, + ).Scan(&got)) + + var parsed struct { + TrackIDs []map[string]any `json:"track_ids"` + } + require.NoError(t, json.Unmarshal([]byte(got), &parsed)) + require.Len(t, parsed.TrackIDs, 1) + entry := parsed.TrackIDs[0] + assert.Equal(t, float64(28622946), entry["track"], "track must be the decoded integer") + assert.Equal(t, float64(1700000000), entry["time"], "time must be copied from timestamp") + assert.Equal(t, float64(1700000000), entry["metadata_time"], "metadata_time must be copied from timestamp when metadata_timestamp absent") + assert.NotContains(t, entry, "track_id", "SDK-shape track_id must be dropped") + assert.NotContains(t, entry, "timestamp", "SDK-shape timestamp must be dropped") + assert.NotContains(t, entry, "metadata_timestamp", "SDK-shape metadata_timestamp must be dropped") +} + +// TestPlaylistContentsHook_RenamesSDKShape_WithMetadataTimestamp covers +// the Create path where the SDK supplies both timestamp and +// metadata_timestamp. metadata_time must come from metadata_timestamp +// (not timestamp), matching the Python apps precedence. +func TestPlaylistContentsHook_RenamesSDKShape_WithMetadataTimestamp(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_indexer") + + hashid, err := trashid.EncodeHashId(123456) + require.NoError(t, err) + sdkShape := `{"track_ids":[{"track_id":"` + hashid + `","timestamp":1700000000,"metadata_timestamp":1699000000}]}` + database.Seed(pool, seedPlaylist(t, 2002, 1, sdkShape)) + + runHook(t, pool, 2002) + + var got string + require.NoError(t, pool.QueryRow(context.Background(), + `SELECT playlist_contents::text FROM playlists WHERE playlist_id = 2002`, + ).Scan(&got)) + + var parsed struct { + TrackIDs []map[string]any `json:"track_ids"` + } + require.NoError(t, json.Unmarshal([]byte(got), &parsed)) + require.Len(t, parsed.TrackIDs, 1) + entry := parsed.TrackIDs[0] + assert.Equal(t, float64(123456), entry["track"]) + assert.Equal(t, float64(1700000000), entry["time"]) + assert.Equal(t, float64(1699000000), entry["metadata_time"]) +} + +// TestPlaylistContentsHook_NoChangeForCanonicalEntries confirms the hook +// is a no-op for the legacy/canonical entry shape Python apps wrote. We +// must not churn rows for playlists that the cutover doesn't break. +func TestPlaylistContentsHook_NoChangeForCanonicalEntries(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_indexer") + + original := `{"track_ids":[{"track":200,"time":1722451644,"metadata_time":1722451644}]}` + database.Seed(pool, seedPlaylist(t, 2003, 1, original)) + + runHook(t, pool, 2003) + + var got string + require.NoError(t, pool.QueryRow(context.Background(), + `SELECT playlist_contents::text FROM playlists WHERE playlist_id = 2003`, + ).Scan(&got)) + assert.JSONEq(t, original, got) +} + +// TestPlaylistContentsHook_SkipsWhenMetadataLacksPlaylistContents gates the +// hook on the same condition the upstream junction-table updater uses: +// only run when the SDK actually carried playlist_contents. A name-only +// update must not touch the column even if the column has fixable data. +func TestPlaylistContentsHook_SkipsWhenMetadataLacksPlaylistContents(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_indexer") + + hashid, err := trashid.EncodeHashId(28622946) + require.NoError(t, err) + stale := `{"track_ids":[{"track_id":"` + hashid + `","timestamp":1}]}` + database.Seed(pool, seedPlaylist(t, 2004, 1, stale)) + + logger := zap.NewNop() + hook := newPlaylistContentsHook(logger) + err = hook(context.Background(), &em.Params{ + EntityID: 2004, + EntityType: em.EntityTypePlaylist, + Action: em.ActionUpdate, + DBTX: pool, + // playlist_name change without playlist_contents: hook must skip. + Metadata: map[string]any{"playlist_name": "renamed"}, + }) + assert.NoError(t, err) + + var got string + require.NoError(t, pool.QueryRow(context.Background(), + `SELECT playlist_contents::text FROM playlists WHERE playlist_id = 2004`, + ).Scan(&got)) + assert.JSONEq(t, stale, got) +} + +// TestPlaylistContentsHook_MixedEntries proves the hook adapts entries +// individually, leaving canonical ones byte-identical while rewriting +// SDK-shape ones. +func TestPlaylistContentsHook_MixedEntries(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_indexer") + + hashid, err := trashid.EncodeHashId(28622946) + require.NoError(t, err) + mixed := `{"track_ids":[` + + `{"track":200,"time":1,"metadata_time":1},` + + `{"track_id":"` + hashid + `","timestamp":2}` + + `]}` + database.Seed(pool, seedPlaylist(t, 2005, 1, mixed)) + + runHook(t, pool, 2005) + + var got string + require.NoError(t, pool.QueryRow(context.Background(), + `SELECT playlist_contents::text FROM playlists WHERE playlist_id = 2005`, + ).Scan(&got)) + + var parsed struct { + TrackIDs []map[string]any `json:"track_ids"` + } + require.NoError(t, json.Unmarshal([]byte(got), &parsed)) + require.Len(t, parsed.TrackIDs, 2) + + first := parsed.TrackIDs[0] + assert.Equal(t, float64(200), first["track"]) + assert.Equal(t, float64(1), first["time"]) + assert.Equal(t, float64(1), first["metadata_time"]) + + second := parsed.TrackIDs[1] + assert.Equal(t, float64(28622946), second["track"]) + assert.Equal(t, float64(2), second["time"]) + assert.Equal(t, float64(2), second["metadata_time"]) + assert.NotContains(t, second, "track_id") + assert.NotContains(t, second, "timestamp") +} + +// TestPlaylistContentsHook_NumericStringTrackID covers the case where the +// SDK didn't encode the ID but did stringify it. trashid.DecodeHashId +// falls back to strconv.Atoi for pure numeric strings — same behavior +// the upstream junction-table decoder relies on. +func TestPlaylistContentsHook_NumericStringTrackID(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_indexer") + + contents := `{"track_ids":[{"track_id":"12345","timestamp":1}]}` + database.Seed(pool, seedPlaylist(t, 2006, 1, contents)) + + runHook(t, pool, 2006) + + var got string + require.NoError(t, pool.QueryRow(context.Background(), + `SELECT playlist_contents::text FROM playlists WHERE playlist_id = 2006`, + ).Scan(&got)) + + var parsed struct { + TrackIDs []map[string]any `json:"track_ids"` + } + require.NoError(t, json.Unmarshal([]byte(got), &parsed)) + require.Len(t, parsed.TrackIDs, 1) + assert.Equal(t, float64(12345), parsed.TrackIDs[0]["track"]) +} + +// TestPlaylistContentsHook_EmptyContents covers the "removed the last +// track from a playlist" path: JSONB is {"track_ids":[]} and the hook +// must no-op without erroring. +func TestPlaylistContentsHook_EmptyContents(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_indexer") + database.Seed(pool, seedPlaylist(t, 2007, 1, `{"track_ids":[]}`)) + + runHook(t, pool, 2007) + + var got string + require.NoError(t, pool.QueryRow(context.Background(), + `SELECT playlist_contents::text FROM playlists WHERE playlist_id = 2007`, + ).Scan(&got)) + assert.JSONEq(t, `{"track_ids":[]}`, got) +} + +// TestPlaylistContentsHook_UndecodableTrackIDLeftAlone confirms we don't +// drop data on a malformed track_id — we log and leave the entry alone. +// The downstream readers still fail loudly on it, but at least the raw +// metadata the SDK uploaded is preserved. +func TestPlaylistContentsHook_UndecodableTrackIDLeftAlone(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_indexer") + + bad := `{"track_ids":[{"track_id":"not-a-hashid!@#","timestamp":1}]}` + database.Seed(pool, seedPlaylist(t, 2008, 1, bad)) + + runHook(t, pool, 2008) + + var got string + require.NoError(t, pool.QueryRow(context.Background(), + `SELECT playlist_contents::text FROM playlists WHERE playlist_id = 2008`, + ).Scan(&got)) + + var parsed struct { + TrackIDs []map[string]any `json:"track_ids"` + } + require.NoError(t, json.Unmarshal([]byte(got), &parsed)) + require.Len(t, parsed.TrackIDs, 1) + entry := parsed.TrackIDs[0] + // No `track` field added because the value couldn't be decoded. + assert.NotContains(t, entry, "track") + // track_id is still dropped (unconditional), but the entry's intent + // is preserved enough that an operator can inspect it. + assert.NotContains(t, entry, "track_id") +} + +// TestPlaylistContentsHook_HashidOnTrackField covers an alternate SDK +// shape where the entry uses `track` (canonical name) but with a hashid +// string value. Pre-cutover Python apps decoded these; we must too. +func TestPlaylistContentsHook_HashidOnTrackField(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_indexer") + + hashid, err := trashid.EncodeHashId(28622946) + require.NoError(t, err) + contents := `{"track_ids":[{"track":"` + hashid + `","time":1,"metadata_time":1}]}` + database.Seed(pool, seedPlaylist(t, 2009, 1, contents)) + + runHook(t, pool, 2009) + + var got string + require.NoError(t, pool.QueryRow(context.Background(), + `SELECT playlist_contents::text FROM playlists WHERE playlist_id = 2009`, + ).Scan(&got)) + + var parsed struct { + TrackIDs []map[string]any `json:"track_ids"` + } + require.NoError(t, json.Unmarshal([]byte(got), &parsed)) + require.Len(t, parsed.TrackIDs, 1) + assert.Equal(t, float64(28622946), parsed.TrackIDs[0]["track"]) +}