diff --git a/core/key.go b/core/key.go index 48beaa7ed1..453c1aeb87 100644 --- a/core/key.go +++ b/core/key.go @@ -38,12 +38,13 @@ const ( ) const ( - COLLECTION = "/collection/names" - COLLECTION_SCHEMA = "/collection/schema" - SCHEMA = "/schema" - SEQ = "/seq" - PRIMARY_KEY = "/pk" - REPLICATOR = "/replicator/id" + COLLECTION = "/collection/names" + COLLECTION_SCHEMA = "/collection/schema" + COLLECTION_SCHEMA_VERSION = "/collection/version" + SCHEMA = "/schema" + SEQ = "/seq" + PRIMARY_KEY = "/pk" + REPLICATOR = "/replicator/id" ) // Key is an interface that represents a key in the database. @@ -78,18 +79,30 @@ type HeadStoreKey struct { var _ Key = (*HeadStoreKey)(nil) +// CollectionKey points to the current/'head' SchemaVersionId for +// the collection of the given name. type CollectionKey struct { CollectionName string } var _ Key = (*CollectionKey)(nil) +// CollectionSchemaKey points to the current/'head' SchemaVersionId for +// the collection of the given schema id. type CollectionSchemaKey struct { SchemaId string } var _ Key = (*CollectionSchemaKey)(nil) +// CollectionSchemaVersionKey points to schema of a collection at a given +// version. +type CollectionSchemaVersionKey struct { + SchemaVersionId string +} + +var _ Key = (*CollectionSchemaVersionKey)(nil) + type SchemaKey struct { SchemaName string } @@ -190,6 +203,10 @@ func NewCollectionSchemaKey(schemaId string) CollectionSchemaKey { return CollectionSchemaKey{SchemaId: schemaId} } +func NewCollectionSchemaVersionKey(schemaVersionId string) CollectionSchemaVersionKey { + return CollectionSchemaVersionKey{SchemaVersionId: schemaVersionId} +} + // NewSchemaKey returns a formatted schema key for the system data store. // It assumes the name of the schema is non-empty. func NewSchemaKey(name string) SchemaKey { @@ -363,6 +380,24 @@ func (k CollectionSchemaKey) ToDS() ds.Key { return ds.NewKey(k.ToString()) } +func (k CollectionSchemaVersionKey) ToString() string { + result := COLLECTION_SCHEMA_VERSION + + if k.SchemaVersionId != "" { + result = result + "/" + k.SchemaVersionId + } + + return result +} + +func (k CollectionSchemaVersionKey) Bytes() []byte { + return []byte(k.ToString()) +} + +func (k CollectionSchemaVersionKey) ToDS() ds.Key { + return ds.NewKey(k.ToString()) +} + func (k SchemaKey) ToString() string { result := SCHEMA diff --git a/db/collection.go b/db/collection.go index 4ca9306d94..375da6b1d1 100644 --- a/db/collection.go +++ b/db/collection.go @@ -111,8 +111,8 @@ func (db *db) CreateCollection( desc client.CollectionDescription, ) (client.Collection, error) { // check if collection by this name exists - cKey := core.NewCollectionKey(desc.Name) - exists, err := db.systemstore().Has(ctx, cKey.ToDS()) + collectionKey := core.NewCollectionKey(desc.Name) + exists, err := db.systemstore().Has(ctx, collectionKey.ToDS()) if err != nil { return nil, err } @@ -139,47 +139,63 @@ func (db *db) CreateCollection( return nil, err } - key := core.NewCollectionKey(col.desc.Name) + // Local elements such as secondary indexes should be excluded + // from the (global) schemaId. + globalSchemaBuf, err := json.Marshal(struct { + Name string + Schema client.SchemaDescription + }{col.desc.Name, col.desc.Schema}) + if err != nil { + return nil, err + } - // write the collection metadata to the system store - err = db.systemstore().Put(ctx, key.ToDS(), buf) + // add a reference to this DB by desc hash + cid, err := core.NewSHA256CidV1(globalSchemaBuf) if err != nil { return nil, err } + schemaId := cid.String() + col.schemaID = schemaId - buf, err = json.Marshal(struct { - Name string - Schema client.SchemaDescription - }{col.desc.Name, col.desc.Schema}) + // For new schemas the initial version id will match the schema id + schemaVersionId := schemaId + collectionSchemaVersionKey := core.NewCollectionSchemaVersionKey(schemaVersionId) + // Whilst the schemaVersionKey is global, the data persisted at the key's location + // is local to the node (the global only elements are not useful beyond key generation). + err = db.systemstore().Put(ctx, collectionSchemaVersionKey.ToDS(), buf) if err != nil { return nil, err } - // add a reference to this DB by desc hash - cid, err := core.NewSHA256CidV1(buf) + collectionSchemaKey := core.NewCollectionSchemaKey(schemaId) + err = db.systemstore().Put(ctx, collectionSchemaKey.ToDS(), []byte(schemaVersionId)) + if err != nil { + return nil, err + } + + err = db.systemstore().Put(ctx, collectionKey.ToDS(), []byte(schemaVersionId)) if err != nil { return nil, err } - col.schemaID = cid.String() - csKey := core.NewCollectionSchemaKey(cid.String()) - err = db.systemstore().Put(ctx, csKey.ToDS(), []byte(desc.Name)) log.Debug( ctx, "Created collection", logging.NewKV("Name", col.Name()), logging.NewKV("ID", col.SchemaID), ) - return col, err + return col, nil } -// GetCollection returns an existing collection within the database. -func (db *db) GetCollectionByName(ctx context.Context, name string) (client.Collection, error) { - if name == "" { - return nil, ErrCollectionNameEmpty +// getCollectionByVersionId returns the [*collection] at the given [schemaVersionId] version. +// +// Will return an error if the given key is empty, or not found. +func (db *db) getCollectionByVersionId(ctx context.Context, schemaVersionId string) (*collection, error) { + if schemaVersionId == "" { + return nil, ErrSchemaVersionIdEmpty } - key := core.NewCollectionKey(name) + key := core.NewCollectionSchemaVersionKey(schemaVersionId) buf, err := db.systemstore().Get(ctx, key.ToDS()) if err != nil { return nil, err @@ -221,6 +237,22 @@ func (db *db) GetCollectionByName(ctx context.Context, name string) (client.Coll }, nil } +// GetCollection returns an existing collection within the database. +func (db *db) GetCollectionByName(ctx context.Context, name string) (client.Collection, error) { + if name == "" { + return nil, ErrCollectionNameEmpty + } + + key := core.NewCollectionKey(name) + buf, err := db.systemstore().Get(ctx, key.ToDS()) + if err != nil { + return nil, err + } + + schemaVersionId := string(buf) + return db.getCollectionByVersionId(ctx, schemaVersionId) +} + // GetCollectionBySchemaID returns an existing collection using the schema hash ID. func (db *db) GetCollectionBySchemaID( ctx context.Context, @@ -236,14 +268,14 @@ func (db *db) GetCollectionBySchemaID( return nil, err } - name := string(buf) - return db.GetCollectionByName(ctx, name) + schemaVersionId := string(buf) + return db.getCollectionByVersionId(ctx, schemaVersionId) } // GetAllCollections gets all the currently defined collections. func (db *db) GetAllCollections(ctx context.Context) ([]client.Collection, error) { // create collection system prefix query - prefix := core.NewCollectionKey("") + prefix := core.NewCollectionSchemaVersionKey("") q, err := db.systemstore().Query(ctx, query.Query{ Prefix: prefix.ToString(), KeysOnly: true, @@ -263,10 +295,10 @@ func (db *db) GetAllCollections(ctx context.Context) ([]client.Collection, error return nil, err } - colName := ds.NewKey(res.Key).BaseNamespace() - col, err := db.GetCollectionByName(ctx, colName) + schemaVersionId := ds.NewKey(res.Key).BaseNamespace() + col, err := db.getCollectionByVersionId(ctx, schemaVersionId) if err != nil { - return nil, NewErrFailedToGetCollection(colName, err) + return nil, NewErrFailedToGetCollection(schemaVersionId, err) } cols = append(cols, col) } diff --git a/db/errors.go b/db/errors.go index dcee0a1cd1..38402c4ac1 100644 --- a/db/errors.go +++ b/db/errors.go @@ -48,6 +48,7 @@ var ( ErrCollectionAlreadyExists = errors.New("collection already exists") ErrCollectionNameEmpty = errors.New("collection name can't be empty") ErrSchemaIdEmpty = errors.New("schema ID can't be empty") + ErrSchemaVersionIdEmpty = errors.New("schema version ID can't be empty") ErrKeyEmpty = errors.New("key cannot be empty") ) diff --git a/planner/datasource.go b/planner/datasource.go index 4d524a1ee4..2dea8290c5 100644 --- a/planner/datasource.go +++ b/planner/datasource.go @@ -57,13 +57,20 @@ func (p *Planner) getCollectionScanPlan(parsed *mapper.Select) (planSource, erro } func (p *Planner) getCollectionDesc(name string) (client.CollectionDescription, error) { - key := core.NewCollectionKey(name) + collectionKey := core.NewCollectionKey(name) var desc client.CollectionDescription - buf, err := p.txn.Systemstore().Get(p.ctx, key.ToDS()) + schemaVersionIdBytes, err := p.txn.Systemstore().Get(p.ctx, collectionKey.ToDS()) if err != nil { return desc, errors.Wrap("failed to get collection description", err) } + schemaVersionId := string(schemaVersionIdBytes) + schemaVersionKey := core.NewCollectionSchemaVersionKey(schemaVersionId) + buf, err := p.txn.Systemstore().Get(p.ctx, schemaVersionKey.ToDS()) + if err != nil { + return desc, err + } + err = json.Unmarshal(buf, &desc) if err != nil { return desc, err diff --git a/planner/mapper/descriptions.go b/planner/mapper/descriptions.go index 1d65820f3b..7343380095 100644 --- a/planner/mapper/descriptions.go +++ b/planner/mapper/descriptions.go @@ -43,22 +43,24 @@ func NewDescriptionsRepo(ctx context.Context, txn datastore.Txn) *DescriptionsRe // Will return nil and an error if a description of the given name is not found. Will first look // in the repo's cache for the description before querying the datastore. func (r *DescriptionsRepo) getCollectionDesc(name string) (client.CollectionDescription, error) { - if desc, hasDesc := r.collectionDescriptionsByName[name]; hasDesc { - return desc, nil + collectionKey := core.NewCollectionKey(name) + var desc client.CollectionDescription + schemaVersionIdBytes, err := r.txn.Systemstore().Get(r.ctx, collectionKey.ToDS()) + if err != nil { + return desc, errors.Wrap("failed to get collection description", err) } - key := core.NewCollectionKey(name) - buf, err := r.txn.Systemstore().Get(r.ctx, key.ToDS()) + schemaVersionId := string(schemaVersionIdBytes) + schemaVersionKey := core.NewCollectionSchemaVersionKey(schemaVersionId) + buf, err := r.txn.Systemstore().Get(r.ctx, schemaVersionKey.ToDS()) if err != nil { - return client.CollectionDescription{}, errors.Wrap("failed to get collection description", err) + return desc, err } - desc := client.CollectionDescription{} err = json.Unmarshal(buf, &desc) if err != nil { - return client.CollectionDescription{}, err + return desc, err } - r.collectionDescriptionsByName[name] = desc return desc, nil }