Skip to content

Commit

Permalink
set scan data after scan is finished
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <[email protected]>
  • Loading branch information
kobergj committed Mar 16, 2023
1 parent f5f28e9 commit d6471ec
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 30 deletions.
35 changes: 12 additions & 23 deletions pkg/events/postprocessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type (
var (
// PPStepAntivirus is the step that scans for viruses
PPStepAntivirus Postprocessingstep = "virusscan"
// PPStepFTS is the step that indexes files for full text search
PPStepFTS Postprocessingstep = "fts"
// PPStepPolicies is the step the step that enforces policies
PPStepPolicies Postprocessingstep = "policies"
// PPStepDelay is the step that processing. Useful for testing or user annoyment
PPStepDelay Postprocessingstep = "delay"

Expand Down Expand Up @@ -68,26 +68,6 @@ func (BytesReceived) Unmarshal(v []byte) (interface{}, error) {
return e, err
}

// VirusscanFinished is emitted by the server when it has completed an antivirus scan
type VirusscanFinished struct {
Infected bool
Outcome PostprocessingOutcome
UploadID string
Filename string
ExecutingUser *user.User
Description string
Scandate time.Time
ResourceID *provider.ResourceId
ErrorMsg string // empty when no error
}

// Unmarshal to fulfill umarshaller interface
func (VirusscanFinished) Unmarshal(v []byte) (interface{}, error) {
e := VirusscanFinished{}
err := json.Unmarshal(v, &e)
return e, err
}

// StartPostprocessingStep can be issued by the server to start a postprocessing step
type StartPostprocessingStep struct {
UploadID string
Expand Down Expand Up @@ -116,7 +96,7 @@ type PostprocessingStepFinished struct {
Filename string

FinishedStep Postprocessingstep // name of the step
Result interface{} // result information
Result interface{} // result information see VirusscanResult for example
Error error // possible error of the step
Outcome PostprocessingOutcome // some services may cause postprocessing to stop
}
Expand All @@ -128,6 +108,15 @@ func (PostprocessingStepFinished) Unmarshal(v []byte) (interface{}, error) {
return e, err
}

// VirusscanResult is the Result of a PostprocessingStepFinished event from the antivirus
type VirusscanResult struct {
Infected bool
Description string
Scandate time.Time
ResourceID *provider.ResourceId
ErrorMsg string // empty when no error
}

// PostprocessingFinished is emitted by *some* service which can decide that
type PostprocessingFinished struct {
UploadID string
Expand Down
19 changes: 12 additions & 7 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event
return nil, errors.New("need nats for async file processing")
}

ch, err := events.Consume(fs.stream, "dcfs", events.PostprocessingFinished{}, events.VirusscanFinished{})
ch, err := events.Consume(fs.stream, "dcfs", events.PostprocessingFinished{}, events.PostprocessingStepFinished{})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -256,9 +256,13 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event")
}

/* LETS KEEP THIS COMMENTED UNTIL VIRUSSCANNING IS BACKMERGED
case events.VirusscanFinished:
if ev.ErrorMsg != "" {
case events.PostprocessingStepFinished:
if ev.FinishedStep != events.PPStepAntivirus {
// atm we are only interested in antivirus results
}

res := ev.Result.(events.VirusscanResult)
if res.ErrorMsg != "" {
// scan failed somehow
// Should we handle this here?
continue
Expand All @@ -268,6 +272,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
switch ev.UploadID {
case "":
// uploadid is empty -> this was an on-demand scan
/* ON DEMAND SCANNING NOT SUPPORTED ATM
ctx := ctxpkg.ContextSetUser(context.Background(), ev.ExecutingUser)
ref := &provider.Reference{ResourceId: ev.ResourceID}
Expand Down Expand Up @@ -342,6 +347,7 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})
continue
}
*/
default:
// uploadid is not empty -> this is an async upload
up, err := upload.Get(ctx, ev.UploadID, fs.lu, fs.tp, fs.o.Root, fs.stream, fs.o.AsyncFileUploads, fs.o.Tokens)
Expand All @@ -359,14 +365,13 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
n = no
}

if err := n.SetScanData(ev.Description, ev.Scandate); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", ev.ResourceID).Msg("Failed to set scan results")
if err := n.SetScanData(res.Description, res.Scandate); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", res.ResourceID).Msg("Failed to set scan results")
continue
}

// remove cache entry in gateway
fs.cache.RemoveStat(ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})
*/
default:
log.Error().Interface("event", ev).Msg("Unknown event")
}
Expand Down

0 comments on commit d6471ec

Please sign in to comment.