Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make collection_id primary key for segment, fix system tests #1731

Merged
merged 2 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ k8s_resource('migration', resource_deps=['postgres'], labels=["chroma"])
k8s_yaml(['k8s/dev/server.yaml'])
k8s_resource('server', resource_deps=['k8s_setup'],labels=["chroma"], port_forwards=8000 )
k8s_yaml(['k8s/dev/coordinator.yaml'])
k8s_resource('coordinator', resource_deps=['pulsar', 'server', 'migration'], labels=["chroma"])
k8s_resource('coordinator', resource_deps=['pulsar', 'server', 'migration'], labels=["chroma"], port_forwards=50051 )
k8s_yaml(['k8s/dev/logservice.yaml'])
k8s_resource('logservice', resource_deps=['migration'], labels=["chroma"])
k8s_resource('logservice', resource_deps=['migration'], labels=["chroma"], port_forwards='50052:50051')
k8s_yaml(['k8s/dev/worker.yaml'])
k8s_resource('worker', resource_deps=['coordinator'],labels=["chroma"])
11 changes: 10 additions & 1 deletion chromadb/test/db/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ def test_update_segment(sysdb: SysDB) -> None:
scope=SegmentScope.VECTOR,
topic="test_topic_a",
collection=sample_collections[0]["id"],
metadata=metadata
metadata=metadata,
)

sysdb.reset_state()
Expand All @@ -732,52 +732,61 @@ def test_update_segment(sysdb: SysDB) -> None:

sysdb.create_segment(segment)

# TODO: revisit update segment - push collection id
# Update topic to new value
segment["topic"] = "new_topic"
sysdb.update_segment(segment["id"], topic=segment["topic"])
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]

# Update topic to None
segment["topic"] = None
sysdb.update_segment(segment["id"], topic=segment["topic"])
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]

# Update collection to new value
segment["collection"] = sample_collections[1]["id"]
sysdb.update_segment(segment["id"], collection=segment["collection"])
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]

# Update collection to None
segment["collection"] = None
sysdb.update_segment(segment["id"], collection=segment["collection"])
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]

# Add a new metadata key
metadata["test_str2"] = "str2"
sysdb.update_segment(segment["id"], metadata={"test_str2": "str2"})
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]

# Update a metadata key
metadata["test_str"] = "str3"
sysdb.update_segment(segment["id"], metadata={"test_str": "str3"})
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]

# Delete a metadata key
del metadata["test_str"]
sysdb.update_segment(segment["id"], metadata={"test_str": None})
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]

# Delete all metadata keys
segment["metadata"] = None
sysdb.update_segment(segment["id"], metadata=None)
result = sysdb.get_segments(id=segment["id"])
result[0]["collection"] = segment["collection"]
assert result == [segment]


Expand Down
1 change: 1 addition & 0 deletions go/coordinator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
ariga.io/atlas-provider-gorm v0.1.1
github.com/apache/pulsar-client-go v0.9.1-0.20231030094548-620ecf4addfb
github.com/google/uuid v1.3.1
github.com/lib/pq v1.10.7
github.com/pingcap/log v1.1.0
github.com/rs/zerolog v1.31.0
github.com/spf13/cobra v1.7.0
Expand Down
5 changes: 2 additions & 3 deletions go/coordinator/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ github.com/AthenZ/athenz v1.10.39/go.mod h1:3Tg8HLsiQZp81BJY58JBeU2BR6B/H4/0MQGf
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/zstd v1.5.0 h1:+K/VEwIAaPcHiMtQvpLD4lqW7f0Gk3xdYZmI1hD+CXo=
github.com/DataDog/zstd v1.5.0/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/alecthomas/kong v0.7.1 h1:azoTh0IOfwlAX3qN9sHWTxACE2oV8Bg2gAwBsMwDQY4=
github.com/alecthomas/kong v0.7.1/go.mod h1:n1iCIO2xS46oE8ZfYCNDqdR0b0wZNrXAIAqro/2132U=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
Expand Down Expand Up @@ -156,6 +154,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw=
github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/linkedin/goavro/v2 v2.9.8 h1:jN50elxBsGBDGVDEKqUlDuU1cFwJ11K/yrJCBMe/7Wg=
github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
Expand Down Expand Up @@ -346,7 +346,6 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg=
golang.org/x/tools v0.10.0/go.mod h1:UJwyiVBsOA2uwvK/e5OY3GTpDUJriEd+/YlqAwLPmyM=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
1 change: 1 addition & 0 deletions go/coordinator/internal/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
ErrInvalidCollectionUpdate = errors.New("invalid collection update, reset collection true and collection value not empty")
ErrSegmentUniqueConstraintViolation = errors.New("unique constraint violation")
ErrSegmentDeleteNonExistingSegment = errors.New("delete non existing segment")
ErrSegmentUpdateNonExistingSegment = errors.New("update non existing segment")

