Skip to content

Commit

Permalink
refactor: rewrite the API to allow for concurrent access to the datas…
Browse files Browse the repository at this point in the history
…et by sharing the `storage` instance, simplifies the API too
  • Loading branch information
FlorianLoch committed Mar 1, 2024
1 parent 5049bd8 commit c5ff6cb
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 165 deletions.
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,23 @@ Of course, this can be disabled too.

The library supports to continue from where it left off, the `sync` command mentioned below demonstrates this.

The basic API is really simple; two functions are exported (and additionally, typed configuration options):

## API

The API is really simple; one type, holding three methods, is exported (and additionally, typed configuration options):

```go
Sync(options ...SyncOption) error // Syncs the local copy with the upstream database
Export(w io.Writer, options ...ExportOption) error // Writes a continuous, decompressed and "free-of-etags" stream to the given io.Writer
New(options ...CommonOption) *HIBP
HIBP#Sync(options ...SyncOption) error // Syncs the local copy with the upstream database
HIBP#Export(w io.Writer, options ...ExportOption) error // Writes a continuous, decompressed and "free-of-etags" stream to the given io.Writer
HIBP#.Query("ABCDE") (io.ReadClose, error) // Returns the k-proximity API result as the upstream API would
```

Additionally, the library can also operate on its data using the `RangeAPI` type and its `Query` method.
This operates on disk but, depending on the medium, should provide access times that are probably good enough for all scenarios.
All operates operate on disk but, depending on the medium, should provide access times that are probably good enough for all scenarios.
A memory-based `tmpfs` will speed things up when necessary.

```go
querier := NewRangeAPI(/* optional options go here */)
kProximityResponse, err := querier.Query("ABCDE")
// TODO: Handle error
// TODO: Read the response (as before received from the upstream API) line-by-line and check whether it contains your hash.
```

## CLI

There are two basic CLI commands, `sync` and `export` that can be used for manual tasks and serve as minimal examples on how to use the library.
They are basic but should play well with other tooling.
Expand Down
8 changes: 5 additions & 3 deletions cmd/export/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@
package main

import (
hibpsync "github.com/exaring/go-hibp-sync"
hibp "github.com/exaring/go-hibp-sync"
"os"
)

