Skip to content

Commit

Permalink
TC-1800 Manage CompleteMultipartUpload events
Browse files Browse the repository at this point in the history
Signed-off-by: mrizzi <[email protected]>
  • Loading branch information
mrizzi committed Sep 24, 2024
1 parent d03019f commit 454a93a
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 5 deletions.
2 changes: 2 additions & 0 deletions pkg/handler/collector/s3/messaging/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type KafkaMessage struct {
func (m *KafkaMessage) GetEvent() (EventName, error) {
if m.EventName == "s3:ObjectCreated:Put" {
return PUT, nil
} else if m.EventName == "s3:ObjectCreated:CompleteMultipartUpload" {
return CompleteMultipartUpload, nil
}
return "", nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/handler/collector/s3/messaging/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import "context"
type EventName string

const (
PUT EventName = "PUT"
PUT EventName = "PUT"
CompleteMultipartUpload EventName = "CompleteMultipartUpload"
)

// Message A generic message related to an S3 bucket and item
Expand Down
2 changes: 2 additions & 0 deletions pkg/handler/collector/s3/messaging/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func (m *SqsMessage) GetEvent() (EventName, error) {

if m.Records[0].EventName == "ObjectCreated:Put" {
return PUT, nil
} else if m.Records[0].EventName == "ObjectCreated:CompleteMultipartUpload" {
return CompleteMultipartUpload, nil
}

return "", nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/handler/collector/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,11 @@ func retrieveWithPoll(s S3Collector, ctx context.Context, docChannel chan<- *pro
continue
}

if e, er := m.GetEvent(); e != messaging.PUT {
if e, er := m.GetEvent(); e != messaging.PUT && e != messaging.CompleteMultipartUpload {
if er != nil {
logger.Debugf("skipping message: %v\n", er)
}
logger.Infof("skipping event: %v\n", e)
continue
}
bucketName, err := m.GetBucket()
Expand Down
39 changes: 36 additions & 3 deletions pkg/handler/collector/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,29 @@ func (t *TestProvider) Close(ctx context.Context) error {
return nil
}

// Test Multipart Message Provider
type TestMultipartProvider struct {
queue string
}

func NewTestMultipartProvider(queue string) TestMultipartProvider {
return TestMultipartProvider{queue}
}

func (t *TestMultipartProvider) ReceiveMessage(context.Context) (messaging.Message, error) {
time.Sleep(2 * time.Second)

return &TestMessage{
item: "test-message",
bucket: t.queue,
event: messaging.CompleteMultipartUpload,
}, nil
}

func (t *TestMultipartProvider) Close(ctx context.Context) error {
return nil
}

// Test Message Provider builder
type TestMpBuilder struct {
}
Expand All @@ -81,6 +104,15 @@ func (tb *TestMpBuilder) GetMessageProvider(config messaging.MessageProviderConf
return &provider, nil
}

// Test Message Provider builder
type TestMpMultipartBuilder struct {
}

func (tb *TestMpMultipartBuilder) GetMessageProvider(config messaging.MessageProviderConfig) (messaging.MessageProvider, error) {
provider := NewTestMultipartProvider(config.Queue)
return &provider, nil
}

// Test Bucket
type TestBucket struct {
}
Expand Down Expand Up @@ -108,13 +140,14 @@ func TestS3Collector(t *testing.T) {
ctx := context.Background()

t.Run("no polling", func(t *testing.T) { testNoPolling(t, ctx) })
t.Run("queues split polling", func(t *testing.T) { testQueuesSplitPolling(t, ctx) })
t.Run("queues split polling", func(t *testing.T) { testQueuesSplitPolling(t, ctx, &TestMpBuilder{}) })
t.Run("multipart queues split polling", func(t *testing.T) { testQueuesSplitPolling(t, ctx, &TestMpMultipartBuilder{}) })
}

func testQueuesSplitPolling(t *testing.T, ctx context.Context) {
func testQueuesSplitPolling(t *testing.T, ctx context.Context, mpBuilder messaging.MessageProviderBuilder) {
s3Collector := NewS3Collector(S3CollectorConfig{
Queues: "q1,q2",
MpBuilder: &TestMpBuilder{},
MpBuilder: mpBuilder,
BucketBuilder: &TestBucketBuilder{},
Poll: true,
})
Expand Down

0 comments on commit 454a93a

Please sign in to comment.