diff --git a/kafka/kafka_test.go b/kafka/kafka_test.go index 9659c812e..f97648671 100644 --- a/kafka/kafka_test.go +++ b/kafka/kafka_test.go @@ -131,6 +131,22 @@ func TestOffsetAPIs(t *testing.T) { t.Errorf("Failed to change offset. Expect (%v), got (%v)\n", 10, offset) } + // test passing explicit 'Offset' type + err = offset.Set(OffsetBeginning) + if err != nil { + t.Errorf("Cannot set offset to (%v). Error: %s \n", OffsetBeginning, err) + } else if offset != OffsetBeginning { + t.Errorf("Failed to change offset. Expect (%v), got %v\n", OffsetBeginning, offset) + } + + // test passing explicit 'Offset' type + err = offset.Set(OffsetEnd) + if err != nil { + t.Errorf("Cannot set offset to (%v). Error: %s \n", OffsetEnd, err) + } else if offset != OffsetEnd { + t.Errorf("Failed to change offset. Expect (%v), got %v\n", OffsetEnd, offset) + } + // test OffsetTail() tail := OffsetTail(offset) t.Logf("offset tail %v\n", tail) diff --git a/kafka/offset.go b/kafka/offset.go index 4cb1819c8..c0d27f95e 100644 --- a/kafka/offset.go +++ b/kafka/offset.go @@ -72,8 +72,8 @@ func (o *Offset) Set(offset interface{}) error { return err } -// NewOffset creates a new Offset using the provided logical string, or an -// absolute int64 offset value. +// NewOffset creates a new Offset using the provided logical string, an +// absolute int64 offset value, or a concrete Offset type. // Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored" func NewOffset(offset interface{}) (Offset, error) { @@ -107,6 +107,8 @@ func NewOffset(offset interface{}) (Offset, error) { return Offset((int64)(v)), nil case int64: return Offset(v), nil + case Offset: + return Offset(v), nil default: return OffsetInvalid, newErrorFromString(ErrInvalidArg, fmt.Sprintf("Invalid offset type: %t", v))