Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
  • Loading branch information
jyu6 committed Feb 2, 2023
1 parent 2f3797d commit 1cfb7ad
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions pkg/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ var (
testStartTime = time.Unix(1636470000, 0).UTC()
)

type myForwardTest struct {
type forwardTest struct {
}

func (f myForwardTest) WhereTo(_ string) ([]string, error) {
func (f forwardTest) WhereTo(_ string) ([]string, error) {
return []string{"to1"}, nil
}

func (f myForwardTest) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.Message, error) {
func (f forwardTest) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.Message, error) {
return testutils.CopyUDFTestApply(ctx, message)
}

Expand All @@ -70,7 +70,7 @@ func TestNewInterStepDataForward(t *testing.T) {
writeMessages := testutils.BuildTestWriteMessages(int64(20), testStartTime)

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(5))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, forwardTest{}, forwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(5))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -101,14 +101,14 @@ func TestNewInterStepDataForward(t *testing.T) {
<-stopped
}

type myForwardDropTest struct {
type forwardDropTest struct {
}

func (f myForwardDropTest) WhereTo(_ string) ([]string, error) {
func (f forwardDropTest) WhereTo(_ string) ([]string, error) {
return []string{"__DROP__"}, nil
}

func (f myForwardDropTest) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.Message, error) {
func (f forwardDropTest) ApplyMap(ctx context.Context, message *isb.ReadMessage) ([]*isb.Message, error) {
return testutils.CopyUDFTestApply(ctx, message)
}

Expand All @@ -130,7 +130,7 @@ func TestNewInterStepDataForward_drop(t *testing.T) {
},
}}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardDropTest{}, myForwardDropTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, forwardDropTest{}, forwardDropTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -380,7 +380,7 @@ func TestNewInterStepDataForwardToOneStep(t *testing.T) {
}}

fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, forwardTest{}, forwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(2))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down Expand Up @@ -426,7 +426,7 @@ func TestWriteToBufferError(t *testing.T) {
},
}}
fetchWatermark, publishWatermark := generic.BuildNoOpWatermarkProgressorsFromBufferMap(toSteps)
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, myForwardTest{}, myForwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(10))
f, err := NewInterStepDataForward(vertex, fromStep, toSteps, forwardTest{}, forwardTest{}, fetchWatermark, publishWatermark, WithReadBatchSize(10))
assert.NoError(t, err)
assert.False(t, to1.IsFull())
assert.True(t, to1.IsEmpty())
Expand Down

0 comments on commit 1cfb7ad

Please sign in to comment.