Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Allow db keys to handle multiple schema versions #1026

Merged
merged 13 commits into from
Jan 18, 2023
47 changes: 41 additions & 6 deletions core/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ const (
)

const (
COLLECTION = "/collection/names"
COLLECTION_SCHEMA = "/collection/schema"
SCHEMA = "/schema"
SEQ = "/seq"
PRIMARY_KEY = "/pk"
REPLICATOR = "/replicator/id"
COLLECTION = "/collection/names"
COLLECTION_SCHEMA = "/collection/schema"
COLLECTION_SCHEMA_VERSION = "/collection/version"
SCHEMA = "/schema"
SEQ = "/seq"
PRIMARY_KEY = "/pk"
REPLICATOR = "/replicator/id"
)

// Key is an interface that represents a key in the database.
Expand Down Expand Up @@ -78,18 +79,30 @@ type HeadStoreKey struct {

var _ Key = (*HeadStoreKey)(nil)

// CollectionKey points to the current/'head' SchemaVersionId for
// the collection of the given name.
type CollectionKey struct {
CollectionName string
}

var _ Key = (*CollectionKey)(nil)

// CollectionSchemaKey points to the current/'head' SchemaVersionId for
// the collection of the given schema id.
type CollectionSchemaKey struct {
SchemaId string
}

var _ Key = (*CollectionSchemaKey)(nil)

// CollectionSchemaVersionKey points to schema of a collection at a given
// version.
type CollectionSchemaVersionKey struct {
SchemaVersionId string
}

var _ Key = (*CollectionSchemaVersionKey)(nil)

type SchemaKey struct {
SchemaName string
}
Expand Down Expand Up @@ -190,6 +203,10 @@ func NewCollectionSchemaKey(schemaId string) CollectionSchemaKey {
return CollectionSchemaKey{SchemaId: schemaId}
}

func NewCollectionSchemaVersionKey(schemaVersionId string) CollectionSchemaVersionKey {
return CollectionSchemaVersionKey{SchemaVersionId: schemaVersionId}
}

// NewSchemaKey returns a formatted schema key for the system data store.
// It assumes the name of the schema is non-empty.
func NewSchemaKey(name string) SchemaKey {
Expand Down Expand Up @@ -363,6 +380,24 @@ func (k CollectionSchemaKey) ToDS() ds.Key {
return ds.NewKey(k.ToString())
}

func (k CollectionSchemaVersionKey) ToString() string {
result := COLLECTION_SCHEMA_VERSION

if k.SchemaVersionId != "" {
result = result + "/" + k.SchemaVersionId
}

return result
}

func (k CollectionSchemaVersionKey) Bytes() []byte {
return []byte(k.ToString())
}

func (k CollectionSchemaVersionKey) ToDS() ds.Key {
return ds.NewKey(k.ToString())
}

func (k SchemaKey) ToString() string {
result := SCHEMA

Expand Down
84 changes: 58 additions & 26 deletions db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func (db *db) CreateCollection(
desc client.CollectionDescription,
) (client.Collection, error) {
// check if collection by this name exists
cKey := core.NewCollectionKey(desc.Name)
exists, err := db.systemstore().Has(ctx, cKey.ToDS())
collectionKey := core.NewCollectionKey(desc.Name)
exists, err := db.systemstore().Has(ctx, collectionKey.ToDS())
fredcarle marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
Expand All @@ -139,47 +139,63 @@ func (db *db) CreateCollection(
return nil, err
}

key := core.NewCollectionKey(col.desc.Name)
// Local elements such as secondary indexes should be excluded
// from the (global) schemaId.
globalSchemaBuf, err := json.Marshal(struct {
Name string
Schema client.SchemaDescription
}{col.desc.Name, col.desc.Schema})
if err != nil {
return nil, err
}

// write the collection metadata to the system store
err = db.systemstore().Put(ctx, key.ToDS(), buf)
// add a reference to this DB by desc hash
cid, err := core.NewSHA256CidV1(globalSchemaBuf)
if err != nil {
return nil, err
}
schemaId := cid.String()
col.schemaID = schemaId

buf, err = json.Marshal(struct {
Name string
Schema client.SchemaDescription
}{col.desc.Name, col.desc.Schema})
// For new schemas the initial version id will match the schema id
schemaVersionId := schemaId
collectionSchemaVersionKey := core.NewCollectionSchemaVersionKey(schemaVersionId)
// Whilst the schemaVersionKey is global, the data persisted at the key's location
// is local to the node (the global only elements are not useful beyond key generation).
err = db.systemstore().Put(ctx, collectionSchemaVersionKey.ToDS(), buf)
if err != nil {
return nil, err
}

// add a reference to this DB by desc hash
cid, err := core.NewSHA256CidV1(buf)
collectionSchemaKey := core.NewCollectionSchemaKey(schemaId)
fredcarle marked this conversation as resolved.
Show resolved Hide resolved
err = db.systemstore().Put(ctx, collectionSchemaKey.ToDS(), []byte(schemaVersionId))
if err != nil {
return nil, err
}

err = db.systemstore().Put(ctx, collectionKey.ToDS(), []byte(schemaVersionId))
if err != nil {
return nil, err
}
col.schemaID = cid.String()

csKey := core.NewCollectionSchemaKey(cid.String())
err = db.systemstore().Put(ctx, csKey.ToDS(), []byte(desc.Name))
log.Debug(
ctx,
"Created collection",
logging.NewKV("Name", col.Name()),
logging.NewKV("ID", col.SchemaID),
)
return col, err
return col, nil
}

// GetCollection returns an existing collection within the database.
func (db *db) GetCollectionByName(ctx context.Context, name string) (client.Collection, error) {
if name == "" {
return nil, ErrCollectionNameEmpty
// getCollectionByVersionId returns the [*collection] at the given [schemaVersionId] version.
//
// Will return an error if the given key is empty, or not found.
func (db *db) getCollectionByVersionId(ctx context.Context, schemaVersionId string) (*collection, error) {
if schemaVersionId == "" {
return nil, ErrSchemaVersionIdEmpty
}

key := core.NewCollectionKey(name)
key := core.NewCollectionSchemaVersionKey(schemaVersionId)
buf, err := db.systemstore().Get(ctx, key.ToDS())
if err != nil {
return nil, err
Expand Down Expand Up @@ -221,6 +237,22 @@ func (db *db) GetCollectionByName(ctx context.Context, name string) (client.Coll
}, nil
}

// GetCollection returns an existing collection within the database.
func (db *db) GetCollectionByName(ctx context.Context, name string) (client.Collection, error) {
if name == "" {
return nil, ErrCollectionNameEmpty
}

key := core.NewCollectionKey(name)
buf, err := db.systemstore().Get(ctx, key.ToDS())
if err != nil {
return nil, err
}

schemaVersionId := string(buf)
Comment on lines +246 to +252
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion(non-blocking): Going from name to schemaVersionId can prob be scoped to a private method.

func (db *db) GetCollectionByName(ctx context.Context, name string) (client.Collection, error) {
    // ...
    schemaVersionId, err := db.getSchemaVersionId(name)
    return db.getCollectionByVersionId(ctx, schemaVersionId)
}

Copy link
Contributor Author

@AndrewSisley AndrewSisley Jan 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a look - if it would only be called once (I cant remember), then this will probably remain as-is.

  • name=>vId private func?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to leave this as now, there is no significantly useful way of doing this without changing (adding to) the public interface in client. That work is already planned in the epic - I would rather this as part of that work where proper time can be allocated to it. I'll make a note of these locations in that ticket though as a reminder.

return db.getCollectionByVersionId(ctx, schemaVersionId)
}

// GetCollectionBySchemaID returns an existing collection using the schema hash ID.
func (db *db) GetCollectionBySchemaID(
ctx context.Context,
Expand All @@ -236,14 +268,14 @@ func (db *db) GetCollectionBySchemaID(
return nil, err
}

name := string(buf)
return db.GetCollectionByName(ctx, name)
schemaVersionId := string(buf)
return db.getCollectionByVersionId(ctx, schemaVersionId)
}

// GetAllCollections gets all the currently defined collections.
func (db *db) GetAllCollections(ctx context.Context) ([]client.Collection, error) {
// create collection system prefix query
prefix := core.NewCollectionKey("")
prefix := core.NewCollectionSchemaVersionKey("")
q, err := db.systemstore().Query(ctx, query.Query{
Prefix: prefix.ToString(),
KeysOnly: true,
Expand All @@ -263,10 +295,10 @@ func (db *db) GetAllCollections(ctx context.Context) ([]client.Collection, error
return nil, err
}

colName := ds.NewKey(res.Key).BaseNamespace()
col, err := db.GetCollectionByName(ctx, colName)
schemaVersionId := ds.NewKey(res.Key).BaseNamespace()
col, err := db.getCollectionByVersionId(ctx, schemaVersionId)
if err != nil {
return nil, NewErrFailedToGetCollection(colName, err)
return nil, NewErrFailedToGetCollection(schemaVersionId, err)
}
cols = append(cols, col)
}
Expand Down
1 change: 1 addition & 0 deletions db/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
ErrCollectionAlreadyExists = errors.New("collection already exists")
ErrCollectionNameEmpty = errors.New("collection name can't be empty")
ErrSchemaIdEmpty = errors.New("schema ID can't be empty")
ErrSchemaVersionIdEmpty = errors.New("schema version ID can't be empty")
ErrKeyEmpty = errors.New("key cannot be empty")
)

Expand Down
11 changes: 9 additions & 2 deletions planner/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,20 @@ func (p *Planner) getCollectionScanPlan(parsed *mapper.Select) (planSource, erro
}

func (p *Planner) getCollectionDesc(name string) (client.CollectionDescription, error) {
key := core.NewCollectionKey(name)
collectionKey := core.NewCollectionKey(name)
var desc client.CollectionDescription
buf, err := p.txn.Systemstore().Get(p.ctx, key.ToDS())
schemaVersionIdBytes, err := p.txn.Systemstore().Get(p.ctx, collectionKey.ToDS())
if err != nil {
return desc, errors.Wrap("failed to get collection description", err)
}

schemaVersionId := string(schemaVersionIdBytes)
schemaVersionKey := core.NewCollectionSchemaVersionKey(schemaVersionId)
buf, err := p.txn.Systemstore().Get(p.ctx, schemaVersionKey.ToDS())
if err != nil {
return desc, err
}
Comment on lines +60 to +72
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion(non-blocking): Would be nice (but not required) to link the two implementations (here and above) to handle schema name => versionId

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I understand your other comment better now - thanks :)


err = json.Unmarshal(buf, &desc)
if err != nil {
return desc, err
Expand Down
18 changes: 10 additions & 8 deletions planner/mapper/descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,24 @@ func NewDescriptionsRepo(ctx context.Context, txn datastore.Txn) *DescriptionsRe
// Will return nil and an error if a description of the given name is not found. Will first look
// in the repo's cache for the description before querying the datastore.
func (r *DescriptionsRepo) getCollectionDesc(name string) (client.CollectionDescription, error) {
if desc, hasDesc := r.collectionDescriptionsByName[name]; hasDesc {
return desc, nil
collectionKey := core.NewCollectionKey(name)
var desc client.CollectionDescription
schemaVersionIdBytes, err := r.txn.Systemstore().Get(r.ctx, collectionKey.ToDS())
if err != nil {
return desc, errors.Wrap("failed to get collection description", err)
}

key := core.NewCollectionKey(name)
buf, err := r.txn.Systemstore().Get(r.ctx, key.ToDS())
schemaVersionId := string(schemaVersionIdBytes)
schemaVersionKey := core.NewCollectionSchemaVersionKey(schemaVersionId)
buf, err := r.txn.Systemstore().Get(r.ctx, schemaVersionKey.ToDS())
Comment on lines +46 to +55
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion(non-blocking): Again :)

if err != nil {
return client.CollectionDescription{}, errors.Wrap("failed to get collection description", err)
return desc, err
}

desc := client.CollectionDescription{}
err = json.Unmarshal(buf, &desc)
if err != nil {
return client.CollectionDescription{}, err
return desc, err
}

r.collectionDescriptionsByName[name] = desc
return desc, nil
}