file: add taskqueue worker process image thumbnail
All checks were successful
Gitea Actions Demo / Explore-Gitea-Actions (push) Successful in 30s

This commit is contained in:
2025-06-02 22:09:53 +08:00
parent 0d65a890f6
commit cd4079c24d
9 changed files with 569 additions and 50 deletions

View File

@ -8,6 +8,7 @@ import (
"godemo/file/internal/config"
"godemo/file/internal/server"
"godemo/file/internal/svc"
"godemo/file/worker"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/service"
@ -25,6 +26,11 @@ func main() {
conf.MustLoad(*configFile, &c)
ctx := svc.NewServiceContext(c)
// 初始化任务队列
worker.InitImageProcessTaskQueue(1000)
// 启动worker池
worker.StartImageProcessWorkers(5, ctx)
s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
file.RegisterFileServer(grpcServer, server.NewFileServer(ctx))

View File

@ -21,6 +21,164 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type ImageVersion int32
const (
ImageVersion_original ImageVersion = 0
ImageVersion_thumbnail ImageVersion = 1 // 缩略图
ImageVersion_preview ImageVersion = 2 // 预览图
ImageVersion_all ImageVersion = 9 // 所有版本
)
// Enum value maps for ImageVersion.
var (
ImageVersion_name = map[int32]string{
0: "original",
1: "thumbnail",
2: "preview",
9: "all",
}
ImageVersion_value = map[string]int32{
"original": 0,
"thumbnail": 1,
"preview": 2,
"all": 9,
}
)
func (x ImageVersion) Enum() *ImageVersion {
p := new(ImageVersion)
*p = x
return p
}
func (x ImageVersion) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (ImageVersion) Descriptor() protoreflect.EnumDescriptor {
return file_rpc_file_proto_enumTypes[0].Descriptor()
}
func (ImageVersion) Type() protoreflect.EnumType {
return &file_rpc_file_proto_enumTypes[0]
}
func (x ImageVersion) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Use ImageVersion.Descriptor instead.
func (ImageVersion) EnumDescriptor() ([]byte, []int) {
return file_rpc_file_proto_rawDescGZIP(), []int{0}
}
// 生成图片版本请求
type GenerateImageVersionsRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
FileId string `protobuf:"bytes,1,opt,name=file_id,json=fileId,proto3" json:"file_id,omitempty"` // 原始图片ID
Versions []ImageVersion `protobuf:"varint,2,rep,packed,name=versions,proto3,enum=file.ImageVersion" json:"versions,omitempty"` // 需要生成的版本
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *GenerateImageVersionsRequest) Reset() {
*x = GenerateImageVersionsRequest{}
mi := &file_rpc_file_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *GenerateImageVersionsRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GenerateImageVersionsRequest) ProtoMessage() {}
func (x *GenerateImageVersionsRequest) ProtoReflect() protoreflect.Message {
mi := &file_rpc_file_proto_msgTypes[0]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GenerateImageVersionsRequest.ProtoReflect.Descriptor instead.
func (*GenerateImageVersionsRequest) Descriptor() ([]byte, []int) {
return file_rpc_file_proto_rawDescGZIP(), []int{0}
}
func (x *GenerateImageVersionsRequest) GetFileId() string {
if x != nil {
return x.FileId
}
return ""
}
func (x *GenerateImageVersionsRequest) GetVersions() []ImageVersion {
if x != nil {
return x.Versions
}
return nil
}
// 生成图片版本响应
type GenerateImageVersionsResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
TaskId string `protobuf:"bytes,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"` // 异步任务ID
Accepted bool `protobuf:"varint,2,opt,name=accepted,proto3" json:"accepted,omitempty"` // 任务是否被接受
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *GenerateImageVersionsResponse) Reset() {
*x = GenerateImageVersionsResponse{}
mi := &file_rpc_file_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *GenerateImageVersionsResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GenerateImageVersionsResponse) ProtoMessage() {}
func (x *GenerateImageVersionsResponse) ProtoReflect() protoreflect.Message {
mi := &file_rpc_file_proto_msgTypes[1]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GenerateImageVersionsResponse.ProtoReflect.Descriptor instead.
func (*GenerateImageVersionsResponse) Descriptor() ([]byte, []int) {
return file_rpc_file_proto_rawDescGZIP(), []int{1}
}
func (x *GenerateImageVersionsResponse) GetTaskId() string {
if x != nil {
return x.TaskId
}
return ""
}
func (x *GenerateImageVersionsResponse) GetAccepted() bool {
if x != nil {
return x.Accepted
}
return false
}
// 上传文件请求
type UploadRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
@ -35,7 +193,7 @@ type UploadRequest struct {
func (x *UploadRequest) Reset() {
*x = UploadRequest{}
mi := &file_rpc_file_proto_msgTypes[0]
mi := &file_rpc_file_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -47,7 +205,7 @@ func (x *UploadRequest) String() string {
func (*UploadRequest) ProtoMessage() {}
func (x *UploadRequest) ProtoReflect() protoreflect.Message {
mi := &file_rpc_file_proto_msgTypes[0]
mi := &file_rpc_file_proto_msgTypes[2]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -60,7 +218,7 @@ func (x *UploadRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use UploadRequest.ProtoReflect.Descriptor instead.
func (*UploadRequest) Descriptor() ([]byte, []int) {
return file_rpc_file_proto_rawDescGZIP(), []int{0}
return file_rpc_file_proto_rawDescGZIP(), []int{2}
}
func (x *UploadRequest) GetFilename() string {
@ -109,7 +267,7 @@ type UploadResponse struct {
func (x *UploadResponse) Reset() {
*x = UploadResponse{}
mi := &file_rpc_file_proto_msgTypes[1]
mi := &file_rpc_file_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -121,7 +279,7 @@ func (x *UploadResponse) String() string {
func (*UploadResponse) ProtoMessage() {}
func (x *UploadResponse) ProtoReflect() protoreflect.Message {
mi := &file_rpc_file_proto_msgTypes[1]
mi := &file_rpc_file_proto_msgTypes[3]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -134,7 +292,7 @@ func (x *UploadResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use UploadResponse.ProtoReflect.Descriptor instead.
func (*UploadResponse) Descriptor() ([]byte, []int) {
return file_rpc_file_proto_rawDescGZIP(), []int{1}
return file_rpc_file_proto_rawDescGZIP(), []int{3}
}
func (x *UploadResponse) GetFileId() string {
@ -161,7 +319,7 @@ type GetFileUrlRequest struct {
func (x *GetFileUrlRequest) Reset() {
*x = GetFileUrlRequest{}
mi := &file_rpc_file_proto_msgTypes[2]
mi := &file_rpc_file_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -173,7 +331,7 @@ func (x *GetFileUrlRequest) String() string {
func (*GetFileUrlRequest) ProtoMessage() {}
func (x *GetFileUrlRequest) ProtoReflect() protoreflect.Message {
mi := &file_rpc_file_proto_msgTypes[2]
mi := &file_rpc_file_proto_msgTypes[4]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -186,7 +344,7 @@ func (x *GetFileUrlRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use GetFileUrlRequest.ProtoReflect.Descriptor instead.
func (*GetFileUrlRequest) Descriptor() ([]byte, []int) {
return file_rpc_file_proto_rawDescGZIP(), []int{2}
return file_rpc_file_proto_rawDescGZIP(), []int{4}
}
func (x *GetFileUrlRequest) GetFileId() string {
@ -206,7 +364,7 @@ type GetFileUrlResponse struct {
func (x *GetFileUrlResponse) Reset() {
*x = GetFileUrlResponse{}
mi := &file_rpc_file_proto_msgTypes[3]
mi := &file_rpc_file_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -218,7 +376,7 @@ func (x *GetFileUrlResponse) String() string {
func (*GetFileUrlResponse) ProtoMessage() {}
func (x *GetFileUrlResponse) ProtoReflect() protoreflect.Message {
mi := &file_rpc_file_proto_msgTypes[3]
mi := &file_rpc_file_proto_msgTypes[5]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -231,7 +389,7 @@ func (x *GetFileUrlResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use GetFileUrlResponse.ProtoReflect.Descriptor instead.
func (*GetFileUrlResponse) Descriptor() ([]byte, []int) {
return file_rpc_file_proto_rawDescGZIP(), []int{3}
return file_rpc_file_proto_rawDescGZIP(), []int{5}
}
func (x *GetFileUrlResponse) GetUrl() string {
@ -251,7 +409,7 @@ type DeleteRequest struct {
func (x *DeleteRequest) Reset() {
*x = DeleteRequest{}
mi := &file_rpc_file_proto_msgTypes[4]
mi := &file_rpc_file_proto_msgTypes[6]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -263,7 +421,7 @@ func (x *DeleteRequest) String() string {
func (*DeleteRequest) ProtoMessage() {}
func (x *DeleteRequest) ProtoReflect() protoreflect.Message {
mi := &file_rpc_file_proto_msgTypes[4]
mi := &file_rpc_file_proto_msgTypes[6]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -276,7 +434,7 @@ func (x *DeleteRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use DeleteRequest.ProtoReflect.Descriptor instead.
func (*DeleteRequest) Descriptor() ([]byte, []int) {
return file_rpc_file_proto_rawDescGZIP(), []int{4}
return file_rpc_file_proto_rawDescGZIP(), []int{6}
}
func (x *DeleteRequest) GetFileId() string {
@ -296,7 +454,7 @@ type DeleteResponse struct {
func (x *DeleteResponse) Reset() {
*x = DeleteResponse{}
mi := &file_rpc_file_proto_msgTypes[5]
mi := &file_rpc_file_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -308,7 +466,7 @@ func (x *DeleteResponse) String() string {
func (*DeleteResponse) ProtoMessage() {}
func (x *DeleteResponse) ProtoReflect() protoreflect.Message {
mi := &file_rpc_file_proto_msgTypes[5]
mi := &file_rpc_file_proto_msgTypes[7]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -321,7 +479,7 @@ func (x *DeleteResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use DeleteResponse.ProtoReflect.Descriptor instead.
func (*DeleteResponse) Descriptor() ([]byte, []int) {
return file_rpc_file_proto_rawDescGZIP(), []int{5}
return file_rpc_file_proto_rawDescGZIP(), []int{7}
}
func (x *DeleteResponse) GetSuccess() bool {
@ -335,7 +493,13 @@ var File_rpc_file_proto protoreflect.FileDescriptor
const file_rpc_file_proto_rawDesc = "" +
"\n" +
"\x0erpc/file.proto\x12\x04file\"\x98\x01\n" +
"\x0erpc/file.proto\x12\x04file\"g\n" +
"\x1cGenerateImageVersionsRequest\x12\x17\n" +
"\afile_id\x18\x01 \x01(\tR\x06fileId\x12.\n" +
"\bversions\x18\x02 \x03(\x0e2\x12.file.ImageVersionR\bversions\"T\n" +
"\x1dGenerateImageVersionsResponse\x12\x17\n" +
"\atask_id\x18\x01 \x01(\tR\x06taskId\x12\x1a\n" +
"\baccepted\x18\x02 \x01(\bR\baccepted\"\x98\x01\n" +
"\rUploadRequest\x12\x1a\n" +
"\bfilename\x18\x01 \x01(\tR\bfilename\x12!\n" +
"\fcontent_type\x18\x02 \x01(\tR\vcontentType\x12\x18\n" +
@ -352,12 +516,18 @@ const file_rpc_file_proto_rawDesc = "" +
"\rDeleteRequest\x12\x17\n" +
"\afile_id\x18\x01 \x01(\tR\x06fileId\"*\n" +
"\x0eDeleteResponse\x12\x18\n" +
"\asuccess\x18\x01 \x01(\bR\asuccess2\xb1\x01\n" +
"\asuccess\x18\x01 \x01(\bR\asuccess*A\n" +
"\fImageVersion\x12\f\n" +
"\boriginal\x10\x00\x12\r\n" +
"\tthumbnail\x10\x01\x12\v\n" +
"\apreview\x10\x02\x12\a\n" +
"\x03all\x10\t2\x93\x02\n" +
"\x04File\x123\n" +
"\x06Upload\x12\x13.file.UploadRequest\x1a\x14.file.UploadResponse\x12?\n" +
"\n" +
"GetFileUrl\x12\x17.file.GetFileUrlRequest\x1a\x18.file.GetFileUrlResponse\x123\n" +
"\x06Delete\x12\x13.file.DeleteRequest\x1a\x14.file.DeleteResponseB\bZ\x06./fileb\x06proto3"
"\x06Delete\x12\x13.file.DeleteRequest\x1a\x14.file.DeleteResponse\x12`\n" +
"\x15GenerateImageVersions\x12\".file.GenerateImageVersionsRequest\x1a#.file.GenerateImageVersionsResponseB\bZ\x06./fileb\x06proto3"
var (
file_rpc_file_proto_rawDescOnce sync.Once
@ -371,27 +541,34 @@ func file_rpc_file_proto_rawDescGZIP() []byte {
return file_rpc_file_proto_rawDescData
}
var file_rpc_file_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_rpc_file_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_rpc_file_proto_msgTypes = make([]protoimpl.MessageInfo, 8)
var file_rpc_file_proto_goTypes = []any{
(*UploadRequest)(nil), // 0: file.UploadRequest
(*UploadResponse)(nil), // 1: file.UploadResponse
(*GetFileUrlRequest)(nil), // 2: file.GetFileUrlRequest
(*GetFileUrlResponse)(nil), // 3: file.GetFileUrlResponse
(*DeleteRequest)(nil), // 4: file.DeleteRequest
(*DeleteResponse)(nil), // 5: file.DeleteResponse
(ImageVersion)(0), // 0: file.ImageVersion
(*GenerateImageVersionsRequest)(nil), // 1: file.GenerateImageVersionsRequest
(*GenerateImageVersionsResponse)(nil), // 2: file.GenerateImageVersionsResponse
(*UploadRequest)(nil), // 3: file.UploadRequest
(*UploadResponse)(nil), // 4: file.UploadResponse
(*GetFileUrlRequest)(nil), // 5: file.GetFileUrlRequest
(*GetFileUrlResponse)(nil), // 6: file.GetFileUrlResponse
(*DeleteRequest)(nil), // 7: file.DeleteRequest
(*DeleteResponse)(nil), // 8: file.DeleteResponse
}
var file_rpc_file_proto_depIdxs = []int32{
0, // 0: file.File.Upload:input_type -> file.UploadRequest
2, // 1: file.File.GetFileUrl:input_type -> file.GetFileUrlRequest
4, // 2: file.File.Delete:input_type -> file.DeleteRequest
1, // 3: file.File.Upload:output_type -> file.UploadResponse
3, // 4: file.File.GetFileUrl:output_type -> file.GetFileUrlResponse
5, // 5: file.File.Delete:output_type -> file.DeleteResponse
3, // [3:6] is the sub-list for method output_type
0, // [0:3] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
0, // 0: file.GenerateImageVersionsRequest.versions:type_name -> file.ImageVersion
3, // 1: file.File.Upload:input_type -> file.UploadRequest
5, // 2: file.File.GetFileUrl:input_type -> file.GetFileUrlRequest
7, // 3: file.File.Delete:input_type -> file.DeleteRequest
1, // 4: file.File.GenerateImageVersions:input_type -> file.GenerateImageVersionsRequest
4, // 5: file.File.Upload:output_type -> file.UploadResponse
6, // 6: file.File.GetFileUrl:output_type -> file.GetFileUrlResponse
8, // 7: file.File.Delete:output_type -> file.DeleteResponse
2, // 8: file.File.GenerateImageVersions:output_type -> file.GenerateImageVersionsResponse
5, // [5:9] is the sub-list for method output_type
1, // [1:5] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_rpc_file_proto_init() }
@ -404,13 +581,14 @@ func file_rpc_file_proto_init() {
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_rpc_file_proto_rawDesc), len(file_rpc_file_proto_rawDesc)),
NumEnums: 0,
NumMessages: 6,
NumEnums: 1,
NumMessages: 8,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_rpc_file_proto_goTypes,
DependencyIndexes: file_rpc_file_proto_depIdxs,
EnumInfos: file_rpc_file_proto_enumTypes,
MessageInfos: file_rpc_file_proto_msgTypes,
}.Build()
File_rpc_file_proto = out.File

View File

@ -19,9 +19,10 @@ import (
const _ = grpc.SupportPackageIsVersion9
const (
File_Upload_FullMethodName = "/file.File/Upload"
File_GetFileUrl_FullMethodName = "/file.File/GetFileUrl"
File_Delete_FullMethodName = "/file.File/Delete"
File_Upload_FullMethodName = "/file.File/Upload"
File_GetFileUrl_FullMethodName = "/file.File/GetFileUrl"
File_Delete_FullMethodName = "/file.File/Delete"
File_GenerateImageVersions_FullMethodName = "/file.File/GenerateImageVersions"
)
// FileClient is the client API for File service.
@ -36,6 +37,8 @@ type FileClient interface {
GetFileUrl(ctx context.Context, in *GetFileUrlRequest, opts ...grpc.CallOption) (*GetFileUrlResponse, error)
// 删除文件
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error)
// 异步生成图片版本接口
GenerateImageVersions(ctx context.Context, in *GenerateImageVersionsRequest, opts ...grpc.CallOption) (*GenerateImageVersionsResponse, error)
}
type fileClient struct {
@ -76,6 +79,16 @@ func (c *fileClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc
return out, nil
}
func (c *fileClient) GenerateImageVersions(ctx context.Context, in *GenerateImageVersionsRequest, opts ...grpc.CallOption) (*GenerateImageVersionsResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(GenerateImageVersionsResponse)
err := c.cc.Invoke(ctx, File_GenerateImageVersions_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// FileServer is the server API for File service.
// All implementations must embed UnimplementedFileServer
// for forward compatibility.
@ -88,6 +101,8 @@ type FileServer interface {
GetFileUrl(context.Context, *GetFileUrlRequest) (*GetFileUrlResponse, error)
// 删除文件
Delete(context.Context, *DeleteRequest) (*DeleteResponse, error)
// 异步生成图片版本接口
GenerateImageVersions(context.Context, *GenerateImageVersionsRequest) (*GenerateImageVersionsResponse, error)
mustEmbedUnimplementedFileServer()
}
@ -107,6 +122,9 @@ func (UnimplementedFileServer) GetFileUrl(context.Context, *GetFileUrlRequest) (
func (UnimplementedFileServer) Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented")
}
func (UnimplementedFileServer) GenerateImageVersions(context.Context, *GenerateImageVersionsRequest) (*GenerateImageVersionsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GenerateImageVersions not implemented")
}
func (UnimplementedFileServer) mustEmbedUnimplementedFileServer() {}
func (UnimplementedFileServer) testEmbeddedByValue() {}
@ -182,6 +200,24 @@ func _File_Delete_Handler(srv interface{}, ctx context.Context, dec func(interfa
return interceptor(ctx, in, info, handler)
}
func _File_GenerateImageVersions_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GenerateImageVersionsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(FileServer).GenerateImageVersions(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: File_GenerateImageVersions_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(FileServer).GenerateImageVersions(ctx, req.(*GenerateImageVersionsRequest))
}
return interceptor(ctx, in, info, handler)
}
// File_ServiceDesc is the grpc.ServiceDesc for File service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -201,6 +237,10 @@ var File_ServiceDesc = grpc.ServiceDesc{
MethodName: "Delete",
Handler: _File_Delete_Handler,
},
{
MethodName: "GenerateImageVersions",
Handler: _File_GenerateImageVersions_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "rpc/file.proto",

View File

@ -14,12 +14,14 @@ import (
)
type (
DeleteRequest = file.DeleteRequest
DeleteResponse = file.DeleteResponse
GetFileUrlRequest = file.GetFileUrlRequest
GetFileUrlResponse = file.GetFileUrlResponse
UploadRequest = file.UploadRequest
UploadResponse = file.UploadResponse
DeleteRequest = file.DeleteRequest
DeleteResponse = file.DeleteResponse
GenerateImageVersionsRequest = file.GenerateImageVersionsRequest
GenerateImageVersionsResponse = file.GenerateImageVersionsResponse
GetFileUrlRequest = file.GetFileUrlRequest
GetFileUrlResponse = file.GetFileUrlResponse
UploadRequest = file.UploadRequest
UploadResponse = file.UploadResponse
File interface {
// 上传文件(图片/头像/壁纸等)
@ -28,6 +30,8 @@ type (
GetFileUrl(ctx context.Context, in *GetFileUrlRequest, opts ...grpc.CallOption) (*GetFileUrlResponse, error)
// 删除文件
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error)
// 异步生成图片版本接口
GenerateImageVersions(ctx context.Context, in *GenerateImageVersionsRequest, opts ...grpc.CallOption) (*GenerateImageVersionsResponse, error)
}
defaultFile struct {
@ -58,3 +62,9 @@ func (m *defaultFile) Delete(ctx context.Context, in *DeleteRequest, opts ...grp
client := file.NewFileClient(m.cli.Conn())
return client.Delete(ctx, in, opts...)
}
// 异步生成图片版本接口
func (m *defaultFile) GenerateImageVersions(ctx context.Context, in *GenerateImageVersionsRequest, opts ...grpc.CallOption) (*GenerateImageVersionsResponse, error) {
client := file.NewFileClient(m.cli.Conn())
return client.GenerateImageVersions(ctx, in, opts...)
}

View File

@ -0,0 +1,49 @@
package logic
import (
"context"
"time"
"godemo/file/file"
"godemo/file/internal/svc"
"godemo/file/worker"
"github.com/google/uuid"
"github.com/zeromicro/go-zero/core/logx"
)
type GenerateImageVersionsLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewGenerateImageVersionsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GenerateImageVersionsLogic {
return &GenerateImageVersionsLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
// 异步生成图片版本接口
func (l *GenerateImageVersionsLogic) GenerateImageVersions(in *file.GenerateImageVersionsRequest) (*file.GenerateImageVersionsResponse, error) {
// 1. 验证输入参数
// 2. 检查文件是否存在
// 3. 创建任务对象,并提交到任务队列
requestID := uuid.New().String()
worker.ImageProcessTaskQueue <- worker.ImageProcessTask{
FileID: in.FileId,
Versions: in.Versions,
RequestID: requestID,
CreatedAt: time.Now(),
}
// 4. 返回响应
return &file.GenerateImageVersionsResponse{
TaskId: requestID,
Accepted: true,
}, nil
}

View File

@ -40,3 +40,9 @@ func (s *FileServer) Delete(ctx context.Context, in *file.DeleteRequest) (*file.
l := logic.NewDeleteLogic(ctx, s.svcCtx)
return l.Delete(in)
}
// 异步生成图片版本接口
func (s *FileServer) GenerateImageVersions(ctx context.Context, in *file.GenerateImageVersionsRequest) (*file.GenerateImageVersionsResponse, error) {
l := logic.NewGenerateImageVersionsLogic(ctx, s.svcCtx)
return l.GenerateImageVersions(in)
}

View File

@ -0,0 +1,196 @@
package worker
import (
"bytes"
"context"
"errors"
"fmt"
"godemo/file/file"
"godemo/file/internal/svc"
"image"
"image/jpeg"
_ "image/png"
"io"
"strings"
"time"
"github.com/minio/minio-go/v7"
"github.com/zeromicro/go-zero/core/logx"
)
// 图片处理任务
type ImageProcessTask struct {
FileID string // 原始文件ID
Versions []file.ImageVersion // 需要生成的版本
RequestID string // 请求ID用于追踪
CreatedAt time.Time // 创建时间
}
var ImageProcessTaskQueue chan ImageProcessTask
func InitImageProcessTaskQueue(bufferSize int) {
ImageProcessTaskQueue = make(chan ImageProcessTask, bufferSize)
}
func StartImageProcessWorkers(workerCount int, svcCtx *svc.ServiceContext) {
for i := 0; i < workerCount; i++ {
go func(workerID int) {
for task := range ImageProcessTaskQueue {
logx.Infof("Worker %d processing task for FileID: %s", workerID, task.FileID)
if err := processImage(task, svcCtx); err != nil {
logx.Errorf("Worker %d failed to process FileID: %s, err: %v", workerID, task.FileID, err)
}
}
}(i)
}
}
func processImage(task ImageProcessTask, svcCtx *svc.ServiceContext) error {
// 加载原始图片(示意,假设你实现了这类方法)
originalImage, err := LoadImageFromMinIO(svcCtx.MinioClient, task.FileID, file.ImageVersion_original)
if err != nil {
return err
}
// 如果是 ALL则转化成 THUMBNAIL + PREVIEW
versions := task.Versions
if len(versions) == 1 && versions[0] == file.ImageVersion_all {
versions = []file.ImageVersion{file.ImageVersion_thumbnail, file.ImageVersion_preview}
}
for _, version := range versions {
switch version {
case file.ImageVersion_thumbnail:
err := generateAndUpload(originalImage, task.FileID, file.ImageVersion_thumbnail, svcCtx)
if err != nil {
logx.Errorf("Generate thumbnail failed: %v", err)
}
case file.ImageVersion_preview:
err := generateAndUpload(originalImage, task.FileID, file.ImageVersion_preview, svcCtx)
if err != nil {
logx.Errorf("Generate preview failed: %v", err)
}
default:
logx.Infof("Unsupported image version: %v", version)
}
}
return nil
}
func generateAndUpload(originalImage []byte, fileID string, version file.ImageVersion, svcCtx *svc.ServiceContext) error {
// 示例尺寸(你可以从配置或常量读取)
var width, height int
switch version {
case file.ImageVersion_thumbnail:
width, height = 150, 150
case file.ImageVersion_preview:
width, height = 800, 600
}
// resize伪代码请换成你自己的图像处理库如 imaging
resizedImage, err := ResizeImage(originalImage, width, height)
if err != nil {
logx.Errorf("generateAndUpload failed: %v", err)
return err
}
// 上传到 MinIO
return UploadImageToMinIO(svcCtx.MinioClient, fileID, version, resizedImage)
}
func LoadImageFromMinIO(client *minio.Client, fileID string, version file.ImageVersion) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
bucket, objectKey, _ := GetBucketAndObjectKey(fileID, version)
object, err := client.GetObject(ctx, bucket, objectKey, minio.GetObjectOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get object from MinIO: %w", err)
}
defer object.Close()
buf := new(bytes.Buffer)
if _, err := io.Copy(buf, object); err != nil {
return nil, fmt.Errorf("failed to read object: %w", err)
}
return buf.Bytes(), nil
}
func GetBucketAndObjectKey(fileId string, version file.ImageVersion) (string, string, error) {
bucket := version.String()
objectKey := fileId
if version == file.ImageVersion_original {
// fileID = xx/yy/cc/ll.png 获取第一段作为bucket
bucket, objectKey, _ = GetBucketAndObjectKeyFromFileID(fileId)
}
return bucket, objectKey, nil
}
func GetBucketAndObjectKeyFromFileID(fileID string) (string, string, error) {
idx := strings.Index(fileID, "/")
if idx == -1 {
return "", "", fmt.Errorf("invalid fileID: %s", fileID)
}
return fileID[:idx], fileID[idx+1:], nil
}
func ResizeImage(original []byte, width, height int) ([]byte, error) {
img, _, err := image.Decode(bytes.NewReader(original))
if err != nil {
logx.Errorf("ResizeImage failed: %v", err)
return nil, err
}
srcBounds := img.Bounds()
srcWidth := srcBounds.Dx()
srcHeight := srcBounds.Dy()
if srcWidth == 0 || srcHeight == 0 {
return nil, errors.New("source image has zero size")
}
// 创建缩放后的图像RGBA 格式)
dst := image.NewRGBA(image.Rect(0, 0, width, height))
// 最近邻缩放
for y := 0; y < height; y++ {
for x := 0; x < width; x++ {
srcX := x * srcWidth / width
srcY := y * srcHeight / height
color := img.At(srcX+srcBounds.Min.X, srcY+srcBounds.Min.Y)
dst.Set(x, y, color)
}
}
// 编码为 JPEG
var buf bytes.Buffer
err = jpeg.Encode(&buf, dst, &jpeg.Options{Quality: 85})
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func UploadImageToMinIO(client *minio.Client, fileID string, version file.ImageVersion, data []byte) error {
bucket, objectKey, err := GetBucketAndObjectKey(fileID, version)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err = client.PutObject(ctx, bucket, objectKey, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{
ContentType: "image/jpeg",
})
logx.Infof("Uploaded image to MinIO: %s/%s", bucket, objectKey)
if err != nil {
return fmt.Errorf("upload to MinIO failed: %w", err)
}
return nil
}