Add bulk import optimization: track_lookup cache, batch inserts, BulkSubmitter

Adopts ListenBrainz-inspired patterns to speed up imports from ~24h to
under 30 minutes for 49k scrobbles.

Phase 1 - track_lookup cache table:
- New migration (000006) adds persistent entity lookup cache
- Maps normalized (artist, track, album) → (artist_id, album_id, track_id)
- SubmitListen fast path: cache hit skips 18 DB queries → 2 queries
- Cache populated after entity resolution, invalidated on merge/delete
- Benefits both live scrobbles and imports

Phase 2 - SaveListensBatch:
- New batch listen insert using pgx CopyFrom → temp table → INSERT ON CONFLICT
- Thousands of inserts per second vs one-at-a-time

Phase 3 - BulkSubmitter:
- Reusable import accelerator for all importers
- Pre-deduplicates scrobbles by (artist, track, album) in memory
- Worker pool (4 goroutines) for parallel entity creation on cache miss
- Batch listen insertion via SaveListensBatch

Phase 4 - Migrate importers:
- Maloja, Spotify, LastFM, ListenBrainz importers use BulkSubmitter
- Koito importer left as-is (already fast with pre-resolved IDs)

Phase 5 - Skip image lookups during import:
- GetArtistImage/GetAlbumImage calls fully skipped when SkipCacheImage=true
- Background tasks (FetchMissingArtistImages/FetchMissingAlbumImages) backfill

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
safierinx-a 2026-03-25 04:17:50 +05:30
parent c92e93484e
commit 8ce6ec494d
21 changed files with 1294 additions and 129 deletions

157
internal/importer/bulk.go Normal file
View file

