Skip to content

Commit

Permalink
Add an experimental csv module exposing a streaming csv parser (#3743)
Browse files Browse the repository at this point in the history
* Expose fs.File underlying implementation's ReadSeeker to other modules

* Add ability to produce a SharedArray from Go code

* Add experimental csv parser module

* Add csv module usage examples

* Apply suggestions from code review

Co-authored-by: Oleg Bespalov <[email protected]>

* Rename data.Reader to data.RecordReader

* Apply pull request review suggestions

* Update js/modules/k6/experimental/csv/module_test.go

Co-authored-by: Joan López de la Franca Beltran <[email protected]>

* Update js/modules/k6/experimental/csv/reader.go

Co-authored-by: Joan López de la Franca Beltran <[email protected]>

* Update js/modules/k6/experimental/fs/module.go

Co-authored-by: Joan López de la Franca Beltran <[email protected]>

* Apply Pull Request suggestions

* Fix linting errors

---------

Co-authored-by: Oleg Bespalov <[email protected]>
Co-authored-by: Joan López de la Franca Beltran <[email protected]>
  • Loading branch information
3 people committed Sep 10, 2024
1 parent 5f36e9b commit 93667ec
Show file tree
Hide file tree
Showing 14 changed files with 1,364 additions and 48 deletions.
28 changes: 28 additions & 0 deletions examples/experimental/csv-parse.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { open } from 'k6/experimental/fs'
import csv from 'k6/experimental/csv'
import { scenario } from 'k6/execution'

export const options = {
iterations: 10,
}

// Open the csv file, and parse it ahead of time.
let file;
let csvRecords;
(async function () {
file = await open('data.csv');

// The `csv.parse` function consumes the entire file at once, and returns
// the parsed records as a SharedArray object.
csvRecords = await csv.parse(file, {delimiter: ','})
})();


export default async function() {
// The csvRecords a SharedArray. Each element is a record from the CSV file, represented as an array
// where each element is a field from the CSV record.
//
// Thus, `csvRecords[scenario.iterationInTest]` will give us the record for the current iteration.
console.log(csvRecords[scenario.iterationInTest])
}

29 changes: 29 additions & 0 deletions examples/experimental/csv-parser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { open } from 'k6/experimental/fs'
import csv from 'k6/experimental/csv'

export const options = {
iterations: 10,
}

let file;
let parser;
(async function () {
file = await open('data.csv');
parser = new csv.Parser(file);
})();

export default async function() {
// The parser `next` method attempts to read the next row from the CSV file.
//
// It returns an iterator-like object with a `done` property that indicates whether
// there are more rows to read, and a `value` property that contains the row fields
// as an array.
const {done, value} = await parser.next();
if (done) {
throw new Error("No more rows to read");
}

// We expect the `value` property to be an array of strings, where each string is a field
// from the CSV record.
console.log(done, value);
}
11 changes: 11 additions & 0 deletions examples/experimental/data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
firstname,lastname,age
fariha,ehlenfeldt,72
qudratullah,gillfillan,50
jeleah,rodina,41
thaisia,nowalk,99
joey-lynn,wilsford,75
tudur,granville,81
aytek,umber,56
aynoor,hisaw,30
fiadh-rose,iravani,31
annely,ooley,70
2 changes: 2 additions & 0 deletions js/jsmodules.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.k6.io/k6/js/modules/k6/data"
"go.k6.io/k6/js/modules/k6/encoding"
"go.k6.io/k6/js/modules/k6/execution"
"go.k6.io/k6/js/modules/k6/experimental/csv"
"go.k6.io/k6/js/modules/k6/experimental/fs"
"go.k6.io/k6/js/modules/k6/experimental/streams"
"go.k6.io/k6/js/modules/k6/experimental/tracing"
Expand All @@ -38,6 +39,7 @@ func getInternalJSModules() map[string]interface{} {
"k6/encoding": encoding.New(),
"k6/timers": timers.New(),
"k6/execution": execution.New(),
"k6/experimental/csv": csv.New(),
"k6/experimental/redis": redis.New(),
"k6/experimental/streams": streams.New(),
"k6/experimental/webcrypto": webcrypto.New(),
Expand Down
60 changes: 58 additions & 2 deletions js/modules/k6/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
package data

import (
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
"sync"

Expand Down Expand Up @@ -93,6 +96,59 @@ func (d *Data) sharedArray(call sobek.ConstructorCall) *sobek.Object {
return array.wrap(rt).ToObject(rt)
}

// RecordReader is the interface that wraps the action of reading records from a resource.
//
// The data module RecordReader interface is implemented by types that can read data that can be
// treated as records, from data sources such as a CSV file, etc.
type RecordReader interface {
Read() ([]string, error)
}

// NewSharedArrayFrom creates a new shared array from the provided data.
//
// This function is not exposed to the JS runtime. It is used internally to instantiate
// shared arrays without having to go through the whole JS runtime machinery, which effectively has
// a big performance impact (e.g. when filling a shared array from a CSV file).
//
// This function takes an explicit runtime argument to retain control over which VU runtime it is
// executed in. This is important because the shared array underlying implementation relies on maintaining
// a single instance of arrays for the whole test setup and VUs.
func (d *Data) NewSharedArrayFrom(rt *sobek.Runtime, name string, r RecordReader) *sobek.Object {
if name == "" {
common.Throw(rt, errors.New("empty name provided to SharedArray's constructor"))
}

var arr []string
for {
record, err := r.Read()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
common.Throw(rt, fmt.Errorf("failed to read record; reason: %w", err))
}

marshaled, err := json.Marshal(record)
if err != nil {
common.Throw(rt, fmt.Errorf("failed to marshal record; reason: %w", err))
}

arr = append(arr, string(marshaled))
}

return d.shared.set(name, arr).wrap(rt).ToObject(rt)
}

// set is a helper method to set a shared array in the underlying shared arrays map.
func (s *sharedArrays) set(name string, arr []string) sharedArray {
s.mu.Lock()
defer s.mu.Unlock()
array := sharedArray{arr: arr}
s.data[name] = array

return array
}

func (s *sharedArrays) get(rt *sobek.Runtime, name string, call sobek.Callable) sharedArray {
s.mu.RLock()
array, ok := s.data[name]
Expand Down Expand Up @@ -121,10 +177,10 @@ func getShareArrayFromCall(rt *sobek.Runtime, call sobek.Callable) sharedArray {
}
arr := make([]string, obj.Get("length").ToInteger())

stringify, _ := sobek.AssertFunction(rt.GlobalObject().Get("JSON").ToObject(rt).Get("stringify"))
stringifyFunc, _ := sobek.AssertFunction(rt.GlobalObject().Get("JSON").ToObject(rt).Get("stringify"))
var val sobek.Value
for i := range arr {
val, err = stringify(sobek.Undefined(), obj.Get(strconv.Itoa(i)))
val, err = stringifyFunc(sobek.Undefined(), obj.Get(strconv.Itoa(i)))
if err != nil {
panic(err)
}
Expand Down
1 change: 1 addition & 0 deletions js/modules/k6/data/share.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (s wrappedSharedArray) Get(index int) sobek.Value {
if err != nil {
common.Throw(s.rt, err)
}

err = s.deepFreeze(s.rt, val)
if err != nil {
common.Throw(s.rt, err)
Expand Down
Loading

0 comments on commit 93667ec

Please sign in to comment.