Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Adapter: Implements RethinkDB as a source of documents #64

Merged
merged 14 commits into from
Mar 17, 2015
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pkg/adaptor/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,21 @@ func (e *Elasticsearch) applyOp(msg *message.Msg) (*message.Msg, error) {
}
return msg, nil
}

// TODO there might be some inconsistency here. elasticsearch uses the _id field for an primary index,
// and we're just mapping it to a string here.
id, err := msg.IDString("_id")
if err != nil {
id = ""
}
err = e.indexer.Index(e.index, e._type, id, "", nil, msg.Data, false)

switch msg.Op {
case message.Delete:
e.indexer.Delete(e.index, e._type, id, false)
err = nil
default:
err = e.indexer.Index(e.index, e._type, id, "", nil, msg.Data, false)
}
if err != nil {
e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("elasticsearch error (%s)", err), msg.Data)
}
Expand Down
166 changes: 140 additions & 26 deletions pkg/adaptor/rethinkdb.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package adaptor

import (
"errors"
"fmt"
"net/url"
"strings"
Expand All @@ -22,6 +23,7 @@ type Rethinkdb struct {
table string

debug bool
tail bool

//
pipe *pipe.Pipe
Expand All @@ -31,10 +33,24 @@ type Rethinkdb struct {
client *gorethink.Session
}

// rethinkDbConfig provides custom configuration options for the RethinkDB adapter
type rethinkDbConfig struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be publicly accessible so that it can be properly "Registered" here as:

Register("rethinkdb", "a rethinkdb sink adaptor", NewRethinkdb, RethinkdbConfig{})

URI string `json:"uri" doc:"the uri to connect to, in the form rethink://user:[email protected]:28015/database"`
Namespace string `json:"namespace" doc:"rethink namespace to read/write, in the form database.table"`
Debug bool `json:"debug" doc:"if true, verbose debugging information is displayed"`
Tail bool `json:"tail" doc:"if true, the RethinkDB table will be monitored for changes after copying the namespace"`
}

type rethinkDbChangeNotification struct {
Error string `gorethink:"error"`
OldVal map[string]interface{} `gorethink:"old_val"`
NewVal map[string]interface{} `gorethink:"new_val"`
}

// NewRethinkdb creates a new Rethinkdb database adaptor
func NewRethinkdb(p *pipe.Pipe, path string, extra Config) (StopStartListener, error) {
var (
conf dbConfig
conf rethinkDbConfig
err error
)
if err = extra.Construct(&conf); err != nil {
Expand All @@ -46,10 +62,15 @@ func NewRethinkdb(p *pipe.Pipe, path string, extra Config) (StopStartListener, e
return nil, err
}

if conf.Debug {
fmt.Printf("rethinkDbConfig: %#v\n", conf)
}

r := &Rethinkdb{
uri: u,
pipe: p,
path: path,
tail: conf.Tail,
}

r.database, r.table, err = extra.splitNamespace()
Expand All @@ -58,22 +79,131 @@ func NewRethinkdb(p *pipe.Pipe, path string, extra Config) (StopStartListener, e
}
r.debug = conf.Debug

r.client, err = gorethink.Connect(gorethink.ConnectOpts{
Address: r.uri.Host,
MaxIdle: 10,
Timeout: time.Second * 10,
})
if err != nil {
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go ahead and return r instead of nil so any logging can happen upstream in the even there are references to the object.

}
r.client.Use(r.database)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some version detection here for >= 1.16.X if tail is true?

return r, nil
}

// Start the adaptor as a source (not implemented)
// Start the adaptor as a source
func (r *Rethinkdb) Start() error {
return fmt.Errorf("rethinkdb can't function as a source")
}
if r.debug {
fmt.Printf("getting a changes cursor\n")
}

// Listen start's the adaptor's listener
func (r *Rethinkdb) Listen() (err error) {
r.client, err = r.setupClient()
// Grab a changes cursor before sending all rows. The server will buffer
// changes while we reindex the entire table.
var ccursor *gorethink.Cursor
ccursor, err := gorethink.Table(r.table).Changes().Run(r.client)
if err != nil {
r.pipe.Err <- err
return err
}
defer ccursor.Close()

if err := r.sendAllDocuments(); err != nil {
r.pipe.Err <- err
return err
}

if r.tail {
if err := r.sendChanges(ccursor); err != nil {
r.pipe.Err <- err
return err
}
}

return nil
}

func (r *Rethinkdb) sendAllDocuments() error {
if r.debug {
fmt.Printf("sending all documents\n")
}

cursor, err := gorethink.Table(r.table).Run(r.client)
if err != nil {
return err
}
defer cursor.Close()

var doc map[string]interface{}
for cursor.Next(&doc) {
if stop := r.pipe.Stopped; stop {
return nil
}

msg := message.NewMsg(message.Insert, r.prepareDocument(doc))
r.pipe.Send(msg)
}

if err := cursor.Err(); err != nil {
return err
}

return nil
}

func (r *Rethinkdb) sendChanges(ccursor *gorethink.Cursor) error {
if r.debug {
fmt.Printf("sending changes\n")
}

var change rethinkDbChangeNotification
for ccursor.Next(&change) {
if stop := r.pipe.Stopped; stop {
return nil
}

if r.debug {
fmt.Printf("change: %#v\n", change)
}

var msg *message.Msg
if change.Error != "" {
return errors.New(change.Error)
} else if change.OldVal != nil && change.NewVal != nil {
msg = message.NewMsg(message.Update, r.prepareDocument(change.NewVal))
} else if change.NewVal != nil {
msg = message.NewMsg(message.Insert, r.prepareDocument(change.NewVal))
} else if change.OldVal != nil {
msg = message.NewMsg(message.Delete, r.prepareDocument(change.OldVal))
}

if msg != nil {
fmt.Printf("msg: %#v\n", msg)
r.pipe.Send(msg)
}
}

if err := ccursor.Err(); err != nil {
return err
}

return nil
}

// prepareDocument moves the `id` field to the `_id` field, which is more
// commonly used by downstream sinks. A transformer could be used to do the
// same thing, but because transformers are not run for Delete messages, we
// must do it here.
func (r *Rethinkdb) prepareDocument(doc map[string]interface{}) map[string]interface{} {
doc["_id"] = doc["id"]
delete(doc, "id")

return doc
}

// Listen start's the adaptor's listener
func (r *Rethinkdb) Listen() (err error) {
r.recreateTable()
return r.pipe.Listen(r.applyOp)
}

Expand Down Expand Up @@ -122,28 +252,12 @@ func (r *Rethinkdb) applyOp(msg *message.Msg) (*message.Msg, error) {
return msg, nil
}

func (r *Rethinkdb) setupClient() (*gorethink.Session, error) {
// set up the clientConfig, we need host:port, username, password, and database name
if r.debug {
fmt.Printf("Connecting to %s\n", r.uri.Host)
}
client, err := gorethink.Connect(gorethink.ConnectOpts{
Address: r.uri.Host,
MaxIdle: 10,
Timeout: time.Second * 10,
})
if err != nil {
return nil, fmt.Errorf("unable to connect: %s", err)
}

func (r *Rethinkdb) recreateTable() {
if r.debug {
fmt.Printf("dropping and creating table '%s' on database '%s'\n", r.table, r.database)
}
gorethink.Db(r.database).TableDrop(r.table).RunWrite(client)
gorethink.Db(r.database).TableCreate(r.table).RunWrite(client)

client.Use(r.database)
return client, nil
gorethink.Db(r.database).TableDrop(r.table).RunWrite(r.client)
gorethink.Db(r.database).TableCreate(r.table).RunWrite(r.client)
}

// handleresponse takes the rethink response and turn it into something we can consume elsewhere
Expand Down
2 changes: 2 additions & 0 deletions test/application-rethink-to-es.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Source({"name": "rethink1", "namespace": "test.transporter", "tail": true, "debug": true}).
save({"name": "locales", "namespace": "test.transporter"})
5 changes: 4 additions & 1 deletion test/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ nodes:
es:
type: elasticsearch
uri: https://nick:[email protected]:10291/thisgetsignored
locales:
type: elasticsearch
uri: http://localhost:9200/thisgetsignored
timeseries:
type: influx
uri: influxdb://root:root@localhost:8086/compose
Expand All @@ -32,7 +35,7 @@ nodes:
uri: stdout://
rethink1:
type: rethinkdb
uri: rethink://127.0.0.2:28015/
uri: rethink://localhost:28015/
loosefile:
type: file
logtransformer:
Expand Down