diff --git a/.gitignore b/.gitignore index e0339a8bf..2e6f121e9 100755 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,4 @@ test/manual_scripts/cachetest.go lint.log azure-storage-fuse bfusemon -test/scripts/dirIterate.go +test/scripts/dirIterate.go \ No newline at end of file diff --git a/cmd/mount.go b/cmd/mount.go index b4c02ed78..4babcea88 100644 --- a/cmd/mount.go +++ b/cmd/mount.go @@ -55,6 +55,7 @@ import ( "github.com/Azure/azure-storage-fuse/v2/common/config" "github.com/Azure/azure-storage-fuse/v2/common/log" "github.com/Azure/azure-storage-fuse/v2/internal" + "github.com/Azure/azure-storage-fuse/v2/internal/filter" "github.com/sevlyar/go-daemon" "github.com/spf13/cobra" @@ -88,6 +89,7 @@ type mountOptions struct { MonitorOpt monitorOptions `config:"health_monitor"` WaitForMount time.Duration `config:"wait-for-mount"` LazyWrite bool `config:"lazy-write"` + blobFilter string `config:"blob-filter"` // v1 support Streaming bool `config:"streaming"` @@ -242,6 +244,13 @@ var mountCmd = &cobra.Command{ configFileExists := true + if config.IsSet("blob-filter") { + if len(options.blobFilter) > 0 { + filter.ProvidedFilter = options.blobFilter + config.Set("read-only", "true") //set read-only mode if filter is provided + } + } + if options.ConfigFile == "" { // Config file is not set in cli parameters // Blobfuse2 defaults to config.yaml in current directory @@ -707,6 +716,10 @@ func init() { config.BindPFlag("pre-mount-validate", mountCmd.Flags().Lookup("pre-mount-validate")) mountCmd.Flags().Lookup("pre-mount-validate").Hidden = true + //accessing blobFilter + mountCmd.PersistentFlags().StringVar(&options.blobFilter, "blob-filter", "", "Filter string for blob filtering.") + config.BindPFlag("blob-filter", mountCmd.PersistentFlags().Lookup("blob-filter")) + mountCmd.Flags().Bool("basic-remount-check", true, "Validate blobfuse2 is mounted by reading /etc/mtab.") config.BindPFlag("basic-remount-check", mountCmd.Flags().Lookup("basic-remount-check")) mountCmd.Flags().Lookup("basic-remount-check").Hidden = true diff --git a/component/azstorage/azstorage.go b/component/azstorage/azstorage.go index abd9ce607..16fc7f741 100644 --- a/component/azstorage/azstorage.go +++ b/component/azstorage/azstorage.go @@ -35,6 +35,7 @@ package azstorage import ( "context" + "errors" "fmt" "sync/atomic" "syscall" @@ -45,9 +46,9 @@ import ( "github.com/Azure/azure-storage-fuse/v2/common/config" "github.com/Azure/azure-storage-fuse/v2/common/log" "github.com/Azure/azure-storage-fuse/v2/internal" + "github.com/Azure/azure-storage-fuse/v2/internal/filter" "github.com/Azure/azure-storage-fuse/v2/internal/handlemap" "github.com/Azure/azure-storage-fuse/v2/internal/stats_manager" - "github.com/spf13/cobra" ) @@ -298,6 +299,7 @@ func (az *AzStorage) StreamDir(options internal.StreamDirOptions) ([]*internal.O path := formatListDirName(options.Name) new_list, new_marker, err := az.storage.List(path, &options.Token, options.Count) + if err != nil { log.Err("AzStorage::StreamDir : Failed to read dir [%s]", err) return new_list, "", err @@ -331,6 +333,11 @@ func (az *AzStorage) StreamDir(options internal.StreamDirOptions) ([]*internal.O // increment streamdir call count azStatsCollector.UpdateStats(stats_manager.Increment, streamDir, (int64)(1)) + //check for filters provided + if az.stConfig.filters != nil { //only apply if user has given filter + filtered_list := az.stConfig.filters.ApplyFilterOnBlobs(new_list) + return filtered_list, *new_marker, nil + } return new_list, *new_marker, nil } @@ -521,7 +528,23 @@ func (az *AzStorage) ReadLink(options internal.ReadLinkOptions) (string, error) // Attribute operations func (az *AzStorage) GetAttr(options internal.GetAttrOptions) (attr *internal.ObjAttr, err error) { //log.Trace("AzStorage::GetAttr : Get attributes of file %s", name) - return az.storage.GetAttr(options.Name) + // return az.storage.GetAttr(options.Name) + resp, err := az.storage.GetAttr(options.Name) + if err != nil { + return resp, err + } + + if az.stConfig.filters != nil { + fileValidatorObj := &filter.FileValidator{ + FilterArr: az.stConfig.filters.FilterArr, + } + if fileValidatorObj.CheckFileWithFilters(resp) { //if this particular file passes all filters, return it + return resp, nil + } else { + return nil, errors.New("the file does not pass the provided filters") + } + } + return resp, err } func (az *AzStorage) Chmod(options internal.ChmodOptions) error { diff --git a/component/azstorage/block_blob.go b/component/azstorage/block_blob.go index 8a7314b43..b15117578 100644 --- a/component/azstorage/block_blob.go +++ b/component/azstorage/block_blob.go @@ -106,6 +106,10 @@ func (bb *BlockBlob) Configure(cfg AzStorageConfig) error { Snapshots: false, } + //if filter is provided, and blobtag filter is also present then we need the details about blobtags + if bb.AzStorageConnection.Config.filters != nil && bb.AzStorageConnection.Config.filters.TagChk { + bb.listDetails.Tags = true + } return nil } @@ -446,6 +450,7 @@ func (bb *BlockBlob) getAttrUsingRest(name string) (attr *internal.ObjAttr, err Crtime: *prop.CreationTime, Flags: internal.NewFileBitMap(), MD5: prop.ContentMD5, + Tier: *prop.AccessTier, } parseMetadata(attr, prop.Metadata) @@ -593,11 +598,15 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern Crtime: dereferenceTime(blobInfo.Properties.CreationTime, *blobInfo.Properties.LastModified), Flags: internal.NewFileBitMap(), MD5: blobInfo.Properties.ContentMD5, + Tier: string(*blobInfo.Properties.AccessTier), } parseMetadata(attr, blobInfo.Metadata) attr.Flags.Set(internal.PropFlagMetadataRetrieved) attr.Flags.Set(internal.PropFlagModeDefault) } + if bb.listDetails.Tags { //if we need blobtags + attr.Tags = parseBlobTags(blobInfo.BlobTags) + } blobList = append(blobList, attr) if attr.IsDir() { @@ -636,6 +645,10 @@ func (bb *BlockBlob) List(prefix string, marker *string, count int32) ([]*intern attr.Ctime = attr.Mtime attr.Flags.Set(internal.PropFlagMetadataRetrieved) attr.Flags.Set(internal.PropFlagModeDefault) + attr.Tier = "" + if bb.Config.defaultTier != nil { //if any defualt value of access tier is provided ,set it + attr.Tier = string(*bb.Config.defaultTier) + } blobList = append(blobList, attr) } } diff --git a/component/azstorage/config.go b/component/azstorage/config.go index a01937ee6..14451beaf 100644 --- a/component/azstorage/config.go +++ b/component/azstorage/config.go @@ -42,6 +42,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" "github.com/Azure/azure-storage-fuse/v2/common/config" "github.com/Azure/azure-storage-fuse/v2/common/log" + "github.com/Azure/azure-storage-fuse/v2/internal/filter" "github.com/JeffreyRichter/enum/enum" ) @@ -186,6 +187,7 @@ type AzStorageOptions struct { CPKEnabled bool `config:"cpk-enabled" yaml:"cpk-enabled"` CPKEncryptionKey string `config:"cpk-encryption-key" yaml:"cpk-encryption-key"` CPKEncryptionKeySha256 string `config:"cpk-encryption-key-sha256" yaml:"cpk-encryption-key-sha256"` + // BlobFilter string `config:"blobFilter" yaml:"blobFilter"` // v1 support UseAdls bool `config:"use-adls" yaml:"-"` @@ -386,6 +388,18 @@ func ParseAndValidateConfig(az *AzStorage, opt AzStorageOptions) error { az.stConfig.telemetry = opt.Telemetry + //if blobFilter is provided, parse string and setup filters + if len(filter.ProvidedFilter) > 0 { + log.Info("ParseAndValidateConfig : provided filter is %s", filter.ProvidedFilter) + az.stConfig.filters = &filter.UserInputFilters{} + erro := az.stConfig.filters.ParseInp(&filter.ProvidedFilter) + log.Info("ParseAndValidateConfig : number of OR seperated filters are %d", len(az.stConfig.filters.FilterArr)) + if erro != nil { + log.Err("ParseAndValidateConfig : mount failed due to an error encountered while parsing") + return erro + } + } + httpProxyProvided := opt.HttpProxyAddress != "" httpsProxyProvided := opt.HttpsProxyAddress != "" diff --git a/component/azstorage/connection.go b/component/azstorage/connection.go index c052f955c..edec985af 100644 --- a/component/azstorage/connection.go +++ b/component/azstorage/connection.go @@ -40,6 +40,7 @@ import ( "github.com/Azure/azure-storage-fuse/v2/common" "github.com/Azure/azure-storage-fuse/v2/common/log" "github.com/Azure/azure-storage-fuse/v2/internal" + "github.com/Azure/azure-storage-fuse/v2/internal/filter" ) // Example for azblob usage : https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/storage/azblob#pkg-examples @@ -81,6 +82,9 @@ type AzStorageConfig struct { cpkEnabled bool cpkEncryptionKey string cpkEncryptionKeySha256 string + + // Filter related config + filters *filter.UserInputFilters } type AzStorageConnection struct { diff --git a/component/azstorage/datalake.go b/component/azstorage/datalake.go index 52213d64f..fa30679d2 100644 --- a/component/azstorage/datalake.go +++ b/component/azstorage/datalake.go @@ -394,6 +394,7 @@ func (dl *Datalake) GetAttr(name string) (attr *internal.ObjAttr, err error) { Ctime: *prop.LastModified, Crtime: *prop.LastModified, Flags: internal.NewFileBitMap(), + Tier: *prop.AccessTier, //set up tier } parseMetadata(attr, prop.Metadata) @@ -471,7 +472,8 @@ func (dl *Datalake) List(prefix string, marker *string, count int32) ([]*interna for _, pathInfo := range listPath.Paths { var attr *internal.ObjAttr var lastModifiedTime time.Time - if dl.Config.disableSymlink { + //if tier filter is provided by user then we need to set it up in the else statement + if (dl.Config.disableSymlink && dl.Config.filters == nil) || (dl.Config.disableSymlink && !dl.Config.filters.TierChk) { var mode fs.FileMode if pathInfo.Permissions != nil { mode, err = getFileMode(*pathInfo.Permissions) diff --git a/component/azstorage/utils.go b/component/azstorage/utils.go index 2c3139be3..c85b1b103 100644 --- a/component/azstorage/utils.go +++ b/component/azstorage/utils.go @@ -53,6 +53,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service" serviceBfs "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/service" "github.com/Azure/azure-storage-fuse/v2/common" @@ -304,6 +305,24 @@ func parseMetadata(attr *internal.ObjAttr, metadata map[string]*string) { } } +// ----------- BlobTags handling --------------- +func parseBlobTags(tags *container.BlobTags) map[string]string { + blobtags := make(map[string]string) //pushing blobtags in map for fast execution during filtering + if tags != nil { + for _, tag := range tags.BlobTagSet { + if tag != nil { + if tag.Key != nil { + blobtags[*tag.Key] = "" + } + if tag.Value != nil { + blobtags[*tag.Key] = *tag.Value + } + } + } + } + return blobtags +} + // ----------- Content-type handling --------------- // ContentTypeMap : Store file extension to content-type mapping diff --git a/internal/attribute.go b/internal/attribute.go index fcb729dd2..a91edfe7e 100644 --- a/internal/attribute.go +++ b/internal/attribute.go @@ -81,6 +81,8 @@ type ObjAttr struct { Name string // base name of the path MD5 []byte Metadata map[string]*string // extra information to preserve + Tier string //access tier of blob + Tags map[string]string //blobtags (key value pair) } // IsDir : Test blob is a directory or not diff --git a/internal/filter/accesstier.go b/internal/filter/accesstier.go new file mode 100644 index 000000000..b49ec110e --- /dev/null +++ b/internal/filter/accesstier.go @@ -0,0 +1,47 @@ +package filter + +import ( + "errors" + "strings" + + "github.com/Azure/azure-storage-fuse/v2/internal" +) + +const lenTier = len(tier) + +type AccessTierFilter struct { + opr bool // true means equal to , false means not equal to + tier string +} + +func (filter AccessTierFilter) Apply(fileInfo *internal.ObjAttr) bool { + // fmt.Println("AccessTier filter ", filter, " file name ", (*fileInfo).Name) DEBUG PRINT + return (filter.opr == (filter.tier == strings.ToLower(fileInfo.Tier))) //if both are same then return true +} + +// used for dynamic creation of AccessTierFilter +func newAccessTierFilter(args ...interface{}) Filter { + return AccessTierFilter{ + opr: args[0].(bool), + tier: args[1].(string), + } +} + +func giveAccessTierFilterObj(singleFilter *string) (Filter, error) { + (*singleFilter) = strings.Map(StringConv, (*singleFilter)) //remove all spaces and make all upperCase to lowerCase + sinChk := (*singleFilter)[lenTier : lenTier+1] //single char after tier (ex- tier=hot , here sinChk will be "=") + doubChk := (*singleFilter)[lenTier : lenTier+2] //2 chars after tier (ex- tier != cold , here doubChk will be "!=") + erro := errors.New("invalid accesstier filter, no files passed") + if !((sinChk == "=") || (doubChk == "!=")) { + return nil, erro + } + if (doubChk == "!=") && (len(*singleFilter) > lenTier+2) { + value := (*singleFilter)[lenTier+2:] // len(tier) + 2 = 4 and + 2 + return newAccessTierFilter(false, value), nil + } else if (sinChk == "=") && (len(*singleFilter) > lenTier+1) { + value := (*singleFilter)[lenTier+1:] // len(tier) + 1 = 4 and + 1 + return newAccessTierFilter(true, value), nil + } else { + return nil, erro + } +} diff --git a/internal/filter/blobtag.go b/internal/filter/blobtag.go new file mode 100644 index 000000000..db144fedd --- /dev/null +++ b/internal/filter/blobtag.go @@ -0,0 +1,53 @@ +package filter + +import ( + "errors" + "strings" + + "github.com/Azure/azure-storage-fuse/v2/internal" +) + +const lenTag = len(tag) + +type BlobTagFilter struct { + key string + value string +} + +func (filter BlobTagFilter) Apply(fileInfo *internal.ObjAttr) bool { + // fmt.Println("BlobTag filter ", filter, " file name ", (*fileInfo).Name) DEBUG PRINT + if val, ok := fileInfo.Tags[filter.key]; ok { + return (filter.value == strings.ToLower(val)) + } + return false +} + +// used for dynamic creation of BlobTagFilter +func newBlobTagFilter(args ...interface{}) Filter { + return BlobTagFilter{ + key: args[0].(string), + value: args[1].(string), + } +} + +func giveBlobTagFilterObj(singleFilter *string) (Filter, error) { + (*singleFilter) = strings.Map(StringConv, (*singleFilter)) //remove all spaces and make all upperCase to lowerCase + sinChk := (*singleFilter)[lenTag : lenTag+1] //single char after tag (ex- tag=hot:yes , here sinChk will be "=") + erro := errors.New("invalid blobtag filter, no files passed") + if !(sinChk == "=") { + return nil, erro + } + splitEq := strings.Split(*singleFilter, "=") + if len(splitEq) == 2 { + splitCol := strings.Split(splitEq[1], ":") + if len(splitCol) == 2 { + tagKey := splitCol[0] + tagVal := splitCol[1] + return newBlobTagFilter(tagKey, tagVal), nil + } else { + return nil, erro + } + } else { + return nil, erro + } +} diff --git a/internal/filter/filter.go b/internal/filter/filter.go new file mode 100644 index 000000000..b6af2ba49 --- /dev/null +++ b/internal/filter/filter.go @@ -0,0 +1,41 @@ +package filter + +import ( + "github.com/Azure/azure-storage-fuse/v2/common/log" + "github.com/Azure/azure-storage-fuse/v2/internal" +) + +var ProvidedFilter string // it will store the input provided by the user while mounting + +func (fl *UserInputFilters) ApplyFilterOnBlobs(fileInfos []*internal.ObjAttr) []*internal.ObjAttr { //function called from azstorage.go streamDir func + log.Debug("came inside filter") + if len(fileInfos) == 0 { + return fileInfos + } + fv := &FileValidator{ + workers: 16, + fileCnt: int64(len(fileInfos)), + FilterArr: fl.FilterArr, + } + fv.wgo.Add(1) //kept outside thread + fv.outputChan = make(chan *opdata, fv.workers) + fv.fileInpQueue = make(chan *internal.ObjAttr, fv.workers) + + go fv.RecieveOutput() //thread parellely reading from ouput channel + + for w := 1; w <= fv.workers; w++ { + go fv.ChkFile() //go routines for each worker (thread) are called + } + for _, fileinfo := range fileInfos { + // fmt.Println("passedFile: ", *fileinfo) + fv.fileInpQueue <- fileinfo //push all files one by one in channel , if channel is full , it will wait + } + + close(fv.fileInpQueue) //close channel once all files have been processed + + fv.wgo.Wait() //wait for completion of all threads + // fmt.Println("All workers stopped ") //exit + log.Debug("moved out of filter") + + return fv.finalFiles +} diff --git a/internal/filter/formatfilter.go b/internal/filter/formatfilter.go new file mode 100644 index 000000000..e0e78a055 --- /dev/null +++ b/internal/filter/formatfilter.go @@ -0,0 +1,42 @@ +package filter + +import ( + "errors" + "path/filepath" + "strings" + + "github.com/Azure/azure-storage-fuse/v2/internal" +) + +const lenformat = len(format) + +// formatFilter and its attributes +type FormatFilter struct { + ext_type string +} + +// Apply fucntion for format filter , check wheather a file passes the constraints +func (filter FormatFilter) Apply(fileInfo *internal.ObjAttr) bool { + // fmt.Println("Format Filter ", filter, " file name ", (*fileInfo).Name) DEBUG PRINT + fileExt := filepath.Ext((*fileInfo).Name) + chkstr := "." + filter.ext_type + // fmt.Println(fileExt, " For file :", fileInfo.Name) + return chkstr == fileExt +} + +// used for dynamic creation of formatFilter using map +func newFormatFilter(args ...interface{}) Filter { + return FormatFilter{ + ext_type: args[0].(string), + } +} + +func giveFormatFilterObj(singleFilter *string) (Filter, error) { + (*singleFilter) = strings.Map(StringConv, (*singleFilter)) + erro := errors.New("invalid format filter, no files passed") + if (len((*singleFilter)) <= lenformat+1) || ((*singleFilter)[lenformat] != '=') || (!((*singleFilter)[lenformat+1] >= 'a' && (*singleFilter)[lenformat+1] <= 'z')) { //since len(format) = 6, at next position (ie index 6) there should be "=" only and assuming extention type starts from an alphabet + return nil, erro + } + value := (*singleFilter)[lenformat+1:] + return newFormatFilter(value), nil +} diff --git a/internal/filter/modtime_filter.go b/internal/filter/modtime_filter.go new file mode 100644 index 000000000..23e4a5edd --- /dev/null +++ b/internal/filter/modtime_filter.go @@ -0,0 +1,91 @@ +package filter + +import ( + "errors" + "strings" + "time" + + "github.com/Azure/azure-storage-fuse/v2/internal" +) + +// modTimeFilter and its attributes +type modTimeFilter struct { + opr string + value time.Time +} + +// Apply fucntion for modTime filter , check wheather a file passes the constraints +func (filter modTimeFilter) Apply(fileInfo *internal.ObjAttr) bool { + // fmt.Println("modTime Filter ", filter.opr, " ", filter.value, " file name ", (*fileInfo).Name) DEBUG PRINT + fileModTimestr := (*fileInfo).Mtime.UTC().Format(time.RFC1123) + fileModTime, _ := time.Parse(time.RFC1123, fileModTimestr) + // fmt.Println(fileModTime, "this is file mod time") + + if (filter.opr == "<=") && (fileModTime.Before(filter.value) || fileModTime.Equal(filter.value)) { + return true + } else if (filter.opr == ">=") && (fileModTime.After(filter.value) || fileModTime.Equal(filter.value)) { + return true + } else if (filter.opr == ">") && (fileModTime.After(filter.value)) { + return true + } else if (filter.opr == "<") && (fileModTime.Before(filter.value)) { + return true + } else if (filter.opr == "=") && (fileModTime.Equal(filter.value)) { + return true + } + return false +} + +// used for dynamic creation of modTimeFilter using map +func newModTimeFilter(args ...interface{}) Filter { + return modTimeFilter{ + opr: args[0].(string), + value: args[1].(time.Time), + } +} + +func giveModtimeFilterObj(singleFilter *string) (Filter, error) { + erro := errors.New("invalid Modtime filter, no files passed") + if strings.Contains((*singleFilter), "<=") { + splitedParts := strings.Split((*singleFilter), "<=") + timeRFC1123str := strings.TrimSpace(splitedParts[1]) + timeRFC1123, err := time.Parse(time.RFC1123, timeRFC1123str) + if err != nil { + return nil, erro + } + return newModTimeFilter("<=", timeRFC1123), nil + } else if strings.Contains((*singleFilter), ">=") { + splitedParts := strings.Split((*singleFilter), ">=") + timeRFC1123str := strings.TrimSpace(splitedParts[1]) + timeRFC1123, err := time.Parse(time.RFC1123, timeRFC1123str) + if err != nil { + return nil, erro + } + return newModTimeFilter(">=", timeRFC1123), nil + } else if strings.Contains((*singleFilter), "<") { + splitedParts := strings.Split((*singleFilter), "<") + timeRFC1123str := strings.TrimSpace(splitedParts[1]) + timeRFC1123, err := time.Parse(time.RFC1123, timeRFC1123str) + if err != nil { + return nil, erro + } + return newModTimeFilter("<", timeRFC1123), nil + } else if strings.Contains((*singleFilter), ">") { + splitedParts := strings.Split((*singleFilter), ">") + timeRFC1123str := strings.TrimSpace(splitedParts[1]) + timeRFC1123, err := time.Parse(time.RFC1123, timeRFC1123str) + if err != nil { + return nil, erro + } + return newModTimeFilter(">", timeRFC1123), nil + } else if strings.Contains((*singleFilter), "=") { + splitedParts := strings.Split((*singleFilter), "=") + timeRFC1123str := strings.TrimSpace(splitedParts[1]) + timeRFC1123, err := time.Parse(time.RFC1123, timeRFC1123str) + if err != nil { + return nil, erro + } + return newModTimeFilter("=", timeRFC1123), nil + } else { + return nil, erro + } +} diff --git a/internal/filter/regexfilter.go b/internal/filter/regexfilter.go new file mode 100644 index 000000000..001c267ac --- /dev/null +++ b/internal/filter/regexfilter.go @@ -0,0 +1,43 @@ +package filter + +import ( + "errors" + "regexp" + "strings" + + "github.com/Azure/azure-storage-fuse/v2/internal" +) + +const lenregex = len(regex) + +// RegexFilter and its attributes +type regexFilter struct { + regex_inp *regexp.Regexp +} + +// Apply fucntion for regex filter , check wheather a file passes the constraints +func (filter regexFilter) Apply(fileInfo *internal.ObjAttr) bool { + // fmt.Println("regex filter ", filter.regex_inp, " file name ", (*fileInfo).Name) DEBUG PRINT + return filter.regex_inp.MatchString((*fileInfo).Name) +} + +// used for dynamic creation of regexFilter +func newRegexFilter(args ...interface{}) Filter { + return regexFilter{ + regex_inp: args[0].(*regexp.Regexp), + } +} + +func giveRegexFilterObj(singleFilter *string) (Filter, error) { + (*singleFilter) = strings.Map(StringConv, (*singleFilter)) + erro := errors.New("invalid regex filter, no files passed") + if (len((*singleFilter)) <= lenregex+1) || ((*singleFilter)[lenregex] != '=') { //since len(regex) = 5, at next position (ie index 5) there should be "=" pnly + return nil, erro + } + value := (*singleFilter)[lenregex+1:] //len(regex)+1 = 5 + 1 + pattern, err := regexp.Compile(value) + if err != nil { + return nil, erro + } + return newRegexFilter(pattern), nil +} diff --git a/internal/filter/sizefilter.go b/internal/filter/sizefilter.go new file mode 100644 index 000000000..cf5462e81 --- /dev/null +++ b/internal/filter/sizefilter.go @@ -0,0 +1,69 @@ +package filter + +import ( + "errors" + "strconv" + "strings" + + "github.com/Azure/azure-storage-fuse/v2/internal" +) + +const lensize = len(size) + +// SizeFilter and its attributes +type SizeFilter struct { + opr string + value float64 +} + +// Apply function for size filter , check wheather a file passes the constraints +func (filter SizeFilter) Apply(fileInfo *internal.ObjAttr) bool { + // fmt.Println("size filter ", filter, " file name ", (*fileInfo).Name) DEBUG PRINT + + if (filter.opr == "<=") && ((*fileInfo).Size <= int64(filter.value)) { + return true + } else if (filter.opr == ">=") && ((*fileInfo).Size >= int64(filter.value)) { + return true + } else if (filter.opr == ">") && ((*fileInfo).Size > int64(filter.value)) { + return true + } else if (filter.opr == "<") && ((*fileInfo).Size < int64(filter.value)) { + return true + } else if (filter.opr == "=") && ((*fileInfo).Size == int64(filter.value)) { + return true + } + return false +} + +// used for dynamic creation of sizeFilter +func newSizeFilter(args ...interface{}) Filter { + return SizeFilter{ + opr: args[0].(string), + value: args[1].(float64), + } +} + +func giveSizeFilterObj(singleFilter *string) (Filter, error) { + (*singleFilter) = strings.Map(StringConv, (*singleFilter)) //remove all spaces and make all upperCase to lowerCase + sinChk := (*singleFilter)[lensize : lensize+1] //single char after size (ex- size=7888 , here sinChk will be "=") + doubChk := (*singleFilter)[lensize : lensize+2] //2 chars after size (ex- size>=8908 , here doubChk will be ">=") + erro := errors.New("invalid size filter, no files passed") + if !((sinChk == "=") || (sinChk == ">") || (sinChk == "<") || (doubChk == ">=") || (doubChk == "<=")) { + return nil, erro + } + value := (*singleFilter)[lensize+1:] // len(size)+1 = 4 and + 1 + floatVal, err := strconv.ParseFloat(value, 64) + if err != nil { + if (*singleFilter)[lensize+1] != '=' { + return nil, erro + } else { + value := (*singleFilter)[lensize+2:] // len(size)+2 = 4 and + 2 + floatVal, err = strconv.ParseFloat(value, 64) + if err != nil { + return nil, erro + } + return newSizeFilter((*singleFilter)[lensize:lensize+2], floatVal), nil // it will give operator ex "<=" + } + } else { + return newSizeFilter((*singleFilter)[lensize:lensize+1], floatVal), nil + } +} diff --git a/internal/filter/utils.go b/internal/filter/utils.go new file mode 100644 index 000000000..81e517075 --- /dev/null +++ b/internal/filter/utils.go @@ -0,0 +1,190 @@ +package filter + +import ( + "context" + "errors" + "strings" + "sync" + "unicode" + + "github.com/Azure/azure-storage-fuse/v2/internal" +) + +// declaring all filters here +const size = "size" +const format = "format" +const regex = "regex" +const modtime = "modtime" +const tier = "tier" +const tag = "tag" + +// struct used for storing files with bool (passed or !passed) in output channel +type opdata struct { + filels *internal.ObjAttr + ispassed bool +} + +// Interface having child as different type of filters like size, format, regex etc +type Filter interface { + Apply(fileInfo *internal.ObjAttr) bool //Apply function defined for each filter, it takes file as input and returns wheather it passes that filter or not +} + +// used for converting string given by user to ideal string so that it becomes easy to process +func StringConv(r rune) rune { + if unicode.IsSpace(r) { + return -1 // Remove space + } + if r >= 'A' && r <= 'Z' { + return unicode.ToLower(r) // Convert uppercase to lowercase + } + return r +} + +// used to return the name of filter +func getFilterName(str *string) string { + for i := range *str { + if !(((*str)[i] >= 'a' && (*str)[i] <= 'z') || ((*str)[i] >= 'A' && (*str)[i] <= 'Z')) { //assuming filters would have only alphabetic names, break when current char is not an alphabet + return (*str)[0:i] //then return the substring till prev index , it will be the name of filter + } + } + return "error" //if no substring is returned inside loop this means the filter name was not valid or does not exists +} + +// it will store the fliters, outer array splitted by ||, inner array splitted by &&, tier filter presence and tag filter presence +type UserInputFilters struct { + FilterArr [][]Filter + TierChk bool + TagChk bool +} + +// this function parses the input string and stores filters, tierchk and tagchk in UserInputFilters +func (fl *UserInputFilters) ParseInp(str *string) error { + splitOr := strings.Split((*str), "||") //splitted string on basis of OR + fl.TierChk = false + fl.TagChk = false + for _, andFilters := range splitOr { //going over each part splitted by OR + var individualFilter []Filter //this array will store all filters seperated by && at each index + splitAnd := strings.Split(andFilters, "&&") //splitted by && + for _, singleFilter := range splitAnd { //this gives a particular filter (ex- A&&B&&C so it will traverse A then B then C) + trimmedStr := strings.TrimSpace(singleFilter) + thisFilter := getFilterName(&trimmedStr) //retrieve name of filter + thisFilter = strings.ToLower(thisFilter) //converted to lowercase + var obj Filter + var erro error + if thisFilter == size { + obj, erro = giveSizeFilterObj(&singleFilter) + } else if thisFilter == format { + obj, erro = giveFormatFilterObj(&singleFilter) + } else if thisFilter == regex { + obj, erro = giveRegexFilterObj(&singleFilter) + } else if thisFilter == modtime { + obj, erro = giveModtimeFilterObj(&singleFilter) + } else if thisFilter == tier { + if !fl.TierChk { //if tier filter is present , set tierchk to true + fl.TierChk = true + } + obj, erro = giveAccessTierFilterObj(&singleFilter) + } else if thisFilter == tag { + if !fl.TagChk { //if tag filter is present , set tagchk to true + fl.TagChk = true + } + obj, erro = giveBlobTagFilterObj(&singleFilter) + } else { // if no name matched , means it is not a valid filter , thus return an error + return errors.New("invalid filter, no files passed") + } + if erro != nil { //if any filter provided error while parsing return error + return erro + } + individualFilter = append(individualFilter, obj) //inner array (splitted by &&) is being formed + } + fl.FilterArr = append(fl.FilterArr, individualFilter) //outer array (splitted by ||) is being formed + } + return nil //everything went well, no error +} + +type FileValidator struct { + workers int //no of threads analysing file + fileCnt int64 //total number of files that will be checked against filters + wgo sync.WaitGroup //to wait until all files from output channel are processed + fileInpQueue chan *internal.ObjAttr //file input channel + outputChan chan *opdata //file output channel (containing both passed and !passed files) + FilterArr [][]Filter //stores filters + finalFiles []*internal.ObjAttr //list containing files files which passed filters +} + +// read output channel +func (fv *FileValidator) RecieveOutput() { + defer fv.wgo.Done() + var counter int64 = 0 //keeps count of number of files recieved in output channel + for data := range fv.outputChan { + counter++ + // fmt.Println("OutPut Channel: ", data.filels.Name, " ", data.ispassed) DEBUG PRINT + if data.ispassed { //if files passed filter , append it to list of final files + fv.finalFiles = append(fv.finalFiles, data.filels) + } + if counter == fv.fileCnt { //indicates that all files are processed and read from output channel , close channel now + close(fv.outputChan) + break + } + } +} + +// it checks every single file against all and filters in seq order +func (fv *FileValidator) checkIndividual(ctx *context.Context, fileInf *internal.ObjAttr, filters *[]Filter) bool { + for _, filter := range *filters { + select { + case <-(*ctx).Done(): // If any one combination returns true, no need to check furthur + // fmt.Println("terminating file by context: ", (*fileInf).Name, " for filter: ", filter) DEBUG PRINT + return false + default: + passedThisFilter := filter.Apply(fileInf) + if !passedThisFilter { //if any filter fails, return false immediately as it can never be true + // fmt.Println("terminating file by false : ", (*fileInf).Name, " for filter: ", filter) DEBUG PRINT + return false + } + } + } + // fmt.Println("chkIn : ", (*fileInf)) + return true // if all filters in seq order passes , return true +} + +// it takes a single file and all filters mentioned by user returns a bool +func (fv *FileValidator) CheckFileWithFilters(fileInf *internal.ObjAttr) bool { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + response := false + resultChan := make(chan bool, len(fv.FilterArr)) // made a channel to store result calculated by each combination + for _, filters := range fv.FilterArr { + go func(filters []Filter) { //all combinations are running parallely to speed up + passed := fv.checkIndividual(&ctx, fileInf, &filters) + resultChan <- passed //push result of each combination in channel + }(filters) + } + for range fv.FilterArr { + resp := <-resultChan //here we check the result of each combination as upper for loop pushed in channel + if (resp) && (!response) { + cancel() + // for the first time when we recieve a true , we will cancel context and wait for all processes to stop + } + response = (response || resp) + } + // fmt.Println("chkfil: ", (*fileInf), " ", response) + return response // return response, it will be true if any combination returns a true +} + +// this is thread pool , where 16 threads are running +func (fv *FileValidator) ChkFile() { + for fileInf := range fv.fileInpQueue { + if fileInf.IsDir() { //pass all directories as it is without applying filter on them + fv.outputChan <- (&opdata{filels: fileInf, ispassed: true}) + continue + } + // fmt.Println("sending for check: ", fileInf.Name) + Passed := fv.CheckFileWithFilters(fileInf) + if Passed { //if a file passes add it to output channel with true + fv.outputChan <- (&opdata{filels: fileInf, ispassed: true}) + } else { //if a file passes add it to output channel with false + fv.outputChan <- (&opdata{filels: fileInf, ispassed: false}) + } + } +}