Files
ocean/file/worker/imageprocessworker.go
konjacpotato cd4079c24d
All checks were successful
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 30s
file: add taskqueue worker process image thumbnail
2025-06-02 22:09:53 +08:00

197 lines
5.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package worker
import (
"bytes"
"context"
"errors"
"fmt"
"godemo/file/file"
"godemo/file/internal/svc"
"image"
"image/jpeg"
_ "image/png"
"io"
"strings"
"time"
"github.com/minio/minio-go/v7"
"github.com/zeromicro/go-zero/core/logx"
)
// 图片处理任务
type ImageProcessTask struct {
FileID string // 原始文件ID
Versions []file.ImageVersion // 需要生成的版本
RequestID string // 请求ID用于追踪
CreatedAt time.Time // 创建时间
}
var ImageProcessTaskQueue chan ImageProcessTask
func InitImageProcessTaskQueue(bufferSize int) {
ImageProcessTaskQueue = make(chan ImageProcessTask, bufferSize)
}
func StartImageProcessWorkers(workerCount int, svcCtx *svc.ServiceContext) {
for i := 0; i < workerCount; i++ {
go func(workerID int) {
for task := range ImageProcessTaskQueue {
logx.Infof("Worker %d processing task for FileID: %s", workerID, task.FileID)
if err := processImage(task, svcCtx); err != nil {
logx.Errorf("Worker %d failed to process FileID: %s, err: %v", workerID, task.FileID, err)
}
}
}(i)
}
}
func processImage(task ImageProcessTask, svcCtx *svc.ServiceContext) error {
// 加载原始图片(示意,假设你实现了这类方法)
originalImage, err := LoadImageFromMinIO(svcCtx.MinioClient, task.FileID, file.ImageVersion_original)
if err != nil {
return err
}
// 如果是 ALL则转化成 THUMBNAIL + PREVIEW
versions := task.Versions
if len(versions) == 1 && versions[0] == file.ImageVersion_all {
versions = []file.ImageVersion{file.ImageVersion_thumbnail, file.ImageVersion_preview}
}
for _, version := range versions {
switch version {
case file.ImageVersion_thumbnail:
err := generateAndUpload(originalImage, task.FileID, file.ImageVersion_thumbnail, svcCtx)
if err != nil {
logx.Errorf("Generate thumbnail failed: %v", err)
}
case file.ImageVersion_preview:
err := generateAndUpload(originalImage, task.FileID, file.ImageVersion_preview, svcCtx)
if err != nil {
logx.Errorf("Generate preview failed: %v", err)
}
default:
logx.Infof("Unsupported image version: %v", version)
}
}
return nil
}
func generateAndUpload(originalImage []byte, fileID string, version file.ImageVersion, svcCtx *svc.ServiceContext) error {
// 示例尺寸(你可以从配置或常量读取)
var width, height int
switch version {
case file.ImageVersion_thumbnail:
width, height = 150, 150
case file.ImageVersion_preview:
width, height = 800, 600
}
// resize伪代码请换成你自己的图像处理库如 imaging
resizedImage, err := ResizeImage(originalImage, width, height)
if err != nil {
logx.Errorf("generateAndUpload failed: %v", err)
return err
}
// 上传到 MinIO
return UploadImageToMinIO(svcCtx.MinioClient, fileID, version, resizedImage)
}
func LoadImageFromMinIO(client *minio.Client, fileID string, version file.ImageVersion) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
bucket, objectKey, _ := GetBucketAndObjectKey(fileID, version)
object, err := client.GetObject(ctx, bucket, objectKey, minio.GetObjectOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get object from MinIO: %w", err)
}
defer object.Close()
buf := new(bytes.Buffer)
if _, err := io.Copy(buf, object); err != nil {
return nil, fmt.Errorf("failed to read object: %w", err)
}
return buf.Bytes(), nil
}
func GetBucketAndObjectKey(fileId string, version file.ImageVersion) (string, string, error) {
bucket := version.String()
objectKey := fileId
if version == file.ImageVersion_original {
// fileID = xx/yy/cc/ll.png 获取第一段作为bucket
bucket, objectKey, _ = GetBucketAndObjectKeyFromFileID(fileId)
}
return bucket, objectKey, nil
}
func GetBucketAndObjectKeyFromFileID(fileID string) (string, string, error) {
idx := strings.Index(fileID, "/")
if idx == -1 {
return "", "", fmt.Errorf("invalid fileID: %s", fileID)
}
return fileID[:idx], fileID[idx+1:], nil
}
func ResizeImage(original []byte, width, height int) ([]byte, error) {
img, _, err := image.Decode(bytes.NewReader(original))
if err != nil {
logx.Errorf("ResizeImage failed: %v", err)
return nil, err
}
srcBounds := img.Bounds()
srcWidth := srcBounds.Dx()
srcHeight := srcBounds.Dy()
if srcWidth == 0 || srcHeight == 0 {
return nil, errors.New("source image has zero size")
}
// 创建缩放后的图像RGBA 格式)
dst := image.NewRGBA(image.Rect(0, 0, width, height))
// 最近邻缩放
for y := 0; y < height; y++ {
for x := 0; x < width; x++ {
srcX := x * srcWidth / width
srcY := y * srcHeight / height
color := img.At(srcX+srcBounds.Min.X, srcY+srcBounds.Min.Y)
dst.Set(x, y, color)
}
}
// 编码为 JPEG
var buf bytes.Buffer
err = jpeg.Encode(&buf, dst, &jpeg.Options{Quality: 85})
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func UploadImageToMinIO(client *minio.Client, fileID string, version file.ImageVersion, data []byte) error {
bucket, objectKey, err := GetBucketAndObjectKey(fileID, version)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err = client.PutObject(ctx, bucket, objectKey, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{
ContentType: "image/jpeg",
})
logx.Infof("Uploaded image to MinIO: %s/%s", bucket, objectKey)
if err != nil {
return fmt.Errorf("upload to MinIO failed: %w", err)
}
return nil
}