// Segment metadata errors
ErrUnknownSegmentMetadataType = errors.New("segment metadata value type not supported")
Expand Down
60 changes: 34 additions & 26 deletions go/coordinator/internal/coordinator/apis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,11 +872,13 @@ func TestUpdateSegment(t *testing.T) {
})

// Update topic to new value
collectionID := segment.CollectionID.String()
newTopic := "new_topic"
segment.Topic = &newTopic
c.UpdateSegment(ctx, &model.UpdateSegment{
ID: segment.ID,
Topic: segment.Topic,
Collection: &collectionID,
ID: segment.ID,
Topic: segment.Topic,
})
result, err := c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
assert.NoError(t, err)
Expand All @@ -885,6 +887,7 @@ func TestUpdateSegment(t *testing.T) {
// Update topic to None
segment.Topic = nil
c.UpdateSegment(ctx, &model.UpdateSegment{
Collection: &collectionID,
ID: segment.ID,
Topic: segment.Topic,
ResetTopic: true,
Expand All @@ -893,42 +896,45 @@ func TestUpdateSegment(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, []*model.Segment{segment}, result)

// TODO: revisit why we need this
// Update collection to new value
segment.CollectionID = sampleCollections[1].ID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why comment these tests? These tests are ported from the python system test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why we need to set collection id to null or modify the collection id for a segment. And it is not possible to do it because collection_id is now primary key of segment

newCollecionID := segment.CollectionID.String()
c.UpdateSegment(ctx, &model.UpdateSegment{
ID: segment.ID,
Collection: &newCollecionID,
})
result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
assert.NoError(t, err)
assert.Equal(t, []*model.Segment{segment}, result)
//segment.CollectionID = sampleCollections[1].ID
//newCollecionID := segment.CollectionID.String()
//c.UpdateSegment(ctx, &model.UpdateSegment{
// ID: segment.ID,
// Collection: &newCollecionID,
//})
//result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
//assert.NoError(t, err)
//assert.Equal(t, []*model.Segment{segment}, result)

// Update collection to None
segment.CollectionID = types.NilUniqueID()
c.UpdateSegment(ctx, &model.UpdateSegment{
ID: segment.ID,
Collection: nil,
ResetCollection: true,
})
result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
assert.NoError(t, err)
assert.Equal(t, []*model.Segment{segment}, result)
//segment.CollectionID = types.NilUniqueID()
//c.UpdateSegment(ctx, &model.UpdateSegment{
// ID: segment.ID,
// Collection: nil,
// ResetCollection: true,
//})
//result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
//assert.NoError(t, err)
//assert.Equal(t, []*model.Segment{segment}, result)

// Add a new metadata key
segment.Metadata.Set("test_str2", &model.SegmentMetadataValueStringType{Value: "str2"})
c.UpdateSegment(ctx, &model.UpdateSegment{
ID: segment.ID,
Metadata: segment.Metadata})
Collection: &collectionID,
ID: segment.ID,
Metadata: segment.Metadata})
result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
assert.NoError(t, err)
assert.Equal(t, []*model.Segment{segment}, result)

// Update a metadata key
segment.Metadata.Set("test_str", &model.SegmentMetadataValueStringType{Value: "str3"})
c.UpdateSegment(ctx, &model.UpdateSegment{
ID: segment.ID,
Metadata: segment.Metadata})
Collection: &collectionID,
ID: segment.ID,
Metadata: segment.Metadata})
result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
assert.NoError(t, err)
assert.Equal(t, []*model.Segment{segment}, result)
Expand All @@ -938,15 +944,17 @@ func TestUpdateSegment(t *testing.T) {
newMetadata := model.NewSegmentMetadata[model.SegmentMetadataValueType]()
newMetadata.Set("test_str", nil)
c.UpdateSegment(ctx, &model.UpdateSegment{
ID: segment.ID,
Metadata: newMetadata})
Collection: &collectionID,
ID: segment.ID,
Metadata: newMetadata})
result, err = c.GetSegments(ctx, segment.ID, nil, nil, nil, types.NilUniqueID())
assert.NoError(t, err)
assert.Equal(t, []*model.Segment{segment}, result)

