diff --git a/pkg/adaptor/elasticsearch.go b/pkg/adaptor/elasticsearch.go index 31b896a4e..3ca7eab0d 100644 --- a/pkg/adaptor/elasticsearch.go +++ b/pkg/adaptor/elasticsearch.go @@ -101,13 +101,21 @@ func (e *Elasticsearch) applyOp(msg *message.Msg) (*message.Msg, error) { } return msg, nil } + // TODO there might be some inconsistency here. elasticsearch uses the _id field for an primary index, // and we're just mapping it to a string here. id, err := msg.IDString("_id") if err != nil { id = "" } - err = e.indexer.Index(e.index, e._type, id, "", nil, msg.Data, false) + + switch msg.Op { + case message.Delete: + e.indexer.Delete(e.index, e._type, id, false) + err = nil + default: + err = e.indexer.Index(e.index, e._type, id, "", nil, msg.Data, false) + } if err != nil { e.pipe.Err <- NewError(ERROR, e.path, fmt.Sprintf("elasticsearch error (%s)", err), msg.Data) } diff --git a/pkg/adaptor/registry.go b/pkg/adaptor/registry.go index d15ca0c17..a672c7119 100644 --- a/pkg/adaptor/registry.go +++ b/pkg/adaptor/registry.go @@ -18,7 +18,7 @@ func init() { Register("elasticsearch", "an elasticsearch sink adaptor", NewElasticsearch, dbConfig{}) // Register("influx", "an InfluxDB sink adaptor", NewInfluxdb, dbConfig{}) Register("transformer", "an adaptor that transforms documents using a javascript function", NewTransformer, TransformerConfig{}) - Register("rethinkdb", "a rethinkdb sink adaptor", NewRethinkdb, dbConfig{}) + Register("rethinkdb", "a rethinkdb sink adaptor", NewRethinkdb, rethinkDbConfig{}) } // Register registers an adaptor (database adaptor) for use with Transporter diff --git a/pkg/adaptor/rethinkdb.go b/pkg/adaptor/rethinkdb.go index 739c2f3ee..fbdf7ba45 100644 --- a/pkg/adaptor/rethinkdb.go +++ b/pkg/adaptor/rethinkdb.go @@ -1,6 +1,7 @@ package adaptor import ( + "errors" "fmt" "net/url" "strings" @@ -8,6 +9,7 @@ import ( "github.com/compose/transporter/pkg/message" "github.com/compose/transporter/pkg/pipe" + version "github.com/hashicorp/go-version" gorethink "gopkg.in/dancannon/gorethink.v0" ) @@ -22,6 +24,7 @@ type Rethinkdb struct { table string debug bool + tail bool // pipe *pipe.Pipe @@ -31,10 +34,32 @@ type Rethinkdb struct { client *gorethink.Session } +// rethinkDbConfig provides custom configuration options for the RethinkDB adapter +type rethinkDbConfig struct { + URI string `json:"uri" doc:"the uri to connect to, in the form rethink://user:password@host.example:28015/database"` + Namespace string `json:"namespace" doc:"rethink namespace to read/write, in the form database.table"` + Debug bool `json:"debug" doc:"if true, verbose debugging information is displayed"` + Tail bool `json:"tail" doc:"if true, the RethinkDB table will be monitored for changes after copying the namespace"` +} + +type rethinkDbChangeNotification struct { + Error string `gorethink:"error"` + OldVal map[string]interface{} `gorethink:"old_val"` + NewVal map[string]interface{} `gorethink:"new_val"` +} + +type rethinkDbProcessStatus struct { + Version string `gorethink:"version"` +} + +type rethinkDbServerStatus struct { + Process rethinkDbProcessStatus `gorethink:"process"` +} + // NewRethinkdb creates a new Rethinkdb database adaptor func NewRethinkdb(p *pipe.Pipe, path string, extra Config) (StopStartListener, error) { var ( - conf dbConfig + conf rethinkDbConfig err error ) if err = extra.Construct(&conf); err != nil { @@ -46,10 +71,15 @@ func NewRethinkdb(p *pipe.Pipe, path string, extra Config) (StopStartListener, e return nil, err } + if conf.Debug { + fmt.Printf("rethinkDbConfig: %#v\n", conf) + } + r := &Rethinkdb{ uri: u, pipe: p, path: path, + tail: conf.Tail, } r.database, r.table, err = extra.splitNamespace() @@ -58,22 +88,172 @@ func NewRethinkdb(p *pipe.Pipe, path string, extra Config) (StopStartListener, e } r.debug = conf.Debug + r.client, err = gorethink.Connect(gorethink.ConnectOpts{ + Address: r.uri.Host, + MaxIdle: 10, + Timeout: time.Second * 10, + }) + if err != nil { + return r, err + } + r.client.Use(r.database) + + if r.tail { + constraint, _ := version.NewConstraint(">= 1.16") + if err := r.assertServerVersion(constraint); err != nil { + return r, err + } + } + return r, nil } -// Start the adaptor as a source (not implemented) -func (r *Rethinkdb) Start() error { - return fmt.Errorf("rethinkdb can't function as a source") +func (r *Rethinkdb) assertServerVersion(constraint version.Constraints) error { + cursor, err := gorethink.Db("rethinkdb").Table("server_status").Run(r.client) + if err != nil { + return err + } + + if cursor.IsNil() { + return errors.New("could not determine the RethinkDB server version: no rows returned from the server_status table") + } + + var serverStatus rethinkDbServerStatus + cursor.Next(&serverStatus) + + if serverStatus.Process.Version == "" { + return errors.New("could not determine the RethinkDB server version: process.version key missing") + } + + pieces := strings.Split(serverStatus.Process.Version, " ") + if len(pieces) < 2 { + return fmt.Errorf("could not determine the RethinkDB server version: malformed version string (%v)", serverStatus.Process.Version) + } + + serverVersion, err := version.NewVersion(pieces[1]) + if err != nil { + return fmt.Errorf("could not determine the RethinkDB server version: malformed version string (%v)", serverStatus.Process.Version) + } + + if !constraint.Check(serverVersion) { + return fmt.Errorf("RethinkDB server version too old: expected %v, but was %v", constraint, serverVersion) + } + + return nil } -// Listen start's the adaptor's listener -func (r *Rethinkdb) Listen() (err error) { - r.client, err = r.setupClient() +// Start the adaptor as a source +func (r *Rethinkdb) Start() error { + if r.debug { + fmt.Printf("getting a changes cursor\n") + } + + // Grab a changes cursor before sending all rows. The server will buffer + // changes while we reindex the entire table. + var ccursor *gorethink.Cursor + ccursor, err := gorethink.Table(r.table).Changes().Run(r.client) if err != nil { r.pipe.Err <- err return err } + defer ccursor.Close() + if err := r.sendAllDocuments(); err != nil { + r.pipe.Err <- err + return err + } + + if r.tail { + if err := r.sendChanges(ccursor); err != nil { + r.pipe.Err <- err + return err + } + } + + return nil +} + +func (r *Rethinkdb) sendAllDocuments() error { + if r.debug { + fmt.Printf("sending all documents\n") + } + + cursor, err := gorethink.Table(r.table).Run(r.client) + if err != nil { + return err + } + defer cursor.Close() + + var doc map[string]interface{} + for cursor.Next(&doc) { + if stop := r.pipe.Stopped; stop { + return nil + } + + msg := message.NewMsg(message.Insert, r.prepareDocument(doc)) + r.pipe.Send(msg) + } + + if err := cursor.Err(); err != nil { + return err + } + + return nil +} + +func (r *Rethinkdb) sendChanges(ccursor *gorethink.Cursor) error { + if r.debug { + fmt.Printf("sending changes\n") + } + + var change rethinkDbChangeNotification + for ccursor.Next(&change) { + if stop := r.pipe.Stopped; stop { + return nil + } + + if r.debug { + fmt.Printf("change: %#v\n", change) + } + + var msg *message.Msg + if change.Error != "" { + return errors.New(change.Error) + } else if change.OldVal != nil && change.NewVal != nil { + msg = message.NewMsg(message.Update, r.prepareDocument(change.NewVal)) + } else if change.NewVal != nil { + msg = message.NewMsg(message.Insert, r.prepareDocument(change.NewVal)) + } else if change.OldVal != nil { + msg = message.NewMsg(message.Delete, r.prepareDocument(change.OldVal)) + } + + if msg != nil { + fmt.Printf("msg: %#v\n", msg) + r.pipe.Send(msg) + } + } + + if err := ccursor.Err(); err != nil { + return err + } + + return nil +} + +// prepareDocument moves the `id` field to the `_id` field, which is more +// commonly used by downstream sinks. A transformer could be used to do the +// same thing, but because transformers are not run for Delete messages, we +// must do it here. +func (r *Rethinkdb) prepareDocument(doc map[string]interface{}) map[string]interface{} { + doc["_id"] = doc["id"] + delete(doc, "id") + + return doc +} + +// Listen start's the adaptor's listener +func (r *Rethinkdb) Listen() (err error) { + r.recreateTable() return r.pipe.Listen(r.applyOp) } @@ -122,28 +302,12 @@ func (r *Rethinkdb) applyOp(msg *message.Msg) (*message.Msg, error) { return msg, nil } -func (r *Rethinkdb) setupClient() (*gorethink.Session, error) { - // set up the clientConfig, we need host:port, username, password, and database name - if r.debug { - fmt.Printf("Connecting to %s\n", r.uri.Host) - } - client, err := gorethink.Connect(gorethink.ConnectOpts{ - Address: r.uri.Host, - MaxIdle: 10, - Timeout: time.Second * 10, - }) - if err != nil { - return nil, fmt.Errorf("unable to connect: %s", err) - } - +func (r *Rethinkdb) recreateTable() { if r.debug { fmt.Printf("dropping and creating table '%s' on database '%s'\n", r.table, r.database) } - gorethink.Db(r.database).TableDrop(r.table).RunWrite(client) - gorethink.Db(r.database).TableCreate(r.table).RunWrite(client) - - client.Use(r.database) - return client, nil + gorethink.Db(r.database).TableDrop(r.table).RunWrite(r.client) + gorethink.Db(r.database).TableCreate(r.table).RunWrite(r.client) } // handleresponse takes the rethink response and turn it into something we can consume elsewhere diff --git a/test/application-rethink-to-es.js b/test/application-rethink-to-es.js new file mode 100644 index 000000000..84ed6d8cb --- /dev/null +++ b/test/application-rethink-to-es.js @@ -0,0 +1,2 @@ +Source({"name": "rethink1", "namespace": "test.transporter", "tail": true, "debug": true}). + save({"name": "locales", "namespace": "test.transporter"}) diff --git a/test/config.yaml b/test/config.yaml index 2a417b84a..03d17370d 100644 --- a/test/config.yaml +++ b/test/config.yaml @@ -12,6 +12,9 @@ nodes: es: type: elasticsearch uri: https://nick:darling@haproxy1.dblayer.com:10291/thisgetsignored + locales: + type: elasticsearch + uri: http://localhost:9200/thisgetsignored timeseries: type: influx uri: influxdb://root:root@localhost:8086/compose @@ -32,7 +35,7 @@ nodes: uri: stdout:// rethink1: type: rethinkdb - uri: rethink://127.0.0.2:28015/ + uri: rethink://localhost:28015/ loosefile: type: file logtransformer: