Skip to content

Commit

Permalink
feat: implement export functionality, add test for it
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorianLoch committed Feb 13, 2024
1 parent af8e9c9 commit c1a59ea
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 31 deletions.
44 changes: 44 additions & 0 deletions export.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package hibpsync

import (
"fmt"
"io"
)

var lineSeparator = []byte("\n")

func export(from, to int64, store storage, w io.Writer) error {
for i := from; i < to; i++ {
err := func() error {
rangePrefix := toRangeString(i)

data, err := store.LoadData(rangePrefix)
if err != nil {
return fmt.Errorf("loading data for range %q: %w", rangePrefix, err)
}
defer data.Close()

if _, err := io.Copy(w, data); err != nil {
return fmt.Errorf("writing data to export writer: %w", err)
}

if i+1 < to {
if _, err := w.Write(lineSeparator); err != nil {
return fmt.Errorf("writing line separator to export writer: %w", err)
}
}

return nil
}()

if err != nil {
return err
}
}

if closer, ok := w.(io.Closer); ok {
return closer.Close()
}

return nil
}
28 changes: 28 additions & 0 deletions export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package hibpsync

import (
"bytes"
"io"
"testing"

"go.uber.org/mock/gomock"
)

func TestExport(t *testing.T) {
ctrl := gomock.NewController(t)
storageMock := NewMockstorage(ctrl)

storageMock.EXPECT().LoadData("00000").Return(io.NopCloser(bytes.NewReader([]byte("prefix:counter1"))), nil)
storageMock.EXPECT().LoadData("00001").Return(io.NopCloser(bytes.NewReader([]byte("prefix:counter2"))), nil)
storageMock.EXPECT().LoadData("00002").Return(io.NopCloser(bytes.NewReader([]byte("prefix:counter3"))), nil)

buf := bytes.NewBuffer([]byte{})

if err := export(0, 3, storageMock, buf); err != nil {
t.Fatalf("unexpected error: %v", err)
}

if buf.String() != "prefix:counter1\nprefix:counter2\nprefix:counter3" {
t.Fatalf("unexpected output: %q", buf.String())
}
}
46 changes: 31 additions & 15 deletions lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,48 @@ const (

type ProgressFunc func(lowest, current, to, processed, remaining int64) error

type syncConfig struct {
type config struct {
dataDir string
endpoint string
minWorkers int
progressFn ProgressFunc
stateFile io.ReadWriteSeeker
}

type SyncOption func(*syncConfig)
type Option func(*config)

func WithDataDir(dataDir string) SyncOption {
return func(c *syncConfig) {
func WithDataDir(dataDir string) Option {
return func(c *config) {
c.dataDir = dataDir
}
}

func WithEndpoint(endpoint string) SyncOption {
return func(c *syncConfig) {
func WithEndpoint(endpoint string) Option {
return func(c *config) {
c.endpoint = endpoint
}
}

func WithMinWorkers(workers int) SyncOption {
return func(c *syncConfig) {
func WithMinWorkers(workers int) Option {
return func(c *config) {
c.minWorkers = workers
}
}

func WithStateFile(stateFile io.ReadWriteSeeker) SyncOption {
return func(c *syncConfig) {
func WithStateFile(stateFile io.ReadWriteSeeker) Option {
return func(c *config) {
c.stateFile = stateFile
}
}

func WithProgressFn(progressFn ProgressFunc) SyncOption {
return func(c *syncConfig) {
func WithProgressFn(progressFn ProgressFunc) Option {
return func(c *config) {
c.progressFn = progressFn
}
}

func Sync(options ...SyncOption) error {
config := &syncConfig{
func Sync(options ...Option) error {
config := &config{
dataDir: defaultDataDir,
endpoint: defaultEndpoint,
minWorkers: defaultWorkers,
Expand Down Expand Up @@ -102,7 +102,23 @@ func Sync(options ...SyncOption) error {

pool := pond.New(config.minWorkers, 0, pond.MinWorkers(config.minWorkers))

return _sync(from, lastRange+1, client, storage, pool, config.progressFn)
return sync(from, lastRange+1, client, storage, pool, config.progressFn)
}

func Export(w io.Writer, options ...Option) error {
config := &config{
dataDir: defaultDataDir,
}

for _, option := range options {
option(config)
}

storage := &fsStorage{
dataDir: config.dataDir,
}

return export(0, lastRange+1, storage, w)
}

func readStateFile(stateFile io.ReadWriteSeeker) (int64, error) {
Expand Down
12 changes: 2 additions & 10 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package hibpsync

import (
"bufio"
"errors"
"fmt"
"io"
"os"
"path"
"sync"
syncPkg "sync"
)

const (
Expand All @@ -22,7 +21,7 @@ type storage interface {

type fsStorage struct {
dataDir string
writeLock sync.Mutex
writeLock syncPkg.Mutex
}

var _ storage = (*fsStorage)(nil)
Expand Down Expand Up @@ -58,10 +57,6 @@ func (f *fsStorage) Save(key, etag string, data []byte) error {
func (f *fsStorage) LoadETag(key string) (string, error) {
file, err := os.Open(f.filePath(key))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return "", nil
}

return "", fmt.Errorf("opening file %q: %w", f.filePath(key), err)
}
defer file.Close()
Expand All @@ -78,9 +73,6 @@ func (f *fsStorage) LoadETag(key string) (string, error) {
func (f *fsStorage) LoadData(key string) (io.ReadCloser, error) {
file, err := os.Open(f.filePath(key))
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil, nil
}
return nil, fmt.Errorf("opening file %q: %w", f.filePath(key), err)
}

Expand Down
14 changes: 9 additions & 5 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ import (
"github.com/alitto/pond"
mapset "github.com/deckarep/golang-set/v2"
"math"
"sync"
syncPkg "sync"
"sync/atomic"
)

func _sync(from, to int64, client *hibpClient, store storage, pool *pond.WorkerPool, onProgress ProgressFunc) error {
func sync(from, to int64, client *hibpClient, store storage, pool *pond.WorkerPool, onProgress ProgressFunc) error {
var (
mErr error
errLock sync.Mutex
errLock syncPkg.Mutex
processed atomic.Int64
inFlightSet = mapset.NewSet[int64]()
onProgressLock sync.Mutex
onProgressLock syncPkg.Mutex
)

processed.Store(from)
Expand All @@ -30,7 +30,11 @@ func _sync(from, to int64, client *hibpClient, store storage, pool *pond.WorkerP
err := func() error {
inFlightSet.Add(current)

etag, _ := store.LoadETag(rangePrefix)
// We basically ignore any error here because we can still process the range even if we can't load the etag
etag, err := store.LoadETag(rangePrefix)
if err != nil {
etag = ""
}

resp, err := client.RequestRange(rangePrefix, etag)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestSync(t *testing.T) {
// Create the pool with some arbitrary configuration
pool := pond.New(3, 3)

if err := _sync(0, 12, client, storageMock, pool, progressFn); err != nil {
if err := sync(0, 12, client, storageMock, pool, progressFn); err != nil {
t.Fatalf("unexpected error: %v", err)
}

Expand Down

0 comments on commit c1a59ea

Please sign in to comment.