Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling improvement for storage and util packages #444

Merged
merged 2 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion asserter/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,11 @@ func (a *Asserter) Transaction(
// invalid transaction identifiers, or a direction not defined by the enum.
func (a *Asserter) RelatedTransactions(relatedTransactions []*types.RelatedTransaction) error {
if dup := DuplicateRelatedTransaction(relatedTransactions); dup != nil {
return fmt.Errorf("%w: %v", ErrDuplicateRelatedTransaction, dup)
return fmt.Errorf(
"related transaction %s is invalid: %w",
types.PrintStruct(dup),
ErrDuplicateRelatedTransaction,
)
}

for i, relatedTransaction := range relatedTransactions {
Expand Down
2 changes: 1 addition & 1 deletion constructor/worker/populator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func PopulateInput(state string, input string) (string, error) {
return value.Raw
})
if err != nil {
return "", fmt.Errorf("%w: unable to insert variables", err)
return "", fmt.Errorf("unable to insert variables: %w", err)
}

if !gjson.Valid(input) {
Expand Down
10 changes: 5 additions & 5 deletions constructor/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (w *Worker) invokeWorker(
case job.GetBlob:
return w.GetBlobWorker(ctx, dbTx, input)
default:
return "", fmt.Errorf("%w: %s", ErrInvalidActionType, action)
return "", ErrInvalidActionType
}
}

Expand Down Expand Up @@ -237,7 +237,7 @@ func GenerateKeyWorker(rawInput string) (string, error) {
var input job.GenerateKeyInput
err := job.UnmarshalInput([]byte(rawInput), &input)
if err != nil {
return "", fmt.Errorf("%w: %s", ErrInvalidInput, err.Error())
return "", fmt.Errorf("failed to unmarshal input: %w", err)
}

kp, err := keys.GenerateKeypair(input.CurveType)
Expand Down Expand Up @@ -303,7 +303,7 @@ func MathWorker(rawInput string) (string, error) {
var input job.MathInput
err := job.UnmarshalInput([]byte(rawInput), &input)
if err != nil {
return "", fmt.Errorf("%w: %s", ErrInvalidInput, err.Error())
return "", fmt.Errorf("failed to unmarshal input: %w", err)
}

var result string
Expand Down Expand Up @@ -880,10 +880,10 @@ func HTTPRequestWorker(rawInput string) (string, error) {

if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf(
"%w: status code %d with body %s",
ErrActionFailed,
"status code %d with body %s: %w",
resp.StatusCode,
body,
ErrActionFailed,
)
}

Expand Down
2 changes: 1 addition & 1 deletion constructor/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,7 @@ func TestJob_Failures(t *testing.T) {
OutputPath: "key",
},
ProcessedInput: `{"curve_typ": "secp256k1"}`,
Err: ErrInvalidInput,
Err: fmt.Errorf("unknown field \"curve_typ\""),
},
helper: &mocks.Helper{},
},
Expand Down
4 changes: 2 additions & 2 deletions fetcher/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ func tryAgain(fetchMsg string, thisBackoff *Backoff, err *Error) *Error {
if nextBackoff == backoff.Stop {
return &Error{
Err: fmt.Errorf(
"%w: %s",
ErrExhaustedRetries,
"fetch message %s: %w",
fetchMsg,
ErrExhaustedRetries,
),
}
}
Expand Down
4 changes: 2 additions & 2 deletions parser/intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ func (p *Parser) ExpectedOperations(

if !foundMatch && errExtra {
return fmt.Errorf(
"%w: %s",
"operation %s: %w",
types.PrintStruct(obs),
ErrExpectedOperationsExtraOperation,
types.PrettyPrintStruct(obs),
)
}
}
Expand Down
73 changes: 47 additions & 26 deletions storage/database/badger_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,13 @@ func NewBadgerDatabase(

db, err := badger.Open(b.badgerOptions)
if err != nil {
return nil, fmt.Errorf("%w: %v", storageErrs.ErrDatabaseOpenFailed, err)
return nil, fmt.Errorf("unable to open database: %w", err)
}
b.db = db

encoder, err := encoder.NewEncoder(b.compressorEntries, b.pool, b.compress)
if err != nil {
return nil, fmt.Errorf("%w: %v", storageErrs.ErrCompressorLoadFailed, err)
return nil, fmt.Errorf("unable to load compressor: %w", err)
}
b.encoder = encoder

Expand All @@ -286,7 +286,7 @@ func (b *BadgerDatabase) Close(ctx context.Context) error {
close(b.closed)

if err := b.db.Close(); err != nil {
return fmt.Errorf("%w: %v", storageErrs.ErrDBCloseFailed, err)
return fmt.Errorf("unable to close badger database: %w", err)
}

return nil
Expand Down Expand Up @@ -436,7 +436,7 @@ func (b *BadgerTransaction) Commit(context.Context) error {
b.releaseLocks()

if err != nil {
return fmt.Errorf("%w: %v", storageErrs.ErrCommitFailed, err)
return fmt.Errorf("unable to commit transaction: %w", err)
}

return nil
Expand Down Expand Up @@ -494,15 +494,19 @@ func (b *BadgerTransaction) Get(
if err == badger.ErrKeyNotFound {
return false, nil, nil
} else if err != nil {
return false, nil, err
return false, nil, fmt.Errorf("unable to get the item of key %s within a transaction: %w", string(key), err)
}

err = item.Value(func(v []byte) error {
_, err := value.Write(v)
return err
})
if err != nil {
return false, nil, err
return false, nil, fmt.Errorf(
"unable to get the value from the item for key %s: %w",
string(key),
err,
)
}

return true, value.Bytes(), nil
Expand Down Expand Up @@ -539,13 +543,17 @@ func (b *BadgerTransaction) Scan(
k := item.Key()
err := item.Value(func(v []byte) error {
if err := worker(k, v); err != nil {
return fmt.Errorf("%w: worker failed for key %s", err, string(k))
return fmt.Errorf("worker failed for key %s: %w", string(k), err)
}

return nil
})
if err != nil {
return -1, fmt.Errorf("%w: unable to get value for key %s", err, string(k))
return -1, fmt.Errorf(
"unable to get the value from the item for key %s: %w",
string(k),
err,
)
}

entries++
Expand All @@ -568,7 +576,12 @@ func decompressAndSave(
// encoded using dictionary compression.
decompressed, err := encoder.DecodeRaw(namespace, v)
if err != nil {
return -1, -1, fmt.Errorf("%w %s: %v", storageErrs.ErrDecompressFailed, string(k), err)
return -1, -1, fmt.Errorf(
"unable to decompress for namespace %s and input %s: %w",
namespace,
string(v),
err,
)
}

err = ioutil.WriteFile(
Expand All @@ -577,7 +590,11 @@ func decompressAndSave(
os.FileMode(utils.DefaultFilePermissions),
)
if err != nil {
return -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrDecompressSaveUnsuccessful, err)
return -1, -1, fmt.Errorf(
"unable to write decompress file %s: %w",
path.Join(tmpDir, types.Hash(string(k))),
err,
)
}

return float64(len(decompressed)), float64(len(v)), nil
Expand All @@ -591,27 +608,26 @@ func decompressAndEncode(
decompressed, err := ioutil.ReadFile(path) // #nosec G304
if err != nil {
return -1, -1, -1, fmt.Errorf(
"%w for file %s: %v",
storageErrs.ErrLoadFileUnsuccessful,
"unable to read decompress file %s: %w",
path,
err,
)
}

normalCompress, err := encoder.EncodeRaw("", decompressed)
if err != nil {
return -1, -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrCompressNormalFailed, err)
return -1, -1, -1, fmt.Errorf("unable to compress normal: %w", err)
}

dictCompress, err := encoder.EncodeRaw(namespace, decompressed)
if err != nil {
return -1, -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrCompressWithDictFailed, err)
return -1, -1, -1, fmt.Errorf("unable to compress with dictionary: %w", err)
}

// Ensure dict works
decompressedDict, err := encoder.DecodeRaw(namespace, dictCompress)
if err != nil {
return -1, -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrDecompressWithDictFailed, err)
return -1, -1, -1, fmt.Errorf("unable to decompress with dictionary: %w", err)
}

if types.Hash(decompressed) != types.Hash(decompressedDict) {
Expand Down Expand Up @@ -644,12 +660,17 @@ func recompress(
func(k []byte, v []byte) error {
decompressed, err := badgerDb.Encoder().DecodeRaw(namespace, v)
if err != nil {
return fmt.Errorf("%w %s: %v", storageErrs.ErrDecompressFailed, string(k), err)
return fmt.Errorf(
"unable to decompress for namespace %s and input %s: %w",
namespace,
string(v),
err,
)
}

newCompressed, err := newCompressor.EncodeRaw(namespace, decompressed)
if err != nil {
return fmt.Errorf("%w: %v", storageErrs.ErrCompressWithDictFailed, err)
return fmt.Errorf("unable to compress with dictionary: %w", err)
}
onDiskSize += float64(len(v))
newSize += float64(len(newCompressed))
Expand All @@ -660,7 +681,7 @@ func recompress(
false,
)
if err != nil {
return -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrRecompressFailed, err)
return -1, -1, fmt.Errorf("unable to recompress: %w", err)
}

// Negative savings here means that the new dictionary
Expand Down Expand Up @@ -691,14 +712,14 @@ func BadgerTrain(
WithCompressorEntries(compressorEntries),
)
if err != nil {
return -1, -1, fmt.Errorf("%w: unable to load database", err)
return -1, -1, fmt.Errorf("unable to load database: %w", err)
}
defer badgerDb.Close(ctx)

// Create directory to store uncompressed files for training
tmpDir, err := utils.CreateTempDir()
if err != nil {
return -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrCreateTempDirectoryFailed, err)
return -1, -1, fmt.Errorf("unable to create temporary directory: %w", err)
}
defer utils.RemoveTempDir(tmpDir)

Expand All @@ -724,7 +745,7 @@ func BadgerTrain(
v,
)
if err != nil {
return fmt.Errorf("%w: unable to decompress and save", err)
return fmt.Errorf("unable to decompress and save: %w", err)
}

totalUncompressedSize += decompressedSize
Expand Down Expand Up @@ -774,11 +795,11 @@ func BadgerTrain(
dictPath,
) // #nosec G204
if err := cmd.Start(); err != nil {
return -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrInvokeZSTDFailed, err)
return -1, -1, fmt.Errorf("unable to start zstd: %w", err)
}

if err := cmd.Wait(); err != nil {
return -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrTrainZSTDFailed, err)
return -1, -1, fmt.Errorf("unable to train zstd: %w", err)
}

encoder, err := encoder.NewEncoder([]*encoder.CompressorEntry{
Expand All @@ -788,7 +809,7 @@ func BadgerTrain(
},
}, encoder.NewBufferPool(), true)
if err != nil {
return -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrCompressorLoadFailed, err)
return -1, -1, fmt.Errorf("unable to load compressor: %w", err)
}

sizeUncompressed := float64(0)
Expand All @@ -809,7 +830,7 @@ func BadgerTrain(
encoder,
)
if err != nil {
return fmt.Errorf("%w: unable to decompress and encode", err)
return fmt.Errorf("unable to decompress and encode: %w", err)
}

sizeUncompressed += decompressed
Expand All @@ -819,7 +840,7 @@ func BadgerTrain(
return nil
})
if err != nil {
return -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrWalkFilesFailed, err)
return -1, -1, fmt.Errorf("unable to walk files: %w", err)
}

log.Printf(
Expand Down
Loading