diff --git a/client.go b/client.go index 2decba7c5..5c54b4461 100644 --- a/client.go +++ b/client.go @@ -363,34 +363,19 @@ func (client *client) MetadataTopics() ([]string, error) { } func (client *client) Partitions(topic string) ([]int32, error) { - if client.Closed() { - return nil, ErrClosedClient - } - - partitions := client.cachedPartitions(topic, allPartitions) - - if len(partitions) == 0 { - err := client.RefreshMetadata(topic) - if err != nil { - return nil, err - } - partitions = client.cachedPartitions(topic, allPartitions) - } - - // no partitions found after refresh metadata - if len(partitions) == 0 { - return nil, ErrUnknownTopicOrPartition - } - - return partitions, nil + return client.getPartitions(topic, allPartitions) } func (client *client) WritablePartitions(topic string) ([]int32, error) { + return client.getPartitions(topic, writablePartitions) +} + +func (client *client) getPartitions(topic string, pt partitionType) ([]int32, error) { if client.Closed() { return nil, ErrClosedClient } - partitions := client.cachedPartitions(topic, writablePartitions) + partitions := client.cachedPartitions(topic, pt) // len==0 catches when it's nil (no such topic) and the odd case when every single // partition is undergoing leader election simultaneously. Callers have to be able to handle @@ -403,7 +388,7 @@ func (client *client) WritablePartitions(topic string) ([]int32, error) { if err != nil { return nil, err } - partitions = client.cachedPartitions(topic, writablePartitions) + partitions = client.cachedPartitions(topic, pt) } if partitions == nil { @@ -414,56 +399,24 @@ func (client *client) WritablePartitions(topic string) ([]int32, error) { } func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) { - if client.Closed() { - return nil, ErrClosedClient - } - - metadata := client.cachedMetadata(topic, partitionID) - - if metadata == nil { - err := client.RefreshMetadata(topic) - if err != nil { - return nil, err - } - metadata = client.cachedMetadata(topic, partitionID) - } - - if metadata == nil { - return nil, ErrUnknownTopicOrPartition - } - - if errors.Is(metadata.Err, ErrReplicaNotAvailable) { - return dupInt32Slice(metadata.Replicas), metadata.Err - } - return dupInt32Slice(metadata.Replicas), nil + return client.getReplicas(topic, partitionID, func(metadata *PartitionMetadata) []int32 { + return metadata.Replicas + }) } func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) { - if client.Closed() { - return nil, ErrClosedClient - } - - metadata := client.cachedMetadata(topic, partitionID) - - if metadata == nil { - err := client.RefreshMetadata(topic) - if err != nil { - return nil, err - } - metadata = client.cachedMetadata(topic, partitionID) - } - - if metadata == nil { - return nil, ErrUnknownTopicOrPartition - } - - if errors.Is(metadata.Err, ErrReplicaNotAvailable) { - return dupInt32Slice(metadata.Isr), metadata.Err - } - return dupInt32Slice(metadata.Isr), nil + return client.getReplicas(topic, partitionID, func(metadata *PartitionMetadata) []int32 { + return metadata.Isr + }) } func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32, error) { + return client.getReplicas(topic, partitionID, func(metadata *PartitionMetadata) []int32 { + return metadata.OfflineReplicas + }) +} + +func (client *client) getReplicas(topic string, partitionID int32, extractor func(metadata *PartitionMetadata) []int32) ([]int32, error) { if client.Closed() { return nil, ErrClosedClient } @@ -482,10 +435,11 @@ func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32, return nil, ErrUnknownTopicOrPartition } + replicas := extractor(metadata) if errors.Is(metadata.Err, ErrReplicaNotAvailable) { - return dupInt32Slice(metadata.OfflineReplicas), metadata.Err + return dupInt32Slice(replicas), metadata.Err } - return dupInt32Slice(metadata.OfflineReplicas), nil + return dupInt32Slice(replicas), nil } func (client *client) Leader(topic string, partitionID int32) (*Broker, error) { diff --git a/utils.go b/utils.go index 34eefe69b..d5b77e0d9 100644 --- a/utils.go +++ b/utils.go @@ -44,11 +44,10 @@ func withRecover(fn func()) { } func safeAsyncClose(b *Broker) { - tmp := b // local var prevents clobbering in goroutine go withRecover(func() { - if connected, _ := tmp.Connected(); connected { - if err := tmp.Close(); err != nil { - Logger.Println("Error closing broker", tmp.ID(), ":", err) + if connected, _ := b.Connected(); connected { + if err := b.Close(); err != nil { + Logger.Println("Error closing broker", b.ID(), ":", err) } } })