Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better API example #1392

Merged
merged 1 commit into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 69 additions & 49 deletions docs/src/miller-as-library.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,41 @@ $ go run main1.go
## Another example use

<pre class="pre-non-highlight-non-pair">
// This is an example of using Miller as a library.
package main

import (
"bufio"
"container/list"
"errors"
"fmt"
"os"

"github.com/johnkerl/miller/pkg/bifs"
"github.com/johnkerl/miller/pkg/cli"
"github.com/johnkerl/miller/pkg/input"
"github.com/johnkerl/miller/pkg/output"
"github.com/johnkerl/miller/pkg/transformers"
"github.com/johnkerl/miller/pkg/types"
)

func convert_csv_to_json(fileNames []string) error {
options := &cli.TOptions{
// Put your record-processing logic here.
func custom_record_processor(irac *types.RecordAndContext) (*types.RecordAndContext, error) {
irec := irac.Record

v := irec.Get("i")
if v == nil {
return nil, fmt.Errorf("did not find key \"i\" at filename %s record number %d",
irac.Context.FILENAME, irac.Context.FNR,
)
}
v2 := bifs.BIF_times(v, v)
irec.PutReference("i2", v2)

return irac, nil
}

// Put your various options here.
func custom_options() *cli.TOptions {
return &cli.TOptions{
ReaderOptions: cli.TReaderOptions{
InputFileFormat: "csv",
IFS: ",",
Expand All @@ -105,6 +122,14 @@ func convert_csv_to_json(fileNames []string) error {
OutputFileFormat: "json",
},
}
}

// This function you don't need to modify.
func convert_csv_to_json(
fileNames []string,
options *cli.TOptions,
record_processor func (irac *types.RecordAndContext) (*types.RecordAndContext, error),
) error {
outputStream := os.Stdout
outputIsStdout := true

Expand All @@ -120,60 +145,55 @@ func convert_csv_to_json(fileNames []string) error {
return err
}

// Set up the channels for the record-reader.
readerChannel := make(chan *list.List, 2) // list of *types.RecordAndContext
inputErrorChannel := make(chan error, 1)
// Not needed in this example
readerDownstreamDoneChannel := make(chan bool, 1)

// Instantiate the record-writer
recordWriter, err := output.Create(&options.WriterOptions)
if err != nil {
return err
}

cat, err := transformers.NewTransformerCat(
false, // doCounters bool,
"", // counterFieldName string,
nil, // groupByFieldNames []string,
false, // doFileName bool,
false, // doFileNum bool,
)
if err != nil {
return err
}
recordTransformers := []transformers.IRecordTransformer{cat}

// Set up the reader-to-transformer and transformer-to-writer channels.
readerChannel := make(chan *list.List, 2) // list of *types.RecordAndContext
writerChannel := make(chan *list.List, 1) // list of *types.RecordAndContext

// We're done when a fatal error is registered on input (file not found,
// etc) or when the record-writer has written all its output. We use
// channels to communicate both of these conditions.
inputErrorChannel := make(chan error, 1)
doneWritingChannel := make(chan bool, 1)
dataProcessingErrorChannel := make(chan bool, 1)

readerDownstreamDoneChannel := make(chan bool, 1)

// Start the reader, transformer, and writer. Let them run until fatal input
// error or end-of-processing happens.
bufferedOutputStream := bufio.NewWriter(outputStream)

go recordReader.Read(fileNames, *initialContext, readerChannel, inputErrorChannel, readerDownstreamDoneChannel)
go transformers.ChainTransformer(readerChannel, readerDownstreamDoneChannel, recordTransformers,
writerChannel, options)
go output.ChannelWriter(writerChannel, recordWriter, &options.WriterOptions, doneWritingChannel,
dataProcessingErrorChannel, bufferedOutputStream, outputIsStdout)
// Start the record-reader.
go recordReader.Read(
fileNames, *initialContext, readerChannel, inputErrorChannel, readerDownstreamDoneChannel)

// Loop through the record stream.
var retval error
done := false
for !done {
select {

case ierr := &lt;-inputErrorChannel:
retval = ierr
break
case _ = &lt;-dataProcessingErrorChannel:
retval = errors.New("exiting due to data error") // details already printed
break
case _ = &lt;-doneWritingChannel:
done = true

case iracs := &lt;-readerChannel:
// Handle the record batch
for e := iracs.Front(); e != nil; e = e.Next() {
irac := e.Value.(*types.RecordAndContext)
if irac.Record != nil {
orac, err := record_processor(irac)
if err != nil {
retval = err
done = true
break
}
recordWriter.Write(orac.Record, bufferedOutputStream, outputIsStdout)
}
if irac.OutputString != "" {
fmt.Fprintln(bufferedOutputStream, irac.OutputString)
}
if irac.EndOfStream {
done = true
}
}
break

}
}

Expand All @@ -183,7 +203,8 @@ func convert_csv_to_json(fileNames []string) error {
}

func main() {
err := convert_csv_to_json(os.Args[1:])
options := custom_options()
err := convert_csv_to_json(os.Args[1:], options, custom_record_processor)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
Expand All @@ -198,10 +219,9 @@ nadir.west.our.org,down

```
$ go build main2.go
$ ./main2 data/hostnames.csv
{"host": "apoapsis.east.our.org", "status": "up"}
{"host": "nadir.west.our.org", "status": "down"}
{"a": "pan", "b": "pan", "i": 1, "x": 0.3467901443380824, "y": 0.7268028627434533, "i2": 1}
{"a": "eks", "b": "pan", "i": 2, "x": 0.7586799647899636, "y": 0.5221511083334797, "i2": 4}
{"a": "wye", "b": "wye", "i": 3, "x": 0.20460330576630303, "y": 0.33831852551664776, "i2": 9}
{"a": "eks", "b": "wye", "i": 4, "x": 0.38139939387114097, "y": 0.13418874328430463, "i2": 16}
{"a": "wye", "b": "pan", "i": 5, "x": 0.5732889198020006, "y": 0.8636244699032729, "i2": 25}$ ./main2 data/small.csv
```



11 changes: 5 additions & 6 deletions docs/src/miller-as-library.md.in
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ GENMD-INCLUDE-ESCAPED(data/hostnames.csv)

```
$ go build main2.go
$ ./main2 data/hostnames.csv
{"host": "apoapsis.east.our.org", "status": "up"}
{"host": "nadir.west.our.org", "status": "down"}
{"a": "pan", "b": "pan", "i": 1, "x": 0.3467901443380824, "y": 0.7268028627434533, "i2": 1}
{"a": "eks", "b": "pan", "i": 2, "x": 0.7586799647899636, "y": 0.5221511083334797, "i2": 4}
{"a": "wye", "b": "wye", "i": 3, "x": 0.20460330576630303, "y": 0.33831852551664776, "i2": 9}
{"a": "eks", "b": "wye", "i": 4, "x": 0.38139939387114097, "y": 0.13418874328430463, "i2": 16}
{"a": "wye", "b": "pan", "i": 5, "x": 0.5732889198020006, "y": 0.8636244699032729, "i2": 25}$ ./main2 data/small.csv
```



107 changes: 64 additions & 43 deletions docs/src/miller-as-library/main2.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,38 @@
// This is an example of using Miller as a library.
package main

import (
"bufio"
"container/list"
"errors"
"fmt"
"os"

"github.com/johnkerl/miller/pkg/bifs"
"github.com/johnkerl/miller/pkg/cli"
"github.com/johnkerl/miller/pkg/input"
"github.com/johnkerl/miller/pkg/output"
"github.com/johnkerl/miller/pkg/transformers"
"github.com/johnkerl/miller/pkg/types"
)

func convert_csv_to_json(fileNames []string) error {
options := &cli.TOptions{
// Put your record-processing logic here.
func custom_record_processor(irac *types.RecordAndContext) (*types.RecordAndContext, error) {
irec := irac.Record

v := irec.Get("i")
if v == nil {
return nil, fmt.Errorf("did not find key \"i\" at filename %s record number %d",
irac.Context.FILENAME, irac.Context.FNR,
)
}
v2 := bifs.BIF_times(v, v)
irec.PutReference("i2", v2)

return irac, nil
}

// Put your various options here.
func custom_options() *cli.TOptions {
return &cli.TOptions{
ReaderOptions: cli.TReaderOptions{
InputFileFormat: "csv",
IFS: ",",
Expand All @@ -26,6 +43,14 @@ func convert_csv_to_json(fileNames []string) error {
OutputFileFormat: "json",
},
}
}

// This function you don't need to modify.
func convert_csv_to_json(
fileNames []string,
options *cli.TOptions,
record_processor func (irac *types.RecordAndContext) (*types.RecordAndContext, error),
) error {
outputStream := os.Stdout
outputIsStdout := true

Expand All @@ -41,60 +66,55 @@ func convert_csv_to_json(fileNames []string) error {
return err
}

// Set up the channels for the record-reader.
readerChannel := make(chan *list.List, 2) // list of *types.RecordAndContext
inputErrorChannel := make(chan error, 1)
// Not needed in this example
readerDownstreamDoneChannel := make(chan bool, 1)

// Instantiate the record-writer
recordWriter, err := output.Create(&options.WriterOptions)
if err != nil {
return err
}

cat, err := transformers.NewTransformerCat(
false, // doCounters bool,
"", // counterFieldName string,
nil, // groupByFieldNames []string,
false, // doFileName bool,
false, // doFileNum bool,
)
if err != nil {
return err
}
recordTransformers := []transformers.IRecordTransformer{cat}

// Set up the reader-to-transformer and transformer-to-writer channels.
readerChannel := make(chan *list.List, 2) // list of *types.RecordAndContext
writerChannel := make(chan *list.List, 1) // list of *types.RecordAndContext

// We're done when a fatal error is registered on input (file not found,
// etc) or when the record-writer has written all its output. We use
// channels to communicate both of these conditions.
inputErrorChannel := make(chan error, 1)
doneWritingChannel := make(chan bool, 1)
dataProcessingErrorChannel := make(chan bool, 1)

readerDownstreamDoneChannel := make(chan bool, 1)

// Start the reader, transformer, and writer. Let them run until fatal input
// error or end-of-processing happens.
bufferedOutputStream := bufio.NewWriter(outputStream)

go recordReader.Read(fileNames, *initialContext, readerChannel, inputErrorChannel, readerDownstreamDoneChannel)
go transformers.ChainTransformer(readerChannel, readerDownstreamDoneChannel, recordTransformers,
writerChannel, options)
go output.ChannelWriter(writerChannel, recordWriter, &options.WriterOptions, doneWritingChannel,
dataProcessingErrorChannel, bufferedOutputStream, outputIsStdout)
// Start the record-reader.
go recordReader.Read(
fileNames, *initialContext, readerChannel, inputErrorChannel, readerDownstreamDoneChannel)

// Loop through the record stream.
var retval error
done := false
for !done {
select {

case ierr := <-inputErrorChannel:
retval = ierr
break
case _ = <-dataProcessingErrorChannel:
retval = errors.New("exiting due to data error") // details already printed
break
case _ = <-doneWritingChannel:
done = true

case iracs := <-readerChannel:
// Handle the record batch
for e := iracs.Front(); e != nil; e = e.Next() {
irac := e.Value.(*types.RecordAndContext)
if irac.Record != nil {
orac, err := record_processor(irac)
if err != nil {
retval = err
done = true
break
}
recordWriter.Write(orac.Record, bufferedOutputStream, outputIsStdout)
}
if irac.OutputString != "" {
fmt.Fprintln(bufferedOutputStream, irac.OutputString)
}
if irac.EndOfStream {
done = true
}
}
break

}
}

Expand All @@ -104,7 +124,8 @@ func convert_csv_to_json(fileNames []string) error {
}

func main() {
err := convert_csv_to_json(os.Args[1:])
options := custom_options()
err := convert_csv_to_json(os.Args[1:], options, custom_record_processor)
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
Expand Down
Loading
Loading