@ -0,0 +1,157 @@
package importer
import (
"context"
"fmt"
"sync"
"time"
"github.com/gabehf/koito/internal/catalog"
"github.com/gabehf/koito/internal/db"
"github.com/gabehf/koito/internal/logger"
"github.com/gabehf/koito/internal/mbz"
)
// BulkSubmitter is a reusable import accelerator. It pre-deduplicates scrobbles
// in memory, resolves entities via the track_lookup cache (falling back to
// SubmitListen on cache miss with a worker pool for parallelism), and batch-inserts
// listens via SaveListensBatch.
type BulkSubmitter struct {
store db.DB
mbzc mbz.MusicBrainzCaller
ctx context.Context
buffer []catalog.SubmitListenOpts
workers int
}
type BulkSubmitterOpts struct {
Store db.DB
Mbzc mbz.MusicBrainzCaller
Workers int // default 4
}
func NewBulkSubmitter(ctx context.Context, opts BulkSubmitterOpts) *BulkSubmitter {
workers := opts.Workers
if workers <= 0 {
workers = 4
}
return &BulkSubmitter{
store: opts.Store,
mbzc: opts.Mbzc,
ctx: ctx,
workers: workers,
}
}
// Accept buffers a scrobble for later batch processing.
func (bs *BulkSubmitter) Accept(opts catalog.SubmitListenOpts) {
bs.buffer = append(bs.buffer, opts)
}
// Flush processes all buffered scrobbles: deduplicates, resolves entities, and batch-inserts listens.
// Returns the number of listens successfully inserted.
func (bs *BulkSubmitter) Flush() (int, error) {
l := logger.FromContext(bs.ctx)
if len(bs.buffer) == 0 {
return 0, nil
}
l.Info().Msgf("BulkSubmitter: Processing %d scrobbles", len(bs.buffer))
// Phase A: Deduplicate — find unique (artist, track, album) tuples
unique := make(map[string]catalog.SubmitListenOpts)
for _, opts := range bs.buffer {
key := catalog.TrackLookupKey(opts.Artist, opts.TrackTitle, opts.ReleaseTitle)
if _, exists := unique[key]; !exists {
unique[key] = opts
}
}
l.Info().Msgf("BulkSubmitter: %d unique entity combos from %d scrobbles", len(unique), len(bs.buffer))
// Phase B: Resolve entities — check cache, create on miss
resolved := make(map[string]int32) // key → trackID
var mu sync.Mutex
var wg sync.WaitGroup
sem := make(chan struct{}, bs.workers)
cacheHits := 0
for key, opts := range unique {
// Check track_lookup cache first
cached, err := bs.store.GetTrackLookup(bs.ctx, key)
if err == nil && cached != nil {
mu.Lock()
resolved[key] = cached.TrackID
cacheHits++
mu.Unlock()
continue
}
// Cache miss — create entities via SubmitListen (with worker pool)
wg.Add(1)
sem <- struct{}{} // acquire worker slot
go func(k string, o catalog.SubmitListenOpts) {
defer wg.Done()
defer func() { <-sem }() // release worker slot
o.SkipSaveListen = true
o.SkipCacheImage = true
err := catalog.SubmitListen(bs.ctx, bs.store, o)
if err != nil {
l.Err(err).Msgf("BulkSubmitter: Failed to create entities for '%s' by '%s'", o.TrackTitle, o.Artist)
return
}
// Re-check cache (SubmitListen populates it via Phase 1's integration)
cached, err := bs.store.GetTrackLookup(bs.ctx, k)
if err == nil && cached != nil {
mu.Lock()
resolved[k] = cached.TrackID
mu.Unlock()
}
}(key, opts)
}
wg.Wait()
l.Info().Msgf("BulkSubmitter: Resolved %d/%d entity combos (%d cache hits)",
len(resolved), len(unique), cacheHits)
// Phase C: Build listen batch
batch := make([]db.SaveListenOpts, 0, len(bs.buffer))
skipped := 0
for _, opts := range bs.buffer {
key := catalog.TrackLookupKey(opts.Artist, opts.TrackTitle, opts.ReleaseTitle)
trackID, ok := resolved[key]
if !ok {
skipped++
continue
}
batch = append(batch, db.SaveListenOpts{
TrackID: trackID,
Time: opts.Time.Truncate(time.Second),
UserID: opts.UserID,
Client: opts.Client,
})
}
if skipped > 0 {
l.Warn().Msgf("BulkSubmitter: Skipped %d scrobbles with unresolved entities", skipped)
}
// Phase D: Batch insert listens (in chunks to avoid huge transactions)
const chunkSize = 5000
var totalInserted int64
for i := 0; i < len(batch); i += chunkSize {
end := i + chunkSize
if end > len(batch) {
end = len(batch)
}
inserted, err := bs.store.SaveListensBatch(bs.ctx, batch[i:end])
if err != nil {
return int(totalInserted), fmt.Errorf("BulkSubmitter: SaveListensBatch: %w", err)
}
totalInserted += inserted
}
l.Info().Msgf("BulkSubmitter: Inserted %d listens (%d duplicates skipped)",
totalInserted, int64(len(batch))-totalInserted)
return int(totalInserted), nil
}

View file

