mirror of
https://github.com/gabehf/Koito.git
synced 2026-04-22 20:11:50 -07:00
Add bulk import optimization: track_lookup cache, batch inserts, BulkSubmitter
This commit is contained in:
parent
0ec7b458cc
commit
ae373a7090
21 changed files with 1296 additions and 125 deletions
157
internal/importer/bulk.go
Normal file
157
internal/importer/bulk.go
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
package importer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gabehf/koito/internal/catalog"
|
||||
"github.com/gabehf/koito/internal/db"
|
||||
"github.com/gabehf/koito/internal/logger"
|
||||
"github.com/gabehf/koito/internal/mbz"
|
||||
)
|
||||
|
||||
// BulkSubmitter is a reusable import accelerator. It pre-deduplicates scrobbles
|
||||
// in memory, resolves entities via the track_lookup cache (falling back to
|
||||
// SubmitListen on cache miss with a worker pool for parallelism), and batch-inserts
|
||||
// listens via SaveListensBatch.
|
||||
type BulkSubmitter struct {
|
||||
store db.DB
|
||||
mbzc mbz.MusicBrainzCaller
|
||||
ctx context.Context
|
||||
buffer []catalog.SubmitListenOpts
|
||||
workers int
|
||||
}
|
||||
|
||||
type BulkSubmitterOpts struct {
|
||||
Store db.DB
|
||||
Mbzc mbz.MusicBrainzCaller
|
||||
Workers int // default 4
|
||||
}
|
||||
|
||||
func NewBulkSubmitter(ctx context.Context, opts BulkSubmitterOpts) *BulkSubmitter {
|
||||
workers := opts.Workers
|
||||
if workers <= 0 {
|
||||
workers = 4
|
||||
}
|
||||
return &BulkSubmitter{
|
||||
store: opts.Store,
|
||||
mbzc: opts.Mbzc,
|
||||
ctx: ctx,
|
||||
workers: workers,
|
||||
}
|
||||
}
|
||||
|
||||
// Accept buffers a scrobble for later batch processing.
|
||||
func (bs *BulkSubmitter) Accept(opts catalog.SubmitListenOpts) {
|
||||
bs.buffer = append(bs.buffer, opts)
|
||||
}
|
||||
|
||||
// Flush processes all buffered scrobbles: deduplicates, resolves entities, and batch-inserts listens.
|
||||
// Returns the number of listens successfully inserted.
|
||||
func (bs *BulkSubmitter) Flush() (int, error) {
|
||||
l := logger.FromContext(bs.ctx)
|
||||
if len(bs.buffer) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
l.Info().Msgf("BulkSubmitter: Processing %d scrobbles", len(bs.buffer))
|
||||
|
||||
// Phase A: Deduplicate — find unique (artist, track, album) tuples
|
||||
unique := make(map[string]catalog.SubmitListenOpts)
|
||||
for _, opts := range bs.buffer {
|
||||
key := catalog.TrackLookupKey(opts.Artist, opts.TrackTitle, opts.ReleaseTitle)
|
||||
if _, exists := unique[key]; !exists {
|
||||
unique[key] = opts
|
||||
}
|
||||
}
|
||||
l.Info().Msgf("BulkSubmitter: %d unique entity combos from %d scrobbles", len(unique), len(bs.buffer))
|
||||
|
||||
// Phase B: Resolve entities — check cache, create on miss
|
||||
resolved := make(map[string]int32) // key → trackID
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
sem := make(chan struct{}, bs.workers)
|
||||
cacheHits := 0
|
||||
|
||||
for key, opts := range unique {
|
||||
// Check track_lookup cache first
|
||||
cached, err := bs.store.GetTrackLookup(bs.ctx, key)
|
||||
if err == nil && cached != nil {
|
||||
mu.Lock()
|
||||
resolved[key] = cached.TrackID
|
||||
cacheHits++
|
||||
mu.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
// Cache miss — create entities via SubmitListen (with worker pool)
|
||||
wg.Add(1)
|
||||
sem <- struct{}{} // acquire worker slot
|
||||
go func(k string, o catalog.SubmitListenOpts) {
|
||||
defer wg.Done()
|
||||
defer func() { <-sem }() // release worker slot
|
||||
|
||||
o.SkipSaveListen = true
|
||||
o.SkipCacheImage = true
|
||||
err := catalog.SubmitListen(bs.ctx, bs.store, o)
|
||||
if err != nil {
|
||||
l.Err(err).Msgf("BulkSubmitter: Failed to create entities for '%s' by '%s'", o.TrackTitle, o.Artist)
|
||||
return
|
||||
}
|
||||
|
||||
// Re-check cache (SubmitListen populates it via Phase 1's integration)
|
||||
cached, err := bs.store.GetTrackLookup(bs.ctx, k)
|
||||
if err == nil && cached != nil {
|
||||
mu.Lock()
|
||||
resolved[k] = cached.TrackID
|
||||
mu.Unlock()
|
||||
}
|
||||
}(key, opts)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
l.Info().Msgf("BulkSubmitter: Resolved %d/%d entity combos (%d cache hits)",
|
||||
len(resolved), len(unique), cacheHits)
|
||||
|
||||
// Phase C: Build listen batch
|
||||
batch := make([]db.SaveListenOpts, 0, len(bs.buffer))
|
||||
skipped := 0
|
||||
for _, opts := range bs.buffer {
|
||||
key := catalog.TrackLookupKey(opts.Artist, opts.TrackTitle, opts.ReleaseTitle)
|
||||
trackID, ok := resolved[key]
|
||||
if !ok {
|
||||
skipped++
|
||||
continue
|
||||
}
|
||||
batch = append(batch, db.SaveListenOpts{
|
||||
TrackID: trackID,
|
||||
Time: opts.Time.Truncate(time.Second),
|
||||
UserID: opts.UserID,
|
||||
Client: opts.Client,
|
||||
})
|
||||
}
|
||||
if skipped > 0 {
|
||||
l.Warn().Msgf("BulkSubmitter: Skipped %d scrobbles with unresolved entities", skipped)
|
||||
}
|
||||
|
||||
// Phase D: Batch insert listens (in chunks to avoid huge transactions)
|
||||
const chunkSize = 5000
|
||||
var totalInserted int64
|
||||
for i := 0; i < len(batch); i += chunkSize {
|
||||
end := i + chunkSize
|
||||
if end > len(batch) {
|
||||
end = len(batch)
|
||||
}
|
||||
inserted, err := bs.store.SaveListensBatch(bs.ctx, batch[i:end])
|
||||
if err != nil {
|
||||
return int(totalInserted), fmt.Errorf("BulkSubmitter: SaveListensBatch: %w", err)
|
||||
}
|
||||
totalInserted += inserted
|
||||
}
|
||||
|
||||
l.Info().Msgf("BulkSubmitter: Inserted %d listens (%d duplicates skipped)",
|
||||
totalInserted, int64(len(batch))-totalInserted)
|
||||
return int(totalInserted), nil
|
||||
}
|
||||
|
|
@ -50,18 +50,17 @@ func ImportLastFMFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall
|
|||
return fmt.Errorf("ImportLastFMFile: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
var throttleFunc = func() {}
|
||||
if ms := cfg.ThrottleImportMs(); ms > 0 {
|
||||
throttleFunc = func() {
|
||||
time.Sleep(time.Duration(ms) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
export := make([]LastFMExportPage, 0)
|
||||
err = json.NewDecoder(file).Decode(&export)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ImportLastFMFile: %w", err)
|
||||
}
|
||||
count := 0
|
||||
|
||||
bs := NewBulkSubmitter(ctx, BulkSubmitterOpts{
|
||||
Store: store,
|
||||
Mbzc: mbzc,
|
||||
})
|
||||
|
||||
for _, item := range export {
|
||||
for _, track := range item.Track {
|
||||
album := track.Album.Text
|
||||
|
|
@ -96,7 +95,6 @@ func ImportLastFMFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall
|
|||
ts = time.Unix(unix, 0).UTC()
|
||||
}
|
||||
if !inImportTimeWindow(ts) {
|
||||
l.Debug().Msgf("Skipping import due to import time rules")
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
@ -105,7 +103,7 @@ func ImportLastFMFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall
|
|||
artistMbidMap = append(artistMbidMap, catalog.ArtistMbidMap{Artist: track.Artist.Text, Mbid: artistMbzID})
|
||||
}
|
||||
|
||||
opts := catalog.SubmitListenOpts{
|
||||
bs.Accept(catalog.SubmitListenOpts{
|
||||
MbzCaller: mbzc,
|
||||
Artist: track.Artist.Text,
|
||||
ArtistNames: []string{track.Artist.Text},
|
||||
|
|
@ -118,16 +116,14 @@ func ImportLastFMFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrainzCall
|
|||
Client: "lastfm",
|
||||
Time: ts,
|
||||
UserID: 1,
|
||||
SkipCacheImage: !cfg.FetchImagesDuringImport(),
|
||||
}
|
||||
err = catalog.SubmitListen(ctx, store, opts)
|
||||
if err != nil {
|
||||
l.Err(err).Msg("Failed to import LastFM playback item")
|
||||
return fmt.Errorf("ImportLastFMFile: %w", err)
|
||||
}
|
||||
count++
|
||||
throttleFunc()
|
||||
SkipCacheImage: true,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
count, err := bs.Flush()
|
||||
if err != nil {
|
||||
return fmt.Errorf("ImportLastFMFile: %w", err)
|
||||
}
|
||||
return finishImport(ctx, filename, count)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -63,13 +63,11 @@ func ImportListenBrainzFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrai
|
|||
|
||||
scanner := bufio.NewScanner(r)
|
||||
|
||||
var throttleFunc = func() {}
|
||||
if ms := cfg.ThrottleImportMs(); ms > 0 {
|
||||
throttleFunc = func() {
|
||||
time.Sleep(time.Duration(ms) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
count := 0
|
||||
bs := NewBulkSubmitter(ctx, BulkSubmitterOpts{
|
||||
Store: store,
|
||||
Mbzc: mbzc,
|
||||
})
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
payload := new(handlers.LbzSubmitListenPayload)
|
||||
|
|
@ -80,7 +78,6 @@ func ImportListenBrainzFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrai
|
|||
}
|
||||
ts := time.Unix(payload.ListenedAt, 0)
|
||||
if !inImportTimeWindow(ts) {
|
||||
l.Debug().Msgf("Skipping import due to import time rules")
|
||||
continue
|
||||
}
|
||||
artistMbzIDs, err := utils.ParseUUIDSlice(payload.TrackMeta.AdditionalInfo.ArtistMBIDs)
|
||||
|
|
@ -139,7 +136,7 @@ func ImportListenBrainzFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrai
|
|||
artistMbidMap = append(artistMbidMap, catalog.ArtistMbidMap{Artist: a.ArtistName, Mbid: mbid})
|
||||
}
|
||||
|
||||
opts := catalog.SubmitListenOpts{
|
||||
bs.Accept(catalog.SubmitListenOpts{
|
||||
MbzCaller: mbzc,
|
||||
ArtistNames: payload.TrackMeta.AdditionalInfo.ArtistNames,
|
||||
Artist: payload.TrackMeta.ArtistName,
|
||||
|
|
@ -154,15 +151,13 @@ func ImportListenBrainzFile(ctx context.Context, store db.DB, mbzc mbz.MusicBrai
|
|||
Time: ts,
|
||||
UserID: 1,
|
||||
Client: client,
|
||||
SkipCacheImage: !cfg.FetchImagesDuringImport(),
|
||||
}
|
||||
err = catalog.SubmitListen(ctx, store, opts)
|
||||
if err != nil {
|
||||
l.Err(err).Msg("Failed to import LastFM playback item")
|
||||
return fmt.Errorf("ImportListenBrainzFile: %w", err)
|
||||
}
|
||||
count++
|
||||
throttleFunc()
|
||||
SkipCacheImage: true,
|
||||
})
|
||||
}
|
||||
|
||||
count, err := bs.Flush()
|
||||
if err != nil {
|
||||
return fmt.Errorf("ImportListenBrainzFile: %w", err)
|
||||
}
|
||||
l.Info().Msgf("Finished importing %s; imported %d items", filename, count)
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -41,21 +41,19 @@ func ImportMalojaFile(ctx context.Context, store db.DB, filename string) error {
|
|||
return fmt.Errorf("ImportMalojaFile: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
var throttleFunc = func() {}
|
||||
if ms := cfg.ThrottleImportMs(); ms > 0 {
|
||||
throttleFunc = func() {
|
||||
time.Sleep(time.Duration(ms) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
export := new(MalojaExport)
|
||||
err = json.NewDecoder(file).Decode(&export)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ImportMalojaFile: %w", err)
|
||||
}
|
||||
|
||||
bs := NewBulkSubmitter(ctx, BulkSubmitterOpts{
|
||||
Store: store,
|
||||
Mbzc: &mbz.MbzErrorCaller{},
|
||||
})
|
||||
|
||||
for _, item := range export.Scrobbles {
|
||||
martists := make([]string, 0)
|
||||
// Maloja has a tendency to have the the artist order ['feature', 'main \u2022 feature'], so
|
||||
// here we try to turn that artist array into ['main', 'feature']
|
||||
item.Track.Artists = utils.MoveFirstMatchToFront(item.Track.Artists, " \u2022 ")
|
||||
for _, an := range item.Track.Artists {
|
||||
ans := strings.Split(an, " \u2022 ")
|
||||
|
|
@ -68,11 +66,10 @@ func ImportMalojaFile(ctx context.Context, store db.DB, filename string) error {
|
|||
}
|
||||
ts := time.Unix(item.Time, 0)
|
||||
if !inImportTimeWindow(ts) {
|
||||
l.Debug().Msgf("Skipping import due to import time rules")
|
||||
continue
|
||||
}
|
||||
opts := catalog.SubmitListenOpts{
|
||||
MbzCaller: &mbz.MusicBrainzClient{},
|
||||
bs.Accept(catalog.SubmitListenOpts{
|
||||
MbzCaller: &mbz.MbzErrorCaller{},
|
||||
Artist: item.Track.Artists[0],
|
||||
ArtistNames: artists,
|
||||
TrackTitle: item.Track.Title,
|
||||
|
|
@ -80,14 +77,13 @@ func ImportMalojaFile(ctx context.Context, store db.DB, filename string) error {
|
|||
Time: ts.Local(),
|
||||
Client: "maloja",
|
||||
UserID: 1,
|
||||
SkipCacheImage: !cfg.FetchImagesDuringImport(),
|
||||
}
|
||||
err = catalog.SubmitListen(ctx, store, opts)
|
||||
if err != nil {
|
||||
l.Err(err).Msg("Failed to import maloja playback item")
|
||||
return fmt.Errorf("ImportMalojaFile: %w", err)
|
||||
}
|
||||
throttleFunc()
|
||||
SkipCacheImage: true,
|
||||
})
|
||||
}
|
||||
return finishImport(ctx, filename, len(export.Scrobbles))
|
||||
|
||||
count, err := bs.Flush()
|
||||
if err != nil {
|
||||
return fmt.Errorf("ImportMalojaFile: %w", err)
|
||||
}
|
||||
return finishImport(ctx, filename, count)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,48 +33,44 @@ func ImportSpotifyFile(ctx context.Context, store db.DB, filename string) error
|
|||
return fmt.Errorf("ImportSpotifyFile: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
var throttleFunc = func() {}
|
||||
if ms := cfg.ThrottleImportMs(); ms > 0 {
|
||||
throttleFunc = func() {
|
||||
time.Sleep(time.Duration(ms) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
export := make([]SpotifyExportItem, 0)
|
||||
err = json.NewDecoder(file).Decode(&export)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ImportSpotifyFile: %w", err)
|
||||
}
|
||||
|
||||
bs := NewBulkSubmitter(ctx, BulkSubmitterOpts{
|
||||
Store: store,
|
||||
Mbzc: &mbz.MbzErrorCaller{},
|
||||
})
|
||||
|
||||
for _, item := range export {
|
||||
if item.ReasonEnd != "trackdone" {
|
||||
continue
|
||||
}
|
||||
if !inImportTimeWindow(item.Timestamp) {
|
||||
l.Debug().Msgf("Skipping import due to import time rules")
|
||||
continue
|
||||
}
|
||||
dur := item.MsPlayed
|
||||
if item.TrackName == "" || item.ArtistName == "" {
|
||||
l.Debug().Msg("Skipping non-track item")
|
||||
continue
|
||||
}
|
||||
opts := catalog.SubmitListenOpts{
|
||||
MbzCaller: &mbz.MusicBrainzClient{},
|
||||
bs.Accept(catalog.SubmitListenOpts{
|
||||
MbzCaller: &mbz.MbzErrorCaller{},
|
||||
Artist: item.ArtistName,
|
||||
TrackTitle: item.TrackName,
|
||||
ReleaseTitle: item.AlbumName,
|
||||
Duration: dur / 1000,
|
||||
Duration: item.MsPlayed / 1000,
|
||||
Time: item.Timestamp,
|
||||
Client: "spotify",
|
||||
UserID: 1,
|
||||
SkipCacheImage: !cfg.FetchImagesDuringImport(),
|
||||
}
|
||||
err = catalog.SubmitListen(ctx, store, opts)
|
||||
if err != nil {
|
||||
l.Err(err).Msg("Failed to import spotify playback item")
|
||||
return fmt.Errorf("ImportSpotifyFile: %w", err)
|
||||
}
|
||||
throttleFunc()
|
||||
SkipCacheImage: true,
|
||||
})
|
||||
}
|
||||
return finishImport(ctx, filename, len(export))
|
||||
|
||||
count, err := bs.Flush()
|
||||
if err != nil {
|
||||
return fmt.Errorf("ImportSpotifyFile: %w", err)
|
||||
}
|
||||
return finishImport(ctx, filename, count)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue