Skip to content

Commit

Permalink
added cooldown a d batches features
Browse files Browse the repository at this point in the history
updated README
  • Loading branch information
rotarur committed Jun 24, 2024
1 parent 8b7e0eb commit 67db896
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 12 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ ipfs-mgm sync --help
ipfs-mgm sync -s <SOURCE URL> -d <DESTINATION URL>
```

#### Transfer all files using batches and cooldown

```bash
ipfs-mgm sync -s <SOURCE URL> -d <DESTINATION URL> -f cids -c 5 -b 100
```
*INFO: `-b 100` will transfer the files in batches of 100 in paralel and wait 5 seconds (`-c 5`) in between the batches to avoid overloading the IPFS nodes*


#### Transfer only specific files from one IPFS node to another:

```bash
Expand Down
51 changes: 40 additions & 11 deletions internal/sync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ var SyncCmd = &cobra.Command{
},
}

var workerItemCount int = 50

func init() {
SyncCmd.Flags().StringP("source", "s", "", "IPFS source endpoint")
SyncCmd.MarkFlagRequired("source")
SyncCmd.Flags().StringP("destination", "d", "", "IPFS destination endpoint")
SyncCmd.MarkFlagRequired("destination")
SyncCmd.Flags().StringP("from-file", "f", "", "Sync CID's from file")
SyncCmd.Flags().IntP("batch", "b", 100, "Batch files to sync in paralel")
SyncCmd.Flags().IntP("cooldown", "c", 0, "Cooldown in seconds between the batches, by default not used. Only used with baches options, ignored otherwise")
}

func Sync(cmd *cobra.Command) {
Expand All @@ -40,7 +40,13 @@ func Sync(cmd *cobra.Command) {

var cids []utils.IPFSCIDResponse

// Get all command flags
// check if syncing only the CIDS specified in the file
fromFile, err := cmd.Flags().GetString("from-file")
if err != nil {
fmt.Println(err)
}

// get source to sync from
src, err := cmd.Flags().GetString("source")
if err != nil {
log.Println(err)
Expand All @@ -51,21 +57,36 @@ func Sync(cmd *cobra.Command) {
log.Println(err)
}

fromFile, err := cmd.Flags().GetString("from-file")
cooldown, err := cmd.Flags().GetInt("cooldown")
if err != nil {
log.Println(err)
}

if cooldown < 0 {
fmt.Printf("The specified cooldown is not valid, it must be greater or equal to 0. Specified %d", cooldown)
os.Exit(1)
}

batch, err := cmd.Flags().GetInt("batch")
if err != nil {
fmt.Println(err)
}
if batch <= 0 {
fmt.Printf("The specified batch is not valid, it must be greater than 0. Specified %d", batch)
cooldown = 0
os.Exit(1)
}

// Will use the file only if specified
if len(fromFile) > 0 {
log.Printf("Syncing from %s to %s using the file <%s> as input\n", src, dst, fromFile)
log.Printf("Syncing from <%s> to <%s> using as input the file <%s>\n", src, dst, fromFile)
c, err := utils.ReadCIDFromFile(fromFile)
if err != nil {
fmt.Println(err)
os.Exit(1)
}

// Create our structure with the CIDS's
// Create our structure with the CID's
cids, err = utils.SliceToCIDSStruct(c)
if err != nil {
fmt.Println(err)
Expand Down Expand Up @@ -97,29 +118,37 @@ func Sync(cmd *cobra.Command) {
}

counter := 1

length := len(cids)
log.Printf("There are %d CIDs to be synced", length)

// Adjust for the number of CID's
if length < workerItemCount {
workerItemCount = length
if batch > length {
batch = length
log.Printf("Using %d batch calls as there are %d CIDs to sync\n", batch, length)
} else {
log.Printf("Using %d batch calls\n", batch)
}

for i := 0; i < length; {
// Create a channel with buffer of workerItemCount size
workChan := make(chan utils.HTTPResult, workerItemCount)
workChan := make(chan utils.HTTPResult, batch)
var wg sync.WaitGroup

for j := 0; j < workerItemCount; j++ {
for j := 0; j < batch; j++ {
wg.Add(1)
go func(c int, cidID string) {
defer wg.Done()
AsyncCall(src, dst, cidID, &c, length, &failed, &synced)

}(counter, cids[i].Cid)
counter += 1
i++
}

if cooldown > 0 {
time.Sleep(time.Duration(cooldown) * time.Second)
}

close(workChan)
wg.Wait()
}
Expand Down
6 changes: 5 additions & 1 deletion internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func PostCID(dst string, payload []byte, fPath string) (*http.Response, error) {
// Set custom User-Agent for cloudflare WAF policies
req.Header.Set("User-Agent", "graphprotocol/ipfs-mgm")
req.Header.Set("Content-Type", writer.FormDataContentType())
// req.Header.Set("Content-Type", "multipart/form-data")

// Set Directory Headers
if len(fPath) != 0 {
Expand Down Expand Up @@ -232,7 +233,10 @@ func ReadCIDFromFile(f string) ([]string, error) {
var s []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
s = append(s, scanner.Text())
txt := scanner.Text()
if len(txt) > 0 {
s = append(s, scanner.Text())
}
}

return s, nil
Expand Down

0 comments on commit 67db896

Please sign in to comment.