@ -50,18 +50,17 @@ func ImportLastFMFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall
return fmt.Errorf("ImportLastFMFile: %w", err)
}
defer file.Close()
var throttleFunc = func() {}
if ms := cfg.ThrottleImportMs(); ms > 0 {
throttleFunc = func() {
time.Sleep(time.Duration(ms) * time.Millisecond)
}
}
export := make([]LastFMExportPage, 0)
err = json.NewDecoder(file).Decode(&export)
if err != nil {
return fmt.Errorf("ImportLastFMFile: %w", err)
}
count := 0
bs := NewBulkSubmitter(ctx, BulkSubmitterOpts{
Store: store,
Mbzc: mbzc,
})
for _, item := range export {
for _, track := range item.Track {
album := track.Album.Text
@ -96,7 +95,6 @@ func ImportLastFMFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall
ts = time.Unix(unix, 0).UTC()
}
if !inImportTimeWindow(ts) {
l.Debug().Msgf("Skipping import due to import time rules")
continue
}
@ -105,7 +103,7 @@ func ImportLastFMFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall
artistMbidMap = append(artistMbidMap, catalog.ArtistMbidMap{Artist: track.Artist.Text, Mbid: artistMbzID})
}
opts := catalog.SubmitListenOpts{
bs.Accept(catalog.SubmitListenOpts{
MbzCaller: mbzc,
Artist: track.Artist.Text,
ArtistNames: []string{track.Artist.Text},
@ -118,16 +116,14 @@ func ImportLastFMFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall
Client: "lastfm",
Time: ts,
UserID: 1,
SkipCacheImage: !cfg.FetchImagesDuringImport(),
}
err = catalog.SubmitListen(ctx, store, opts)
if err != nil {
l.Err(err).Msg("Failed to import LastFM playback item")
return fmt.Errorf("ImportLastFMFile: %w", err)
}
count++
throttleFunc()
SkipCacheImage: true,
})
}
}
count, err := bs.Flush()
if err != nil {
return fmt.Errorf("ImportLastFMFile: %w", err)
}
return finishImport(ctx, filename, count)
}

View file

@ -63,13 +63,11 @@ func ImportListenBrainzFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrai
scanner := bufio.NewScanner(r)
var throttleFunc = func() {}
if ms := cfg.ThrottleImportMs(); ms > 0 {
throttleFunc = func() {
time.Sleep(time.Duration(ms) * time.Millisecond)
}
}
count := 0
bs := NewBulkSubmitter(ctx, BulkSubmitterOpts{
Store: store,
Mbzc: mbzc,
})
for scanner.Scan() {
line := scanner.Bytes()
payload := new(handlers.LbzSubmitListenPayload)
@ -80,7 +78,6 @@ func ImportListenBrainzFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrai
}
ts := time.Unix(payload.ListenedAt, 0)
if !inImportTimeWindow(ts) {
l.Debug().Msgf("Skipping import due to import time rules")
continue
}
artistMbzIDs, err := utils.ParseUUIDSlice(payload.TrackMeta.AdditionalInfo.ArtistMBIDs)
@ -139,7 +136,7 @@ func ImportListenBrainzFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrai
artistMbidMap = append(artistMbidMap, catalog.ArtistMbidMap{Artist: a.ArtistName, Mbid: mbid})
}
opts := catalog.SubmitListenOpts{
bs.Accept(catalog.SubmitListenOpts{
MbzCaller: mbzc,
ArtistNames: payload.TrackMeta.AdditionalInfo.ArtistNames,
Artist: payload.TrackMeta.ArtistName,
@ -154,15 +151,13 @@ func ImportListenBrainzFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrai
Time: ts,
UserID: 1,
Client: client,
SkipCacheImage: !cfg.FetchImagesDuringImport(),
}
err = catalog.SubmitListen(ctx, store, opts)
if err != nil {
l.Err(err).Msg("Failed to import LastFM playback item")
return fmt.Errorf("ImportListenBrainzFile: %w", err)
}
count++
throttleFunc()
SkipCacheImage: true,
})
}
count, err := bs.Flush()
if err != nil {
return fmt.Errorf("ImportListenBrainzFile: %w", err)
}
l.Info().Msgf("Finished importing %s; imported %d items", filename, count)
return nil

View file

