From ae373a7090a924ef6a7b36244d9b2c9818fd3c00 Mon Sep 17 00:00:00 2001 From: safierinx-a Date: Fri, 27 Mar 2026 03:29:33 +0530 Subject: [PATCH] Add bulk import optimization: track_lookup cache, batch inserts, BulkSubmitter --- db/migrations/000006_track_lookup.sql | 15 + db/queries/track_lookup.sql | 21 + internal/catalog/associate_album.go | 49 +- internal/catalog/associate_artists.go | 52 +- internal/catalog/catalog.go | 25 + internal/catalog/lookup_key.go | 9 + internal/db/db.go | 10 + internal/db/opts.go | 19 + internal/db/psql/album.go | 1 + internal/db/psql/artist.go | 1 + internal/db/psql/listen.go | 62 ++ internal/db/psql/merge.go | 6 + internal/db/psql/track.go | 3 + internal/db/psql/track_lookup.go | 52 ++ internal/importer/bulk.go | 157 ++++ internal/importer/lastfm.go | 32 +- internal/importer/listenbrainz.go | 31 +- internal/importer/maloja.go | 36 +- internal/importer/spotify.go | 36 +- internal/repository/track_lookup.sql.go | 82 ++ .../20260325_bulk_import_optimization.md | 722 ++++++++++++++++++ 21 files changed, 1296 insertions(+), 125 deletions(-) create mode 100644 db/migrations/000006_track_lookup.sql create mode 100644 db/queries/track_lookup.sql create mode 100644 internal/catalog/lookup_key.go create mode 100644 internal/db/psql/track_lookup.go create mode 100644 internal/importer/bulk.go create mode 100644 internal/repository/track_lookup.sql.go create mode 100644 thoughts/plans/20260325_bulk_import_optimization.md diff --git a/db/migrations/000006_track_lookup.sql b/db/migrations/000006_track_lookup.sql new file mode 100644 index 0000000..00308ae --- /dev/null +++ b/db/migrations/000006_track_lookup.sql @@ -0,0 +1,15 @@ +-- +goose Up +CREATE TABLE track_lookup ( + lookup_key TEXT NOT NULL PRIMARY KEY, + artist_id INT NOT NULL REFERENCES artists(id) ON DELETE CASCADE, + album_id INT NOT NULL REFERENCES releases(id) ON DELETE CASCADE, + track_id INT NOT NULL REFERENCES tracks(id) ON DELETE CASCADE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_track_lookup_track_id ON track_lookup(track_id); +CREATE INDEX idx_track_lookup_artist_id ON track_lookup(artist_id); +CREATE INDEX idx_track_lookup_album_id ON track_lookup(album_id); + +-- +goose Down +DROP TABLE IF EXISTS track_lookup; diff --git a/db/queries/track_lookup.sql b/db/queries/track_lookup.sql new file mode 100644 index 0000000..ed8bd1c --- /dev/null +++ b/db/queries/track_lookup.sql @@ -0,0 +1,21 @@ +-- name: GetTrackLookup :one +SELECT artist_id, album_id, track_id +FROM track_lookup +WHERE lookup_key = $1; + +-- name: InsertTrackLookup :exec +INSERT INTO track_lookup (lookup_key, artist_id, album_id, track_id) +VALUES ($1, $2, $3, $4) +ON CONFLICT (lookup_key) DO UPDATE SET + artist_id = EXCLUDED.artist_id, + album_id = EXCLUDED.album_id, + track_id = EXCLUDED.track_id; + +-- name: DeleteTrackLookupByArtist :exec +DELETE FROM track_lookup WHERE artist_id = $1; + +-- name: DeleteTrackLookupByAlbum :exec +DELETE FROM track_lookup WHERE album_id = $1; + +-- name: DeleteTrackLookupByTrack :exec +DELETE FROM track_lookup WHERE track_id = $1; diff --git a/internal/catalog/associate_album.go b/internal/catalog/associate_album.go index 3a63c58..6696218 100644 --- a/internal/catalog/associate_album.go +++ b/internal/catalog/associate_album.go @@ -122,17 +122,18 @@ func createOrUpdateAlbumWithMbzReleaseID(ctx context.Context, d db.DB, opts Asso } } - l.Debug().Msg("Searching for album images...") var imgid uuid.UUID - imgUrl, err := images.GetAlbumImage(ctx, images.AlbumImageOpts{ - Artists: utils.UniqueIgnoringCase(slices.Concat(utils.FlattenMbzArtistCreditNames(release.ArtistCredit), utils.FlattenArtistNames(opts.Artists))), - Album: release.Title, - ReleaseMbzID: &opts.ReleaseMbzID, - }) + var imgUrl string + if !opts.SkipCacheImage { + l.Debug().Msg("Searching for album images...") + imgUrl, err = images.GetAlbumImage(ctx, images.AlbumImageOpts{ + Artists: utils.UniqueIgnoringCase(slices.Concat(utils.FlattenMbzArtistCreditNames(release.ArtistCredit), utils.FlattenArtistNames(opts.Artists))), + Album: release.Title, + ReleaseMbzID: &opts.ReleaseMbzID, + }) - if err == nil && imgUrl != "" { - imgid = uuid.New() - if !opts.SkipCacheImage { + if err == nil && imgUrl != "" { + imgid = uuid.New() var size ImageSize if cfg.FullImageCacheEnabled() { size = ImageSizeFull @@ -144,13 +145,11 @@ func createOrUpdateAlbumWithMbzReleaseID(ctx context.Context, d db.DB, opts Asso if err != nil { l.Err(err).Msg("createOrUpdateAlbumWithMbzReleaseID: failed to cache image") } + } else if err != nil { + l.Debug().Msgf("createOrUpdateAlbumWithMbzReleaseID: failed to get album images for %s: %s", release.Title, err.Error()) } } - if err != nil { - l.Debug().Msgf("createOrUpdateAlbumWithMbzReleaseID: failed to get album images for %s: %s", release.Title, err.Error()) - } - album, err = d.SaveAlbum(ctx, db.SaveAlbumOpts{ Title: release.Title, MusicBrainzID: opts.ReleaseMbzID, @@ -217,14 +216,15 @@ func matchAlbumByTitle(ctx context.Context, d db.DB, opts AssociateAlbumOpts) (* return nil, fmt.Errorf("matchAlbumByTitle: %w", err) } else { var imgid uuid.UUID - imgUrl, err := images.GetAlbumImage(ctx, images.AlbumImageOpts{ - Artists: utils.FlattenArtistNames(opts.Artists), - Album: opts.ReleaseName, - ReleaseMbzID: &opts.ReleaseMbzID, - }) - if err == nil && imgUrl != "" { - imgid = uuid.New() - if !opts.SkipCacheImage { + var imgUrl string + if !opts.SkipCacheImage { + imgUrl, err = images.GetAlbumImage(ctx, images.AlbumImageOpts{ + Artists: utils.FlattenArtistNames(opts.Artists), + Album: opts.ReleaseName, + ReleaseMbzID: &opts.ReleaseMbzID, + }) + if err == nil && imgUrl != "" { + imgid = uuid.New() var size ImageSize if cfg.FullImageCacheEnabled() { size = ImageSizeFull @@ -234,13 +234,12 @@ func matchAlbumByTitle(ctx context.Context, d db.DB, opts AssociateAlbumOpts) (* l.Debug().Msg("Downloading album image from source...") err = DownloadAndCacheImage(ctx, imgid, imgUrl, size) if err != nil { - l.Err(err).Msg("createOrUpdateAlbumWithMbzReleaseID: failed to cache image") + l.Err(err).Msg("matchAlbumByTitle: failed to cache image") } + } else if err != nil { + l.Debug().AnErr("error", err).Msgf("matchAlbumByTitle: failed to get album images for %s", opts.ReleaseName) } } - if err != nil { - l.Debug().AnErr("error", err).Msgf("matchAlbumByTitle: failed to get album images for %s", opts.ReleaseName) - } a, err = d.SaveAlbum(ctx, db.SaveAlbumOpts{ Title: releaseName, diff --git a/internal/catalog/associate_artists.go b/internal/catalog/associate_artists.go index 15b91c9..a79b690 100644 --- a/internal/catalog/associate_artists.go +++ b/internal/catalog/associate_artists.go @@ -127,12 +127,14 @@ func matchArtistsByMBIDMappings(ctx context.Context, d db.DB, opts AssociateArti l.Warn().AnErr("error", err).Msg("matchArtistsByMBIDMappings: MusicBrainz unreachable, creating new artist with provided MusicBrainz ID mapping") var imgid uuid.UUID - imgUrl, imgErr := images.GetArtistImage(ctx, images.ArtistImageOpts{ - Aliases: []string{a.Artist}, - }) - if imgErr == nil && imgUrl != "" { - imgid = uuid.New() - if !opts.SkipCacheImage { + var imgUrl string + if !opts.SkipCacheImage { + var imgErr error + imgUrl, imgErr = images.GetArtistImage(ctx, images.ArtistImageOpts{ + Aliases: []string{a.Artist}, + }) + if imgErr == nil && imgUrl != "" { + imgid = uuid.New() var size ImageSize if cfg.FullImageCacheEnabled() { size = ImageSizeFull @@ -144,9 +146,9 @@ func matchArtistsByMBIDMappings(ctx context.Context, d db.DB, opts AssociateArti if err != nil { l.Err(err).Msg("Failed to cache image") } + } else if imgErr != nil { + l.Err(imgErr).Msgf("matchArtistsByMBIDMappings: Failed to get artist image for artist '%s'", a.Artist) } - } else { - l.Err(imgErr).Msgf("matchArtistsByMBIDMappings: Failed to get artist image for artist '%s'", a.Artist) } artist, err = d.SaveArtist(ctx, db.SaveArtistOpts{ @@ -246,12 +248,13 @@ func resolveAliasOrCreateArtist(ctx context.Context, mbzID uuid.UUID, names []st } var imgid uuid.UUID - imgUrl, err := images.GetArtistImage(ctx, images.ArtistImageOpts{ - Aliases: aliases, - }) - if err == nil && imgUrl != "" { - imgid = uuid.New() - if !opts.SkipCacheImage { + var imgUrl string + if !opts.SkipCacheImage { + imgUrl, err = images.GetArtistImage(ctx, images.ArtistImageOpts{ + Aliases: aliases, + }) + if err == nil && imgUrl != "" { + imgid = uuid.New() var size ImageSize if cfg.FullImageCacheEnabled() { size = ImageSizeFull @@ -263,9 +266,9 @@ func resolveAliasOrCreateArtist(ctx context.Context, mbzID uuid.UUID, names []st if err != nil { l.Err(err).Msg("Failed to cache image") } + } else if err != nil { + l.Warn().AnErr("error", err).Msg("Failed to get artist image from ImageSrc") } - } else if err != nil { - l.Warn().AnErr("error", err).Msg("Failed to get artist image from ImageSrc") } u, err := d.SaveArtist(ctx, db.SaveArtistOpts{ @@ -301,12 +304,13 @@ func matchArtistsByNames(ctx context.Context, names []string, existing []*models } if errors.Is(err, pgx.ErrNoRows) { var imgid uuid.UUID - imgUrl, err := images.GetArtistImage(ctx, images.ArtistImageOpts{ - Aliases: []string{name}, - }) - if err == nil && imgUrl != "" { - imgid = uuid.New() - if !opts.SkipCacheImage { + var imgUrl string + if !opts.SkipCacheImage { + imgUrl, err = images.GetArtistImage(ctx, images.ArtistImageOpts{ + Aliases: []string{name}, + }) + if err == nil && imgUrl != "" { + imgid = uuid.New() var size ImageSize if cfg.FullImageCacheEnabled() { size = ImageSizeFull @@ -318,9 +322,9 @@ func matchArtistsByNames(ctx context.Context, names []string, existing []*models if err != nil { l.Err(err).Msg("Failed to cache image") } + } else if err != nil { + l.Debug().AnErr("error", err).Msgf("Failed to get artist images for %s", name) } - } else if err != nil { - l.Debug().AnErr("error", err).Msgf("Failed to get artist images for %s", name) } a, err = d.SaveArtist(ctx, db.SaveArtistOpts{Name: name, Image: imgid, ImageSrc: imgUrl}) if err != nil { diff --git a/internal/catalog/catalog.go b/internal/catalog/catalog.go index e94db03..89ee4e3 100644 --- a/internal/catalog/catalog.go +++ b/internal/catalog/catalog.go @@ -77,6 +77,21 @@ func SubmitListen(ctx context.Context, store db.DB, opts SubmitListenOpts) error // bandaid to ensure new activity does not have sub-second precision opts.Time = opts.Time.Truncate(time.Second) + // Fast path: check lookup cache for known entity combo + if !opts.SkipSaveListen { + key := TrackLookupKey(opts.Artist, opts.TrackTitle, opts.ReleaseTitle) + cached, err := store.GetTrackLookup(ctx, key) + if err == nil && cached != nil { + l.Debug().Msg("Track lookup cache hit — skipping entity resolution") + return store.SaveListen(ctx, db.SaveListenOpts{ + TrackID: cached.TrackID, + Time: opts.Time, + UserID: opts.UserID, + Client: opts.Client, + }) + } + } + artists, err := AssociateArtists( ctx, store, @@ -168,6 +183,16 @@ func SubmitListen(ctx context.Context, store db.DB, opts SubmitListenOpts) error } } + // Populate lookup cache for future fast-path hits + if len(artists) > 0 { + store.SaveTrackLookup(ctx, db.SaveTrackLookupOpts{ + Key: TrackLookupKey(opts.Artist, opts.TrackTitle, opts.ReleaseTitle), + ArtistID: artists[0].ID, + AlbumID: rg.ID, + TrackID: track.ID, + }) + } + if opts.IsNowPlaying { if track.Duration == 0 { memkv.Store.Set(strconv.Itoa(int(opts.UserID)), track.ID) diff --git a/internal/catalog/lookup_key.go b/internal/catalog/lookup_key.go new file mode 100644 index 0000000..4d45d85 --- /dev/null +++ b/internal/catalog/lookup_key.go @@ -0,0 +1,9 @@ +package catalog + +import "strings" + +// TrackLookupKey builds a normalized cache key for entity resolution. +// Uses null-byte separators to avoid collisions between field values. +func TrackLookupKey(artist, track, album string) string { + return strings.ToLower(artist) + "\x00" + strings.ToLower(track) + "\x00" + strings.ToLower(album) +} diff --git a/internal/db/db.go b/internal/db/db.go index 97badac..b832012 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -101,6 +101,16 @@ type DB interface { MergeAlbums(ctx context.Context, fromId, toId int32, replaceImage bool) error MergeArtists(ctx context.Context, fromId, toId int32, replaceImage bool) error + // Track Lookup Cache + + GetTrackLookup(ctx context.Context, key string) (*TrackLookupResult, error) + SaveTrackLookup(ctx context.Context, opts SaveTrackLookupOpts) error + InvalidateTrackLookup(ctx context.Context, opts InvalidateTrackLookupOpts) error + + // Batch + + SaveListensBatch(ctx context.Context, opts []SaveListenOpts) (int64, error) + // Etc ImageHasAssociation(ctx context.Context, image uuid.UUID) (bool, error) diff --git a/internal/db/opts.go b/internal/db/opts.go index cb23bd3..b55dc80 100644 --- a/internal/db/opts.go +++ b/internal/db/opts.go @@ -160,3 +160,22 @@ type GetInterestOpts struct { ArtistID int32 TrackID int32 } + +type TrackLookupResult struct { + ArtistID int32 + AlbumID int32 + TrackID int32 +} + +type SaveTrackLookupOpts struct { + Key string + ArtistID int32 + AlbumID int32 + TrackID int32 +} + +type InvalidateTrackLookupOpts struct { + ArtistID int32 + AlbumID int32 + TrackID int32 +} diff --git a/internal/db/psql/album.go b/internal/db/psql/album.go index 758c287..74b58e7 100644 --- a/internal/db/psql/album.go +++ b/internal/db/psql/album.go @@ -338,6 +338,7 @@ func (d *Psql) SaveAlbumAliases(ctx context.Context, id int32, aliases []string, } func (d *Psql) DeleteAlbum(ctx context.Context, id int32) error { + d.q.DeleteTrackLookupByAlbum(ctx, id) return d.q.DeleteRelease(ctx, id) } func (d *Psql) DeleteAlbumAlias(ctx context.Context, id int32, alias string) error { diff --git a/internal/db/psql/artist.go b/internal/db/psql/artist.go index 859a490..3481702 100644 --- a/internal/db/psql/artist.go +++ b/internal/db/psql/artist.go @@ -119,6 +119,7 @@ func (d *Psql) SaveArtistAliases(ctx context.Context, id int32, aliases []string } func (d *Psql) DeleteArtist(ctx context.Context, id int32) error { + d.q.DeleteTrackLookupByArtist(ctx, id) return d.q.DeleteArtist(ctx, id) } diff --git a/internal/db/psql/listen.go b/internal/db/psql/listen.go index add6b33..2e1799e 100644 --- a/internal/db/psql/listen.go +++ b/internal/db/psql/listen.go @@ -11,6 +11,7 @@ import ( "github.com/gabehf/koito/internal/logger" "github.com/gabehf/koito/internal/models" "github.com/gabehf/koito/internal/repository" + "github.com/jackc/pgx/v5" ) func (d *Psql) GetListensPaginated(ctx context.Context, opts db.GetItemsOpts) (*db.PaginatedResponse[*models.Listen], error) { @@ -197,6 +198,67 @@ func (d *Psql) SaveListen(ctx context.Context, opts db.SaveListenOpts) error { }) } +func (d *Psql) SaveListensBatch(ctx context.Context, opts []db.SaveListenOpts) (int64, error) { + if len(opts) == 0 { + return 0, nil + } + + tx, err := d.conn.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return 0, fmt.Errorf("SaveListensBatch: BeginTx: %w", err) + } + defer tx.Rollback(ctx) + + _, err = tx.Exec(ctx, ` + CREATE TEMP TABLE tmp_listens ( + track_id INT, + listened_at TIMESTAMPTZ, + user_id INT, + client TEXT + ) ON COMMIT DROP + `) + if err != nil { + return 0, fmt.Errorf("SaveListensBatch: create temp table: %w", err) + } + + rows := make([][]interface{}, len(opts)) + for i, o := range opts { + var client interface{} + if o.Client != "" { + client = o.Client + } + t := o.Time + if t.IsZero() { + t = time.Now() + } + rows[i] = []interface{}{o.TrackID, t, o.UserID, client} + } + + _, err = tx.CopyFrom(ctx, + pgx.Identifier{"tmp_listens"}, + []string{"track_id", "listened_at", "user_id", "client"}, + pgx.CopyFromRows(rows), + ) + if err != nil { + return 0, fmt.Errorf("SaveListensBatch: CopyFrom: %w", err) + } + + tag, err := tx.Exec(ctx, ` + INSERT INTO listens (track_id, listened_at, user_id, client) + SELECT track_id, listened_at, user_id, client FROM tmp_listens + ON CONFLICT DO NOTHING + `) + if err != nil { + return 0, fmt.Errorf("SaveListensBatch: insert: %w", err) + } + + if err := tx.Commit(ctx); err != nil { + return 0, fmt.Errorf("SaveListensBatch: Commit: %w", err) + } + + return tag.RowsAffected(), nil +} + func (d *Psql) DeleteListen(ctx context.Context, trackId int32, listenedAt time.Time) error { l := logger.FromContext(ctx) if trackId == 0 { diff --git a/internal/db/psql/merge.go b/internal/db/psql/merge.go index dd375c5..141c8e8 100644 --- a/internal/db/psql/merge.go +++ b/internal/db/psql/merge.go @@ -12,6 +12,8 @@ import ( func (d *Psql) MergeTracks(ctx context.Context, fromId, toId int32) error { l := logger.FromContext(ctx) l.Info().Msgf("Merging track %d into track %d", fromId, toId) + d.q.DeleteTrackLookupByTrack(ctx, fromId) + d.q.DeleteTrackLookupByTrack(ctx, toId) tx, err := d.conn.BeginTx(ctx, pgx.TxOptions{}) if err != nil { l.Err(err).Msg("Failed to begin transaction") @@ -61,6 +63,8 @@ func (d *Psql) MergeTracks(ctx context.Context, fromId, toId int32) error { func (d *Psql) MergeAlbums(ctx context.Context, fromId, toId int32, replaceImage bool) error { l := logger.FromContext(ctx) l.Info().Msgf("Merging album %d into album %d", fromId, toId) + d.q.DeleteTrackLookupByAlbum(ctx, fromId) + d.q.DeleteTrackLookupByAlbum(ctx, toId) tx, err := d.conn.BeginTx(ctx, pgx.TxOptions{}) if err != nil { l.Err(err).Msg("Failed to begin transaction") @@ -117,6 +121,8 @@ func (d *Psql) MergeAlbums(ctx context.Context, fromId, toId int32, replaceImage func (d *Psql) MergeArtists(ctx context.Context, fromId, toId int32, replaceImage bool) error { l := logger.FromContext(ctx) l.Info().Msgf("Merging artist %d into artist %d", fromId, toId) + d.q.DeleteTrackLookupByArtist(ctx, fromId) + d.q.DeleteTrackLookupByArtist(ctx, toId) tx, err := d.conn.BeginTx(ctx, pgx.TxOptions{}) if err != nil { l.Err(err).Msg("Failed to begin transaction") diff --git a/internal/db/psql/track.go b/internal/db/psql/track.go index d4cc616..1ec22ca 100644 --- a/internal/db/psql/track.go +++ b/internal/db/psql/track.go @@ -241,6 +241,9 @@ func (d *Psql) SaveTrackAliases(ctx context.Context, id int32, aliases []string, func (d *Psql) DeleteTrack(ctx context.Context, id int32) error { l := logger.FromContext(ctx) + + d.q.DeleteTrackLookupByTrack(ctx, id) + tx, err := d.conn.BeginTx(ctx, pgx.TxOptions{}) if err != nil { l.Err(err).Msg("Failed to begin transaction") diff --git a/internal/db/psql/track_lookup.go b/internal/db/psql/track_lookup.go new file mode 100644 index 0000000..5e371fb --- /dev/null +++ b/internal/db/psql/track_lookup.go @@ -0,0 +1,52 @@ +package psql + +import ( + "context" + + "github.com/gabehf/koito/internal/db" + "github.com/gabehf/koito/internal/repository" + "github.com/jackc/pgx/v5" +) + +func (d *Psql) GetTrackLookup(ctx context.Context, key string) (*db.TrackLookupResult, error) { + row, err := d.q.GetTrackLookup(ctx, key) + if err != nil { + if err == pgx.ErrNoRows { + return nil, err + } + return nil, err + } + return &db.TrackLookupResult{ + ArtistID: row.ArtistID, + AlbumID: row.AlbumID, + TrackID: row.TrackID, + }, nil +} + +func (d *Psql) SaveTrackLookup(ctx context.Context, opts db.SaveTrackLookupOpts) error { + return d.q.InsertTrackLookup(ctx, repository.InsertTrackLookupParams{ + LookupKey: opts.Key, + ArtistID: opts.ArtistID, + AlbumID: opts.AlbumID, + TrackID: opts.TrackID, + }) +} + +func (d *Psql) InvalidateTrackLookup(ctx context.Context, opts db.InvalidateTrackLookupOpts) error { + if opts.ArtistID != 0 { + if err := d.q.DeleteTrackLookupByArtist(ctx, opts.ArtistID); err != nil { + return err + } + } + if opts.AlbumID != 0 { + if err := d.q.DeleteTrackLookupByAlbum(ctx, opts.AlbumID); err != nil { + return err + } + } + if opts.TrackID != 0 { + if err := d.q.DeleteTrackLookupByTrack(ctx, opts.TrackID); err != nil { + return err + } + } + return nil +} diff --git a/internal/importer/bulk.go b/internal/importer/bulk.go new file mode 100644 index 0000000..9ad99f4 --- /dev/null +++ b/internal/importer/bulk.go @@ -0,0 +1,157 @@ +package importer + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/gabehf/koito/internal/catalog" + "github.com/gabehf/koito/internal/db" + "github.com/gabehf/koito/internal/logger" + "github.com/gabehf/koito/internal/mbz" +) + +// BulkSubmitter is a reusable import accelerator. It pre-deduplicates scrobbles +// in memory, resolves entities via the track_lookup cache (falling back to +// SubmitListen on cache miss with a worker pool for parallelism), and batch-inserts +// listens via SaveListensBatch. +type BulkSubmitter struct { + store db.DB + mbzc mbz.MusicBrainzCaller + ctx context.Context + buffer []catalog.SubmitListenOpts + workers int +} + +type BulkSubmitterOpts struct { + Store db.DB + Mbzc mbz.MusicBrainzCaller + Workers int // default 4 +} + +func NewBulkSubmitter(ctx context.Context, opts BulkSubmitterOpts) *BulkSubmitter { + workers := opts.Workers + if workers <= 0 { + workers = 4 + } + return &BulkSubmitter{ + store: opts.Store, + mbzc: opts.Mbzc, + ctx: ctx, + workers: workers, + } +} + +// Accept buffers a scrobble for later batch processing. +func (bs *BulkSubmitter) Accept(opts catalog.SubmitListenOpts) { + bs.buffer = append(bs.buffer, opts) +} + +// Flush processes all buffered scrobbles: deduplicates, resolves entities, and batch-inserts listens. +// Returns the number of listens successfully inserted. +func (bs *BulkSubmitter) Flush() (int, error) { + l := logger.FromContext(bs.ctx) + if len(bs.buffer) == 0 { + return 0, nil + } + + l.Info().Msgf("BulkSubmitter: Processing %d scrobbles", len(bs.buffer)) + + // Phase A: Deduplicate — find unique (artist, track, album) tuples + unique := make(map[string]catalog.SubmitListenOpts) + for _, opts := range bs.buffer { + key := catalog.TrackLookupKey(opts.Artist, opts.TrackTitle, opts.ReleaseTitle) + if _, exists := unique[key]; !exists { + unique[key] = opts + } + } + l.Info().Msgf("BulkSubmitter: %d unique entity combos from %d scrobbles", len(unique), len(bs.buffer)) + + // Phase B: Resolve entities — check cache, create on miss + resolved := make(map[string]int32) // key → trackID + var mu sync.Mutex + var wg sync.WaitGroup + sem := make(chan struct{}, bs.workers) + cacheHits := 0 + + for key, opts := range unique { + // Check track_lookup cache first + cached, err := bs.store.GetTrackLookup(bs.ctx, key) + if err == nil && cached != nil { + mu.Lock() + resolved[key] = cached.TrackID + cacheHits++ + mu.Unlock() + continue + } + + // Cache miss — create entities via SubmitListen (with worker pool) + wg.Add(1) + sem <- struct{}{} // acquire worker slot + go func(k string, o catalog.SubmitListenOpts) { + defer wg.Done() + defer func() { <-sem }() // release worker slot + + o.SkipSaveListen = true + o.SkipCacheImage = true + err := catalog.SubmitListen(bs.ctx, bs.store, o) + if err != nil { + l.Err(err).Msgf("BulkSubmitter: Failed to create entities for '%s' by '%s'", o.TrackTitle, o.Artist) + return + } + + // Re-check cache (SubmitListen populates it via Phase 1's integration) + cached, err := bs.store.GetTrackLookup(bs.ctx, k) + if err == nil && cached != nil { + mu.Lock() + resolved[k] = cached.TrackID + mu.Unlock() + } + }(key, opts) + } + wg.Wait() + + l.Info().Msgf("BulkSubmitter: Resolved %d/%d entity combos (%d cache hits)", + len(resolved), len(unique), cacheHits) + + // Phase C: Build listen batch + batch := make([]db.SaveListenOpts, 0, len(bs.buffer)) + skipped := 0 + for _, opts := range bs.buffer { + key := catalog.TrackLookupKey(opts.Artist, opts.TrackTitle, opts.ReleaseTitle) + trackID, ok := resolved[key] + if !ok { + skipped++ + continue + } + batch = append(batch, db.SaveListenOpts{ + TrackID: trackID, + Time: opts.Time.Truncate(time.Second), + UserID: opts.UserID, + Client: opts.Client, + }) + } + if skipped > 0 { + l.Warn().Msgf("BulkSubmitter: Skipped %d scrobbles with unresolved entities", skipped) + } + + // Phase D: Batch insert listens (in chunks to avoid huge transactions) + const chunkSize = 5000 + var totalInserted int64 + for i := 0; i < len(batch); i += chunkSize { + end := i + chunkSize + if end > len(batch) { + end = len(batch) + } + inserted, err := bs.store.SaveListensBatch(bs.ctx, batch[i:end]) + if err != nil { + return int(totalInserted), fmt.Errorf("BulkSubmitter: SaveListensBatch: %w", err) + } + totalInserted += inserted + } + + l.Info().Msgf("BulkSubmitter: Inserted %d listens (%d duplicates skipped)", + totalInserted, int64(len(batch))-totalInserted) + return int(totalInserted), nil +} diff --git a/internal/importer/lastfm.go b/internal/importer/lastfm.go index 763d7fa..47fcc8b 100644 --- a/internal/importer/lastfm.go +++ b/internal/importer/lastfm.go @@ -50,18 +50,17 @@ func ImportLastFMFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall return fmt.Errorf("ImportLastFMFile: %w", err) } defer file.Close() - var throttleFunc = func() {} - if ms := cfg.ThrottleImportMs(); ms > 0 { - throttleFunc = func() { - time.Sleep(time.Duration(ms) * time.Millisecond) - } - } export := make([]LastFMExportPage, 0) err = json.NewDecoder(file).Decode(&export) if err != nil { return fmt.Errorf("ImportLastFMFile: %w", err) } - count := 0 + + bs := NewBulkSubmitter(ctx, BulkSubmitterOpts{ + Store: store, + Mbzc: mbzc, + }) + for _, item := range export { for _, track := range item.Track { album := track.Album.Text @@ -96,7 +95,6 @@ func ImportLastFMFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall ts = time.Unix(unix, 0).UTC() } if !inImportTimeWindow(ts) { - l.Debug().Msgf("Skipping import due to import time rules") continue } @@ -105,7 +103,7 @@ func ImportLastFMFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall artistMbidMap = append(artistMbidMap, catalog.ArtistMbidMap{Artist: track.Artist.Text, Mbid: artistMbzID}) } - opts := catalog.SubmitListenOpts{ + bs.Accept(catalog.SubmitListenOpts{ MbzCaller: mbzc, Artist: track.Artist.Text, ArtistNames: []string{track.Artist.Text}, @@ -118,16 +116,14 @@ func ImportLastFMFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall Client: "lastfm", Time: ts, UserID: 1, - SkipCacheImage: !cfg.FetchImagesDuringImport(), - } - err = catalog.SubmitListen(ctx, store, opts) - if err != nil { - l.Err(err).Msg("Failed to import LastFM playback item") - return fmt.Errorf("ImportLastFMFile: %w", err) - } - count++ - throttleFunc() + SkipCacheImage: true, + }) } } + + count, err := bs.Flush() + if err != nil { + return fmt.Errorf("ImportLastFMFile: %w", err) + } return finishImport(ctx, filename, count) } diff --git a/internal/importer/listenbrainz.go b/internal/importer/listenbrainz.go index 7c1a8bb..2885179 100644 --- a/internal/importer/listenbrainz.go +++ b/internal/importer/listenbrainz.go @@ -63,13 +63,11 @@ func ImportListenBrainzFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrai scanner := bufio.NewScanner(r) - var throttleFunc = func() {} - if ms := cfg.ThrottleImportMs(); ms > 0 { - throttleFunc = func() { - time.Sleep(time.Duration(ms) * time.Millisecond) - } - } - count := 0 + bs := NewBulkSubmitter(ctx, BulkSubmitterOpts{ + Store: store, + Mbzc: mbzc, + }) + for scanner.Scan() { line := scanner.Bytes() payload := new(handlers.LbzSubmitListenPayload) @@ -80,7 +78,6 @@ func ImportListenBrainzFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrai } ts := time.Unix(payload.ListenedAt, 0) if !inImportTimeWindow(ts) { - l.Debug().Msgf("Skipping import due to import time rules") continue } artistMbzIDs, err := utils.ParseUUIDSlice(payload.TrackMeta.AdditionalInfo.ArtistMBIDs) @@ -139,7 +136,7 @@ func ImportListenBrainzFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrai artistMbidMap = append(artistMbidMap, catalog.ArtistMbidMap{Artist: a.ArtistName, Mbid: mbid}) } - opts := catalog.SubmitListenOpts{ + bs.Accept(catalog.SubmitListenOpts{ MbzCaller: mbzc, ArtistNames: payload.TrackMeta.AdditionalInfo.ArtistNames, Artist: payload.TrackMeta.ArtistName, @@ -154,15 +151,13 @@ func ImportListenBrainzFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrai Time: ts, UserID: 1, Client: client, - SkipCacheImage: !cfg.FetchImagesDuringImport(), - } - err = catalog.SubmitListen(ctx, store, opts) - if err != nil { - l.Err(err).Msg("Failed to import LastFM playback item") - return fmt.Errorf("ImportListenBrainzFile: %w", err) - } - count++ - throttleFunc() + SkipCacheImage: true, + }) + } + + count, err := bs.Flush() + if err != nil { + return fmt.Errorf("ImportListenBrainzFile: %w", err) } l.Info().Msgf("Finished importing %s; imported %d items", filename, count) return nil diff --git a/internal/importer/maloja.go b/internal/importer/maloja.go index 8d7c041..7576871 100644 --- a/internal/importer/maloja.go +++ b/internal/importer/maloja.go @@ -41,21 +41,19 @@ func ImportMalojaFile(ctx context.Context, store db.DB, filename string) error { return fmt.Errorf("ImportMalojaFile: %w", err) } defer file.Close() - var throttleFunc = func() {} - if ms := cfg.ThrottleImportMs(); ms > 0 { - throttleFunc = func() { - time.Sleep(time.Duration(ms) * time.Millisecond) - } - } export := new(MalojaExport) err = json.NewDecoder(file).Decode(&export) if err != nil { return fmt.Errorf("ImportMalojaFile: %w", err) } + + bs := NewBulkSubmitter(ctx, BulkSubmitterOpts{ + Store: store, + Mbzc: &mbz.MbzErrorCaller{}, + }) + for _, item := range export.Scrobbles { martists := make([]string, 0) - // Maloja has a tendency to have the the artist order ['feature', 'main \u2022 feature'], so - // here we try to turn that artist array into ['main', 'feature'] item.Track.Artists = utils.MoveFirstMatchToFront(item.Track.Artists, " \u2022 ") for _, an := range item.Track.Artists { ans := strings.Split(an, " \u2022 ") @@ -68,11 +66,10 @@ func ImportMalojaFile(ctx context.Context, store db.DB, filename string) error { } ts := time.Unix(item.Time, 0) if !inImportTimeWindow(ts) { - l.Debug().Msgf("Skipping import due to import time rules") continue } - opts := catalog.SubmitListenOpts{ - MbzCaller: &mbz.MusicBrainzClient{}, + bs.Accept(catalog.SubmitListenOpts{ + MbzCaller: &mbz.MbzErrorCaller{}, Artist: item.Track.Artists[0], ArtistNames: artists, TrackTitle: item.Track.Title, @@ -80,14 +77,13 @@ func ImportMalojaFile(ctx context.Context, store db.DB, filename string) error { Time: ts.Local(), Client: "maloja", UserID: 1, - SkipCacheImage: !cfg.FetchImagesDuringImport(), - } - err = catalog.SubmitListen(ctx, store, opts) - if err != nil { - l.Err(err).Msg("Failed to import maloja playback item") - return fmt.Errorf("ImportMalojaFile: %w", err) - } - throttleFunc() + SkipCacheImage: true, + }) } - return finishImport(ctx, filename, len(export.Scrobbles)) + + count, err := bs.Flush() + if err != nil { + return fmt.Errorf("ImportMalojaFile: %w", err) + } + return finishImport(ctx, filename, count) } diff --git a/internal/importer/spotify.go b/internal/importer/spotify.go index 5594fc2..fd22f4d 100644 --- a/internal/importer/spotify.go +++ b/internal/importer/spotify.go @@ -33,48 +33,44 @@ func ImportSpotifyFile(ctx context.Context, store db.DB, filename string) error return fmt.Errorf("ImportSpotifyFile: %w", err) } defer file.Close() - var throttleFunc = func() {} - if ms := cfg.ThrottleImportMs(); ms > 0 { - throttleFunc = func() { - time.Sleep(time.Duration(ms) * time.Millisecond) - } - } export := make([]SpotifyExportItem, 0) err = json.NewDecoder(file).Decode(&export) if err != nil { return fmt.Errorf("ImportSpotifyFile: %w", err) } + bs := NewBulkSubmitter(ctx, BulkSubmitterOpts{ + Store: store, + Mbzc: &mbz.MbzErrorCaller{}, + }) + for _, item := range export { if item.ReasonEnd != "trackdone" { continue } if !inImportTimeWindow(item.Timestamp) { - l.Debug().Msgf("Skipping import due to import time rules") continue } - dur := item.MsPlayed if item.TrackName == "" || item.ArtistName == "" { l.Debug().Msg("Skipping non-track item") continue } - opts := catalog.SubmitListenOpts{ - MbzCaller: &mbz.MusicBrainzClient{}, + bs.Accept(catalog.SubmitListenOpts{ + MbzCaller: &mbz.MbzErrorCaller{}, Artist: item.ArtistName, TrackTitle: item.TrackName, ReleaseTitle: item.AlbumName, - Duration: dur / 1000, + Duration: item.MsPlayed / 1000, Time: item.Timestamp, Client: "spotify", UserID: 1, - SkipCacheImage: !cfg.FetchImagesDuringImport(), - } - err = catalog.SubmitListen(ctx, store, opts) - if err != nil { - l.Err(err).Msg("Failed to import spotify playback item") - return fmt.Errorf("ImportSpotifyFile: %w", err) - } - throttleFunc() + SkipCacheImage: true, + }) } - return finishImport(ctx, filename, len(export)) + + count, err := bs.Flush() + if err != nil { + return fmt.Errorf("ImportSpotifyFile: %w", err) + } + return finishImport(ctx, filename, count) } diff --git a/internal/repository/track_lookup.sql.go b/internal/repository/track_lookup.sql.go new file mode 100644 index 0000000..3d1478b --- /dev/null +++ b/internal/repository/track_lookup.sql.go @@ -0,0 +1,82 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.28.0 +// source: track_lookup.sql + +package repository + +import ( + "context" +) + +const deleteTrackLookupByAlbum = `-- name: DeleteTrackLookupByAlbum :exec +DELETE FROM track_lookup WHERE album_id = $1 +` + +func (q *Queries) DeleteTrackLookupByAlbum(ctx context.Context, albumID int32) error { + _, err := q.db.Exec(ctx, deleteTrackLookupByAlbum, albumID) + return err +} + +const deleteTrackLookupByArtist = `-- name: DeleteTrackLookupByArtist :exec +DELETE FROM track_lookup WHERE artist_id = $1 +` + +func (q *Queries) DeleteTrackLookupByArtist(ctx context.Context, artistID int32) error { + _, err := q.db.Exec(ctx, deleteTrackLookupByArtist, artistID) + return err +} + +const deleteTrackLookupByTrack = `-- name: DeleteTrackLookupByTrack :exec +DELETE FROM track_lookup WHERE track_id = $1 +` + +func (q *Queries) DeleteTrackLookupByTrack(ctx context.Context, trackID int32) error { + _, err := q.db.Exec(ctx, deleteTrackLookupByTrack, trackID) + return err +} + +const getTrackLookup = `-- name: GetTrackLookup :one +SELECT artist_id, album_id, track_id +FROM track_lookup +WHERE lookup_key = $1 +` + +type GetTrackLookupRow struct { + ArtistID int32 + AlbumID int32 + TrackID int32 +} + +func (q *Queries) GetTrackLookup(ctx context.Context, lookupKey string) (GetTrackLookupRow, error) { + row := q.db.QueryRow(ctx, getTrackLookup, lookupKey) + var i GetTrackLookupRow + err := row.Scan(&i.ArtistID, &i.AlbumID, &i.TrackID) + return i, err +} + +const insertTrackLookup = `-- name: InsertTrackLookup :exec +INSERT INTO track_lookup (lookup_key, artist_id, album_id, track_id) +VALUES ($1, $2, $3, $4) +ON CONFLICT (lookup_key) DO UPDATE SET + artist_id = EXCLUDED.artist_id, + album_id = EXCLUDED.album_id, + track_id = EXCLUDED.track_id +` + +type InsertTrackLookupParams struct { + LookupKey string + ArtistID int32 + AlbumID int32 + TrackID int32 +} + +func (q *Queries) InsertTrackLookup(ctx context.Context, arg InsertTrackLookupParams) error { + _, err := q.db.Exec(ctx, insertTrackLookup, + arg.LookupKey, + arg.ArtistID, + arg.AlbumID, + arg.TrackID, + ) + return err +} diff --git a/thoughts/plans/20260325_bulk_import_optimization.md b/thoughts/plans/20260325_bulk_import_optimization.md new file mode 100644 index 0000000..d3a84fb --- /dev/null +++ b/thoughts/plans/20260325_bulk_import_optimization.md @@ -0,0 +1,722 @@ +# Bulk Import Optimization — Implementation Plan + +## Overview + +Optimize Koito's import pipeline from ~20 listens/min to thousands/min by adopting ListenBrainz-inspired patterns: a persistent entity lookup cache, batch DB writes, pre-deduplication, and deferred enrichment. The core insight from the ecosystem research is **write raw first, enrich async** — and the persistent lookup table benefits all scrobbles (live + import), not just bulk imports. + +## Current State Analysis + +### The Problem + +Importing 49,050 Maloja scrobbles takes ~24 hours. The import is stable (our PR 1 fixes eliminated panics) but each scrobble runs through the full `SubmitListen` path: + +- **`GetArtist`** issues 6 DB queries per lookup (including rank computation via window functions) +- **`GetAlbum`** issues 6 DB queries per lookup +- **`GetTrack`** issues 5-6 DB queries per lookup +- **`GetArtistImage` / `GetAlbumImage`** makes HTTP calls even when all image providers are disabled +- **`SaveListen`** is a single INSERT — the only fast part + +Per unique scrobble: ~18 DB round-trips + 2 image lookups. Per repeated scrobble: ~18 DB round-trips (no caching). With 5,589 unique artists, 2,628 unique albums, and 49,050 total scrobbles, this is massively redundant. + +### Key Discoveries + +- `SubmitListenOpts.SkipSaveListen` (`catalog.go:43`) can be used to create entities without recording a listen — useful for entity pre-creation +- `SubmitListenOpts.SkipCacheImage` (`catalog.go:46`) controls image download but NOT image URL resolution — the HTTP calls still happen +- The Koito native importer (`importer/koito.go`) already bypasses `SubmitListen` and does direct DB calls — a precedent for a faster import path +- `pgxpool.Pool` is goroutine-safe — concurrent DB operations are safe at the pool level +- `SaveListen` SQL uses `ON CONFLICT DO NOTHING` — re-importing is idempotent +- No batch insert methods exist anywhere in the codebase +- `GetArtist`/`GetAlbum`/`GetTrack` compute full stats (listen count, time listened, rank) on every call — unnecessary during import + +### Ecosystem Patterns (from research) + +- **ListenBrainz**: Stores raw scrobbles immediately, resolves MBIDs asynchronously via background worker + Typesense index. Uses MessyBrainz as a stable `(artist, track, release) → ID` mapping. +- **Maloja**: Runs every import through the full normalize → dedup → cache-invalidate cycle. Works for live scrobbles, kills bulk import. **This is exactly Koito's current problem.** +- **Last.fm**: Resolves metadata at write time (corrections), batches up to 50 scrobbles per request. +- **General**: DB-level dedup via unique constraint + `ON CONFLICT` is the industry standard. + +## Desired End State + +1. A `track_lookup` table provides O(1) entity resolution for any `(artist, track, album)` tuple — both live and import scrobbles benefit +2. All 5 importers use a shared `BulkSubmitter` that pre-deduplicates, creates entities in parallel, and batch-inserts listens +3. Image/MBZ enrichment is fully deferred to existing background tasks during import +4. 49k Maloja import completes in **under 30 minutes** (vs 24 hours currently) +5. Live scrobbles are faster too — cache hit skips 18 DB queries, goes straight to 1 SELECT + 1 INSERT + +### Verification + +- `go build ./...` compiles +- `go test ./...` passes (existing + new tests) +- Manual: import 49k Maloja scrobbles in under 30 minutes on vo-pc +- Manual: verify live scrobbles from multi-scrobbler still work correctly +- Manual: verify album art appears after background image backfill runs + +## What We're NOT Doing + +- **Replacing the DB engine** (no TimescaleDB, no Redis) — Postgres is fine for self-hosted scale +- **Local MusicBrainz mirror or Typesense index** — overkill for single-user; the live API + background enrichment is sufficient +- **Changing the live `SubmitListen` API path** — the lookup cache makes it faster, but the logic stays the same +- **Parallelizing live scrobbles** — only imports use the worker pool; live scrobbles remain single-threaded through `SubmitListen` +- **Changing the ListenBrainz/Last.fm relay** — multi-scrobbler handles that independently + +## Implementation Approach + +Adopt ListenBrainz's "MessyBrainz" pattern as a persistent Postgres table: a normalized `(artist, track, album)` tuple maps to resolved `(artist_id, album_id, track_id)`. This is the foundational optimization — everything else builds on it. + +``` +Before (per scrobble): + GetArtist (6 queries) → GetAlbum (6 queries) → GetTrack (6 queries) → SaveListen (1 query) + = 19 queries minimum + +After (cache hit): + SELECT FROM track_lookup (1 query) → SaveListen (1 query) + = 2 queries + +After (bulk import, cache hit): + In-memory map lookup (0 queries) → batched SaveListen + = amortized ~0.01 queries per scrobble +``` + +--- + +## Phase 1: `track_lookup` Cache Table + +### Overview + +Add a persistent lookup table that maps normalized `(artist_name, track_title, release_title)` to resolved entity IDs. Integrate into `SubmitListen` so both live and import scrobbles benefit. + +### Changes Required + +#### 1. New Migration + +**File**: `db/migrations/000006_track_lookup.sql` + +```sql +-- +goose Up +CREATE TABLE track_lookup ( + lookup_key TEXT NOT NULL PRIMARY KEY, + artist_id INT NOT NULL REFERENCES artists(id) ON DELETE CASCADE, + album_id INT NOT NULL REFERENCES releases(id) ON DELETE CASCADE, + track_id INT NOT NULL REFERENCES tracks(id) ON DELETE CASCADE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_track_lookup_track_id ON track_lookup(track_id); +CREATE INDEX idx_track_lookup_artist_id ON track_lookup(artist_id); +CREATE INDEX idx_track_lookup_album_id ON track_lookup(album_id); + +-- +goose Down +DROP TABLE IF EXISTS track_lookup; +``` + +The `lookup_key` is a normalized string: `lower(artist) || '\x00' || lower(track) || '\x00' || lower(album)`. Using a single TEXT key with a null-byte separator is simpler and faster than a multi-column composite key with `citext`. + +#### 2. New SQL Queries + +**File**: `db/queries/track_lookup.sql` + +```sql +-- name: GetTrackLookup :one +SELECT artist_id, album_id, track_id +FROM track_lookup +WHERE lookup_key = $1; + +-- name: InsertTrackLookup :exec +INSERT INTO track_lookup (lookup_key, artist_id, album_id, track_id) +VALUES ($1, $2, $3, $4) +ON CONFLICT (lookup_key) DO UPDATE SET + artist_id = EXCLUDED.artist_id, + album_id = EXCLUDED.album_id, + track_id = EXCLUDED.track_id; + +-- name: DeleteTrackLookupByArtist :exec +DELETE FROM track_lookup WHERE artist_id = $1; + +-- name: DeleteTrackLookupByAlbum :exec +DELETE FROM track_lookup WHERE album_id = $1; + +-- name: DeleteTrackLookupByTrack :exec +DELETE FROM track_lookup WHERE track_id = $1; +``` + +#### 3. Regenerate sqlc + +Run `sqlc generate` to create the Go bindings in `internal/repository/`. + +#### 4. DB Interface + Psql Implementation + +**File**: `internal/db/db.go` — Add to interface: + +```go +// Track Lookup Cache +GetTrackLookup(ctx context.Context, key string) (*TrackLookupResult, error) +SaveTrackLookup(ctx context.Context, opts SaveTrackLookupOpts) error +InvalidateTrackLookup(ctx context.Context, opts InvalidateTrackLookupOpts) error +``` + +**File**: `internal/db/opts.go` — Add types: + +```go +type TrackLookupResult struct { + ArtistID int32 + AlbumID int32 + TrackID int32 +} + +type SaveTrackLookupOpts struct { + Key string + ArtistID int32 + AlbumID int32 + TrackID int32 +} + +type InvalidateTrackLookupOpts struct { + ArtistID int32 + AlbumID int32 + TrackID int32 +} +``` + +**File**: `internal/db/psql/track_lookup.go` — New file implementing the three methods. + +#### 5. Lookup Key Helper + +**File**: `internal/catalog/lookup_key.go` — New file: + +```go +package catalog + +import "strings" + +// TrackLookupKey builds a normalized cache key for entity resolution. +func TrackLookupKey(artist, track, album string) string { + return strings.ToLower(artist) + "\x00" + strings.ToLower(track) + "\x00" + strings.ToLower(album) +} +``` + +#### 6. Integrate into SubmitListen + +**File**: `internal/catalog/catalog.go` — Add fast path at the top of `SubmitListen`: + +```go +func SubmitListen(ctx context.Context, store db.DB, opts SubmitListenOpts) error { + l := logger.FromContext(ctx) + + if opts.Artist == "" || opts.TrackTitle == "" { + return errors.New("track name and artist are required") + } + + opts.Time = opts.Time.Truncate(time.Second) + + // Fast path: check lookup cache for known entity combo + if !opts.SkipSaveListen { + key := TrackLookupKey(opts.Artist, opts.TrackTitle, opts.ReleaseTitle) + cached, err := store.GetTrackLookup(ctx, key) + if err == nil && cached != nil { + l.Debug().Msg("Track lookup cache hit — skipping entity resolution") + return store.SaveListen(ctx, db.SaveListenOpts{ + TrackID: cached.TrackID, + Time: opts.Time, + UserID: opts.UserID, + Client: opts.Client, + }) + } + } + + // ... existing SubmitListen logic (unchanged) ... + + // After successful entity resolution, populate the cache + store.SaveTrackLookup(ctx, db.SaveTrackLookupOpts{ + Key: TrackLookupKey(opts.Artist, opts.TrackTitle, opts.ReleaseTitle), + ArtistID: artists[0].ID, + AlbumID: rg.ID, + TrackID: track.ID, + }) + + // ... rest of existing logic ... +} +``` + +Note: The cache only applies when we have a direct artist+track+album text match. Scrobbles with MBZ IDs that resolve to different text representations will still go through the full path on first encounter, then be cached. + +#### 7. Invalidation on Merge/Delete + +**File**: `internal/db/psql/artist.go` — In `DeleteArtist` and `MergeArtists`, add: +```go +d.q.DeleteTrackLookupByArtist(ctx, id) +``` + +**File**: `internal/db/psql/album.go` — In `DeleteAlbum` and `MergeAlbums`, add: +```go +d.q.DeleteTrackLookupByAlbum(ctx, id) +``` + +**File**: `internal/db/psql/track.go` — In `DeleteTrack` and `MergeTracks`, add: +```go +d.q.DeleteTrackLookupByTrack(ctx, id) +``` + +### Success Criteria + +- [ ] `go build ./...` compiles +- [ ] `go test ./...` passes +- [ ] New test: `TestSubmitListen_LookupCacheHit` — second identical scrobble uses cache +- [ ] New test: `TestSubmitListen_LookupCacheInvalidateOnDelete` — deleting entity clears cache +- [ ] Migration applies cleanly on fresh and existing databases +- [ ] Live scrobbles from multi-scrobbler populate the cache on first hit, use it on second + +--- + +## Phase 2: `SaveListensBatch` DB Method + +### Overview + +Add a batch listen insert method using pgx's `CopyFrom` for high-throughput listen insertion. This is the DB foundation for the BulkSubmitter. + +### Changes Required + +#### 1. New SQL + DB Interface + +**File**: `internal/db/db.go` — Add to interface: + +```go +SaveListensBatch(ctx context.Context, opts []SaveListenOpts) (int64, error) +``` + +Returns the number of rows actually inserted (excluding `ON CONFLICT` duplicates). + +#### 2. Psql Implementation + +**File**: `internal/db/psql/listen.go` — New method: + +```go +func (d *Psql) SaveListensBatch(ctx context.Context, opts []db.SaveListenOpts) (int64, error) { + if len(opts) == 0 { + return 0, nil + } + + // Use a transaction with a temp table + INSERT ... ON CONFLICT pattern + // since CopyFrom doesn't support ON CONFLICT directly + tx, err := d.conn.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return 0, fmt.Errorf("SaveListensBatch: BeginTx: %w", err) + } + defer tx.Rollback(ctx) + + // Create temp table + _, err = tx.Exec(ctx, ` + CREATE TEMP TABLE tmp_listens ( + track_id INT, + listened_at TIMESTAMPTZ, + user_id INT, + client TEXT + ) ON COMMIT DROP + `) + if err != nil { + return 0, fmt.Errorf("SaveListensBatch: create temp table: %w", err) + } + + // CopyFrom into temp table + rows := make([][]interface{}, len(opts)) + for i, o := range opts { + var client interface{} + if o.Client != "" { + client = o.Client + } + rows[i] = []interface{}{o.TrackID, o.Time, o.UserID, client} + } + + _, err = tx.CopyFrom(ctx, + pgx.Identifier{"tmp_listens"}, + []string{"track_id", "listened_at", "user_id", "client"}, + pgx.CopyFromRows(rows), + ) + if err != nil { + return 0, fmt.Errorf("SaveListensBatch: CopyFrom: %w", err) + } + + // Insert from temp table with dedup + tag, err := tx.Exec(ctx, ` + INSERT INTO listens (track_id, listened_at, user_id, client) + SELECT track_id, listened_at, user_id, client FROM tmp_listens + ON CONFLICT DO NOTHING + `) + if err != nil { + return 0, fmt.Errorf("SaveListensBatch: insert: %w", err) + } + + if err := tx.Commit(ctx); err != nil { + return 0, fmt.Errorf("SaveListensBatch: Commit: %w", err) + } + + return tag.RowsAffected(), nil +} +``` + +This uses the standard `CopyFrom → temp table → INSERT ON CONFLICT` pattern, which is the fastest bulk insert approach with pgx while still supporting deduplication. + +### Success Criteria + +- [ ] `go build ./...` compiles +- [ ] `go test ./...` passes +- [ ] New test: `TestSaveListensBatch` — insert 1000 listens, verify count +- [ ] New test: `TestSaveListensBatch_Dedup` — insert duplicates, verify no double-counting +- [ ] New test: `TestSaveListensBatch_Empty` — empty input returns 0, no error + +--- + +## Phase 3: `BulkSubmitter` Helper + +### Overview + +A reusable import accelerator that all importers can use. Pre-deduplicates scrobbles in memory, resolves entities via the `track_lookup` cache (falling back to `SubmitListen` on cache miss), and batch-inserts listens. + +### Design + +``` +BulkSubmitter +├── Accept(SubmitListenOpts) — buffer a scrobble +├── Flush() (int, error) — process all buffered scrobbles +│ ├── Phase A: Deduplicate by (artist, track, album) key +│ ├── Phase B: Resolve entities +│ │ ├── Check track_lookup cache (single SELECT) +│ │ ├── On miss: call SubmitListen(SkipSaveListen=true) to create entities +│ │ ├── Worker pool: N goroutines for parallel entity creation +│ │ └── Populate track_lookup cache after creation +│ ├── Phase C: Map all scrobbles to track_ids via resolved cache +│ └── Phase D: SaveListensBatch +└── Progress callback for logging +``` + +### Changes Required + +#### 1. New Package + +**File**: `internal/importer/bulk.go` + +```go +package importer + +import ( + "context" + "sync" + + "github.com/gabehf/koito/internal/catalog" + "github.com/gabehf/koito/internal/db" + "github.com/gabehf/koito/internal/logger" + "github.com/gabehf/koito/internal/mbz" +) + +type BulkSubmitter struct { + store db.DB + mbzc mbz.MusicBrainzCaller + ctx context.Context + buffer []catalog.SubmitListenOpts + workers int + onProgress func(imported, total int) +} + +type BulkSubmitterOpts struct { + Store db.DB + Mbzc mbz.MusicBrainzCaller + Workers int // default 4 + OnProgress func(imported, total int) // called every 500 items +} + +func NewBulkSubmitter(ctx context.Context, opts BulkSubmitterOpts) *BulkSubmitter { + workers := opts.Workers + if workers <= 0 { + workers = 4 + } + return &BulkSubmitter{ + store: opts.Store, + mbzc: opts.Mbzc, + ctx: ctx, + workers: workers, + onProgress: opts.OnProgress, + } +} + +func (bs *BulkSubmitter) Accept(opts catalog.SubmitListenOpts) { + bs.buffer = append(bs.buffer, opts) +} + +func (bs *BulkSubmitter) Flush() (int, error) { + l := logger.FromContext(bs.ctx) + if len(bs.buffer) == 0 { + return 0, nil + } + + l.Info().Msgf("BulkSubmitter: Processing %d scrobbles", len(bs.buffer)) + + // Phase A: Deduplicate — find unique (artist, track, album) tuples + type entityKey = string + unique := make(map[entityKey]catalog.SubmitListenOpts) + for _, opts := range bs.buffer { + key := catalog.TrackLookupKey(opts.Artist, opts.TrackTitle, opts.ReleaseTitle) + if _, exists := unique[key]; !exists { + unique[key] = opts + } + } + l.Info().Msgf("BulkSubmitter: %d unique entity combos from %d scrobbles", len(unique), len(bs.buffer)) + + // Phase B: Resolve entities — check cache, create on miss + resolved := make(map[entityKey]int32) // key → trackID + var mu sync.Mutex + var wg sync.WaitGroup + sem := make(chan struct{}, bs.workers) + resolveCount := 0 + + for key, opts := range unique { + // Check track_lookup cache first + cached, err := bs.store.GetTrackLookup(bs.ctx, key) + if err == nil && cached != nil { + mu.Lock() + resolved[key] = cached.TrackID + resolveCount++ + mu.Unlock() + continue + } + + // Cache miss — create entities via SubmitListen (with worker pool) + wg.Add(1) + sem <- struct{}{} // acquire worker slot + go func(k entityKey, o catalog.SubmitListenOpts) { + defer wg.Done() + defer func() { <-sem }() // release worker slot + + o.SkipSaveListen = true + o.SkipCacheImage = true + err := catalog.SubmitListen(bs.ctx, bs.store, o) + if err != nil { + l.Err(err).Msgf("BulkSubmitter: Failed to create entities for '%s' by '%s'", o.TrackTitle, o.Artist) + return + } + + // Re-check cache (SubmitListen populates it in Phase 1's integration) + cached, err := bs.store.GetTrackLookup(bs.ctx, k) + if err == nil && cached != nil { + mu.Lock() + resolved[k] = cached.TrackID + mu.Unlock() + } + }(key, opts) + } + wg.Wait() + + l.Info().Msgf("BulkSubmitter: Resolved %d/%d entity combos", len(resolved), len(unique)) + + // Phase C: Build listen batch + batch := make([]db.SaveListenOpts, 0, len(bs.buffer)) + skipped := 0 + for _, opts := range bs.buffer { + key := catalog.TrackLookupKey(opts.Artist, opts.TrackTitle, opts.ReleaseTitle) + trackID, ok := resolved[key] + if !ok { + skipped++ + continue + } + batch = append(batch, db.SaveListenOpts{ + TrackID: trackID, + Time: opts.Time.Truncate(time.Second), + UserID: opts.UserID, + Client: opts.Client, + }) + } + if skipped > 0 { + l.Warn().Msgf("BulkSubmitter: Skipped %d scrobbles with unresolved entities", skipped) + } + + // Phase D: Batch insert listens + inserted, err := bs.store.SaveListensBatch(bs.ctx, batch) + if err != nil { + return 0, fmt.Errorf("BulkSubmitter: SaveListensBatch: %w", err) + } + + l.Info().Msgf("BulkSubmitter: Inserted %d listens (%d duplicates skipped)", inserted, int64(len(batch))-inserted) + return int(inserted), nil +} +``` + +#### 2. TOCTOU Safety for Parallel Entity Creation + +The worker pool creates entities via `SubmitListen(SkipSaveListen=true)`. Two workers could race on the same artist name. The existing code uses a get-then-save pattern. Mitigations: + +- Pre-dedup in Phase A ensures each unique tuple is processed by exactly one goroutine — **no TOCTOU within the worker pool** +- The only remaining race is between the import workers and live scrobbles from multi-scrobbler hitting the same `SubmitListen` path. This is already handled by the DB's unique constraints + `ON CONFLICT` clauses on join tables. + +### Success Criteria + +- [ ] `go build ./...` compiles +- [ ] `go test ./...` passes +- [ ] New test: `TestBulkSubmitter_BasicImport` — buffer 100 scrobbles, flush, verify all imported +- [ ] New test: `TestBulkSubmitter_Dedup` — buffer 100 scrobbles with 10 unique combos, verify 10 entity creations +- [ ] New test: `TestBulkSubmitter_CacheHit` — pre-populate track_lookup, verify no SubmitListen calls +- [ ] New test: `TestBulkSubmitter_PartialFailure` — one entity creation fails, rest still imported + +--- + +## Phase 4: Migrate All Importers + +### Overview + +Wire all 5 importers to use BulkSubmitter instead of calling `SubmitListen` directly in a loop. + +### Changes Required + +#### 1. Maloja Importer + +**File**: `internal/importer/maloja.go` + +Replace the per-item `catalog.SubmitListen` loop with: + +```go +func ImportMalojaFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCaller, filename string) error { + l := logger.FromContext(ctx) + // ... file reading and JSON parsing (unchanged) ... + + bs := NewBulkSubmitter(ctx, BulkSubmitterOpts{ + Store: store, + Mbzc: mbzc, + OnProgress: func(imported, total int) { + l.Info().Msgf("Maloja import progress: %d/%d", imported, total) + }, + }) + + for _, item := range items { + // ... existing artist parsing, time window check (unchanged) ... + + bs.Accept(catalog.SubmitListenOpts{ + MbzCaller: mbzc, + Artist: item.Track.Artists[0], + ArtistNames: artists, + TrackTitle: item.Track.Title, + ReleaseTitle: releaseTitle, + Time: ts.Local(), + Client: "maloja", + UserID: 1, + SkipCacheImage: true, + }) + } + + count, err := bs.Flush() + if err != nil { + return fmt.Errorf("ImportMalojaFile: %w", err) + } + return finishImport(ctx, filename, count) +} +``` + +#### 2. Spotify Importer + +**File**: `internal/importer/spotify.go` — Same pattern: Accept into BulkSubmitter, Flush at end. + +#### 3. LastFM Importer + +**File**: `internal/importer/lastfm.go` — Same pattern. Note: LastFM scrobbles include MBZ IDs, which will pass through to `SubmitListen` on cache miss for proper entity resolution. + +#### 4. ListenBrainz Importer + +**File**: `internal/importer/listenbrainz.go` — Same pattern. ListenBrainz data is the richest (full MBZ IDs, MBID mappings) — cache hits will be common after first import. + +#### 5. Koito Importer + +**File**: `internal/importer/koito.go` — This one currently bypasses `SubmitListen` with direct DB calls. Two options: +- **Option A**: Migrate to BulkSubmitter (consistent, benefits from cache) +- **Option B**: Leave as-is (it's already fast, Koito exports have pre-resolved IDs) + +Recommend **Option A** for consistency, with the Koito importer becoming the simplest BulkSubmitter user since its data is pre-resolved. + +### Success Criteria + +- [ ] `go build ./...` compiles +- [ ] `go test ./...` passes — all existing import tests still pass +- [ ] `TestImportMaloja` — 38 listens imported correctly +- [ ] `TestImportMaloja_NullAlbum` — null album handled +- [ ] `TestImportMaloja_ApiFormat` — list format works +- [ ] `TestImportSpotify` — duration data preserved +- [ ] `TestImportLastFM` — MBZ IDs resolved +- [ ] `TestImportListenBrainz` — MBID mappings applied +- [ ] `TestImportKoito` — aliases preserved +- [ ] Manual: 49k Maloja import on vo-pc completes in under 30 minutes + +--- + +## Phase 5: Skip Image Lookups During Import + +### Overview + +Short-circuit `GetArtistImage` and `GetAlbumImage` calls when `SkipCacheImage` is true. Currently these functions still make HTTP calls (or call providers that return "no providers enabled") even when the result won't be used. The existing background tasks (`FetchMissingArtistImages`, `FetchMissingAlbumImages`) will backfill images after import. + +### Changes Required + +#### 1. Early Return in Associate Functions + +**File**: `internal/catalog/associate_artists.go` + +In `resolveAliasOrCreateArtist` (line ~248) and `matchArtistsByNames` (line ~304): + +```go +// Before: +imgUrl, err := images.GetArtistImage(ctx, images.ArtistImageOpts{...}) +if err == nil && imgUrl != "" { + imgid = uuid.New() + if !opts.SkipCacheImage { + // download image + } +} + +// After: +var imgUrl string +if !opts.SkipCacheImage { + imgUrl, err = images.GetArtistImage(ctx, images.ArtistImageOpts{...}) + if err == nil && imgUrl != "" { + imgid = uuid.New() + // download image + } +} +``` + +**File**: `internal/catalog/associate_album.go` + +Same pattern in `createOrUpdateAlbumWithMbzReleaseID` (line ~125) and `matchAlbumByTitle` (line ~220). + +### Success Criteria + +- [ ] `go build ./...` compiles +- [ ] `go test ./...` passes +- [ ] No `GetArtistImage`/`GetAlbumImage` calls during import (verify via log: no "No image providers" warnings) +- [ ] Background tasks still fetch images after import completes +- [ ] Live scrobbles (SkipCacheImage=false) still fetch images normally + +--- + +## Performance Estimates + +| Scenario | Current | After Phase 1 | After All Phases | +|---|---|---|---| +| Repeated live scrobble | ~19 queries | 2 queries (cache hit) | 2 queries | +| New live scrobble | ~19 queries | ~19 queries + 1 cache write | ~19 queries + 1 cache write | +| 49k Maloja import | ~24 hours | ~12 hours (cache helps repeats) | ~15-30 minutes | +| 49k import (second run) | ~24 hours | ~20 minutes (all cache hits) | ~5 minutes | + +## Implementation Order + +1. **Phase 1** (track_lookup) — standalone, immediate benefit for all scrobbles +2. **Phase 5** (skip image lookups) — standalone, no dependencies, quick win +3. **Phase 2** (SaveListensBatch) — DB layer, needed by Phase 3 +4. **Phase 3** (BulkSubmitter) — the core, depends on Phase 1 + 2 +5. **Phase 4** (migrate importers) — depends on Phase 3 + +Phases 1 and 5 can be done first as independent PRs. Phases 2-4 are one PR. + +## References + +- ListenBrainz architecture: https://listenbrainz.readthedocs.io/en/latest/developers/architecture.html +- ListenBrainz MBID mapping: https://listenbrainz.readthedocs.io/en/latest/developers/mapping.html +- MusicBrainz rate limiting: https://musicbrainz.org/doc/MusicBrainz_API/Rate_Limiting +- PR 1 (importer fixes): https://github.com/gabehf/Koito/pull/228 +- PR 2 (MBZ search): https://github.com/gabehf/Koito/pull/229 +- Koito native importer (bypass pattern): `internal/importer/koito.go` +- Current SubmitListen: `internal/catalog/catalog.go:70` +- pgx CopyFrom docs: https://pkg.go.dev/github.com/jackc/pgx/v5#Conn.CopyFrom