Skip to content

Commit

Permalink
refactor: clean-up and simplify syncying code
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorianLoch committed Feb 13, 2024
1 parent 7bb285b commit c1e8f0d
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 166 deletions.
5 changes: 5 additions & 0 deletions cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func main() {
if err != nil {
fmt.Printf("opening state file error: %q", err)
}
defer stateFile.Close()

bar := progressbar.NewOptions(0xFFFFF+1,
progressbar.OptionSetWriter(ansi.NewAnsiStdout()),
Expand Down Expand Up @@ -47,4 +48,8 @@ func main() {
if err := hibpsync.Sync(hibpsync.WithProgressFn(updateProgressBar), hibpsync.WithStateFile(stateFile)); err != nil {
fmt.Printf("sync error: %q", err)
}

if err := os.Remove(hibpsync.DefaultStateFile); err != nil {
fmt.Printf("removing state file error: %q", err)
}
}
117 changes: 37 additions & 80 deletions lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"io"
"os"
"strconv"
"sync"
"sync/atomic"
)

const (
Expand All @@ -19,8 +17,11 @@ const (
defaultCheckETag = true
defaultWorkers = 50
DefaultStateFile = "./.hibp-data/state"
lastRange = 0xFFFFF
)

type ProgressFunc func(lowest, current, to, processed, remaining int64) error

type syncConfig struct {
dataDir string
endpoint string
Expand Down Expand Up @@ -90,99 +91,27 @@ func Sync(options ...SyncOption) error {
}

from = lastState
innerProgressFn := config.progressFn

config.progressFn = func(lowest, current, to, processed, remaining int64) error {
err := func() error {
if lowest < lastState+1000 && remaining > 0 {
return nil
}

if _, err := config.stateFile.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("seeking to beginning of state file: %w", err)
}

if _, err := config.stateFile.Write([]byte(fmt.Sprintf("%d", lowest))); err != nil {
return fmt.Errorf("writing state file: %w", err)
}

lastState = lowest

return nil
}()

if err != nil {
fmt.Printf("updating state file: %v\n", err)
}

return innerProgressFn(lowest, current, to, processed, remaining)
}
config.progressFn = wrapWithStateUpdate(lastState, config.stateFile, config.progressFn)
}

rG := newRangeGenerator(from, 0xFFFFF+1, config.progressFn)

retryClient := retryablehttp.NewClient()
retryClient.RetryMax = 10
retryClient.Logger = nil
retryClient.Logger = nil // For now, we simply want to suppress the debug output

hc := hibpClient{
client := &hibpClient{
endpoint: config.endpoint,
httpClient: retryClient.StandardClient(),
maxRetries: 2,
maxRetries: 3,
}

storage := fsStorage{
storage := &fsStorage{
dataDir: config.dataDir,
}

pool := pond.New(config.minWorkers, 0, pond.MinWorkers(config.minWorkers))
//defer pool.Stop()

var (
outerErr error
errLock sync.Mutex
done atomic.Bool
)

for !done.Load() {
pool.Submit(func() {
keepGoing, err := rG.Next(func(r int64) error {
rangePrefix := toRangeString(r)

etag, _ := storage.LoadETag(rangePrefix)
// TODO: Log error with debug level

resp, err := hc.RequestRange(rangePrefix, etag)
if err != nil {
return fmt.Errorf("error requesting range %q: %w", rangePrefix, err)
}

if resp.NotModified {
return nil
}

if err := storage.Save(rangePrefix, resp.ETag, resp.Data); err != nil {
return fmt.Errorf("error saving range %q: %w", rangePrefix, err)
}

return nil
})
if err != nil {
errLock.Lock()
defer errLock.Unlock()

outerErr = errors.Join(fmt.Errorf("processing range: %w", err))
}

if !keepGoing {
done.Store(true)
}
})
}

pool.StopAndWait()

return outerErr
return _sync(from, lastRange+1, client, storage, pool, config.progressFn)
}

func readStateFile(stateFile io.ReadWriteSeeker) (int64, error) {
Expand Down Expand Up @@ -212,3 +141,31 @@ func readStateFile(stateFile io.ReadWriteSeeker) (int64, error) {

return lastState, nil
}

func wrapWithStateUpdate(startingState int64, stateFile io.ReadWriteSeeker, innerProgressFn ProgressFunc) ProgressFunc {
return func(lowest, current, to, processed, remaining int64) error {
err := func() error {
if lowest < startingState+1000 && remaining > 0 {
return nil
}

if _, err := stateFile.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("seeking to beginning of state file: %w", err)
}

if _, err := stateFile.Write([]byte(fmt.Sprintf("%d", lowest))); err != nil {
return fmt.Errorf("writing state file: %w", err)
}

startingState = lowest

return nil
}()

if err != nil {
fmt.Printf("updating state file: %v\n", err)
}

return innerProgressFn(lowest, current, to, processed, remaining)
}
}
86 changes: 0 additions & 86 deletions ranges.go

This file was deleted.

95 changes: 95 additions & 0 deletions sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package hibpsync

import (
"errors"
"fmt"
"github.com/alitto/pond"
mapset "github.com/deckarep/golang-set/v2"
"math"
"sync"
"sync/atomic"
)

func _sync(from, to int64, client *hibpClient, storage *fsStorage, pool *pond.WorkerPool, onProgress ProgressFunc) error {
var (
mErr error
errLock sync.Mutex
processed atomic.Int64
inFlightSet = mapset.NewSet[int64]()
onProgressLock sync.Mutex
)

processed.Store(from)

for i := from; i < to; i++ {
current := i

pool.Submit(func() {
rangePrefix := toRangeString(current)

err := func() error {
inFlightSet.Add(current)

etag, _ := storage.LoadETag(rangePrefix)

resp, err := client.RequestRange(rangePrefix, etag)
if err != nil {
return fmt.Errorf("requesting range: %w", err)
}

if !resp.NotModified {
if err := storage.Save(rangePrefix, resp.ETag, resp.Data); err != nil {
return fmt.Errorf("saving range: %w", err)
}
}

p := processed.Add(1)

inFlightSet.Remove(current)

lowest := lowestInFlight(inFlightSet, to)
remaining := to - p

if p%10 == 0 || remaining == 0 {
onProgressLock.Lock()
defer onProgressLock.Unlock()

if err := onProgress(lowest, current, to, p, remaining); err != nil {
return fmt.Errorf("reporting progress: %w", err)
}
}

return nil
}()

if err != nil {
errLock.Lock()
defer errLock.Unlock()

mErr = errors.Join(mErr, fmt.Errorf("processing range %q: %w", rangePrefix, err))
}
})
}

pool.StopAndWait()

return mErr
}

func toRangeString(i int64) string {
return fmt.Sprintf("%05X", i)
}

func lowestInFlight(inFlight mapset.Set[int64], to int64) int64 {
lowest := int64(math.MaxInt64)

for _, a := range inFlight.ToSlice() {
lowest = min(lowest, a)
}

if lowest == math.MaxInt64 {
return to - 1
}

return lowest
}

0 comments on commit c1e8f0d

Please sign in to comment.