|
|
|
|
@ -3,6 +3,7 @@ package catalog
|
|
|
|
|
import (
|
|
|
|
|
"bytes"
|
|
|
|
|
"context"
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io"
|
|
|
|
|
"net/http"
|
|
|
|
|
@ -10,6 +11,8 @@ import (
|
|
|
|
|
"path"
|
|
|
|
|
"path/filepath"
|
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/gabehf/koito/internal/cfg"
|
|
|
|
|
"github.com/gabehf/koito/internal/db"
|
|
|
|
|
@ -30,6 +33,93 @@ const (
|
|
|
|
|
ImageCacheDir = "image_cache"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type imageJob struct {
|
|
|
|
|
ctx context.Context
|
|
|
|
|
id string
|
|
|
|
|
size ImageSize
|
|
|
|
|
url string // optional
|
|
|
|
|
reader io.Reader // optional
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ImageProcessor manages a single goroutine to process image jobs sequentially
|
|
|
|
|
type ImageProcessor struct {
|
|
|
|
|
jobs chan imageJob
|
|
|
|
|
wg sync.WaitGroup
|
|
|
|
|
closing chan struct{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewImageProcessor creates an ImageProcessor and starts the worker goroutine
|
|
|
|
|
func NewImageProcessor(buffer int) *ImageProcessor {
|
|
|
|
|
ip := &ImageProcessor{
|
|
|
|
|
jobs: make(chan imageJob, buffer),
|
|
|
|
|
closing: make(chan struct{}),
|
|
|
|
|
}
|
|
|
|
|
ip.wg.Add(1)
|
|
|
|
|
go ip.worker()
|
|
|
|
|
return ip
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ip *ImageProcessor) worker() {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case job := <-ip.jobs:
|
|
|
|
|
var err error
|
|
|
|
|
if job.reader != nil {
|
|
|
|
|
err = ip.compressAndSave(job.ctx, job.id, job.size, job.reader)
|
|
|
|
|
} else {
|
|
|
|
|
err = ip.downloadCompressAndSave(job.ctx, job.id, job.url, job.size)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.FromContext(job.ctx).Err(err).Msg("Image processing failed")
|
|
|
|
|
}
|
|
|
|
|
case <-ip.closing:
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ip *ImageProcessor) EnqueueDownloadAndCache(ctx context.Context, id uuid.UUID, url string, size ImageSize) error {
|
|
|
|
|
return ip.enqueueJob(imageJob{ctx: ctx, id: id.String(), size: size, url: url})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ip *ImageProcessor) EnqueueCompressAndSave(ctx context.Context, id string, size ImageSize, reader io.Reader) error {
|
|
|
|
|
return ip.enqueueJob(imageJob{ctx: ctx, id: id, size: size, reader: reader})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ip *ImageProcessor) WaitForIdle(timeout time.Duration) error {
|
|
|
|
|
timer := time.NewTimer(timeout)
|
|
|
|
|
defer timer.Stop()
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
if len(ip.jobs) == 0 {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-time.After(10 * time.Millisecond):
|
|
|
|
|
case <-timer.C:
|
|
|
|
|
return errors.New("image processor did not become idle in time")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ip *ImageProcessor) enqueueJob(job imageJob) error {
|
|
|
|
|
select {
|
|
|
|
|
case ip.jobs <- job:
|
|
|
|
|
return nil
|
|
|
|
|
case <-job.ctx.Done():
|
|
|
|
|
return job.ctx.Err()
|
|
|
|
|
case <-ip.closing:
|
|
|
|
|
return errors.New("image processor closed")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close stops the worker and waits for any ongoing processing to finish
|
|
|
|
|
func (ip *ImageProcessor) Close() {
|
|
|
|
|
close(ip.closing)
|
|
|
|
|
ip.wg.Wait()
|
|
|
|
|
close(ip.jobs)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func ParseImageSize(size string) (ImageSize, error) {
|
|
|
|
|
switch strings.ToLower(size) {
|
|
|
|
|
case "small":
|
|
|
|
|
@ -46,7 +136,7 @@ func ParseImageSize(size string) (ImageSize, error) {
|
|
|
|
|
return "", fmt.Errorf("unknown image size: %s", size)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
func GetImageSize(size ImageSize) int {
|
|
|
|
|
func getImageSize(size ImageSize) int {
|
|
|
|
|
var px int
|
|
|
|
|
switch size {
|
|
|
|
|
case "small":
|
|
|
|
|
@ -88,9 +178,7 @@ func ValidateImageURL(url string) error {
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// DownloadAndCacheImage downloads an image from the given URL, then calls CompressAndSaveImage.
|
|
|
|
|
func DownloadAndCacheImage(ctx context.Context, id uuid.UUID, url string, size ImageSize) error {
|
|
|
|
|
func (ip *ImageProcessor) downloadCompressAndSave(ctx context.Context, id string, url string, size ImageSize) error {
|
|
|
|
|
l := logger.FromContext(ctx)
|
|
|
|
|
err := ValidateImageURL(url)
|
|
|
|
|
if err != nil {
|
|
|
|
|
@ -99,7 +187,7 @@ func DownloadAndCacheImage(ctx context.Context, id uuid.UUID, url string, size I
|
|
|
|
|
l.Debug().Msgf("Downloading image for ID %s", id)
|
|
|
|
|
resp, err := http.Get(url)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to download image: %w", err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
defer resp.Body.Close()
|
|
|
|
|
|
|
|
|
|
@ -107,28 +195,28 @@ func DownloadAndCacheImage(ctx context.Context, id uuid.UUID, url string, size I
|
|
|
|
|
return fmt.Errorf("failed to download image, status code: %d", resp.StatusCode)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return CompressAndSaveImage(ctx, id.String(), size, resp.Body)
|
|
|
|
|
return ip.compressAndSave(ctx, id, size, resp.Body)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Compresses an image to the specified size, then saves it to the correct cache folder.
|
|
|
|
|
func CompressAndSaveImage(ctx context.Context, filename string, size ImageSize, body io.Reader) error {
|
|
|
|
|
func (ip *ImageProcessor) compressAndSave(ctx context.Context, filename string, size ImageSize, body io.Reader) error {
|
|
|
|
|
l := logger.FromContext(ctx)
|
|
|
|
|
|
|
|
|
|
if size == ImageSizeFull {
|
|
|
|
|
return saveImage(filename, size, body)
|
|
|
|
|
l.Debug().Msg("Full size image desired, skipping compression")
|
|
|
|
|
return ip.saveImage(filename, size, body)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
l.Debug().Msg("Creating resized image")
|
|
|
|
|
compressed, err := compressImage(size, body)
|
|
|
|
|
compressed, err := ip.compressImage(size, body)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return saveImage(filename, size, compressed)
|
|
|
|
|
return ip.saveImage(filename, size, compressed)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SaveImage saves an image to the image_cache/{size} folder
|
|
|
|
|
func saveImage(filename string, size ImageSize, data io.Reader) error {
|
|
|
|
|
func (ip *ImageProcessor) saveImage(filename string, size ImageSize, data io.Reader) error {
|
|
|
|
|
configDir := cfg.ConfigDir()
|
|
|
|
|
cacheDir := filepath.Join(configDir, ImageCacheDir)
|
|
|
|
|
|
|
|
|
|
@ -155,12 +243,12 @@ func saveImage(filename string, size ImageSize, data io.Reader) error {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func compressImage(size ImageSize, data io.Reader) (io.Reader, error) {
|
|
|
|
|
func (ip *ImageProcessor) compressImage(size ImageSize, data io.Reader) (io.Reader, error) {
|
|
|
|
|
imgBytes, err := io.ReadAll(data)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
px := GetImageSize(size)
|
|
|
|
|
px := getImageSize(size)
|
|
|
|
|
// Resize with bimg
|
|
|
|
|
imgBytes, err = bimg.NewImage(imgBytes).Process(bimg.Options{
|
|
|
|
|
Width: px,
|
|
|
|
|
|