Skip to content

Commit

Permalink
feat(oss api): support ceph oss (#775)
Browse files Browse the repository at this point in the history
Co-authored-by: wenxuwan <[email protected]>
Co-authored-by: seeflood <[email protected]>
  • Loading branch information
3 people authored Oct 13, 2022
1 parent 15a9192 commit 4a40509
Show file tree
Hide file tree
Showing 14 changed files with 1,635 additions and 121 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ cmd/layotto_multiple_api/nohup.out
default.etcd/
demo/configuration/common/client
demo/file/client
demo/oss/client
demo/flowcontrol/client
demo/lock/redis/client
demo/pubsub/redis/client/publisher
Expand Down
3 changes: 3 additions & 0 deletions cmd/layotto/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (

aliyun_oss "mosn.io/layotto/components/oss/aliyun"

ceph_oss "mosn.io/layotto/components/oss/ceph"

"mosn.io/mosn/pkg/istio"

aliyun_file "mosn.io/layotto/components/file/aliyun"
Expand Down Expand Up @@ -289,6 +291,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime.WithOssFactory(
oss.NewFactory("aws.oss", aws_oss.NewAwsOss),
oss.NewFactory("aliyun.oss", aliyun_oss.NewAliyunOss),
oss.NewFactory("ceph", ceph_oss.NewCephOss),
),
// PubSub
runtime.WithPubSubFactory(
Expand Down
3 changes: 3 additions & 0 deletions cmd/layotto_multiple_api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (

aliyun_oss "mosn.io/layotto/components/oss/aliyun"

ceph_oss "mosn.io/layotto/components/oss/ceph"

aliyun_file "mosn.io/layotto/components/file/aliyun"
"mosn.io/layotto/components/file/local"

Expand Down Expand Up @@ -303,6 +305,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime.WithOssFactory(
oss.NewFactory("aws.oss", aws_oss.NewAwsOss),
oss.NewFactory("aliyun.oss", aliyun_oss.NewAliyunOss),
oss.NewFactory("ceph", ceph_oss.NewCephOss),
),

// PubSub
Expand Down
3 changes: 3 additions & 0 deletions cmd/layotto_without_xds/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (

aliyun_oss "mosn.io/layotto/components/oss/aliyun"

ceph_oss "mosn.io/layotto/components/oss/ceph"

"mosn.io/layotto/components/file/aliyun"
aws_file "mosn.io/layotto/components/file/aws"
"mosn.io/layotto/components/file/minio"
Expand Down Expand Up @@ -408,6 +410,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp
runtime.WithOssFactory(
oss.NewFactory("aws.oss", aws_oss.NewAwsOss),
oss.NewFactory("aliyun.oss", aliyun_oss.NewAliyunOss),
oss.NewFactory("ceph", ceph_oss.NewCephOss),
),

// Sequencer
Expand Down
43 changes: 0 additions & 43 deletions components/oss/aws/option.go

This file was deleted.

90 changes: 19 additions & 71 deletions components/oss/aws/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,8 @@ func (a *AwsOss) GetObject(ctx context.Context, req *oss.GetObjectInput) (*oss.G
if err != nil {
return nil, err
}
out := &oss.GetObjectOutput{}
err = copier.Copy(out, ob)
if err != nil {
return nil, err
}
out.DataStream = ob.Body
return out, nil

return oss.GetGetObjectOutput(ob)
}

func (a *AwsOss) PutObject(ctx context.Context, req *oss.PutObjectInput) (*oss.PutObjectOutput, error) {
Expand All @@ -114,12 +109,8 @@ func (a *AwsOss) PutObject(ctx context.Context, req *oss.PutObjectInput) (*oss.P
if err != nil {
return nil, err
}
out := &oss.PutObjectOutput{}
err = copier.Copy(out, resp)
if err != nil {
return nil, err
}
return out, err

return oss.GetPutObjectOutput(resp)
}

func (a *AwsOss) DeleteObject(ctx context.Context, req *oss.DeleteObjectInput) (*oss.DeleteObjectOutput, error) {
Expand All @@ -135,7 +126,7 @@ func (a *AwsOss) DeleteObject(ctx context.Context, req *oss.DeleteObjectInput) (
if err != nil {
return nil, err
}
return &oss.DeleteObjectOutput{DeleteMarker: resp.DeleteMarker, RequestCharged: string(resp.RequestCharged), VersionId: *resp.VersionId}, err
return oss.GetDeleteObjectOutput(resp)
}

func (a *AwsOss) PutObjectTagging(ctx context.Context, req *oss.PutObjectTaggingInput) (*oss.PutObjectTaggingOutput, error) {
Expand Down Expand Up @@ -169,7 +160,7 @@ func (a *AwsOss) DeleteObjectTagging(ctx context.Context, req *oss.DeleteObjectT
if err != nil {
return nil, err
}
return &oss.DeleteObjectTaggingOutput{VersionId: *resp.VersionId}, err
return oss.GetDeleteObjectTaggingOutput(resp)
}

func (a *AwsOss) GetObjectTagging(ctx context.Context, req *oss.GetObjectTaggingInput) (*oss.GetObjectTaggingOutput, error) {
Expand All @@ -187,11 +178,7 @@ func (a *AwsOss) GetObjectTagging(ctx context.Context, req *oss.GetObjectTagging
return nil, err
}

output := &oss.GetObjectTaggingOutput{Tags: map[string]string{}}
for _, tags := range resp.TagSet {
output.Tags[*tags.Key] = *tags.Value
}
return output, err
return oss.GetGetObjectTaggingOutput(resp)
}

func (a *AwsOss) CopyObject(ctx context.Context, req *oss.CopyObjectInput) (*oss.CopyObjectOutput, error) {
Expand Down Expand Up @@ -259,14 +246,8 @@ func (a *AwsOss) ListObjects(ctx context.Context, req *oss.ListObjectsInput) (*o
if err != nil {
return nil, err
}
output := &oss.ListObjectsOutput{}
err = copier.CopyWithOption(output, resp, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{time2int64}})
// if not return NextMarker, use the value of the last Key in the response as the marker
if output.IsTruncated && output.NextMarker == "" {
index := len(output.Contents) - 1
output.NextMarker = output.Contents[index].Key
}
return output, err

return oss.GetListObjectsOutput(resp)
}
func (a *AwsOss) GetObjectCannedAcl(ctx context.Context, req *oss.GetObjectCannedAclInput) (*oss.GetObjectCannedAclOutput, error) {
return nil, errors.New("GetObjectCannedAcl method not supported on AWS")
Expand Down Expand Up @@ -304,7 +285,7 @@ func (a *AwsOss) CreateMultipartUpload(ctx context.Context, req *oss.CreateMulti
return nil, err
}
input := &s3.CreateMultipartUploadInput{}
err = copier.CopyWithOption(input, req, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{int642time}})
err = copier.CopyWithOption(input, req, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{oss.Int64ToTime}})
if err != nil {
log.DefaultLogger.Errorf("copy CreateMultipartUploadInput fail, err: %+v", err)
return nil, err
Expand All @@ -314,7 +295,7 @@ func (a *AwsOss) CreateMultipartUpload(ctx context.Context, req *oss.CreateMulti
return nil, err
}
output := &oss.CreateMultipartUploadOutput{}
copier.CopyWithOption(output, resp, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{time2int64}})
copier.CopyWithOption(output, resp, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{oss.TimeToInt64}})
return output, err
}
func (a *AwsOss) UploadPart(ctx context.Context, req *oss.UploadPartInput) (*oss.UploadPartOutput, error) {
Expand All @@ -332,12 +313,8 @@ func (a *AwsOss) UploadPart(ctx context.Context, req *oss.UploadPartInput) (*oss
if err != nil {
return nil, err
}
output := &oss.UploadPartOutput{}
err = copier.Copy(output, resp)
if err != nil {
return nil, err
}
return output, err

return oss.GetUploadPartOutput(resp)
}
func (a *AwsOss) UploadPartCopy(ctx context.Context, req *oss.UploadPartCopyInput) (*oss.UploadPartCopyOutput, error) {
client, err := a.getClient()
Expand All @@ -360,9 +337,8 @@ func (a *AwsOss) UploadPartCopy(ctx context.Context, req *oss.UploadPartCopyInpu
if err != nil {
return nil, err
}
output := &oss.UploadPartCopyOutput{}
err = copier.Copy(output, resp)
return output, err

return oss.GetUploadPartCopyOutput(resp)
}
func (a *AwsOss) CompleteMultipartUpload(ctx context.Context, req *oss.CompleteMultipartUploadInput) (*oss.CompleteMultipartUploadOutput, error) {
client, err := a.getClient()
Expand Down Expand Up @@ -417,20 +393,8 @@ func (a *AwsOss) ListMultipartUploads(ctx context.Context, req *oss.ListMultipar
if err != nil {
return nil, err
}
output := &oss.ListMultipartUploadsOutput{CommonPrefixes: []string{}, Uploads: []*oss.MultipartUpload{}}
err = copier.Copy(output, resp)
if err != nil {
return nil, err
}
for _, v := range resp.CommonPrefixes {
output.CommonPrefixes = append(output.CommonPrefixes, *v.Prefix)
}
for _, v := range resp.Uploads {
upload := &oss.MultipartUpload{}
copier.CopyWithOption(upload, v, copier.Option{IgnoreEmpty: true, DeepCopy: true})
output.Uploads = append(output.Uploads, upload)
}
return output, err

return oss.GetListMultipartUploadsOutput(resp)
}
func (a *AwsOss) ListObjectVersions(ctx context.Context, req *oss.ListObjectVersionsInput) (*oss.ListObjectVersionsOutput, error) {
client, err := a.getClient()
Expand All @@ -446,24 +410,8 @@ func (a *AwsOss) ListObjectVersions(ctx context.Context, req *oss.ListObjectVers
if err != nil {
return nil, err
}
output := &oss.ListObjectVersionsOutput{}
err = copier.Copy(output, resp)
if err != nil {
return nil, err
}
for _, v := range resp.CommonPrefixes {
output.CommonPrefixes = append(output.CommonPrefixes, *v.Prefix)
}
for _, v := range resp.DeleteMarkers {
entry := &oss.DeleteMarkerEntry{IsLatest: v.IsLatest, Key: *v.Key, Owner: &oss.Owner{DisplayName: *v.Owner.DisplayName, ID: *v.Owner.ID}, VersionId: *v.VersionId}
output.DeleteMarkers = append(output.DeleteMarkers, entry)
}
for _, v := range resp.Versions {
version := &oss.ObjectVersion{}
copier.CopyWithOption(version, v, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{time2int64}})
output.Versions = append(output.Versions, version)
}
return output, err

return oss.GetListObjectVersionsOutput(resp)
}

func (a *AwsOss) HeadObject(ctx context.Context, req *oss.HeadObjectInput) (*oss.HeadObjectOutput, error) {
Expand Down
31 changes: 30 additions & 1 deletion components/oss/aws/oss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/feature/s3/manager"

"github.com/aws/aws-sdk-go-v2/service/s3"

"github.com/jinzhu/copier"

"github.com/aws/aws-sdk-go-v2/service/s3/types"
Expand Down Expand Up @@ -136,6 +140,31 @@ func TestAwsOss(t *testing.T) {

err = instance.UpdateUploadBandwidthRateLimit(context.TODO(), &oss.UpdateBandwidthRateLimitInput{})
assert.NotNil(t, err)

_, err = oss.GetGetObjectOutput(&s3.GetObjectOutput{})
assert.Nil(t, err)
_, err = oss.GetPutObjectOutput(&manager.UploadOutput{})
assert.Nil(t, err)
_, err = oss.GetDeleteObjectOutput(&s3.DeleteObjectOutput{})
assert.Nil(t, err)
_, err = oss.GetDeleteObjectOutput(&s3.DeleteObjectOutput{})
assert.Nil(t, err)
_, err = oss.GetGetObjectTaggingOutput(&s3.GetObjectTaggingOutput{})
assert.Nil(t, err)
_, err = oss.GetListObjectsOutput(&s3.ListObjectsOutput{})
assert.Nil(t, err)
_, err = oss.GetGetObjectCannedAclOutput(&s3.GetObjectAclOutput{})
assert.Nil(t, err)
_, err = oss.GetUploadPartOutput(&s3.UploadPartOutput{})
assert.Nil(t, err)
_, err = oss.GetUploadPartCopyOutput(&s3.UploadPartCopyOutput{})
assert.Nil(t, err)
_, err = oss.GetListPartsOutput(&s3.ListPartsOutput{})
assert.Nil(t, err)
_, err = oss.GetListMultipartUploadsOutput(&s3.ListMultipartUploadsOutput{})
assert.Nil(t, err)
_, err = oss.GetListObjectVersionsOutput(&s3.ListObjectVersionsOutput{})
assert.Nil(t, err)
}

func TestDeepCopy(t *testing.T) {
Expand All @@ -152,7 +181,7 @@ func TestDeepCopy(t *testing.T) {
VersionId: &value,
}
tovalue := &oss.ObjectVersion{}
err := copier.CopyWithOption(tovalue, fromValue, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{time2int64}})
err := copier.CopyWithOption(tovalue, fromValue, copier.Option{IgnoreEmpty: true, DeepCopy: true, Converters: []copier.TypeConverter{oss.TimeToInt64}})
assert.Nil(t, err)
assert.Equal(t, tovalue.Owner.DisplayName, value)
}
Loading

0 comments on commit 4a40509

Please sign in to comment.