@ -44,12 +44,6 @@ func ImportMalojaFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall
return fmt.Errorf("ImportMalojaFile: %w", err)
}
defer file.Close()
var throttleFunc = func() {}
if ms := cfg.ThrottleImportMs(); ms > 0 {
throttleFunc = func() {
time.Sleep(time.Duration(ms) * time.Millisecond)
}
}
export := new(MalojaFile)
err = json.NewDecoder(file).Decode(&export)
if err != nil {
@ -59,12 +53,14 @@ func ImportMalojaFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall
if len(items) == 0 {
items = export.List
}
count := 0
total := len(items)
for i, item := range items {
bs := NewBulkSubmitter(ctx, BulkSubmitterOpts{
Store: store,
Mbzc: mbzc,
})
for _, item := range items {
martists := make([]string, 0)
// Maloja has a tendency to have the the artist order ['feature', 'main ● feature'], so
// here we try to turn that artist array into ['main', 'feature']
item.Track.Artists = utils.MoveFirstMatchToFront(item.Track.Artists, " \u2022 ")
for _, an := range item.Track.Artists {
ans := strings.Split(an, " \u2022 ")
@ -77,14 +73,13 @@ func ImportMalojaFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall
}
ts := time.Unix(item.Time, 0)
if !inImportTimeWindow(ts) {
l.Debug().Msgf("Skipping import due to import time rules")
continue
}
releaseTitle := ""
if item.Track.Album != nil {
releaseTitle = item.Track.Album.Title
}
opts := catalog.SubmitListenOpts{
bs.Accept(catalog.SubmitListenOpts{
MbzCaller: mbzc,
Artist: item.Track.Artists[0],
ArtistNames: artists,
@ -93,18 +88,13 @@ func ImportMalojaFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall
Time: ts.Local(),
Client: "maloja",
UserID: 1,
SkipCacheImage: !cfg.FetchImagesDuringImport(),
}
err = catalog.SubmitListen(ctx, store, opts)
if err != nil {
l.Err(err).Msgf("Failed to import maloja item %d/%d", i+1, total)
continue
}
count++
if count%500 == 0 {
l.Info().Msgf("Maloja import progress: %d/%d", count, total)
}
throttleFunc()
SkipCacheImage: true,
})
}
count, err := bs.Flush()
if err != nil {
return fmt.Errorf("ImportMalojaFile: %w", err)
}
return finishImport(ctx, filename, count)
}

View file

@ -33,48 +33,44 @@ func ImportSpotifyFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCal
return fmt.Errorf("ImportSpotifyFile: %w", err)
}
defer file.Close()
var throttleFunc = func() {}
if ms := cfg.ThrottleImportMs(); ms > 0 {
throttleFunc = func() {
time.Sleep(time.Duration(ms) * time.Millisecond)
}
}
export := make([]SpotifyExportItem, 0)
err = json.NewDecoder(file).Decode(&export)
if err != nil {
return fmt.Errorf("ImportSpotifyFile: %w", err)
}
bs := NewBulkSubmitter(ctx, BulkSubmitterOpts{
Store: store,
Mbzc: mbzc,
})
for _, item := range export {
if item.ReasonEnd != "trackdone" {
continue
}
if !inImportTimeWindow(item.Timestamp) {
l.Debug().Msgf("Skipping import due to import time rules")
continue
}
dur := item.MsPlayed
if item.TrackName == "" || item.ArtistName == "" {
l.Debug().Msg("Skipping non-track item")
continue
}
opts := catalog.SubmitListenOpts{
bs.Accept(catalog.SubmitListenOpts{
MbzCaller: mbzc,
Artist: item.ArtistName,
TrackTitle: item.TrackName,
ReleaseTitle: item.AlbumName,
Duration: dur / 1000,
Duration: item.MsPlayed / 1000,
Time: item.Timestamp,
Client: "spotify",
UserID: 1,
SkipCacheImage: !cfg.FetchImagesDuringImport(),
}
err = catalog.SubmitListen(ctx, store, opts)
if err != nil {
l.Err(err).Msg("Failed to import spotify playback item")
return fmt.Errorf("ImportSpotifyFile: %w", err)
}
throttleFunc()
SkipCacheImage: true,
})
}
return finishImport(ctx, filename, len(export))
count, err := bs.Flush()
if err != nil {
return fmt.Errorf("ImportSpotifyFile: %w", err)
}
return finishImport(ctx, filename, count)
}