Skip to content

Commit

Permalink
Introduce update state on CV implementation API (#976)
Browse files Browse the repository at this point in the history
<!-- markdownlint-disable MD041 -->
#### What this PR does / why we need it

Extend the CV implementation API to provide information, whether an
Update of the descriptor has been done.
This information is then used to trigger the pubsub handler.
If no update has been done, the query of the repository's pubsub info is
omitted.

#### Which issue(s) this PR fixes
<!--
Usage: `Fixes #<issue number>`, or `Fixes (paste link of issue)`.
-->
  • Loading branch information
mandelsoft authored Oct 21, 2024
1 parent 014afa2 commit 4ad368d
Show file tree
Hide file tree
Showing 15 changed files with 80 additions and 63 deletions.
2 changes: 1 addition & 1 deletion api/oci/extensions/repositories/artifactset/artifactset.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (a *namespaceContainer) Write(path string, mode vfs.FileMode, opts ...acces
return a.base.Write(path, mode, opts...)
}

func (a *namespaceContainer) Update() error {
func (a *namespaceContainer) Update() (bool, error) {
return a.base.Update()
}

Expand Down
2 changes: 1 addition & 1 deletion api/oci/extensions/repositories/ctf/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (r *RepositoryImpl) Write(path string, mode vfs.FileMode, opts ...accessio.
return r.base.Write(path, mode, opts...)
}

func (r *RepositoryImpl) Update() error {
func (r *RepositoryImpl) Update() (bool, error) {
return r.base.Update()
}

Expand Down
10 changes: 5 additions & 5 deletions api/ocm/cpi/repocpi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type StorageBackendImpl interface {
// version related methods.

GetDescriptor(key common.NameVersion) (*compdesc.ComponentDescriptor, error)
SetDescriptor(key common.NameVersion, descriptor *compdesc.ComponentDescriptor) error
SetDescriptor(key common.NameVersion, descriptor *compdesc.ComponentDescriptor) (bool, error)
AccessMethod(key common.NameVersion, acc cpi.AccessSpec, cv refmgmt.ExtendedAllocatable) (cpi.AccessMethod, error)
GetStorageContext(key common.NameVersion) cpi.StorageContext
GetBlob(key common.NameVersion, name string) (cpi.DataAccess, error)
Expand Down Expand Up @@ -268,13 +268,13 @@ func (s *storageBackendComponentVersion) GetDescriptor() *compdesc.ComponentDesc
return d
}

func (s *storageBackendComponentVersion) SetDescriptor(descriptor *compdesc.ComponentDescriptor) error {
err := s.comp.repo.impl.SetDescriptor(s.name, descriptor)
func (s *storageBackendComponentVersion) SetDescriptor(descriptor *compdesc.ComponentDescriptor) (bool, error) {
b, err := s.comp.repo.impl.SetDescriptor(s.name, descriptor)
if err != nil {
return err
return b, err
}
s.descriptor = descriptor
return nil
return b, nil
}

func (s *storageBackendComponentVersion) AccessMethod(acc cpi.AccessSpec, cv refmgmt.ExtendedAllocatable) (cpi.AccessMethod, error) {
Expand Down
22 changes: 12 additions & 10 deletions api/ocm/cpi/repocpi/bridge_cv.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type ComponentVersionAccessImpl interface {
SetReadOnly()

GetDescriptor() *compdesc.ComponentDescriptor
SetDescriptor(*compdesc.ComponentDescriptor) error
SetDescriptor(*compdesc.ComponentDescriptor) (bool, error)

AccessMethod(acc cpi.AccessSpec, cv refmgmt.ExtendedAllocatable) (cpi.AccessMethod, error)

Expand Down Expand Up @@ -258,34 +258,36 @@ func (b *componentVersionAccessBridge) update(final bool) error {
return nil
}

pub, err := pubsub.PubSubForRepo(b.Repository())
if err != nil {
return err
}

d := b.getDescriptor()

opts := &cpi.BlobUploadOptions{
UseNoDefaultIfNotSet: optionutils.PointerTo(true),
}
err = b.setupLocalBlobs("resource", b.composeAccess, d.Resources, true, opts)
err := b.setupLocalBlobs("resource", b.composeAccess, d.Resources, true, opts)
if err == nil {
err = b.setupLocalBlobs("source", b.composeAccess, d.Sources, true, opts)
}
if err != nil {
return err
}

err = b.impl.SetDescriptor(b.descriptor.Copy())
updated, err := b.impl.SetDescriptor(b.descriptor.Copy())
if err != nil {
return err
}
err = b.blobcache.Clear()
if pub != nil {
err := pub.NotifyComponentVersion(common.VersionedElementKey(b))

if updated {
pub, err := pubsub.PubSubForRepo(b.Repository())
if err != nil {
return err
}
if pub != nil {
err := pub.NotifyComponentVersion(common.VersionedElementKey(b))
if err != nil {
return err
}
}
}
return err
}
Expand Down
9 changes: 5 additions & 4 deletions api/ocm/extensions/repositories/comparch/componentarchive.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ func (c *componentArchiveContainer) GetParentBridge() repocpi.ComponentAccessBri

func (c *componentArchiveContainer) Close() error {
var list errors.ErrorList
return list.Add(c.Update(), c.fsacc.Close()).Result()
_, err := c.Update()
return list.Add(err, c.fsacc.Close()).Result()
}

func (c *componentArchiveContainer) GetContext() cpi.Context {
Expand All @@ -141,13 +142,13 @@ func (c *componentArchiveContainer) SetReadOnly() {
c.fsacc.SetReadOnly()
}

func (c *componentArchiveContainer) Update() error {
func (c *componentArchiveContainer) Update() (bool, error) {
return c.fsacc.Update()
}

func (c *componentArchiveContainer) SetDescriptor(cd *compdesc.ComponentDescriptor) error {
func (c *componentArchiveContainer) SetDescriptor(cd *compdesc.ComponentDescriptor) (bool, error) {
if c.fsacc.IsReadOnly() {
return accessobj.ErrReadOnly
return false, accessobj.ErrReadOnly
}
cur := c.fsacc.GetState().GetState().(*compdesc.ComponentDescriptor)
*cur = *cd.Copy()
Expand Down
4 changes: 2 additions & 2 deletions api/ocm/extensions/repositories/comparch/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,13 @@ func (c *ComponentVersionContainer) SetReadOnly() {
c.comp.repo.arch.SetReadOnly()
}

func (c *ComponentVersionContainer) Update() error {
func (c *ComponentVersionContainer) Update() (bool, error) {
desc := c.comp.repo.arch.GetDescriptor()
*desc = *c.descriptor.Copy()
return c.comp.repo.arch.container.Update()
}

func (c *ComponentVersionContainer) SetDescriptor(cd *compdesc.ComponentDescriptor) error {
func (c *ComponentVersionContainer) SetDescriptor(cd *compdesc.ComponentDescriptor) (bool, error) {
*c.descriptor = *cd
return c.Update()
}
Expand Down
29 changes: 18 additions & 11 deletions api/ocm/extensions/repositories/composition/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,10 @@ func (a *Access) GetComponentVersion(comp, version string) (virtual.VersionAcces
i := a.index.Get(comp, version)
if i == nil {
cd = compdesc.New(comp, version)
err := a.index.Add(cd, common.VersionedElementKey(cd))
if err != nil {
return nil, err
}
} else {
cd = i.CD()
}
return &VersionAccess{a, cd.GetName(), cd.GetVersion(), a.IsReadOnly(), cd.Copy()}, nil
return &VersionAccess{a, cd.GetName(), cd.GetVersion(), a.IsReadOnly(), cd.Copy(), false}, nil
}

func (a *Access) GetBlob(name string) (blobaccess.BlobAccess, error) {
Expand Down Expand Up @@ -156,6 +152,7 @@ type VersionAccess struct {
vers string
readonly bool
desc *compdesc.ComponentDescriptor
new bool
}

func (v *VersionAccess) GetDescriptor() *compdesc.ComponentDescriptor {
Expand All @@ -173,25 +170,35 @@ func (v *VersionAccess) AddBlob(blob cpi.BlobAccess) (string, error) {
return v.access.AddBlob(blob)
}

func (v *VersionAccess) Update() error {
func (v *VersionAccess) Update() (bool, error) {
v.access.lock.Lock()
defer v.access.lock.Unlock()

if v.readonly {
return accessio.ErrReadOnly
return false, accessio.ErrReadOnly
}
if v.desc.GetName() != v.comp || v.desc.GetVersion() != v.vers {
return errors.ErrInvalid(cpi.KIND_COMPONENTVERSION, common.VersionedElementKey(v.desc).String())
return false, errors.ErrInvalid(cpi.KIND_COMPONENTVERSION, common.VersionedElementKey(v.desc).String())
}
i := v.access.index.Get(v.comp, v.vers)
if !reflect.DeepEqual(v.desc, i.CD()) {
v.access.index.Set(v.desc, i.Info())
if v.new {
err := v.access.index.Add(v.desc, i.Info())
if err != nil {
return false, err
}
v.new = false
} else {
v.access.index.Set(v.desc, i.Info())
}
return true, nil
}
return nil
return false, nil
}

func (v *VersionAccess) Close() error {
return v.Update()
_, err := v.Update()
return err
}

func (v *VersionAccess) IsReadOnly() bool {
Expand Down
21 changes: 11 additions & 10 deletions api/ocm/extensions/repositories/genericocireg/componentversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c *ComponentVersionContainer) AccessMethod(a cpi.AccessSpec, cv refmgmt.Ex
return nil, errors.ErrNotSupported(errkind.KIND_ACCESSMETHOD, a.GetType(), "oci registry")
}

func (c *ComponentVersionContainer) SetDescriptor(cd *compdesc.ComponentDescriptor) error {
func (c *ComponentVersionContainer) SetDescriptor(cd *compdesc.ComponentDescriptor) (bool, error) {
cur := c.GetDescriptor()
*cur = *cd
return c.Update()
Expand All @@ -163,11 +163,11 @@ const (
OCM_ARTIFACT = "ocm-artifact"
)

func (c *ComponentVersionContainer) Update() error {
func (c *ComponentVersionContainer) Update() (bool, error) {
logger := Logger(c.GetContext()).WithValues("cv", common.NewNameVersion(c.comp.name, c.version))
err := c.Check()
if err != nil {
return fmt.Errorf("check failed: %w", err)
return false, fmt.Errorf("check failed: %w", err)
}

if c.state.HasChanged() {
Expand All @@ -182,7 +182,7 @@ func (c *ComponentVersionContainer) Update() error {
for i, r := range desc.Resources {
s, l, err := c.evalLayer(r.Access)
if err != nil {
return fmt.Errorf("failed resource layer evaluation: %w", err)
return false, fmt.Errorf("failed resource layer evaluation: %w", err)
}
if l > 0 {
layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{
Expand All @@ -198,7 +198,7 @@ func (c *ComponentVersionContainer) Update() error {
for i, r := range desc.Sources {
s, l, err := c.evalLayer(r.Access)
if err != nil {
return fmt.Errorf("failed source layer evaluation: %w", err)
return false, fmt.Errorf("failed source layer evaluation: %w", err)
}
if l > 0 {
layerAnnotations[l] = append(layerAnnotations[l], ArtifactInfo{
Expand All @@ -216,7 +216,7 @@ func (c *ComponentVersionContainer) Update() error {
for layer, info := range layerAnnotations {
data, err := runtime.DefaultJSONEncoding.Marshal(info)
if err != nil {
return err
return false, err
}
if m.Layers[layer].Annotations == nil {
m.Layers[layer].Annotations = map[string]string{}
Expand All @@ -233,20 +233,21 @@ func (c *ComponentVersionContainer) Update() error {
i--
}
if _, err := c.state.Update(); err != nil {
return fmt.Errorf("failed to update state: %w", err)
return false, fmt.Errorf("failed to update state: %w", err)
}

logger.Debug("add oci artifact")
tag, err := toTag(c.version)
if err != nil {
return err
return false, err
}
if _, err := c.comp.namespace.AddArtifact(c.manifest, tag); err != nil {
return fmt.Errorf("unable to add artifact: %w", err)
return false, fmt.Errorf("unable to add artifact: %w", err)
}
return true, nil
}

return nil
return false, nil
}

func (c *ComponentVersionContainer) evalLayer(s compdesc.AccessSpec) (compdesc.AccessSpec, int, error) {
Expand Down
2 changes: 1 addition & 1 deletion api/ocm/extensions/repositories/virtual/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type VersionAccess interface {
GetDescriptor() *compdesc.ComponentDescriptor
GetBlob(name string) (cpi.DataAccess, error)
AddBlob(blob cpi.BlobAccess) (string, error)
Update() error
Update() (bool, error)
Close() error

IsReadOnly() bool
Expand Down
4 changes: 2 additions & 2 deletions api/ocm/extensions/repositories/virtual/componentversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ func (c *ComponentVersionContainer) AccessMethod(a cpi.AccessSpec, cv refmgmt.Ex
return nil, errors.ErrNotSupported(errkind.KIND_ACCESSMETHOD, a.GetType(), "virtual registry")
}

func (c *ComponentVersionContainer) Update() error {
func (c *ComponentVersionContainer) Update() (bool, error) {
return c.access.Update()
}

func (c *ComponentVersionContainer) SetDescriptor(cd *compdesc.ComponentDescriptor) error {
func (c *ComponentVersionContainer) SetDescriptor(cd *compdesc.ComponentDescriptor) (bool, error) {
cur := c.access.GetDescriptor()
*cur = *cd
return c.access.Update()
Expand Down
19 changes: 12 additions & 7 deletions api/ocm/extensions/repositories/virtual/example/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,30 +192,35 @@ func (v *VersionAccess) AddBlob(blob cpi.BlobAccess) (string, error) {
return d.Encoded(), nil
}

func (v *VersionAccess) Update() error {
func (v *VersionAccess) Update() (bool, error) {
v.access.lock.Lock()
defer v.access.lock.Unlock()

if v.desc.GetName() != v.comp || v.desc.GetVersion() != v.vers {
return errors.ErrInvalid(cpi.KIND_COMPONENTVERSION, common.VersionedElementKey(v.desc).String())
return false, errors.ErrInvalid(cpi.KIND_COMPONENTVERSION, common.VersionedElementKey(v.desc).String())
}
i := v.access.index.Get(v.comp, v.vers)
if !reflect.DeepEqual(v.desc, i.CD()) {
if v.IsReadOnly() {
return accessio.ErrReadOnly
return false, accessio.ErrReadOnly
}
data, err := compdesc.Encode(v.desc)
if err != nil {
return err
return false, err
}
v.access.index.Set(v.desc, i.Info())
return vfs.WriteFile(v.access.fs, i.Info(), data, 0o600)
err = vfs.WriteFile(v.access.fs, i.Info(), data, 0o600)
if err != nil {
return false, err
}
return true, nil
}
return nil
return false, nil
}

func (v *VersionAccess) Close() error {
return v.Update()
_, err := v.Update()
return err
}

func (v *VersionAccess) IsReadOnly() bool {
Expand Down
13 changes: 7 additions & 6 deletions api/utils/accessobj/accessobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,20 +189,21 @@ func (a *AccessObject) Write(path string, mode vfs.FileMode, opts ...accessio.Op
return f.Write(a, path, o, mode)
}

func (a *AccessObject) Update() error {
if _, err := a.updateDescriptor(); err != nil {
return fmt.Errorf("unable to update descriptor: %w", err)
func (a *AccessObject) Update() (bool, error) {
if b, err := a.updateDescriptor(); err != nil {
return b, fmt.Errorf("unable to update descriptor: %w", err)
} else {
return b, nil
}

return nil
}

func (a *AccessObject) Close() error {
if a.IsClosed() {
return accessio.ErrClosed
}
list := errors.ErrListf("cannot close %s", a.info.GetObjectTypeName())
list.Add(a.Update())
_, err := a.Update()
list.Add(err)
if a.closer != nil {
list.Add(a.closer.Close(a))
}
Expand Down
2 changes: 1 addition & 1 deletion api/utils/accessobj/accessstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (s *state) Update() (bool, error) {
}

if s.IsReadOnly() {
return true, ErrReadOnly
return false, ErrReadOnly
}
data, err := s.handler.Encode(s.current)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion api/utils/accessobj/filesystemaccess.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (a *FileSystemBlobAccess) Write(path string, mode vfs.FileMode, opts ...acc
return a.base.Write(path, mode, opts...)
}

func (a *FileSystemBlobAccess) Update() error {
func (a *FileSystemBlobAccess) Update() (bool, error) {
return a.base.Update()
}

Expand Down
Loading

0 comments on commit 4ad368d

Please sign in to comment.