// Delete all metadata keys
segment.Metadata = nil
c.UpdateSegment(ctx, &model.UpdateSegment{
Collection: &collectionID,
ID: segment.ID,
Metadata: segment.Metadata,
ResetMetadata: true},
Expand Down
27 changes: 27 additions & 0 deletions go/coordinator/internal/coordinator/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package coordinator

import (
"context"
"errors"
"github.com/jackc/pgx/v5/pgconn"
"sync"

"github.com/chroma/chroma-coordinator/internal/common"
Expand Down Expand Up @@ -222,6 +224,18 @@ func (mt *MetaTable) AddCollection(ctx context.Context, createCollection *model.
collection, err := mt.catalog.CreateCollection(ctx, createCollection, createCollection.Ts)
if err != nil {
log.Error("create collection failed", zap.Error(err))
var pgErr *pgconn.PgError
ok := errors.As(err, &pgErr)
if ok {
log.Error("Postgres Error")
switch pgErr.Code {
case "23505":
log.Error("collection id already exists")
return nil, common.ErrCollectionUniqueConstraintViolation
default:
return nil, err
}
}
return nil, err
}
mt.tenantDatabaseCollectionCache[tenantID][databaseName][collection.ID] = collection
Expand Down Expand Up @@ -361,6 +375,19 @@ func (mt *MetaTable) AddSegment(ctx context.Context, createSegment *model.Create

segment, err := mt.catalog.CreateSegment(ctx, createSegment, createSegment.Ts)
if err != nil {
log.Error("create segment failed", zap.Error(err))
var pgErr *pgconn.PgError
ok := errors.As(err, &pgErr)
if ok {
log.Error("Postgres Error")
switch pgErr.Code {
case "23505":
log.Error("segment id already exists")
return common.ErrSegmentUniqueConstraintViolation
default:
return err
}
}
return err
}
mt.segmentsCache[createSegment.ID] = segment
Expand Down
19 changes: 17 additions & 2 deletions go/coordinator/internal/metastore/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package coordinator

import (
"context"

"github.com/chroma/chroma-coordinator/internal/common"
"github.com/chroma/chroma-coordinator/internal/metastore"
"github.com/chroma/chroma-coordinator/internal/metastore/db/dbmodel"
Expand Down Expand Up @@ -222,7 +221,7 @@ func (tc *Catalog) CreateCollection(ctx context.Context, createCollection *model
}

collectionName := createCollection.Name
existing, err := tc.metaDomain.CollectionDb(txCtx).GetCollections(types.FromUniqueID(createCollection.ID), &collectionName, nil, tenantID, databaseName)
existing, err := tc.metaDomain.CollectionDb(txCtx).GetCollections(nil, &collectionName, nil, tenantID, databaseName)
if err != nil {
log.Error("error getting collection", zap.Error(err))
return err
Expand Down Expand Up @@ -492,6 +491,22 @@ func (tc *Catalog) UpdateSegment(ctx context.Context, updateSegment *model.Updat
var result *model.Segment

err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
// TODO: we should push in collection_id here, add a GET to fix test for now
if updateSegment.Collection == nil {
results, err := tc.metaDomain.SegmentDb(txCtx).GetSegments(updateSegment.ID, nil, nil, nil, types.NilUniqueID())
if err != nil {
return err
}
if results == nil || len(results) == 0 {
return common.ErrSegmentUpdateNonExistingSegment
}
if results != nil && len(results) > 1 {
// TODO: fix this error
return common.ErrInvalidCollectionUpdate
}
updateSegment.Collection = results[0].Segment.CollectionID
}

// update segment
dbSegment := &dbmodel.UpdateSegment{
ID: updateSegment.ID.String(),
Expand Down
25 changes: 14 additions & 11 deletions go/coordinator/internal/metastore/db/dao/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,20 +165,23 @@ func generateSegmentUpdatesWithoutID(in *dbmodel.UpdateSegment) map[string]inter
}
}

if in.ResetCollection {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why comment out this code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why we need to set collection id to null. And it is not possible to do it because collection_id is now primary key of segment

if in.Collection == nil {
ret["collection_id"] = nil
}
} else {
if in.Collection != nil {
ret["collection_id"] = *in.Collection
}
}
log.Info("generate segment updates without id", zap.Any("updates", ret))
// TODO: check this
//if in.ResetCollection {
// if in.Collection == nil {
// ret["collection_id"] = nil
// }
//} else {
// if in.Collection != nil {
// ret["collection_id"] = *in.Collection
// }
//}
//log.Info("generate segment updates without id", zap.Any("updates", ret))
return ret
}

func (s *segmentDb) Update(in *dbmodel.UpdateSegment) error {
updates := generateSegmentUpdatesWithoutID(in)
return s.db.Model(&dbmodel.Segment{}).Where("id = ?", in.ID).Updates(updates).Error
return s.db.Model(&dbmodel.Segment{}).
Where("collection_id = ?", &in.Collection).
Where("id = ?", in.ID).Updates(updates).Error
}
6 changes: 5 additions & 1 deletion go/coordinator/internal/metastore/db/dbmodel/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import (
)

type Segment struct {
/* Making CollectionID the primary key allows fast search when we have CollectionID.
This requires us to push down CollectionID from the caller. We don't think there is
need to modify CollectionID in the near future. Each Segment should always have a
collection as a parent and cannot be modified. */
CollectionID *string `gorm:"collection_id;primaryKey"`
ID string `gorm:"id;primaryKey"`
Type string `gorm:"type;type:string;not null"`
Scope string `gorm:"scope"`
Expand All @@ -15,7 +20,6 @@ type Segment struct {
IsDeleted bool `gorm:"is_deleted;type:bool;default:false"`
CreatedAt time.Time `gorm:"created_at;type:timestamp;not null;default:current_timestamp"`
UpdatedAt time.Time `gorm:"updated_at;type:timestamp;not null;default:current_timestamp"`
CollectionID *string `gorm:"collection_id"`
}

func (s Segment) TableName() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ CREATE TABLE "public"."segment_metadata" (
);
-- Create "segments" table
CREATE TABLE "public"."segments" (
"collection_id" text NOT NULL,
"id" text NOT NULL,
"type" text NOT NULL,
"scope" text NULL,
Expand All @@ -76,8 +77,7 @@ CREATE TABLE "public"."segments" (
"is_deleted" boolean NULL DEFAULT false,
"created_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
"collection_id" text NULL,
PRIMARY KEY ("id")
PRIMARY KEY ("collection_id", "id")
);
-- Create "tenants" table
CREATE TABLE "public"."tenants" (
Expand Down
Loading
Loading