diff --git a/examples/experimental/csv-parse.js b/examples/experimental/csv-parse.js new file mode 100644 index 00000000000..36ece396be6 --- /dev/null +++ b/examples/experimental/csv-parse.js @@ -0,0 +1,28 @@ +import { open } from 'k6/experimental/fs' +import csv from 'k6/experimental/csv' +import { scenario } from 'k6/execution' + +export const options = { + iterations: 10, +} + +// Open the csv file, and parse it ahead of time. +let file; +let csvRecords; +(async function () { + file = await open('data.csv'); + + // The `csv.parse` function consumes the entire file at once, and returns + // the parsed records as a SharedArray object. + csvRecords = await csv.parse(file, {delimiter: ','}) +})(); + + +export default async function() { + // The csvRecords a SharedArray. Each element is a record from the CSV file, represented as an array + // where each element is a field from the CSV record. + // + // Thus, `csvRecords[scenario.iterationInTest]` will give us the record for the current iteration. + console.log(csvRecords[scenario.iterationInTest]) +} + diff --git a/examples/experimental/csv-parser.js b/examples/experimental/csv-parser.js new file mode 100644 index 00000000000..d074014da6f --- /dev/null +++ b/examples/experimental/csv-parser.js @@ -0,0 +1,29 @@ +import { open } from 'k6/experimental/fs' +import csv from 'k6/experimental/csv' + +export const options = { + iterations: 10, +} + +let file; +let parser; +(async function () { + file = await open('data.csv'); + parser = new csv.Parser(file); +})(); + +export default async function() { + // The parser `next` method attempts to read the next row from the CSV file. + // + // It returns an iterator-like object with a `done` property that indicates whether + // there are more rows to read, and a `value` property that contains the row fields + // as an array. + const {done, value} = await parser.next(); + if (done) { + throw new Error("No more rows to read"); + } + + // We expect the `value` property to be an array of strings, where each string is a field + // from the CSV record. + console.log(done, value); +} diff --git a/examples/experimental/data.csv b/examples/experimental/data.csv new file mode 100644 index 00000000000..da01b911664 --- /dev/null +++ b/examples/experimental/data.csv @@ -0,0 +1,11 @@ +firstname,lastname,age +fariha,ehlenfeldt,72 +qudratullah,gillfillan,50 +jeleah,rodina,41 +thaisia,nowalk,99 +joey-lynn,wilsford,75 +tudur,granville,81 +aytek,umber,56 +aynoor,hisaw,30 +fiadh-rose,iravani,31 +annely,ooley,70 diff --git a/js/jsmodules.go b/js/jsmodules.go index a005b1d69ca..6bc6868c9a1 100644 --- a/js/jsmodules.go +++ b/js/jsmodules.go @@ -13,6 +13,7 @@ import ( "go.k6.io/k6/js/modules/k6/data" "go.k6.io/k6/js/modules/k6/encoding" "go.k6.io/k6/js/modules/k6/execution" + "go.k6.io/k6/js/modules/k6/experimental/csv" "go.k6.io/k6/js/modules/k6/experimental/fs" "go.k6.io/k6/js/modules/k6/experimental/streams" "go.k6.io/k6/js/modules/k6/experimental/tracing" @@ -38,6 +39,7 @@ func getInternalJSModules() map[string]interface{} { "k6/encoding": encoding.New(), "k6/timers": timers.New(), "k6/execution": execution.New(), + "k6/experimental/csv": csv.New(), "k6/experimental/redis": redis.New(), "k6/experimental/streams": streams.New(), "k6/experimental/webcrypto": webcrypto.New(), diff --git a/js/modules/k6/data/data.go b/js/modules/k6/data/data.go index a67f1a9c7ba..dd73fb38912 100644 --- a/js/modules/k6/data/data.go +++ b/js/modules/k6/data/data.go @@ -3,7 +3,10 @@ package data import ( + "encoding/json" "errors" + "fmt" + "io" "strconv" "sync" @@ -93,6 +96,59 @@ func (d *Data) sharedArray(call sobek.ConstructorCall) *sobek.Object { return array.wrap(rt).ToObject(rt) } +// RecordReader is the interface that wraps the action of reading records from a resource. +// +// The data module RecordReader interface is implemented by types that can read data that can be +// treated as records, from data sources such as a CSV file, etc. +type RecordReader interface { + Read() ([]string, error) +} + +// NewSharedArrayFrom creates a new shared array from the provided data. +// +// This function is not exposed to the JS runtime. It is used internally to instantiate +// shared arrays without having to go through the whole JS runtime machinery, which effectively has +// a big performance impact (e.g. when filling a shared array from a CSV file). +// +// This function takes an explicit runtime argument to retain control over which VU runtime it is +// executed in. This is important because the shared array underlying implementation relies on maintaining +// a single instance of arrays for the whole test setup and VUs. +func (d *Data) NewSharedArrayFrom(rt *sobek.Runtime, name string, r RecordReader) *sobek.Object { + if name == "" { + common.Throw(rt, errors.New("empty name provided to SharedArray's constructor")) + } + + var arr []string + for { + record, err := r.Read() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + common.Throw(rt, fmt.Errorf("failed to read record; reason: %w", err)) + } + + marshaled, err := json.Marshal(record) + if err != nil { + common.Throw(rt, fmt.Errorf("failed to marshal record; reason: %w", err)) + } + + arr = append(arr, string(marshaled)) + } + + return d.shared.set(name, arr).wrap(rt).ToObject(rt) +} + +// set is a helper method to set a shared array in the underlying shared arrays map. +func (s *sharedArrays) set(name string, arr []string) sharedArray { + s.mu.Lock() + defer s.mu.Unlock() + array := sharedArray{arr: arr} + s.data[name] = array + + return array +} + func (s *sharedArrays) get(rt *sobek.Runtime, name string, call sobek.Callable) sharedArray { s.mu.RLock() array, ok := s.data[name] @@ -121,10 +177,10 @@ func getShareArrayFromCall(rt *sobek.Runtime, call sobek.Callable) sharedArray { } arr := make([]string, obj.Get("length").ToInteger()) - stringify, _ := sobek.AssertFunction(rt.GlobalObject().Get("JSON").ToObject(rt).Get("stringify")) + stringifyFunc, _ := sobek.AssertFunction(rt.GlobalObject().Get("JSON").ToObject(rt).Get("stringify")) var val sobek.Value for i := range arr { - val, err = stringify(sobek.Undefined(), obj.Get(strconv.Itoa(i))) + val, err = stringifyFunc(sobek.Undefined(), obj.Get(strconv.Itoa(i))) if err != nil { panic(err) } diff --git a/js/modules/k6/data/share.go b/js/modules/k6/data/share.go index 9c4b8fee44a..2b7ac262ad8 100644 --- a/js/modules/k6/data/share.go +++ b/js/modules/k6/data/share.go @@ -49,6 +49,7 @@ func (s wrappedSharedArray) Get(index int) sobek.Value { if err != nil { common.Throw(s.rt, err) } + err = s.deepFreeze(s.rt, val) if err != nil { common.Throw(s.rt, err) diff --git a/js/modules/k6/experimental/csv/module.go b/js/modules/k6/experimental/csv/module.go new file mode 100644 index 00000000000..722dccbdb0b --- /dev/null +++ b/js/modules/k6/experimental/csv/module.go @@ -0,0 +1,292 @@ +// Package csv provides a CSV parser for k6. +package csv + +import ( + "errors" + "fmt" + "io" + "strconv" + "sync/atomic" + "time" + + "go.k6.io/k6/js/modules/k6/data" + + "github.com/grafana/sobek" + + "gopkg.in/guregu/null.v3" + + "go.k6.io/k6/js/promises" + + "go.k6.io/k6/js/modules/k6/experimental/fs" + + "go.k6.io/k6/js/common" + "go.k6.io/k6/js/modules" +) + +type ( + // RootModule is the global module instance that will create instances of our + // module for each VU. + RootModule struct { + dataModuleInstance *data.Data + } + + // ModuleInstance represents an instance of the fs module for a single VU. + ModuleInstance struct { + vu modules.VU + + *RootModule + } +) + +var ( + _ modules.Module = &RootModule{} + _ modules.Instance = &ModuleInstance{} +) + +// New returns a pointer to a new [RootModule] instance. +func New() *RootModule { + return &RootModule{} +} + +// NewModuleInstance implements the modules.Module interface and returns a new +// instance of our module for the given VU. +func (rm *RootModule) NewModuleInstance(vu modules.VU) modules.Instance { + if rm.dataModuleInstance == nil { + var ok bool + rm.dataModuleInstance, ok = data.New().NewModuleInstance(vu).(*data.Data) + if !ok { + common.Throw(vu.Runtime(), errors.New("failed to create data module instance")) + } + } + + return &ModuleInstance{vu: vu, RootModule: rm} +} + +// Exports implements the modules.Module interface and returns the exports of +// our module. +func (mi *ModuleInstance) Exports() modules.Exports { + return modules.Exports{ + Named: map[string]any{ + "parse": mi.Parse, + "Parser": mi.NewParser, + }, + } +} + +// Parser is a CSV parser. +type Parser struct { + // currentLine holds the current line number being read by the parser. + currentLine atomic.Int64 + + // reader is the CSV reader that enables to read records from the provided + // input file. + reader *Reader + + // options holds the parser's as provided by the user. + options options + + // vu is the VU instance that owns this module instance. + vu modules.VU +} + +// parseSharedArrayNamePrefix is the prefix used for the shared array names created by the Parse function. +const parseSharedArrayNamePrefix = "csv.parse." + +// Parse parses the provided CSV file, and returns a promise that resolves to a shared array +// containing the parsed data. +func (mi *ModuleInstance) Parse(file sobek.Value, options sobek.Value) *sobek.Promise { + promise, resolve, reject := promises.New(mi.vu) + + rt := mi.vu.Runtime() + + // 1. Make sure the Sobek object is a fs.File (sobek operation) + var fileObj fs.File + if err := mi.vu.Runtime().ExportTo(file, &fileObj); err != nil { + reject(fmt.Errorf("first argument expected to be a fs.File instance, got %T instead", file)) + return promise + } + + parserOptions := newDefaultParserOptions() + if options != nil { + var err error + parserOptions, err = newParserOptionsFrom(options.ToObject(rt)) + if err != nil { + reject(fmt.Errorf("failed to interpret the provided Parser options; reason: %w", err)) + return promise + } + } + + r, err := NewReaderFrom(fileObj.ReadSeekStater, parserOptions) + if err != nil { + reject(fmt.Errorf("failed to create a new parser; reason: %w", err)) + return promise + } + + go func() { + underlyingSharedArrayName := parseSharedArrayNamePrefix + strconv.Itoa(time.Now().Nanosecond()) + + // Because we rely on the data module to create the shared array, we need to + // make sure that the data module is initialized before we can proceed, and that we don't instantiate + // it multiple times. + // + // As such we hold a single instance of it in the RootModule, and we use it to create the shared array. + resolve(mi.RootModule.dataModuleInstance.NewSharedArrayFrom(mi.vu.Runtime(), underlyingSharedArrayName, r)) + }() + + return promise +} + +// NewParser creates a new CSV parser instance. +func (mi *ModuleInstance) NewParser(call sobek.ConstructorCall) *sobek.Object { + rt := mi.vu.Runtime() + + if mi.vu.State() != nil { + common.Throw(rt, errors.New("csv Parser constructor must be called in the init context")) + } + + if len(call.Arguments) < 1 || sobek.IsUndefined(call.Argument(0)) { + common.Throw(rt, fmt.Errorf("csv Parser constructor takes at least one non-nil source argument")) + } + + fileArg := call.Argument(0) + if common.IsNullish(fileArg) { + common.Throw(rt, fmt.Errorf("csv Parser constructor takes at least one non-nil source argument")) + } + + // 1. Make sure the Sobek object is a fs.File (Sobek operation) + var file fs.File + if err := mi.vu.Runtime().ExportTo(fileArg, &file); err != nil { + common.Throw( + mi.vu.Runtime(), + fmt.Errorf("first argument expected to be a fs.File instance, got %T instead", call.Argument(0)), + ) + } + + options := newDefaultParserOptions() + if len(call.Arguments) == 2 && !sobek.IsUndefined(call.Argument(1)) { + var err error + options, err = newParserOptionsFrom(call.Argument(1).ToObject(rt)) + if err != nil { + common.Throw(rt, fmt.Errorf("encountered an error while interpreting Parser options; reason: %w", err)) + } + } + + // Instantiate and configure a csv reader using the provided file and options + r, err := NewReaderFrom(file.ReadSeekStater, options) + if err != nil { + common.Throw(rt, fmt.Errorf("failed to create a new parser; reason: %w", err)) + } + + // Create a new Parser instance + parser := Parser{ + reader: r, + options: options, + vu: mi.vu, + } + + return rt.ToValue(&parser).ToObject(rt) +} + +// Next returns the next row in the CSV file. +func (p *Parser) Next() *sobek.Promise { + promise, resolve, reject := promises.New(p.vu) + + go func() { + var records []string + var done bool + var err error + + records, err = p.reader.Read() + if err != nil { + if errors.Is(err, io.EOF) { + resolve(parseResult{Done: true, Value: []string{}}) + return + } + + reject(err) + return + } + + p.currentLine.Add(1) + + resolve(parseResult{Done: done, Value: records}) + }() + + return promise +} + +// parseResult holds the result of a CSV parser's parsing operation such +// as when calling the [Next]. +type parseResult struct { + // Done indicates whether the parser has finished reading the file. + Done bool `js:"done"` + + // Value holds the line's records value. + Value []string `js:"value"` +} + +// options holds options used to configure CSV parsing when utilizing the module. +// +// The options can be used to either configure the CSV parser, or the parse function. +// They offer to customize the behavior of the parser, such as the delimiter, whether +// to skip the first line, or to start reading from a specific line, and stop reading +// at a specific line. +type options struct { + // Delimiter is the character that separates the fields in the CSV. + Delimiter rune `js:"delimiter"` + + // SkipFirstLine indicates whether the first line should be skipped. + SkipFirstLine bool `js:"skipFirstLine"` + + // FromLine indicates the line from which to start reading the CSV file. + FromLine null.Int `js:"fromLine"` + + // ToLine indicates the line at which to stop reading the CSV file (inclusive). + ToLine null.Int `js:"toLine"` +} + +// newDefaultParserOptions creates a new options instance with default values. +func newDefaultParserOptions() options { + return options{ + Delimiter: ',', + SkipFirstLine: false, + } +} + +// newParserOptions creates a new options instance from the given Sobek object. +func newParserOptionsFrom(obj *sobek.Object) (options, error) { + options := newDefaultParserOptions() + + if obj == nil { + return options, nil + } + + if v := obj.Get("delimiter"); v != nil { + delimiter := v.String() + + // A delimiter is gonna be treated as a rune in the Go code, so we need to make sure it's a single character. + if len(delimiter) > 1 { + return options, fmt.Errorf("delimiter must be a single character") + } + + options.Delimiter = rune(delimiter[0]) + } + + if v := obj.Get("skipFirstLine"); v != nil { + options.SkipFirstLine = v.ToBoolean() + } + + if v := obj.Get("fromLine"); v != nil { + options.FromLine = null.IntFrom(v.ToInteger()) + } + + if v := obj.Get("toLine"); v != nil { + options.ToLine = null.IntFrom(v.ToInteger()) + } + + if options.FromLine.Valid && options.ToLine.Valid && options.FromLine.Int64 >= options.ToLine.Int64 { + return options, fmt.Errorf("fromLine must be less than or equal to toLine") + } + + return options, nil +} diff --git a/js/modules/k6/experimental/csv/module_test.go b/js/modules/k6/experimental/csv/module_test.go new file mode 100644 index 00000000000..d121610e4b5 --- /dev/null +++ b/js/modules/k6/experimental/csv/module_test.go @@ -0,0 +1,583 @@ +package csv + +import ( + "fmt" + "net/url" + "strings" + "testing" + + "go.k6.io/k6/metrics" + + "go.k6.io/k6/lib" + + "go.k6.io/k6/js/modules/k6/experimental/fs" + "go.k6.io/k6/lib/fsext" + + "go.k6.io/k6/js/compiler" + + "github.com/stretchr/testify/require" + + "go.k6.io/k6/js/modulestest" +) + +// testFilePath holds the path to the test CSV file. +const testFilePath = fsext.FilePathSeparator + "testdata.csv" + +// csvTestData is a CSV file that contains test data about +// various composers. +const csvTestData = `lastname,firstname,composer,born,died,dates +Scarlatti,Domenico,Domenico Scarlatti,1685,1757,1685–1757 +Dorman,Avner,Avner Dorman,1975,,1975– +Still,William Grant,William Grant Still,1895,1978,1895–1978 +Bacewicz,Grażyna,Grażyna Bacewicz,1909,1969,1909–1969 +Prokofiev,Sergei,Sergei Prokofiev,1891,1953,1891–1953 +Lash,Han,Han Lash,1981,,1981– +Franck,César,César Franck,1822,1890,1822–1890 +Messiaen,Olivier,Olivier Messiaen,1908,1992,1908–1992 +Bellini,Vincenzo,Vincenzo Bellini,1801,1835,1801–1835 +Ligeti,György,György Ligeti,1923,2006,1923–2006 +` + +func TestParserConstructor(t *testing.T) { + t.Parallel() + + t.Run("constructing a parser without options should succeed", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + // Ensure the testdata.csv file is present on the test filesystem. + r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error { + return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644) + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` + const file = await fs.open(%q); + const parser = new csv.Parser(file); + `, testFilePath))) + + require.NoError(t, err) + }) + + t.Run("constructing a parser with valid options should succeed", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + // Ensure the testdata.csv file is present on the test filesystem. + r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error { + return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644) + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` + const file = await fs.open(%q); + const parser = new csv.Parser(file, { delimiter: ';', skipFirstLine: true, fromLine: 0, toLine: 10 }); + `, testFilePath))) + + require.NoError(t, err) + }) + + t.Run("constructing a parser without providing a file instance should fail", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(` + // Regardless of whether a file is passed, the parser should not be constructed in the VU context. + const parser = new csv.Parser(null); + `)) + + require.Error(t, err) + require.Contains(t, err.Error(), "csv Parser constructor takes at least one non-nil source argument") + }) + + t.Run("constructing a parser in VU context should fail", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + r.MoveToVUContext(&lib.State{ + Tags: lib.NewVUStateTags(metrics.NewRegistry().RootTagSet()), + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(` + // Note that we pass an empty object here as opening a real file here would lead to the fs.open call + // itself to fail (as we're not in the init context). + const parser = new csv.Parser({}, { delimiter: ';', skipFirstLine: true, fromLine: 0, toLine: 10 }); + `)) + + require.Error(t, err) + require.Contains(t, err.Error(), "csv Parser constructor must be called in the init context") + }) +} + +func TestParserNext(t *testing.T) { + t.Parallel() + + t.Run("next with default options should succeed", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + // Ensure the testdata.csv file is present on the test filesystem. + r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error { + return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644) + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` + const file = await fs.open(%q); + const parser = new csv.Parser(file); + + // Parse the header + let { done, value } = await parser.next(); + if (done) { + throw new Error("Expected to read a record, but got done=true"); + } + if (value.length !== 6) { + throw new Error("Expected 6 fields, but got " + value.length); + } + if (JSON.stringify(value) !== JSON.stringify(["lastname", "firstname", "composer", "born", "died", "dates"])) { + throw new Error("Expected header to be 'lastname,firstname,composer,born,died,dates', but got " + value); + } + + // Parse the first record + ({ done, value } = await parser.next()); + if (done) { + throw new Error("Expected to read a record, but got done=true"); + } + if (value.length !== 6) { + throw new Error("Expected 6 fields, but got " + value.length); + } + if (JSON.stringify(value) !== JSON.stringify(["Scarlatti", "Domenico", "Domenico Scarlatti", "1685", "1757", "1685–1757"])) { + throw new Error("Expected record to be 'Scarlatti,Domenico,Domenico Scarlatti,1685,1757,1685–1757', but got " + value); + } + `, testFilePath))) + + require.NoError(t, err) + }) + + t.Run("next with delimiter options should respect delimiter and succeed", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + // Ensure the testdata.csv file is present on the test filesystem. + r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error { + return fsext.WriteFile(fs, testFilePath, []byte(strings.ReplaceAll(csvTestData, ",", ";")), 0o644) + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` + const file = await fs.open(%q); + const parser = new csv.Parser(file, { delimiter: ';' }); + + // Parse the header + let { done, value } = await parser.next(); + if (done) { + throw new Error("Expected to read a record, but got done=true"); + } + + if (value.length !== 6) { + throw new Error("Expected 6 fields, but got " + value.length); + } + + if (JSON.stringify(value) !== JSON.stringify(["lastname", "firstname", "composer", "born", "died", "dates"])) { + throw new Error("Expected header to be 'lastname,firstname,composer,born,died,dates', but got " + value); + } + `, testFilePath))) + + require.NoError(t, err) + }) + + t.Run("next with skipFirstLine options should ignore header and succeed", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + // Ensure the testdata.csv file is present on the test filesystem. + r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error { + return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644) + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` + const file = await fs.open(%q); + const parser = new csv.Parser(file, { skipFirstLine: true }); + + // Parse the first record + const { done, value } = await parser.next(); + if (done) { + throw new Error("Expected to read a record, but got done=true"); + } + if (value.length !== 6) { + throw new Error("Expected 6 fields, but got " + value.length); + } + if (JSON.stringify(value) !== JSON.stringify(["Scarlatti", "Domenico", "Domenico Scarlatti", "1685", "1757", "1685–1757"])) { + throw new Error("Expected record to be 'Scarlatti,Domenico,Domenico Scarlatti,1685,1757,1685–1757', but got " + value); + } + `, testFilePath))) + + require.NoError(t, err) + }) + + t.Run("next with fromLine option should start from provided line number and succeed", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + // Ensure the testdata.csv file is present on the test filesystem. + r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error { + return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644) + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` + const file = await fs.open(%q); + const parser = new csv.Parser(file, { fromLine: 9 }); + + // Parse the first record from the line at index 9 + const { done, value } = await parser.next(); + if (done) { + throw new Error("Expected to read a record, but got done=true"); + } + if (value.length !== 6) { + throw new Error("Expected 6 fields, but got " + value.length); + } + if (JSON.stringify(value) !== JSON.stringify(["Bellini", "Vincenzo", "Vincenzo Bellini", "1801", "1835", "1801–1835"])) { + throw new Error("Expected record to be 'Scarlatti,Domenico,Domenico Scarlatti,1685,1757,1685–1757', but got " + value); + } + `, testFilePath))) + + require.NoError(t, err) + }) + + t.Run("next with skipFirstLine does not impact fromLine and succeed", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + // Ensure the testdata.csv file is present on the test filesystem. + r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error { + return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644) + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` + const file = await fs.open(%q); + const parser = new csv.Parser(file, { skipFirstLine: true, fromLine: 9 }); + + // Parse the first record from the line at index 9 + const { done, value } = await parser.next(); + if (done) { + throw new Error("Expected to read a record, but got done=true"); + } + if (value.length !== 6) { + throw new Error("Expected 6 fields, but got " + value.length); + } + if (JSON.stringify(value) !== JSON.stringify(["Bellini", "Vincenzo", "Vincenzo Bellini", "1801", "1835", "1801–1835"])) { + throw new Error("Expected record to be 'Scarlatti,Domenico,Domenico Scarlatti,1685,1757,1685–1757', but got " + value); + } + `, testFilePath))) + + require.NoError(t, err) + }) + + t.Run("next with toLine option should end at provided line number and succeed", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + // Ensure the testdata.csv file is present on the test filesystem. + r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error { + return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644) + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` + const file = await fs.open(%q); + const parser = new csv.Parser(file, { toLine: 2 }); + + // Ignore the header + await parser.next(); + + // Parse the first record + await parser.next(); + + // Because we asked to parse to line 2 we should effectively parse it + let { done } = await parser.next(); + if (done) { + throw new Error("Expected to not be done, but got done=true"); + } + + // Finally because we are past line 2 we should be done + ({ done } = await parser.next()); + if (!done) { + throw new Error("Expected to be done, but got done=false"); + } + `, testFilePath))) + + require.NoError(t, err) + }) + + t.Run("next with skipFirstLine does not impact fromLine and succeed", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + // Ensure the testdata.csv file is present on the test filesystem. + r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error { + return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644) + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` + const file = await fs.open(%q); + const parser = new csv.Parser(file, { skipFirstLine: true, toLine: 2 }); + + // Ignore the header + await parser.next(); + + // Parse the first record + await parser.next(); + + // Because we asked to parse until line 2, we should be done, and have reached EOF + const { done } = await parser.next(); + if (!done) { + throw new Error("Expected to be done, but got done=false"); + } + `, testFilePath))) + + require.NoError(t, err) + }) + + t.Run("calling next on a parser that has reached EOF should return done=true and no value", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + // Ensure the testdata.csv file is present on the test filesystem. + r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error { + return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644) + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` + const file = await fs.open(%q); + const parser = new csv.Parser(file); + + // Parse the entire file + let { done, value } = await parser.next(); + while (!done) { + ({ done, value } = await parser.next()); + } + + // The parser should be done now + ({ done, value } = await parser.next()); + if (!done) { + throw new Error("Expected to be done, but got done=false"); + } + if (!Array.isArray(value) || value.length !== 0) { + throw new Error("Expected value to be a zero length array, but got " + value); + } + `, testFilePath))) + + require.NoError(t, err) + }) +} + +func TestParse(t *testing.T) { + t.Parallel() + + t.Run("parse with default options should succeed", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + // Ensure the testdata.csv file is present on the test filesystem. + r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error { + return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644) + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` + const file = await fs.open(%q); + const csvRecords = await csv.parse(file); + + if (csvRecords.length !== 11) { + throw new Error("Expected 11 records, but got " + csvRecords.length); + } + + // FIXME @oleiade: Ideally we would check the prototype of the returned object is SharedArray, but + // the prototype of SharedArray is not exposed to the JS runtime as such at the moment. + if (csvRecords.constructor !== Array) { + throw new Error("Expected the result to be a SharedArray, but got " + csvRecords.constructor); + } + `, testFilePath))) + + require.NoError(t, err) + }) + + t.Run("parse respects the delimiter option and should succeed", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + // Ensure the testdata.csv file is present on the test filesystem. + r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error { + return fsext.WriteFile(fs, testFilePath, []byte(strings.ReplaceAll(csvTestData, ",", ";")), 0o644) + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` + const file = await fs.open(%q); + const csvRecords = await csv.parse(file, { delimiter: ';' }); + + if (csvRecords.length !== 11) { + throw new Error("Expected 11 records, but got " + csvRecords.length); + } + `, testFilePath))) + + require.NoError(t, err) + }) + + t.Run("parse respects the skipFirstLine option and should succeed", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + // Ensure the testdata.csv file is present on the test filesystem. + r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error { + return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644) + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` + const file = await fs.open(%q); + const csvRecords = await csv.parse(file, { skipFirstLine: true }); + + if (csvRecords.length !== 10) { + throw new Error("Expected 10 records, but got " + csvRecords.length); + } + + const wantRecord = ["Scarlatti", "Domenico", "Domenico Scarlatti", "1685", "1757", "1685–1757"]; + if (JSON.stringify(csvRecords[0]) !== JSON.stringify(wantRecord)) { + throw new Error("Expected first record to be 'Scarlatti,Domenico,Domenico Scarlatti,1685,1757,1685–1757', but got " + csvRecords[0]); + } + `, testFilePath))) + + require.NoError(t, err) + }) + + t.Run("parse respects the fromLine option and should succeed", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + // Ensure the testdata.csv file is present on the test filesystem. + r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error { + return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644) + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` + const file = await fs.open(%q); + const csvRecords = await csv.parse(file, { fromLine: 10 }); + + if (csvRecords.length !== 1) { + throw new Error("Expected 1 records, but got " + csvRecords.length); + } + + const wantRecord = ["Ligeti","György","György Ligeti","1923","2006","1923–2006"]; + if (JSON.stringify(csvRecords[0]) !== JSON.stringify(wantRecord)) { + throw new Error("Expected first record to be 'Ligeti,György,György Ligeti,1923,2006,1923–2006', but got " + csvRecords[0]); + } + `, testFilePath))) + + require.NoError(t, err) + }) + + t.Run("parse respects the toLine option and should succeed", func(t *testing.T) { + t.Parallel() + + r, err := newConfiguredRuntime(t) + require.NoError(t, err) + + // Ensure the testdata.csv file is present on the test filesystem. + r.VU.InitEnvField.FileSystems["file"] = newTestFs(t, func(fs fsext.Fs) error { + return fsext.WriteFile(fs, testFilePath, []byte(csvTestData), 0o644) + }) + + _, err = r.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` + const file = await fs.open(%q); + const csvRecords = await csv.parse(file, { toLine: 2 }); + + if (csvRecords.length !== 3) { + throw new Error("Expected 3 records, but got " + csvRecords.length); + } + + const wantRecord = ["Dorman","Avner","Avner Dorman","1975","","1975–"]; + if (JSON.stringify(csvRecords[2]) !== JSON.stringify(wantRecord)) { + throw new Error("Expected first record to be 'Dorman,Avner,Avner Dorman,1975,,1975–', but got " + csvRecords[1]); + } + `, testFilePath))) + + require.NoError(t, err) + }) +} + +const initGlobals = ` + globalThis.fs = require("k6/experimental/fs"); + globalThis.csv = require("k6/experimental/csv"); +` + +func newConfiguredRuntime(t testing.TB) (*modulestest.Runtime, error) { + runtime := modulestest.NewRuntime(t) + + modules := map[string]interface{}{ + "k6/experimental/fs": fs.New(), + "k6/experimental/csv": New(), + } + + err := runtime.SetupModuleSystem(modules, nil, compiler.New(runtime.VU.InitEnv().Logger)) + if err != nil { + return nil, err + } + + // Set up the VU environment with an in-memory filesystem and a CWD of "/". + runtime.VU.InitEnvField.FileSystems = map[string]fsext.Fs{ + "file": fsext.NewMemMapFs(), + } + runtime.VU.InitEnvField.CWD = &url.URL{Scheme: "file"} + + // Ensure the `fs` module is available in the VU's runtime. + _, err = runtime.VU.Runtime().RunString(initGlobals) + + return runtime, err +} + +// newTestFs is a helper function that creates a new in-memory file system and calls the provided +// function with it. The provided function is expected to use the file system to create files and +// directories. +func newTestFs(t *testing.T, fn func(fs fsext.Fs) error) fsext.Fs { + t.Helper() + + filesystem := fsext.NewMemMapFs() + + err := fn(filesystem) + if err != nil { + t.Fatal(err) + } + + return filesystem +} + +// wrapInAsyncLambda is a helper function that wraps the provided input in an async lambda. This +// makes the use of `await` statements in the input possible. +func wrapInAsyncLambda(input string) string { + // This makes it possible to use `await` freely on the "top" level + return "(async () => {\n " + input + "\n })()" +} diff --git a/js/modules/k6/experimental/csv/reader.go b/js/modules/k6/experimental/csv/reader.go new file mode 100644 index 00000000000..a4e5f60c253 --- /dev/null +++ b/js/modules/k6/experimental/csv/reader.go @@ -0,0 +1,115 @@ +package csv + +import ( + "encoding/csv" + "fmt" + "io" + "sync/atomic" +) + +// Reader is a CSV reader. +// +// It wraps a csv.Reader and provides additional functionality such as the ability to stop reading at a specific line. +type Reader struct { + csv *csv.Reader + + // currentLine tracks the current line number. + currentLine atomic.Int64 + + // options holds the reader's options. + options options +} + +// NewReaderFrom creates a new CSV reader from the provided io.Reader. +// +// It will check whether the first line should be skipped and consume it if necessary. +// It will also check whether the reader should start from a specific line and skip lines until that line is reached. +// We perform these operations here to avoid having to do them in the Read method. +// +// Hence, this constructor function can return an error if the first line cannot be skipped or if the reader +// cannot start from the specified line. +func NewReaderFrom(r io.Reader, options options) (*Reader, error) { + if r == nil { + return nil, fmt.Errorf("the reader cannot be nil") + } + + // Ensure the default delimiter is set. + if options.Delimiter == 0 { + options.Delimiter = ',' + } + + csvParser := csv.NewReader(r) + csvParser.Comma = options.Delimiter + + reader := &Reader{ + csv: csvParser, + options: options, + } + + var ( + fromLineSet = options.FromLine.Valid + toLineSet = options.ToLine.Valid + skipFirstLineSet = options.SkipFirstLine + fromLineIsPositive = fromLineSet && options.FromLine.Int64 >= 0 + toLineIsPositive = toLineSet && options.ToLine.Int64 >= 0 + ) + + // If set, the fromLine option should either be greater than or equal to 0. + if fromLineSet && !fromLineIsPositive { + return nil, fmt.Errorf("the 'fromLine' option must be greater than or equal to 0; got %d", options.FromLine.Int64) + } + + // If set, the toLine option should be strictly greater than or equal to 0. + if toLineSet && !toLineIsPositive { + return nil, fmt.Errorf("the 'toLine' option must be greater than or equal to 0; got %d", options.ToLine.Int64) + } + + // if the `fromLine` and `toLine` options are set, and `fromLine` is greater or equal to `toLine`, we return an error. + if fromLineSet && toLineSet && options.FromLine.Int64 >= options.ToLine.Int64 { + return nil, fmt.Errorf( + "the 'fromLine' option must be less than the 'toLine' option; got 'fromLine': %d, 'toLine': %d", + options.FromLine.Int64, options.ToLine.Int64, + ) + } + + // If the user wants to skip the first line, we consume and discard it. + if skipFirstLineSet && (!fromLineSet || options.FromLine.Int64 == 0) { + _, err := csvParser.Read() + if err != nil { + return nil, fmt.Errorf("failed to skip the first line; reason: %w", err) + } + + reader.currentLine.Add(1) + } + + if fromLineSet && options.FromLine.Int64 > 0 { + // We skip lines until we reach the specified line. + for reader.currentLine.Load() < options.FromLine.Int64 { + _, err := csvParser.Read() + if err != nil { + return nil, fmt.Errorf("failed to skip lines until line %d; reason: %w", options.FromLine.Int64, err) + } + reader.currentLine.Add(1) + } + } + + return reader, nil +} + +func (r *Reader) Read() ([]string, error) { + toLineSet := r.options.ToLine.Valid + + // If the `toLine` option was set and we have reached it, we return EOF. + if toLineSet && r.options.ToLine.Int64 > 0 && r.currentLine.Load() > r.options.ToLine.Int64 { + return nil, io.EOF + } + + records, err := r.csv.Read() + if err != nil { + return nil, err + } + + r.currentLine.Add(1) + + return records, nil +} diff --git a/js/modules/k6/experimental/csv/reader_test.go b/js/modules/k6/experimental/csv/reader_test.go new file mode 100644 index 00000000000..d395721dbab --- /dev/null +++ b/js/modules/k6/experimental/csv/reader_test.go @@ -0,0 +1,172 @@ +package csv + +import ( + "io" + "strings" + "testing" + + "gopkg.in/guregu/null.v3" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewReaderFrom(t *testing.T) { + t.Parallel() + + t.Run("instantiating a new reader with a nil io.Reader should fail", func(t *testing.T) { + t.Parallel() + + _, err := NewReaderFrom(nil, options{}) + require.Error(t, err) + }) + + t.Run("instantiating a new reader with the fromLine option less than 0 should fail", func(t *testing.T) { + t.Parallel() + + _, err := NewReaderFrom( + strings.NewReader("lastname,firstname,composer,born,died,dates\n"), + options{FromLine: null.NewInt(-1, true)}, + ) + require.Error(t, err) + }) + + t.Run("instantiating a new reader with the toLine option less than 0 should fail", func(t *testing.T) { + t.Parallel() + + _, err := NewReaderFrom( + strings.NewReader("lastname,firstname,composer,born,died,dates\n"), + options{ToLine: null.NewInt(-1, true)}, + ) + require.Error(t, err) + }) + + t.Run("instantiating a new reader with fromLine greater or equal to toLine should fail", func(t *testing.T) { + t.Parallel() + + _, err := NewReaderFrom( + strings.NewReader("lastname,firstname,composer,born,died,dates\n"), + options{FromLine: null.NewInt(4, true), ToLine: null.NewInt(1, true)}, + ) + require.Error(t, err) + }) + + t.Run("skipFirstLine option skips first line and succeeds", func(t *testing.T) { + t.Parallel() + + const csvTestData = "lastname,firstname,composer,born,died,dates\n" + + "Scarlatti,Domenico,Domenico Scarlatti,1685,1757,1685–1757\n" + + r, err := NewReaderFrom( + strings.NewReader(csvTestData), + options{SkipFirstLine: true}, + ) + require.NoError(t, err) + + records, err := r.csv.Read() + require.NoError(t, err) + assert.Equal(t, []string{"Scarlatti", "Domenico", "Domenico Scarlatti", "1685", "1757", "1685–1757"}, records) + }) + + t.Run("fromLine option move reading head forward and succeeds", func(t *testing.T) { + t.Parallel() + + const csvTestData = "lastname,firstname,composer,born,died,dates\n" + + "Scarlatti,Domenico,Domenico Scarlatti,1685,1757,1685–1757\n" + + "Dorman,Avner,Avner Dorman,1975,,1975–\n" + + "Still,William Grant,William Grant Still,1895,1978,1895–1978\n" + + r, err := NewReaderFrom( + strings.NewReader(csvTestData), + options{FromLine: null.NewInt(2, true)}, + ) + require.NoError(t, err) + + records, err := r.csv.Read() + require.NoError(t, err) + assert.Equal(t, []string{"Dorman", "Avner", "Avner Dorman", "1975", "", "1975–"}, records) + }) + + t.Run("fromLine option supersedes skipFirstLine option and succeeds", func(t *testing.T) { + t.Parallel() + + const csvTestData = "lastname,firstname,composer,born,died,dates\n" + + "Scarlatti,Domenico,Domenico Scarlatti,1685,1757,1685–1757\n" + + "Dorman,Avner,Avner Dorman,1975,,1975–\n" + + "Still,William Grant,William Grant Still,1895,1978,1895–1978\n" + + r, err := NewReaderFrom( + strings.NewReader(csvTestData), + options{SkipFirstLine: true, FromLine: null.NewInt(2, true)}, + ) + require.NoError(t, err) + + records, err := r.csv.Read() + require.NoError(t, err) + assert.Equal(t, []string{"Dorman", "Avner", "Avner Dorman", "1975", "", "1975–"}, records) + }) +} + +func TestReader_Read(t *testing.T) { + t.Parallel() + + t.Run("default behavior should return all lines and succeed", func(t *testing.T) { + t.Parallel() + + const csvTestData = "lastname,firstname,composer,born,died,dates\n" + + "Scarlatti,Domenico,Domenico Scarlatti,1685,1757,1685–1757\n" + + r, err := NewReaderFrom( + strings.NewReader(csvTestData), + options{}, + ) + require.NoError(t, err) + + // Parsing gotHeader should succeed + gotHeader, err := r.Read() + require.NoError(t, err) + assert.Equal(t, []string{"lastname", "firstname", "composer", "born", "died", "dates"}, gotHeader) + + // Parsing first line should succeed + gotFirstLine, err := r.Read() + require.NoError(t, err) + assert.Equal(t, []string{"Scarlatti", "Domenico", "Domenico Scarlatti", "1685", "1757", "1685–1757"}, gotFirstLine) + + // As we've reached EOF, we should get EOF + _, err = r.Read() + require.Error(t, err) + require.ErrorIs(t, err, io.EOF) + }) + + t.Run("toLine option returns EOF when toLine option is reached and succeeds", func(t *testing.T) { + t.Parallel() + + const csvTestData = "lastname,firstname,composer,born,died,dates\n" + + "Scarlatti,Domenico,Domenico Scarlatti,1685,1757,1685–1757\n" + + "Dorman,Avner,Avner Dorman,1975,,1975–\n" + + "Still,William Grant,William Grant Still,1895,1978,1895–1978\n" + + r, err := NewReaderFrom( + strings.NewReader(csvTestData), + options{ToLine: null.NewInt(2, true)}, + ) + require.NoError(t, err) + + // Parsing header should succeed + _, err = r.Read() + require.NoError(t, err) + + // Parsing first line should succeed + _, err = r.Read() + require.NoError(t, err) + + // Parsing second line should succeed + _, err = r.Read() + require.NoError(t, err) + + // As we've reached `toLine`, we should get EOF + _, err = r.Read() + require.Error(t, err) + require.ErrorIs(t, err, io.EOF) + }) +} diff --git a/js/modules/k6/experimental/fs/file.go b/js/modules/k6/experimental/fs/file.go index d72a3517154..b60ce81565c 100644 --- a/js/modules/k6/experimental/fs/file.go +++ b/js/modules/k6/experimental/fs/file.go @@ -8,25 +8,28 @@ import ( // file is an abstraction for interacting with files. type file struct { - path string + path string `js:"path"` // data holds a pointer to the file's data - data []byte + data []byte `js:"data"` // offset holds the current offset in the file // // TODO: using an atomic here does not guarantee ordering of reads and seeks, and leaves // the behavior not strictly defined. This is something we might want to address in the future, and // is tracked as part of #3433. - offset atomic.Int64 + offset atomic.Int64 `js:"offset"` } // Stat returns a FileInfo describing the named file. -func (f *file) stat() *FileInfo { +func (f *file) Stat() *FileInfo { filename := filepath.Base(f.path) return &FileInfo{Name: filename, Size: f.size()} } +// Ensure that `file` implements the Stater interface. +var _ Stater = (*file)(nil) + // FileInfo holds information about a file. type FileInfo struct { // Name holds the base name of the file. @@ -48,7 +51,7 @@ func (f *file) Read(into []byte) (n int, err error) { // Check if we have reached the end of the file if currentOffset == fileSize { - return 0, newFsError(EOFError, "EOF") + return 0, io.EOF } // Calculate the effective new offset @@ -62,7 +65,7 @@ func (f *file) Read(into []byte) (n int, err error) { // If we've reached or surpassed the end, set the error to EOF if targetOffset > fileSize { - err = newFsError(EOFError, "EOF") + err = io.EOF } return n, err diff --git a/js/modules/k6/experimental/fs/file_test.go b/js/modules/k6/experimental/fs/file_test.go index 58cfe80362b..d3b26659cea 100644 --- a/js/modules/k6/experimental/fs/file_test.go +++ b/js/modules/k6/experimental/fs/file_test.go @@ -2,7 +2,6 @@ package fs import ( "bytes" - "errors" "testing" "github.com/stretchr/testify/assert" @@ -21,7 +20,7 @@ func TestFileImpl(t *testing.T) { offset int64 wantInto []byte wantN int - wantErr errorKind + wantErr bool }{ { name: "reading the entire file into a buffer fitting the whole file should succeed", @@ -30,7 +29,7 @@ func TestFileImpl(t *testing.T) { offset: 0, wantInto: []byte("hello"), wantN: 5, - wantErr: 0, // No error expected + wantErr: false, }, { name: "reading a file larger than the provided buffer should succeed", @@ -39,7 +38,7 @@ func TestFileImpl(t *testing.T) { offset: 0, wantInto: []byte("hel"), wantN: 3, - wantErr: 0, // No error expected + wantErr: false, }, { name: "reading a file larger than the provided buffer at an offset should succeed", @@ -48,7 +47,7 @@ func TestFileImpl(t *testing.T) { offset: 2, wantInto: []byte("llo"), wantN: 3, - wantErr: 0, // No error expected + wantErr: false, }, { name: "reading file data into a zero sized buffer should succeed", @@ -57,7 +56,7 @@ func TestFileImpl(t *testing.T) { offset: 0, wantInto: []byte{}, wantN: 0, - wantErr: 0, // No error expected + wantErr: false, }, { name: "reading past the end of the file should fill the buffer and fail with EOF", @@ -66,7 +65,8 @@ func TestFileImpl(t *testing.T) { offset: 0, wantInto: []byte{'h', 'e', 'l', 'l', 'o', 0, 0, 0, 0, 0}, wantN: 5, - wantErr: EOFError, + wantErr: true, + // wantErr: EOFError, }, { name: "reading into a prefilled buffer overrides its content", @@ -75,7 +75,7 @@ func TestFileImpl(t *testing.T) { offset: 0, wantInto: []byte("hello!"), wantN: 5, - wantErr: EOFError, + wantErr: true, }, { name: "reading an empty file should fail with EOF", @@ -84,7 +84,7 @@ func TestFileImpl(t *testing.T) { offset: 0, wantInto: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, wantN: 0, - wantErr: EOFError, + wantErr: true, }, { name: "reading from the end of a file should fail with EOF", @@ -94,7 +94,7 @@ func TestFileImpl(t *testing.T) { offset: 5, wantInto: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, wantN: 0, - wantErr: EOFError, + wantErr: true, }, } @@ -110,20 +110,9 @@ func TestFileImpl(t *testing.T) { } f.offset.Store(tc.offset) - gotN, err := f.Read(tc.into) + gotN, gotErr := f.Read(tc.into) - // Cast the error to your custom error type to access its kind - var gotErr errorKind - if err != nil { - var fsErr *fsError - ok := errors.As(err, &fsErr) - if !ok { - t.Fatalf("unexpected error type: got %T, want %T", err, &fsError{}) - } - gotErr = fsErr.kind - } - - if gotN != tc.wantN || gotErr != tc.wantErr { + if gotN != tc.wantN || (gotErr != nil) != tc.wantErr { t.Errorf("Read() = %d, %v, want %d, %v", gotN, gotErr, tc.wantN, tc.wantErr) } diff --git a/js/modules/k6/experimental/fs/module.go b/js/modules/k6/experimental/fs/module.go index 6b20215df6d..55aa04f7f7c 100644 --- a/js/modules/k6/experimental/fs/module.go +++ b/js/modules/k6/experimental/fs/module.go @@ -7,13 +7,15 @@ package fs import ( "errors" "fmt" + "io" "reflect" + "go.k6.io/k6/lib/fsext" + "github.com/grafana/sobek" "go.k6.io/k6/js/common" "go.k6.io/k6/js/modules" "go.k6.io/k6/js/promises" - "go.k6.io/k6/lib/fsext" ) type ( @@ -135,15 +137,37 @@ func (mi *ModuleInstance) openImpl(path string) (*File, error) { return nil, err } - return &File{ + file := &File{ Path: path, - file: file{ + ReadSeekStater: &file{ path: path, data: data, }, vu: mi.vu, cache: mi.cache, - }, nil + } + + return file, nil +} + +// Stater is an interface that provides information about a file. +// +// Although in the context of this module we have a single implementation +// of this interface, it is defined to allow exposing the `file`'s behavior +// to other module through the `ReadSeekStater` interface without having to +// leak our internal abstraction. +type Stater interface { + // Stat returns a FileInfo describing the named file. + Stat() *FileInfo +} + +// ReadSeekStater is an interface that combines the io.ReadSeeker and Stater +// interfaces and ensure that structs implementing it have the necessary +// methods to interact with files. +type ReadSeekStater interface { + io.Reader + io.Seeker + Stater } // File represents a file and exposes methods to interact with it. @@ -154,8 +178,14 @@ type File struct { // Path holds the name of the file, as presented to [Open]. Path string `json:"path"` - // file contains the actual implementation for the file system. - file + // ReadSeekStater contains the actual implementation of the file logic, and + // interacts with the underlying file system. + // + // Note that we explicitly omit exposing this to JS to avoid leaking + // implementation details, but keep it public so that we can access it + // from other modules that would want to leverage its implementation of + // io.Reader and io.Seeker. + ReadSeekStater ReadSeekStater `js:"-"` // vu holds a reference to the VU this file is associated with. // @@ -175,7 +205,7 @@ func (f *File) Stat() *sobek.Promise { promise, resolve, _ := promises.New(f.vu) go func() { - resolve(f.file.stat()) + resolve(f.ReadSeekStater.Stat()) }() return promise @@ -220,7 +250,7 @@ func (f *File) Read(into sobek.Value) *sobek.Promise { // occurs on the main thread, during the promise's resolution. callback := f.vu.RegisterCallback() go func() { - n, readErr := f.file.Read(buffer) + n, readErr := f.ReadSeekStater.Read(buffer) callback(func() error { _ = copy(intoBytes[0:n], buffer) @@ -231,9 +261,14 @@ func (f *File) Read(into sobek.Value) *sobek.Promise { return nil } + // If the read operation failed, we need to check if it was an io.EOF error + // and match it to its fsError counterpart if that's the case. + if errors.Is(readErr, io.EOF) { + readErr = newFsError(EOFError, "read() failed; reason: EOF") + } + var fsErr *fsError isFSErr := errors.As(readErr, &fsErr) - if !isFSErr { reject(readErr) return nil @@ -283,7 +318,7 @@ func (f *File) Seek(offset sobek.Value, whence sobek.Value) *sobek.Promise { callback := f.vu.RegisterCallback() go func() { - newOffset, err := f.file.Seek(intOffset, seekMode) + newOffset, err := f.ReadSeekStater.Seek(intOffset, seekMode) callback(func() error { if err != nil { reject(err) diff --git a/js/modules/k6/experimental/fs/module_test.go b/js/modules/k6/experimental/fs/module_test.go index bba76a726ed..397e3512640 100644 --- a/js/modules/k6/experimental/fs/module_test.go +++ b/js/modules/k6/experimental/fs/module_test.go @@ -95,7 +95,7 @@ func TestOpen(t *testing.T) { _, err = runtime.RunOnEventLoop(wrapInAsyncLambda(` try { - const file = await fs.open('bonjour.txt') + const file = await fs.open('bonjour.txt') throw 'unexpected promise resolution with result: ' + file; } catch (err) { if (err.name !== 'ForbiddenError') { @@ -199,7 +199,7 @@ func TestOpen(t *testing.T) { func TestFile(t *testing.T) { t.Parallel() - t.Run("stat method should succeed", func(t *testing.T) { + t.Run("Stat method should succeed", func(t *testing.T) { t.Parallel() runtime, err := newConfiguredRuntime(t) @@ -338,7 +338,7 @@ func TestFile(t *testing.T) { runtime.VU.InitEnvField.FileSystems["file"] = fs _, err = runtime.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` - const file = await fs.open(%q); + const file = await fs.open(%q); let bytesRead; // No argument should fail with TypeError. @@ -406,11 +406,11 @@ func TestFile(t *testing.T) { runtime.VU.InitEnvField.FileSystems["file"] = fs _, err = runtime.RunOnEventLoop(wrapInAsyncLambda(fmt.Sprintf(` - // file size is 3 + // file size is 3 const file = await fs.open(%q); // Create a buffer of size fileSize + 1 - let buffer = new Uint8Array(4); + let buffer = new Uint8Array(4); let n = await file.read(buffer) if (n !== 3) { throw 'expected read to return 10, got ' + n + ' instead'; @@ -555,7 +555,7 @@ func TestFile(t *testing.T) { // Invalid type offset should fail with TypeError. try { newOffset = await file.seek('abc') - throw "file.seek('abc') promise unexpectedly resolved with result: " + newOffset + throw "file.seek('abc') promise unexpectedly resolved with result: " + newOffset } catch (err) { if (err.name !== 'TypeError') { throw "file.seek('1') rejected with unexpected error: " + err @@ -585,7 +585,7 @@ func TestFile(t *testing.T) { // Invalid whence should fail with TypeError. try { newOffset = await file.seek(1, -1) - throw "file.seek(1, -1) promise unexpectedly resolved with result: " + newOffset + throw "file.seek(1, -1) promise unexpectedly resolved with result: " + newOffset } catch (err) { if (err.name !== 'TypeError') { throw "file.seek(1, -1) rejected with unexpected error: " + err