Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: refactor the drop collection and drop partition #37348

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ packages:
github.com/milvus-io/milvus/internal/metastore:
interfaces:
StreamingCoordCataLog:
DataCoordCatalog:
StreamingNodeCataLog:
github.com/milvus-io/milvus/internal/util/streamingutil/service/discoverer:
interfaces:
Expand Down
39 changes: 35 additions & 4 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
DeleteNode(nodeID UniqueID) error
Watch(ctx context.Context, ch RWChannel) error
Release(nodeID UniqueID, channelName string) error
ReleaseByCollectionID(collectionID int64) error

Match(nodeID UniqueID, channel string) bool
FindWatcher(channel string) (UniqueID, error)
Expand Down Expand Up @@ -88,8 +89,8 @@
// ChannelmanagerOpt is to set optional parameters in channel manager.
type ChannelmanagerOpt func(c *ChannelManagerImpl)

func withFactoryV2(f ChannelPolicyFactory) ChannelmanagerOpt {
return func(c *ChannelManagerImpl) { c.factory = f }
func withEmptyPolicyFactory() ChannelmanagerOpt {
return func(c *ChannelManagerImpl) { c.factory = NewEmptyChannelPolicyFactory() }
}

func withCheckerV2() ChannelmanagerOpt {
Expand Down Expand Up @@ -161,7 +162,7 @@
m.finishRemoveChannel(info.NodeID, lo.Values(info.Channels)...)
}

if m.balanceCheckLoop != nil && !streamingutil.IsStreamingServiceEnabled() {
if m.balanceCheckLoop != nil {
log.Info("starting channel balance loop")
m.wg.Add(1)
go func() {
Expand Down Expand Up @@ -231,6 +232,29 @@
return m.execute(updates)
}

func (m *ChannelManagerImpl) ReleaseByCollectionID(collectionID int64) error {
logger := log.With(zap.Int64("collectionID", collectionID))
m.mu.Lock()
defer m.mu.Unlock()

nodeChannels := m.store.GetNodeChannelsBy(WithAllNodes(), WithCollectionIDV2(collectionID))
updates := make([]*ChannelOp, 0)
for _, nodeChannel := range nodeChannels {
for _, ch := range nodeChannel.Channels {
if nodeChannel.NodeID != bufferID {
logger.Info("release channel from watched node",
zap.Int64("nodeID", nodeChannel.NodeID),
zap.String("channel", ch.GetName()))
updates = append(updates, NewChannelOp(nodeChannel.NodeID, Release, ch))
}
}
}
if len(updates) == 0 {
return nil
}

Check warning on line 254 in internal/datacoord/channel_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/channel_manager.go#L253-L254

Added lines #L253 - L254 were not covered by tests
return m.execute(NewChannelOpSet(updates...))
}

func (m *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error {
log := log.Ctx(ctx).With(zap.String("channel", ch.GetName()))
m.mu.Lock()
Expand Down Expand Up @@ -448,7 +472,13 @@
m.mu.RUnlock()

// Processing standby channels
updatedStandbys := m.advanceStandbys(ctx, standbys)
updatedStandbys := false
if !streamingutil.IsStreamingServiceEnabled() {
// If streaming service is enabled, the channel manager no longer manages channels.
// So the standby channels should be processed by channel manager.
// TODO: ChannelManager can be removed in future at 3.0.0.
updatedStandbys = m.advanceStandbys(ctx, standbys)
}
updatedToCheckes := m.advanceToChecks(ctx, toChecks)
updatedToNotifies := m.advanceToNotifies(ctx, toNotifies)

Expand Down Expand Up @@ -496,6 +526,7 @@
log.Warn("Reassign channels fail",
zap.Int64("nodeID", nodeAssign.NodeID),
zap.Strings("channels", chNames),
zap.Error(err),

Check warning on line 529 in internal/datacoord/channel_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/channel_manager.go#L529

Added line #L529 was not covered by tests
)
continue
}
Expand Down
14 changes: 14 additions & 0 deletions internal/datacoord/channel_manager_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,17 @@ func (f *ChannelPolicyFactoryV1) NewBalancePolicy() BalanceChannelPolicy {
func (f *ChannelPolicyFactoryV1) NewAssignPolicy() AssignPolicy {
return AvgAssignByCountPolicy
}

func NewEmptyChannelPolicyFactory() *EmptyChannelPolicyFactory {
return &EmptyChannelPolicyFactory{}
}

type EmptyChannelPolicyFactory struct{}

func (f *EmptyChannelPolicyFactory) NewBalancePolicy() BalanceChannelPolicy {
return EmptyBalancePolicy
}

func (f *EmptyChannelPolicyFactory) NewAssignPolicy() AssignPolicy {
return EmptyAssignPolicy
}
19 changes: 17 additions & 2 deletions internal/datacoord/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,5 +803,20 @@ func (s *ChannelManagerSuite) TestStartupRootCoordFailed() {
s.Error(err)
}

func (s *ChannelManagerSuite) TestCheckLoop() {}
func (s *ChannelManagerSuite) TestGet() {}
func (s *ChannelManagerSuite) TestReleaseByCollectionID() {
chNodes := map[string]int64{
"ch1": 1,
"ch2": 1,
"ch3": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)

m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
err = m.ReleaseByCollectionID(1)
s.NoError(err)

s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", ToRelease)
s.checkAssignment(m, 1, "ch3", ToRelease)
}
3 changes: 2 additions & 1 deletion internal/datacoord/channel_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func (s *StateChannelStoreSuite) SetupTest() {
func generateWatchInfo(name string, state datapb.ChannelWatchState) *datapb.ChannelWatchInfo {
return &datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{
ChannelName: name,
CollectionID: 1,
ChannelName: name,
},
Schema: &schemapb.CollectionSchema{},
State: state,
Expand Down
17 changes: 1 addition & 16 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ type compactionPlanHandler struct {

meta CompactionMeta
allocator allocator.Allocator
chManager ChannelManager
sessions session.DataNodeManager
cluster Cluster
analyzeScheduler *taskScheduler
Expand Down Expand Up @@ -177,12 +176,11 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64)
return cnt
}

func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, cm ChannelManager, meta CompactionMeta,
func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, meta CompactionMeta,
allocator allocator.Allocator, analyzeScheduler *taskScheduler, handler Handler,
) *compactionPlanHandler {
return &compactionPlanHandler{
queueTasks: *NewCompactionQueue(256, getPrioritizer()), // Higher capacity will have better ordering in priority, but consumes more memory.
chManager: cm,
meta: meta,
sessions: sessions,
allocator: allocator,
Expand Down Expand Up @@ -673,19 +671,6 @@ func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task Comp
return nodeID, useSlot
}

func (c *compactionPlanHandler) pickShardNode(nodeSlots map[int64]int64, t CompactionTask) int64 {
nodeID, err := c.chManager.FindWatcher(t.GetTaskProto().GetChannel())
if err != nil {
log.Info("failed to find watcher", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err))
return NullNodeID
}

if nodeSlots[nodeID] > 0 {
return nodeID
}
return NullNodeID
}

// isFull return true if the task pool is full
func (c *compactionPlanHandler) isFull() bool {
return c.queueTasks.Len() >= c.queueTasks.capacity
Expand Down
2 changes: 2 additions & 0 deletions internal/datacoord/compaction_task_clustering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/session"
"github.com/milvus-io/milvus/internal/datacoord/tombstone"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand Down Expand Up @@ -61,6 +62,7 @@ func (s *ClusteringCompactionTaskSuite) SetupTest() {
ctx := context.Background()
cm := storage.NewLocalChunkManager(storage.RootPath(""))
catalog := datacoord.NewCatalog(NewMetaMemoryKV(), "", "")
tombstone.RecoverCollectionTombstoneForTest(ctx, catalog)
meta, err := newMeta(ctx, catalog, cm)
s.NoError(err)
s.meta = meta
Expand Down
52 changes: 2 additions & 50 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (s *CompactionPlanHandlerSuite) SetupTest() {
s.mockCm = NewMockChannelManager(s.T())
s.mockSessMgr = session.NewMockDataNodeManager(s.T())
s.cluster = NewMockCluster(s.T())
s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil)
s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil)
}

func (s *CompactionPlanHandlerSuite) TestScheduleEmpty() {
Expand Down Expand Up @@ -449,54 +449,6 @@ func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() {
s.Equal(int64(NullNodeID), node)
}

func (s *CompactionPlanHandlerSuite) TestPickShardNode() {
s.SetupTest()
nodeSlots := map[int64]int64{
100: 2,
101: 6,
}

t1 := newMixCompactionTask(&datapb.CompactionTask{
PlanID: 19530,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-01",
NodeID: 1,
}, nil, s.mockMeta, s.mockSessMgr)
t1.plan = &datapb.CompactionPlan{
PlanID: 19530,
Channel: "ch-01",
Type: datapb.CompactionType_MixCompaction,
}

t2 := newMixCompactionTask(&datapb.CompactionTask{
PlanID: 19531,
Type: datapb.CompactionType_MixCompaction,
Channel: "ch-02",
NodeID: 1,
}, nil, s.mockMeta, s.mockSessMgr)
t2.plan = &datapb.CompactionPlan{
PlanID: 19531,
Channel: "ch-02",
Type: datapb.CompactionType_Level0DeleteCompaction,
}

s.mockCm.EXPECT().FindWatcher(mock.Anything).RunAndReturn(func(channel string) (int64, error) {
if channel == "ch-01" {
return 100, nil
}
if channel == "ch-02" {
return 101, nil
}
return 1, nil
}).Twice()

node := s.handler.pickShardNode(nodeSlots, t1)
s.Equal(int64(100), node)

node = s.handler.pickShardNode(nodeSlots, t2)
s.Equal(int64(101), node)
}

func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() {
s.SetupTest()
ch := "ch1"
Expand Down Expand Up @@ -604,7 +556,7 @@ func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
s.SetupTest()
s.mockMeta.EXPECT().CheckAndSetSegmentsCompacting(mock.Anything).Return(true, true).Maybe()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil)
handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockMeta, s.mockAlloc, nil, nil)

task := &datapb.CompactionTask{
TriggerID: 1,
Expand Down
2 changes: 2 additions & 0 deletions internal/datacoord/garbage_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
broker2 "github.com/milvus-io/milvus/internal/datacoord/broker"
"github.com/milvus-io/milvus/internal/datacoord/tombstone"
kvmocks "github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
Expand Down Expand Up @@ -120,6 +121,7 @@ func Test_garbageCollector_scan(t *testing.T) {

meta, err := newMemoryMeta()
assert.NoError(t, err)
tombstone.RecoverCollectionTombstoneForTest(context.Background(), meta.catalog)

t.Run("key is reference", func(t *testing.T) {
gc := newGarbageCollector(meta, newMockHandler(), GcOption{
Expand Down
38 changes: 17 additions & 21 deletions internal/datacoord/import_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package datacoord

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -27,6 +25,7 @@

"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/internal/datacoord/broker"
"github.com/milvus-io/milvus/internal/datacoord/tombstone"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
Expand Down Expand Up @@ -453,25 +452,22 @@
}

func (c *importChecker) checkCollection(collectionID int64, jobs []ImportJob) {
if len(jobs) == 0 {
return
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
has, err := c.broker.HasCollection(ctx, collectionID)
if err != nil {
log.Warn("verify existence of collection failed", zap.Int64("collection", collectionID), zap.Error(err))
return
}
if !has {
jobs = lo.Filter(jobs, func(job ImportJob, _ int) bool {
return job.GetState() != internalpb.ImportJobState_Failed
})
for _, job := range jobs {
err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed),
UpdateJobReason(fmt.Sprintf("collection %d dropped", collectionID)))
if err != nil {
for _, job := range jobs {
if job.GetState() == internalpb.ImportJobState_Failed {
continue

Check warning on line 457 in internal/datacoord/import_checker.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/import_checker.go#L457

Added line #L457 was not covered by tests
}
dropped := false
var reason string
// Collection with partitionKey will never drop partition.
for _, partitionID := range job.GetPartitionIDs() {
if err := tombstone.CollectionTombstone().CheckIfPartitionDropped(collectionID, partitionID); err != nil {
dropped = true
reason = err.Error()
break
}
}
if dropped {
if err := c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(reason)); err != nil {
log.Warn("failed to update job state to Failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err))
}
}
Expand Down
Loading
Loading