fix: go back to synchronous image processing

This commit is contained in:
Gabe Farrell 2025-06-12 00:30:01 -04:00
parent aba2b76def
commit 1a5a6acc95
10 changed files with 58 additions and 168 deletions

View file

@ -3,7 +3,6 @@ package catalog
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
@ -11,8 +10,6 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"time"
"github.com/gabehf/koito/internal/cfg"
"github.com/gabehf/koito/internal/db"
@ -33,93 +30,6 @@ const (
ImageCacheDir = "image_cache"
)
type imageJob struct {
ctx context.Context
id string
size ImageSize
url string // optional
reader io.Reader // optional
}
// ImageProcessor manages a single goroutine to process image jobs sequentially
type ImageProcessor struct {
jobs chan imageJob
wg sync.WaitGroup
closing chan struct{}
}
// NewImageProcessor creates an ImageProcessor and starts the worker goroutine
func NewImageProcessor(buffer int) *ImageProcessor {
ip := &ImageProcessor{
jobs: make(chan imageJob, buffer),
closing: make(chan struct{}),
}
ip.wg.Add(1)
go ip.worker()
return ip
}
func (ip *ImageProcessor) worker() {
for {
select {
case job := <-ip.jobs:
var err error
if job.reader != nil {
err = ip.compressAndSave(job.ctx, job.id, job.size, job.reader)
} else {
err = ip.downloadCompressAndSave(job.ctx, job.id, job.url, job.size)
}
if err != nil {
logger.FromContext(job.ctx).Err(err).Msg("Image processing failed")
}
case <-ip.closing:
return
}
}
}
func (ip *ImageProcessor) EnqueueDownloadAndCache(ctx context.Context, id uuid.UUID, url string, size ImageSize) error {
return ip.enqueueJob(imageJob{ctx: ctx, id: id.String(), size: size, url: url})
}
func (ip *ImageProcessor) EnqueueCompressAndSave(ctx context.Context, id string, size ImageSize, reader io.Reader) error {
return ip.enqueueJob(imageJob{ctx: ctx, id: id, size: size, reader: reader})
}
func (ip *ImageProcessor) WaitForIdle(timeout time.Duration) error {
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
if len(ip.jobs) == 0 {
return nil
}
select {
case <-time.After(10 * time.Millisecond):
case <-timer.C:
return errors.New("image processor did not become idle in time")
}
}
}
func (ip *ImageProcessor) enqueueJob(job imageJob) error {
select {
case ip.jobs <- job:
return nil
case <-job.ctx.Done():
return job.ctx.Err()
case <-ip.closing:
return errors.New("image processor closed")
}
}
// Close stops the worker and waits for any ongoing processing to finish
func (ip *ImageProcessor) Close() {
close(ip.closing)
ip.wg.Wait()
close(ip.jobs)
}
func ParseImageSize(size string) (ImageSize, error) {
switch strings.ToLower(size) {
case "small":
@ -136,7 +46,7 @@ func ParseImageSize(size string) (ImageSize, error) {
return "", fmt.Errorf("unknown image size: %s", size)
}
}
func getImageSize(size ImageSize) int {
func GetImageSize(size ImageSize) int {
var px int
switch size {
case "small":
@ -178,7 +88,9 @@ func ValidateImageURL(url string) error {
return nil
}
func (ip *ImageProcessor) downloadCompressAndSave(ctx context.Context, id string, url string, size ImageSize) error {
// DownloadAndCacheImage downloads an image from the given URL, then calls CompressAndSaveImage.
func DownloadAndCacheImage(ctx context.Context, id uuid.UUID, url string, size ImageSize) error {
l := logger.FromContext(ctx)
err := ValidateImageURL(url)
if err != nil {
@ -187,7 +99,7 @@ func (ip *ImageProcessor) downloadCompressAndSave(ctx context.Context, id string
l.Debug().Msgf("Downloading image for ID %s", id)
resp, err := http.Get(url)
if err != nil {
return err
return fmt.Errorf("failed to download image: %w", err)
}
defer resp.Body.Close()
@ -195,28 +107,28 @@ func (ip *ImageProcessor) downloadCompressAndSave(ctx context.Context, id string
return fmt.Errorf("failed to download image, status code: %d", resp.StatusCode)
}
return ip.compressAndSave(ctx, id, size, resp.Body)
return CompressAndSaveImage(ctx, id.String(), size, resp.Body)
}
func (ip *ImageProcessor) compressAndSave(ctx context.Context, filename string, size ImageSize, body io.Reader) error {
// Compresses an image to the specified size, then saves it to the correct cache folder.
func CompressAndSaveImage(ctx context.Context, filename string, size ImageSize, body io.Reader) error {
l := logger.FromContext(ctx)
if size == ImageSizeFull {
l.Debug().Msg("Full size image desired, skipping compression")
return ip.saveImage(filename, size, body)
return saveImage(filename, size, body)
}
l.Debug().Msg("Creating resized image")
compressed, err := ip.compressImage(size, body)
compressed, err := compressImage(size, body)
if err != nil {
return err
}
return ip.saveImage(filename, size, compressed)
return saveImage(filename, size, compressed)
}
// SaveImage saves an image to the image_cache/{size} folder
func (ip *ImageProcessor) saveImage(filename string, size ImageSize, data io.Reader) error {
func saveImage(filename string, size ImageSize, data io.Reader) error {
configDir := cfg.ConfigDir()
cacheDir := filepath.Join(configDir, ImageCacheDir)
@ -243,12 +155,12 @@ func (ip *ImageProcessor) saveImage(filename string, size ImageSize, data io.Rea
return nil
}
func (ip *ImageProcessor) compressImage(size ImageSize, data io.Reader) (io.Reader, error) {
func compressImage(size ImageSize, data io.Reader) (io.Reader, error) {
imgBytes, err := io.ReadAll(data)
if err != nil {
return nil, err
}
px := getImageSize(size)
px := GetImageSize(size)
// Resize with bimg
imgBytes, err = bimg.NewImage(imgBytes).Process(bimg.Options{
Width: px,