func main() {
dataDir := hibpsync.DefaultDataDir
dataDir := hibp.DefaultDataDir

if len(os.Args) == 2 {
dataDir = os.Args[1]
}

if err := hibpsync.Export(os.Stdout, hibpsync.ExportWithDataDir(dataDir)); err != nil {
h := hibp.New(hibp.WithDataDir(dataDir))

if err := h.Export(os.Stdout); err != nil {
_, _ = os.Stderr.WriteString("Failed to export HIBP data: " + err.Error())

os.Exit(1)
Expand Down
15 changes: 8 additions & 7 deletions cmd/sync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package main

import (
"fmt"
hibpsync "github.com/exaring/go-hibp-sync"
hibp "github.com/exaring/go-hibp-sync"
"github.com/k0kubun/go-ansi"
"github.com/schollz/progressbar/v3"
"os"
Expand All @@ -16,7 +16,7 @@ import (
)

func main() {
dataDir := hibpsync.DefaultDataDir
dataDir := hibp.DefaultDataDir

if len(os.Args) == 2 {
dataDir = os.Args[1]
Expand All @@ -30,7 +30,7 @@ func main() {
}

func run(dataDir string) error {
stateFilePath := path.Join(dataDir, hibpsync.DefaultStateFileName)
stateFilePath := path.Join(dataDir, hibp.DefaultStateFileName)
if err := os.MkdirAll(path.Dir(stateFilePath), 0o755); err != nil {
return fmt.Errorf("creating state file directory %q: %w", stateFilePath, err)
}
Expand Down Expand Up @@ -69,10 +69,11 @@ func run(dataDir string) error {
return nil
}

if err := hibpsync.Sync(
hibpsync.SyncWithDataDir(dataDir),
hibpsync.SyncWithProgressFn(updateProgressBar),
hibpsync.SyncWithStateFile(stateFile)); err != nil {
h := hibp.New(hibp.WithDataDir(dataDir))

if err := h.Sync(
hibp.SyncWithProgressFn(updateProgressBar),
hibp.SyncWithStateFile(stateFile)); err != nil {
return fmt.Errorf("syncing: %w", err)
}

Expand Down
2 changes: 1 addition & 1 deletion export.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package hibpsync
package hibp

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion export_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package hibpsync
package hibp

import (
"bytes"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/exaring/go-hibp-sync

go 1.21.6
go 1.21.7

require (
github.com/alitto/pond v1.8.3
Expand Down
94 changes: 33 additions & 61 deletions lib.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package hibpsync
package hibp

import (
"bytes"
Expand All @@ -20,25 +20,35 @@ const (
defaultLastRange = 0xFFFFF
)

// ProgressFunc represents a type of function that can be used to report progress of a sync operation.
// The parameters are as follows:
// - lowest: The lowest prefix that has been processed so far (due to concurrent operations, there is a window of
// prefixes that are possibly being processed at the same time, "lowest" refers to the range with the lowest prefix).
// - current: The current prefix that is being processed, i.e. for which the ProgressFunc gets invoked.
// - to: The highest prefix that will be processed.
// - processed: The number of prefixes that have been processed so far.
// - remaining: The number of prefixes that are remaining to be processed.
// The function should return an error if the operation should be aborted.
type ProgressFunc func(lowest, current, to, processed, remaining int64) error
// HIBP bundles the functionality of the HIBP package.
// In order to allow concurrent operations on the local, file-based dataset efficiently and safely, a shared set of
// locks is required - this gets managed by the HIBP type.
type HIBP struct {
store storage
}

func New(options ...CommonOption) *HIBP {
config := commonConfig{
dataDir: DefaultDataDir,
noCompression: false,
}

for _, option := range options {
option(&config)
}

storage := newFSStorage(config.dataDir, config.noCompression)

return &HIBP{
store: storage,
}
}

// Sync copies the ranges, i.e., the HIBP data, from the upstream API to the local storage.
// The function will start from the lowest prefix and continue until the highest prefix.
// See the set of SyncOption functions for customizing the behavior of the sync operation.
func Sync(options ...SyncOption) error {
func (h *HIBP) Sync(options ...SyncOption) error {
config := &syncConfig{
commonConfig: commonConfig{
dataDir: DefaultDataDir,
},
ctx: context.Background(),
endpoint: defaultEndpoint,
minWorkers: defaultWorkers,
Expand Down Expand Up @@ -73,63 +83,25 @@ func Sync(options ...SyncOption) error {
maxRetries: 3,
}

storage := newFSStorage(config.dataDir, config.noCompression)

// It is important to create a non-buffering/blocking pool because we don't want to schedule all jobs upfront.
// This would cause problems, especially when cancelling the context.
pool := pond.New(config.minWorkers, 0, pond.MinWorkers(config.minWorkers))

return sync(config.ctx, from, config.lastRange+1, client, storage, pool, config.progressFn)
}

// Export writes the HIBP data to the given writer.
// The data is written in the same format as it is provided by the HIBP API itself.
// See the set of ExportOption functions for customizing the behavior of the export operation.
func Export(w io.Writer, options ...ExportOption) error {
config := &exportConfig{
commonConfig: commonConfig{
dataDir: DefaultDataDir,
},
}

for _, option := range options {
option(config)
}

storage := newFSStorage(config.dataDir, config.noCompression)

return export(0, defaultLastRange+1, storage, w)
}

// RangeAPI provides an API for querying the local HIBP data.
type RangeAPI struct {
storage storage
return sync(config.ctx, from, config.lastRange+1, client, h.store, pool, config.progressFn)
}

// NewRangeAPI creates a new RangeAPI instance that can be used for querying k-proximity ranges.
// See the set of RangeAPIOption functions for customizing the behavior of the RangeAPI.
func NewRangeAPI(options ...RangeAPIOption) *RangeAPI {
config := &queryConfig{
commonConfig: commonConfig{
dataDir: DefaultDataDir,
},
}

for _, option := range options {
option(config)
}

return &RangeAPI{
storage: newFSStorage(config.dataDir, config.noCompression),
}
// Export writes the dataset to the given writer.
// The data is written in the same format as it is provided by the Have-I-Been-Pwned API itself.
func (h *HIBP) Export(w io.Writer) error {
return export(0, defaultLastRange+1, h.store, w)
}

// Query queries the local HIBP data for the given prefix.
// Query queries the local dataset for the given prefix.
// The function returns an io.ReadCloser that can be used to read the data, it should be closed as soon as possible
// to release the read lock on the file.
// It is the responsibility of the caller to close the returned io.ReadCloser.
func (q *RangeAPI) Query(prefix string) (io.ReadCloser, error) {
reader, err := q.storage.LoadData(prefix)
func (h *HIBP) Query(prefix string) (io.ReadCloser, error) {
reader, err := h.store.LoadData(prefix)
if err != nil {
return nil, fmt.Errorf("loading data for prefix %q: %w", prefix, err)
}
Expand Down
16 changes: 10 additions & 6 deletions lib_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package hibpsync
package hibp

import (
"io"
Expand All @@ -11,23 +11,27 @@ func BenchmarkQuery(b *testing.B) {

dataDir := b.TempDir()

if err := Sync(SyncWithDataDir(dataDir), SyncWithLastRange(lastRange)); err != nil {
h := New(WithDataDir(dataDir))

if err := h.Sync(SyncWithLastRange(lastRange)); err != nil {
b.Fatalf("unexpected error: %v", err)
}

querier := NewRangeAPI(QueryWithDataDir(dataDir))

b.ResetTimer()

for i := 0; i < b.N; i++ {
func() {
rnd := rand.Intn(lastRange)

reader, err := querier.Query(toRangeString(int64(rnd)))
reader, err := h.Query(toRangeString(int64(rnd)))
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
defer reader.Close()
defer func() {
if reader.Close() != nil {
b.Fatalf("unexpected error: %v", err)
}
}()

data, err := io.ReadAll(reader)
if err != nil {
Expand Down
Loading

0 comments on commit c5ff6cb

Please sign in to comment.