Skip to content

Commit

Permalink
tidb-binlog/reader: migrate test-infra to testify for offset_test.go
Browse files Browse the repository at this point in the history
  • Loading branch information
zanpocc committed Mar 27, 2022
1 parent e09bc29 commit 3923649
Showing 1 changed file with 22 additions and 24 deletions.
46 changes: 22 additions & 24 deletions tidb-binlog/driver/reader/offset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,21 @@ import (
"time"

"github.com/Shopify/sarama"
. "github.com/pingcap/check"
pb "github.com/pingcap/tidb/tidb-binlog/proto/go-binlog"
"github.com/stretchr/testify/require"
)

func TestClient(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testOffsetSuite{})

type testOffsetSuite struct {
type testOffset struct {
producer sarama.SyncProducer
config *sarama.Config
addr string
available bool
topic string
}

func (to *testOffsetSuite) SetUpSuite(c *C) {
var to testOffset

func Setup() {
to.topic = "test"
to.addr = "192.168.198.61"
if os.Getenv("HOSTIP") != "" {
Expand All @@ -59,29 +55,31 @@ func (to *testOffsetSuite) SetUpSuite(c *C) {
}
}

func (to *testOffsetSuite) deleteTopic(c *C) {
func deleteTopic(t *testing.T) {
broker := sarama.NewBroker(to.addr + ":9092")
err := broker.Open(to.config)
c.Assert(err, IsNil)
require.NoError(t, err)
defer broker.Close()
broker.DeleteTopics(&sarama.DeleteTopicsRequest{Topics: []string{to.topic}, Timeout: 10 * time.Second})
}

func (to *testOffsetSuite) TestOffset(c *C) {
func TestOffset(t *testing.T) {
Setup()

if !to.available {
c.Skip("no kafka available")
t.Skip("no kafka available")
}

to.deleteTopic(c)
defer to.deleteTopic(c)
deleteTopic(t)
defer deleteTopic(t)

topic := to.topic

sk, err := NewKafkaSeeker([]string{to.addr + ":9092"}, to.config)
c.Assert(err, IsNil)
require.NoError(t, err)

to.producer, err = sarama.NewSyncProducer([]string{to.addr + ":9092"}, to.config)
c.Assert(err, IsNil)
require.NoError(t, err)
defer to.producer.Close()

var testPoss = map[int64]int64{
Expand All @@ -90,8 +88,8 @@ func (to *testOffsetSuite) TestOffset(c *C) {
30: 0,
}
for ts := range testPoss {
testPoss[ts], err = to.procudeMessage(ts, topic)
c.Assert(err, IsNil)
testPoss[ts], err = procudeMessage(ts, topic)
require.NoError(t, err)
// c.Log("produce ", ts, " at ", testPoss[ts])
}

Expand All @@ -104,14 +102,14 @@ func (to *testOffsetSuite) TestOffset(c *C) {
}
for ts, offset := range testCases {
offsetFounds, err := sk.Seek(topic, ts, []int32{0})
c.Log("check: ", ts)
c.Assert(err, IsNil)
c.Assert(offsetFounds, HasLen, 1)
c.Assert(offsetFounds[0], Equals, offset)
t.Log("check: ", ts)
require.NoError(t, err)
require.Len(t, offsetFounds, 1)
require.Equal(t, offset, offsetFounds[0])
}
}

func (to *testOffsetSuite) procudeMessage(ts int64, topic string) (offset int64, err error) {
func procudeMessage(ts int64, topic string) (offset int64, err error) {
binlog := new(pb.Binlog)
binlog.CommitTs = ts
var data []byte
Expand Down

0 comments on commit 3923649

Please sign in to comment.