diff --git a/README.md b/README.md index 990f8af..178a2f0 100644 --- a/README.md +++ b/README.md @@ -1,137 +1,553 @@ -beanstalk [![TravisCI](https://travis-ci.org/prep/beanstalk.svg?branch=master)](https://travis-ci.org/prep/beanstalk.svg?branch=master) -[![Go Report Card](https://goreportcard.com/badge/github.com/prep/beanstalk)](https://goreportcard.com/report/github.com/prep/beanstalk) [![GoDoc](https://godoc.org/github.com/prep/beanstalk?status.svg)](https://godoc.org/github.com/prep/beanstalk) -========= -This repository contains a beanstalk package for Go that works with producers to insert jobs into a beanstalk tube, and consumers to reserve and delete jobs from a beanstalk tube. Pools with multiple producers and consumers can be created to balance the requests over multiple connections. -Each producer and consumer maintains its own connection to the beanstalk server and will disconnect and reconnect when it detects an unrecoverable error. Timeouts can be set on read and write operations to make sure that an interrupted connecting gets detected early and your application doesn't block as long as the connection timeout of the kernel. +# Package beanstalk +`import "github.com/prep/beanstalk"` -Examples --------- -To get started, you need to import the client into your Go project: +[Overview](#user-content-overview) +[Index](#user-content-index) + +## Overview +Package beanstalk implements a beanstalk client that includes various +abstractions to make producing and consuming jobs easier. + +Create a Conn if you want the most basic version of a beanstalk client: ```go -import "github.com/prep/beanstalk" -``` +conn, err := beanstalk.Dial("localhost:11300", beanstalk.Config{}) +if err != nil { + // handle error +} +defer conn.Close() -The easiest way to work with this client is by creating a *ProducerPool{}* and/or *ConsumerPool{}*. +id, err := conn.Put(ctx, "example_tube", []byte("Hello World"), beanstalk.PutParams{ + Priority: 1024, + Delay: 2 * time.Second, + TTR: 1 * time.Minute, +}) +if err != nil { + // handle error +} -#### ProducerPool -A ProducerPool creates 1 or more producers that connects to a beanstalk server with the purpose of feeding it *put* requests. Here is an example of a *ProducerPool{}* with a connection to the beanstalk server on *127.0.0.1:11300*: +if err = conn.Watch(ctx, "example_tube"); err != nil { + // handle error +} -```go -// Create a producer pool with 1 producer. -pool, err := beanstalk.NewProducerPool([]string{"beanstalk://127.0.0.1:11300"}, nil) +job, err := conn.ReserveWithTimeout(ctx, 3*time.Second) if err != nil { - log.Fatal("Unable to create beanstalk producer pool: %s", err) + // handle error } -defer pool.Stop() -// Reusable put parameters. -putParams := &beanstalk.PutParams{1024, 0, 5 * time.Second} +// process job + +if err = job.Delete(ctx); err != nil { + // handle error +} +``` -// Insert a job containing "Hello World" in the beanstalk tube named "test". -id, err := pool.Put("test", []byte("Hello World"), putParams) +In most cases it is easier to leverage ConsumerPool and ProducerPool to manage +one or more beanstalk client connections, as this provides some form of +load balacning and auto-reconnect mechanisms under the hood. + +The ProducerPool manages one or more client connections used specifically for +producing beanstalk jobs. If exports a Put method that load balances between the +available connections + +```go +pool, err := beanstalk.NewProducerPool([]string{"localhost:11300"}, beanstalk.Config{}) if err != nil { - return err + // handle error } +defer pool.Stop() -log.Printf("Created job with id: %d", id) +id, err := pool.Put(ctx, "example_tube", []byte("Hello World"), beanstalk.PutParams{ + Priority: 1024, + Delay: 2 * time.Second, + TTR: 1 * time.Minute, +} ``` -#### ConsumerPool -A ConsumerPool creates 1 or more consumers that connects to a beanstalk server with the purpose of reserving jobs. Here is an example of a *ConsumerPool{}* with a connection to the beanstalk server on *127.0.0.1:11300* that watches tube *test* for jobs to reserve. +A ConsumerPool manages one or more client connections used specifically for +consuming beanstalk jobs. If exports a channel on which Job types can be read. ```go -// Create a consumer pool with 1 consumer, watching 1 tube. -pool, err := beanstalk.NewConsumerPool([]string{"beanstalk://127.0.0.1:11300"}, []string{"test"}, nil) +pool, err := beanstalk.NewConsumerPool([]string{"localhost:11300"}, []string{"example_tube"}, beanstalk.Config{}) if err != nil { - log.Fatal("Unable to create beanstalk consumer pool: %s", err) + // handle error } defer pool.Stop() pool.Play() +for job := range pool.C { + // process job + + if err = job.Delete(ctx); err != nil { + // handle error + } +} +``` + +Alternatively, instead of leveraging the exported channel it is possible to +provide a handler function that is called for every reserved beanstalk job by +calling the Receive method on ConsumerPool. + +```go +pool.Play() +pool.Receive(ctx, func(ctx context.Context, job *beanstalk.Job) { + // process job + + if err = job.Delete(ctx); err != nil { + // handle error + } +}) +``` + +In the above examples the beanstalk server was referenced by way of the +host:port notation. This package also supports URI formats like beanstalk:// for +a plaintext connection, and beanstalks:// or tls:// for encrypted connections. + +## Index +[Variables](#user-content-variables) +[func ParseURI(uri string) (string, bool, error)](#user-content-func-ParseURI) +[type Config](#user-content-type-Config) + +[type Conn](#user-content-type-Conn) +  [func Dial(uri string, config Config) (*Conn, error)](#user-content-func-Conn-Dial) +  [func (conn *Conn) Close() error](#user-content-method-Conn-Close) +  [func (conn *Conn) Ignore(ctx context.Context, tube string) error](#user-content-method-Conn-Ignore) +  [func (conn *Conn) Put(ctx context.Context, tube string, body []byte, params PutParams) (uint64, error)](#user-content-method-Conn-Put) +  [func (conn *Conn) ReserveWithTimeout(ctx context.Context, timeout time.Duration) (*Job, error)](#user-content-method-Conn-ReserveWithTimeout) +  [func (conn *Conn) String() string](#user-content-method-Conn-String) +  [func (conn *Conn) Watch(ctx context.Context, tube string) error](#user-content-method-Conn-Watch) + +[type Consumer](#user-content-type-Consumer) +  [func NewConsumer(uri string, tubes []string, config Config) (*Consumer, error)](#user-content-func-Consumer-NewConsumer) +  [func (consumer *Consumer) Close()](#user-content-method-Consumer-Close) +  [func (consumer *Consumer) Pause()](#user-content-method-Consumer-Pause) +  [func (consumer *Consumer) Play()](#user-content-method-Consumer-Play) +  [func (consumer *Consumer) Receive(ctx context.Context, fn func(ctx context.Context, job *Job))](#user-content-method-Consumer-Receive) + +[type ConsumerPool](#user-content-type-ConsumerPool) +  [func NewConsumerPool(uris []string, tubes []string, config Config) (*ConsumerPool, error)](#user-content-func-ConsumerPool-NewConsumerPool) +  [func (pool *ConsumerPool) Pause()](#user-content-method-ConsumerPool-Pause) +  [func (pool *ConsumerPool) Play()](#user-content-method-ConsumerPool-Play) +  [func (pool *ConsumerPool) Receive(ctx context.Context, fn func(ctx context.Context, job *Job))](#user-content-method-ConsumerPool-Receive) +  [func (pool *ConsumerPool) Stop()](#user-content-method-ConsumerPool-Stop) + +[type Job](#user-content-type-Job) +  [func (job *Job) Bury(ctx context.Context) error](#user-content-method-Job-Bury) +  [func (job *Job) BuryWithPriority(ctx context.Context, priority uint32) error](#user-content-method-Job-BuryWithPriority) +  [func (job *Job) Delete(ctx context.Context) error](#user-content-method-Job-Delete) +  [func (job *Job) Release(ctx context.Context) error](#user-content-method-Job-Release) +  [func (job *Job) ReleaseWithParams(ctx context.Context, priority uint32, delay time.Duration) error](#user-content-method-Job-ReleaseWithParams) +  [func (job *Job) Touch(ctx context.Context) error](#user-content-method-Job-Touch) +  [func (job *Job) TouchAfter() time.Duration](#user-content-method-Job-TouchAfter) + +[type Producer](#user-content-type-Producer) +  [func NewProducer(uri string, config Config) (*Producer, error)](#user-content-func-Producer-NewProducer) +  [func (producer *Producer) Close()](#user-content-method-Producer-Close) +  [func (producer *Producer) Put(ctx context.Context, tube string, body []byte, params PutParams) (uint64, error)](#user-content-method-Producer-Put) + +[type ProducerPool](#user-content-type-ProducerPool) +  [func NewProducerPool(uris []string, config Config) (*ProducerPool, error)](#user-content-func-ProducerPool-NewProducerPool) +  [func (pool *ProducerPool) Put(ctx context.Context, tube string, body []byte, params PutParams) (uint64, error)](#user-content-method-ProducerPool-Put) +  [func (pool *ProducerPool) Stop()](#user-content-method-ProducerPool-Stop) + +[type PutParams](#user-content-type-PutParams) + +## Package files +[beanstalk.go](beanstalk.go) +[config.go](config.go) +[conn.go](conn.go) +[consumer.go](consumer.go) +[consumer_pool.go](consumer_pool.go) +[doc.go](doc.go) +[job.go](job.go) +[producer.go](producer.go) +[producer_pool.go](producer_pool.go) + +## Variables +These error may be returned by any of Conn's methods. + +```go +var ( + ErrBuried = errors.New("job was buried") + ErrDeadlineSoon = errors.New("deadline soon") + ErrDisconnected = errors.New("client disconnected") + ErrNotFound = errors.New("job not found") + ErrTimedOut = errors.New("reserve timed out") + ErrNotIgnored = errors.New("tube not ignored") + ErrTubeTooLong = errors.New("tube name too long") + ErrUnexpected = errors.New("unexpected response received") +) +``` + +ErrJobFinished is returned when a job was already finished. + +```go +var ErrJobFinished = errors.New("job was already finished") +``` + +### func ParseURI + +```go +func ParseURI(uri string) (string, bool, error) +``` +ParseURI returns the socket of the specified URI and if the connection is +supposed to be a TLS or plaintext connection. Valid URI schemes are: + +```go +beanstalk://host:port +beanstalks://host:port +tls://host:port +``` + +Where both the beanstalks and tls scheme mean the same thing. Alternatively, +it is also possibly to just specify the host:port combo which is assumed to +be a plaintext connection. + +### type Config +A Config structure is used to configure a Consumer, Producer, one of its +pools or Conn. + +```go +type Config struct { + // NumGoroutines is the number of goroutines that the Receive() method will + // spin up. + // The default is to spin up 1 goroutine. + NumGoroutines int + // ReserveTimeout is the time a consumer should wait before reserving a job, + // when the last attempt didn't yield a job. + // The default is to wait 5 seconds. + ReserveTimeout time.Duration + // ReleaseTimeout is the time a consumer should hold a reserved job before + // it is released back. + // The default is to wait 3 seconds. + ReleaseTimeout time.Duration + // ReconnectTimeout is the timeout between reconnects. + // The default is to wait 10 seconds. + ReconnectTimeout time.Duration + // TLSConfig describes the configuration that is used when Dial() makes a + // TLS connection. + TLSConfig *tls.Config + // InfoLog is used to log informational messages. + InfoLog *log.Logger + // ErrorLog is used to log error messages. + ErrorLog *log.Logger + // contains filtered or unexported fields +} +``` + + +### type Conn +Conn describes a connection to a beanstalk server. + +```go +type Conn struct { + URI string + // contains filtered or unexported fields +} +``` + +#### func Dial + +```go +func Dial(uri string, config Config) (*Conn, error) +``` +Dial into a beanstalk server. + +#### func (*Conn) Close + +```go +func (conn *Conn) Close() error +``` +Close this connection. + +#### func (*Conn) Ignore + +```go +func (conn *Conn) Ignore(ctx context.Context, tube string) error +``` +Ignore the specified tube. + +#### func (*Conn) Put + +```go +func (conn *Conn) Put(ctx context.Context, tube string, body []byte, params PutParams) (uint64, error) +``` +Put a job in the specified tube. + +#### func (*Conn) ReserveWithTimeout + +```go +func (conn *Conn) ReserveWithTimeout(ctx context.Context, timeout time.Duration) (*Job, error) +``` +ReserveWithTimeout tries to reserve a job and block for up to a maximum of +timeout. If no job could be reserved, this function will return without a +job or error. + +#### func (*Conn) String + +```go +func (conn *Conn) String() string +``` + +#### func (*Conn) Watch + +```go +func (conn *Conn) Watch(ctx context.Context, tube string) error +``` +Watch the specified tube. + + +### type Consumer +Consumer maintains a connnection to a beanstalk server and offers up jobs +on its exposed jobs channel. When it gets disconnected, it automatically +tries to reconnect. + +```go +type Consumer struct { + // C offers up reserved jobs. + C <-chan *Job + // contains filtered or unexported fields +} +``` + +#### func NewConsumer + +```go +func NewConsumer(uri string, tubes []string, config Config) (*Consumer, error) +``` +NewConsumer connects to the beanstalk server that's referenced in URI and +returns a Consumer. + +#### func (*Consumer) Close + +```go +func (consumer *Consumer) Close() +``` +Close this consumer's connection. + +#### func (*Consumer) Pause + +```go +func (consumer *Consumer) Pause() +``` +Pause this consumer. -for { - select { - case job := <-pool.C: - log.Printf("Received job with id: %d", job.ID) +#### func (*Consumer) Play - if err := doSomethingWithJob(job); err != nil { - fmt.Printf("Burying job %d with body: %s\n", job.ID, string(job.Body)) - job.Bury() - } else { - job.Delete() - } - // ... - } +```go +func (consumer *Consumer) Play() +``` +Play unpauses this customer. + +#### func (*Consumer) Receive + +```go +func (consumer *Consumer) Receive(ctx context.Context, fn func(ctx context.Context, job *Job)) +``` +Receive calls fn for each job it can reserve on this consumer. + + +### type ConsumerPool +ConsumerPool manages a pool of consumers that share a single channel on +which jobs are offered. + +```go +type ConsumerPool struct { + // C offers up reserved jobs. + C <-chan *Job + // contains filtered or unexported fields } ``` -Managing consumers ------------------- -By default, *Consumer{}* and *ConsumerPool{}* objects start out in a paused state, which means that even though they will try to establish a connection to the beanstalk server immediately, they will not reserve any jobs until the *Play()* function has been called. If you want to stop the stream of reserved jobs for a moment, you can call the *Pause()* function. It should be noted that calling *Pause()* won't affect jobs that are currently reserved. +#### func NewConsumerPool -Jobs ----- -Jobs are offered on your consumer channel and it looks like this: +```go +func NewConsumerPool(uris []string, tubes []string, config Config) (*ConsumerPool, error) +``` +NewConsumerPool creates a pool of Consumers from the list of URIs that has +been provided. + +#### func (*ConsumerPool) Pause + +```go +func (pool *ConsumerPool) Pause() +``` +Pause all the consumers in this pool. + +#### func (*ConsumerPool) Play + +```go +func (pool *ConsumerPool) Play() +``` +Play unpauses all the consumers in this pool. + +#### func (*ConsumerPool) Receive + +```go +func (pool *ConsumerPool) Receive(ctx context.Context, fn func(ctx context.Context, job *Job)) +``` +Receive calls fn in for each job it can reserve on the consumers in this pool. + +#### func (*ConsumerPool) Stop + +```go +func (pool *ConsumerPool) Stop() +``` +Stop all the consumers in this pool. + + +### type Job +Job describes a beanstalk job and its stats. ```go type Job struct { - ID uint64 - Body []byte - Priority uint32 - TTR time.Duration + ID uint64 + Body []byte + ReservedAt time.Time + Stats struct { + PutParams `yaml:",inline"` + Tube string `yaml:"tube"` + State string `yaml:"state"` + Age time.Duration `yaml:"age"` + TimeLeft time.Duration `yaml:"time-left"` + File int `yaml:"file"` + Reserves int `yaml:"reserves"` + Timeouts int `yaml:"timeouts"` + Releases int `yaml:"releases"` + Buries int `yaml:"buries"` + Kicks int `yaml:"kicks"` + } + // contains filtered or unexported fields } ``` -When you receive a *Job{}* on your consumer channel, it is your responsibility to honor the TTR of that job. To do that, you call the *TouchAt()* function to get the remaining TTR of the current job, which has a margin built in for safety. You can use the *Touch()* function to refresh the TTR of that job. +#### func (*Job) Bury ```go -Touch() error -TouchAt() time.Duration +func (job *Job) Bury(ctx context.Context) error ``` +Bury this job. + +#### func (*Job) BuryWithPriority -To finalize a job, the following functions are available on the *Job{}* object: ```go -Bury() error -BuryWithPriority(priority uint32) error -Delete() error -Release() error -ReleaseWithParams(priority uint32, delay time.Duration) error +func (job *Job) BuryWithPriority(ctx context.Context, priority uint32) error ``` +BuryWithPriority buries this job with the specified priority. + +#### func (*Job) Delete -The *Bury()* and *Release()* functions use the priority with which the job was inserted in the first place and *Release()* uses a delay of 0, meaning immediately. +```go +func (job *Job) Delete(ctx context.Context) error +``` +Delete this job. -Options -------- -An **Options** struct can be provided at the end of each *NewProducer()*, *NewProducerPool()*, *NewConsumer()* and *NewConsumerPool()* function. It allows you to fine-tune some behaviour under the hood. +#### func (*Job) Release ```go -options := &beanstalk.Options{ - // ReserveTimeout defines how long a beanstalk reserve command should wait - // before it should timeout. The default and minimum value is 1 second. - ReserveTimeout: 3 * time.Second, - // ReconnectTimeout defines how long a producer or consumer should wait - // between reconnect attempts. The default is 3 seconds, with a minimum of 1 - // second. - ReconnectTimeout: 3 * time.Second, - // ReadWriteTimeout defines how long each read or write operation is allowed - // to block until the connection is considered broken. The default is - // disabled and the minimum value is 1ms. - ReadWriteTimeout: 5 * time.Second, +func (job *Job) Release(ctx context.Context) error +``` +Release this job back with its original priority and without delay. + +#### func (*Job) ReleaseWithParams - // InfoLog is used to log info messages to, but can be nil. - InfoLog: log.New(os.Stdout, "INFO: ", 0), - // ErrorLog is used to log error messages to, but can be nil. - ErrorLog: log.New(os.Stderr, "ERROR: ", 0), +```go +func (job *Job) ReleaseWithParams(ctx context.Context, priority uint32, delay time.Duration) error +``` +ReleaseWithParams releases this job back with the specified priority and delay. + +#### func (*Job) Touch + +```go +func (job *Job) Touch(ctx context.Context) error +``` +Touch the job thereby resetting its reserved status. + +#### func (*Job) TouchAfter + +```go +func (job *Job) TouchAfter() time.Duration +``` +TouchAfter returns the duration until this jobs needs to be touched for its +reservation to be retained. + + +### type Producer +Producer manages a connection for the purpose of inserting jobs. + +```go +type Producer struct { + // contains filtered or unexported fields +} +``` + +#### func NewProducer + +```go +func NewProducer(uri string, config Config) (*Producer, error) +``` +NewProducer creates a connection to a beanstalk server, but will return an +error if the connection fails. Once established, the connection will be +maintained in the background. + +#### func (*Producer) Close + +```go +func (producer *Producer) Close() +``` +Close this consumer's connection. + +#### func (*Producer) Put + +```go +func (producer *Producer) Put(ctx context.Context, tube string, body []byte, params PutParams) (uint64, error) +``` +Put inserts a job into beanstalk. + + +### type ProducerPool +ProducerPool manages a connection pool of Producers and provides a simple +interface for balancing Put requests over the pool of connections. + +```go +type ProducerPool struct { + // contains filtered or unexported fields } +``` + +#### func NewProducerPool + +```go +func NewProducerPool(uris []string, config Config) (*ProducerPool, error) +``` +NewProducerPool creates a pool of Producers from the list of URIs that has +been provided. + +#### func (*ProducerPool) Put + +```go +func (pool *ProducerPool) Put(ctx context.Context, tube string, body []byte, params PutParams) (uint64, error) +``` +Put a job into the specified tube. + +#### func (*ProducerPool) Stop -producerPool, _ := beanstalk.NewProducerPool([]string{"beanstalk://127.0.0.1:11300"}, options) -consumerPool, _ := beanstalk.NewConsumerPool([]string{"beanstalk://127.0.0.1:11300"}, []string{"test"}, options) +```go +func (pool *ProducerPool) Stop() ``` +Stop all the producers in this pool. + + +### type PutParams +PutParams are the parameters used to perform a Put operation. -License -------- -This software is created for MessageBird B.V. and distributed under the BSD-style license found in the LICENSE file. +```go +type PutParams struct { + Priority uint32 `yaml:"pri"` + Delay time.Duration `yaml:"delay"` + TTR time.Duration `yaml:"ttr"` +} +``` diff --git a/beanstalk.go b/beanstalk.go new file mode 100644 index 0000000..3942fdb --- /dev/null +++ b/beanstalk.go @@ -0,0 +1,142 @@ +package beanstalk + +import ( + "context" + "fmt" + "net" + "net/url" + "strings" + "time" +) + +// ParseURI returns the socket of the specified URI and if the connection is +// supposed to be a TLS or plaintext connection. Valid URI schemes are: +// +// beanstalk://host:port +// beanstalks://host:port +// tls://host:port +// +// Where both the beanstalks and tls scheme mean the same thing. Alternatively, +// it is also possibly to just specify the host:port combo which is assumed to +// be a plaintext connection. +func ParseURI(uri string) (string, bool, error) { + var host string + var isTLS bool + + if strings.Contains(uri, "://") { + url, err := url.Parse(uri) + if err != nil { + return "", false, err + } + + // Determine the protocol scheme of the URI. + switch strings.ToLower(url.Scheme) { + case "beanstalk": + case "beanstalks", "tls": + isTLS = true + default: + return "", false, fmt.Errorf("%s: unknown beanstalk URI scheme", url.Scheme) + } + + host = url.Host + } else { + host = uri + } + + // Validate the resulting host:port combo. + _, _, err := net.SplitHostPort(host) + switch { + case err != nil && strings.Contains(err.Error(), "missing port in address"): + if isTLS { + host += ":11400" + } else { + host += ":11300" + } + case err != nil: + return "", false, err + } + + return host, isTLS, nil +} + +func includes(a []string, s string) bool { + for _, e := range a { + if e == s { + return true + } + } + + return false +} + +func contextTimeoutFunc(d time.Duration, fn func(ctx context.Context) error) error { + ctx, cancel := context.WithTimeout(context.Background(), d) + defer cancel() + + return fn(ctx) +} + +type ioHandler interface { + setupConnection(conn *Conn, config Config) error + handleIO(conn *Conn, config Config) error +} + +// keepConnected is responsible for keeping a connection to a URI up. +func keepConnected(handler ioHandler, conn *Conn, config Config, close chan struct{}) { + URI := conn.URI + + go func() { + var err error + for { + // Reconnect to the beanstalk server if no connection is active. + for conn == nil { + if conn, err = Dial(URI, config); err != nil { + config.ErrorLog.Printf("Unable to connect to beanstalk server %s: %s", URI, err) + + select { + // Wait a bit and try again. + case <-time.After(config.ReconnectTimeout): + continue + case <-close: + return + } + } + } + + config.InfoLog.Printf("Connected to beanstalk server %s", conn) + + // Set up the connection. If not successful, close the connection, wait + // a bit and reconnect. + err := handler.setupConnection(conn, config) + if err != nil { + config.InfoLog.Printf("Unable to set up the beanstalk connection: %s", err) + _ = conn.Close() + conn = nil + + select { + case <-time.After(config.ReconnectTimeout): + case <-close: + return + } + + continue + } + + // call the IO handler for as long as it wants it, or the connection is up. + if err = handler.handleIO(conn, config); err != nil && err != ErrDisconnected { + config.ErrorLog.Printf("Disconnected from beanstalk server %s: %s", conn, err) + } else { + config.InfoLog.Printf("Disconnected from beanstalk server %s", conn) + } + + _ = conn.Close() + conn = nil + + select { + case <-close: + return + default: + } + } + }() +} diff --git a/beanstalk_test.go b/beanstalk_test.go new file mode 100644 index 0000000..eb0455b --- /dev/null +++ b/beanstalk_test.go @@ -0,0 +1,70 @@ +package beanstalk + +import "testing" + +func TestParseURI(t *testing.T) { + t.Run("WithValidSchemes", func(t *testing.T) { + for _, scheme := range []string{"beanstalk", "beanstalks", "tls"} { + uri := scheme + "://localhost:12345" + + host, useTLS, err := ParseURI(uri) + switch { + case err != nil: + t.Errorf("Unable to parse URI: %s", uri) + case host != "localhost:12345": + t.Errorf("Unexpected host: %s", host) + } + + switch scheme { + case "beanstalk": + if useTLS { + t.Errorf("%s: scheme shouldn't support TLS", scheme) + } + case "beanstalks", "tls": + if !useTLS { + t.Errorf("%s: scheme should support TLS", scheme) + } + default: + t.Fatalf("%s: unknown scheme", scheme) + } + } + }) + + t.Run("WithMissingScheme", func(t *testing.T) { + host, useTLS, err := ParseURI("localhost:11300") + switch { + case err != nil: + t.Fatalf("Error parsing URI without scheme: %s", err) + case host != "localhost:11300": + t.Errorf("Unexpected host: %s", host) + case useTLS: + t.Error("Unexpected TLS to be set") + } + }) + + t.Run("WithMissingPort", func(t *testing.T) { + host, _, err := ParseURI("beanstalk://localhost") + switch { + case err != nil: + t.Fatalf("Error parsing URI without port") + case host != "localhost:11300": + t.Errorf("%s: Expected port 11300 to be added to the socket", host) + } + }) + + t.Run("WithMissingTLSPort", func(t *testing.T) { + host, _, err := ParseURI("beanstalks://localhost") + switch { + case err != nil: + t.Fatalf("Error parsing URI without port") + case host != "localhost:11400": + t.Errorf("%s: Expected port 11400 to be added to the socket", host) + } + }) + + t.Run("WithInvalidScheme", func(t *testing.T) { + if _, _, err := ParseURI("foo://localhost:12345"); err == nil { + t.Fatal("Expected an error, but got nothing") + } + }) +} diff --git a/client.go b/client.go deleted file mode 100644 index d52c7bf..0000000 --- a/client.go +++ /dev/null @@ -1,309 +0,0 @@ -package beanstalk - -import ( - "errors" - "io" - "net" - "net/textproto" - "strconv" - "strings" - "time" -) - -// Errors that can be returned by the beanstalk client functions. -var ( - ErrBuried = errors.New("job is buried") - ErrConnectionClosed = errors.New("remote end closed connection") - ErrDeadlineSoon = errors.New("deadline soon") - ErrDraining = errors.New("server in draining mode") - ErrExpectedCRLF = errors.New("expected CRLF after job body") - ErrJobTooBig = errors.New("job body too big") - ErrNotConnected = errors.New("not connected") - ErrNotFound = errors.New("job not found") - ErrNotIgnored = errors.New("tube cannot be ignored") - ErrOutOfMemory = errors.New("server is out of memory") - ErrUnexpectedResp = errors.New("unexpected response from server") -) - -// Client implements a simple beanstalk API. -type Client struct { - options *Options - conn net.Conn - textConn *textproto.Conn - isConnected bool -} - -// NewClient returns a new beanstalk Client object. -func NewClient(conn net.Conn, options *Options) *Client { - if options == nil { - options = DefaultOptions() - } - - return &Client{ - options: options, - conn: conn, - textConn: textproto.NewConn(conn), - isConnected: true} -} - -// Close the connection to the beanstalk server. -func (client *Client) Close() { - if client.textConn == nil { - return - } - - client.options.LogInfo("Closing connection to beanstalk server %s (local=%s)", client.conn.RemoteAddr().String(), client.conn.LocalAddr().String()) - _ = client.textConn.Close() - client.textConn = nil - client.conn = nil -} - -// Bury a reserved job. This is done after being unable to process the job and -// it is likely that other consumers won't either. -func (client *Client) Bury(job *Job, priority uint32) error { - _, _, err := client.requestResponse("bury %d %d", job.ID, priority) - if err == ErrBuried { - return nil - } - - return err -} - -// Delete a reserved job. This is done after successful processing. -func (client *Client) Delete(job *Job) error { - _, _, err := client.requestResponse("delete %d", job.ID) - return err -} - -// Ignore removes an active tube from the watch list. -func (client *Client) Ignore(tube string) error { - _, _, err := client.requestResponse("ignore %s", tube) - return err -} - -// Put a new job into beanstalk. -func (client *Client) Put(putRequest *PutRequest) (uint64, error) { - id, _, err := client.requestResponse("put %d %d %d %d\r\n%s", - putRequest.Params.Priority, - putRequest.Params.Delay/time.Second, - putRequest.Params.TTR/time.Second, - len(putRequest.Body), - putRequest.Body) - - return id, err -} - -// Release a reserved job. This is done after being unable to process the job, -// but another consumer might be successful. -func (client *Client) Release(job *Job, priority uint32, delay time.Duration) error { - _, _, err := client.requestResponse("release %d %d %d", job.ID, priority, delay/time.Second) - return err -} - -// Reserve retrieves a new job. -func (client *Client) Reserve(timeout time.Duration) (*Job, error) { - err := client.request("reserve-with-timeout %d", timeout/time.Second) - if err != nil { - return nil, err - } - - // Set a read deadline that is slightly longer than the reserve timeout. - if timeout != 0 { - if err = client.conn.SetReadDeadline(time.Now().Add(timeout + time.Second)); err != nil { - return nil, err - } - - defer client.conn.SetReadDeadline(time.Time{}) - } - - job := &Job{TTR: time.Second} - job.ID, job.Body, err = client.response() - if err != nil { - return nil, err - } - if job.ID == 0 { - return nil, nil - } - - // Fetch the TTR value for this job via stats-job. If this fails, ignore it. - if _, yaml, err := client.requestResponse("stats-job %d", job.ID); err == nil { - if val, err := yamlValue(yaml, "pri"); err == nil { - if prio, err := strconv.ParseUint(val, 10, 32); err == nil { - job.Priority = uint32(prio) - } - } - - if val, err := yamlValue(yaml, "ttr"); err == nil { - if ttr, err := strconv.Atoi(val); err == nil { - job.TTR = time.Duration(ttr) * time.Second - } - } - } - - job.touched() - return job, nil -} - -// Touch a job to extend the TTR of the reserved job. -func (client *Client) Touch(job *Job) error { - if _, _, err := client.requestResponse("touch %d", job.ID); err != nil { - return err - } - - job.touched() - return nil -} - -// Use the specified tube for the upcoming put requests. -func (client *Client) Use(tube string) error { - _, _, err := client.requestResponse("use %s", tube) - return err -} - -// Watch adds a tube to the watch list. -func (client *Client) Watch(tube string) error { - _, _, err := client.requestResponse("watch %s", tube) - return err -} - -// request sends a request to the beanstalk server. -func (client *Client) request(format string, args ...interface{}) error { - if client.options.ReadWriteTimeout != 0 { - if err := client.conn.SetWriteDeadline(time.Now().Add(client.options.ReadWriteTimeout)); err != nil { - return err - } - - defer client.conn.SetWriteDeadline(time.Time{}) - } - - if err := client.textConn.PrintfLine(format, args...); err != nil { - if err == io.EOF { - return ErrConnectionClosed - } - return err - } - - return nil -} - -// response reads and parses a response from the beanstalk server. -func (client *Client) response() (uint64, []byte, error) { - line, err := client.textConn.ReadLine() - if err != nil { - if err == io.EOF { - return 0, nil, ErrConnectionClosed - } - return 0, nil, err - } - - items := strings.SplitN(line, " ", 2) - if len(items[0]) == 0 { - return 0, nil, ErrUnexpectedResp - } - - var response, rest = items[0], "" - if len(items) == 2 { - rest = items[1] - } - - switch response { - // Simple successful responses. - case "DELETED", "RELEASED", "TIMED_OUT", "TOUCHED", "USING", "WATCHING": - return 0, nil, nil - - // BURIED can either be a successful response to a _bury_ command or an - // unsuccessful response to the _put_ and _release_ commands. - case "BURIED": - if rest != "" { - // The response to the _put_ command provides an id of the job. - if id, err := strconv.ParseUint(rest, 10, 64); err == nil { - return id, nil, ErrBuried - } - } - - return 0, nil, ErrBuried - - // Deadline soon means a reserved job is about to expire. - case "DEADLINE_SOON": - return 0, nil, ErrDeadlineSoon - - // INSERTED is a successful response to a _put_ command. - case "INSERTED": - if id, err := strconv.ParseUint(rest, 10, 64); err == nil { - return id, nil, nil - } - - // OK is a successful response to a request that responds with YAML data. - case "OK": - if size, err := strconv.Atoi(rest); err == nil { - body := make([]byte, size+2) - if _, err := io.ReadFull(client.textConn.R, body); err != nil { - break - } - - return 0, body[:size], nil - } - - // A RESERVED response is a successful response to a _reserve_ command. - case "RESERVED": - resInfo := strings.SplitN(rest, " ", 2) - if len(resInfo) != 2 { - break - } - - id, err := strconv.ParseUint(resInfo[0], 10, 64) - if err != nil { - break - } - size, err := strconv.Atoi(resInfo[1]) - if err != nil { - break - } - - body := make([]byte, size+2) - if _, err := io.ReadFull(client.textConn.R, body); err != nil { - break - } - - return id, body[:size], nil - - // NOT_FOUND is a response to an unsuccessful _bury_, _delete_, _touch_ or - // _release_ command. - case "NOT_FOUND": - return 0, nil, ErrNotFound - - // NOT_IGNORED is a response to an unsuccessful _ignore_ command. - case "NOT_IGNORED": - return 0, nil, ErrNotIgnored - - // The following responses can occur after an unsuccessful _put_ command. - case "DRAINING": - return 0, nil, ErrDraining - case "EXPECTED_CRLF": - return 0, nil, ErrExpectedCRLF - case "JOB_TOO_BIG": - return 0, nil, ErrJobTooBig - case "OUT_OF_MEMORY": - return 0, nil, ErrOutOfMemory - } - - return 0, nil, ErrUnexpectedResp -} - -// requestResponse sends a request to the beanstalk server and then parses -// and returns its response. -func (client *Client) requestResponse(format string, args ...interface{}) (uint64, []byte, error) { - if err := client.request(format, args...); err != nil { - return 0, nil, err - } - - if client.options.ReadWriteTimeout != 0 { - if err := client.conn.SetReadDeadline(time.Now().Add(client.options.ReadWriteTimeout)); err != nil { - return 0, nil, err - } - - defer client.conn.SetReadDeadline(time.Time{}) - } - - return client.response() -} diff --git a/client_test.go b/client_test.go deleted file mode 100644 index 1b3cbd3..0000000 --- a/client_test.go +++ /dev/null @@ -1,246 +0,0 @@ -package beanstalk - -import ( - "net" - "net/textproto" - "os" - "syscall" - "testing" - "time" -) - -func socketpair() (net.Conn, net.Conn, error) { - fds, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0) - if err != nil { - return nil, nil, err - } - - fd1 := os.NewFile(uintptr(fds[0]), "fd1") - defer fd1.Close() - - fd2 := os.NewFile(uintptr(fds[1]), "fd2") - defer fd2.Close() - - sock1, err := net.FileConn(fd1) - if err != nil { - return nil, nil, err - } - - sock2, err := net.FileConn(fd2) - if err != nil { - sock1.Close() - return nil, nil, err - } - - return sock1, sock2, nil -} - -type TestClient struct { - *Client - ServerConn *textproto.Conn -} - -func NewTestClient(t *testing.T, req, resp string) *TestClient { - s1, s2, err := socketpair() - if err != nil { - panic("Unable to create socket pair: " + err.Error()) - } - - client := &TestClient{Client: NewClient(s1, DefaultOptions()), ServerConn: textproto.NewConn(s2)} - go client.ServerResponse(t, req, resp) - - return client -} - -func (tc *TestClient) ServerResponse(t *testing.T, req, resp string) { - defer tc.ServerConn.Close() - - line, err := tc.ServerConn.ReadLine() - if err != nil { - t.Fatalf("Unable to read line from client: %s", err) - } - - if line != req { - t.Fatalf("Expected client request to be '%s', but got '%s'", req, line) - } - - if err := tc.ServerConn.PrintfLine(resp); err != nil { - t.Fatalf("Unable to send response to client: %s", err) - } -} - -// **************************************************************************** - -func TestBury(t *testing.T) { - client := NewTestClient(t, "bury 12345 9876", "BURIED") - defer client.Close() - - if err := client.Bury(&Job{ID: 12345}, 9876); err != nil { - t.Fatalf("Unexpected error from Bury: %s", err) - } -} - -func TestBuryNotFound(t *testing.T) { - client := NewTestClient(t, "bury 12345 9876", "NOT_FOUND") - defer client.Close() - - if err := client.Bury(&Job{ID: 12345}, 9876); err != ErrNotFound { - t.Fatalf("Expected ErrNotFound, but got: %s", err) - } -} - -func TestDelete(t *testing.T) { - client := NewTestClient(t, "delete 12345", "DELETED") - defer client.Close() - - if err := client.Delete(&Job{ID: 12345}); err != nil { - t.Fatalf("Unexpected error from Delete: %s", err) - } -} - -func TestDeleteNotFound(t *testing.T) { - client := NewTestClient(t, "delete 12345", "NOT_FOUND") - defer client.Close() - - if err := client.Delete(&Job{ID: 12345}); err != ErrNotFound { - t.Fatalf("Expected ErrNotFound, but got: %s", err) - } -} - -func TestIgnore(t *testing.T) { - client := NewTestClient(t, "ignore default", "WATCHING 1") - defer client.Close() - - if err := client.Ignore("default"); err != nil { - t.Fatalf("Unexpected error from Ignore: %s", err) - } -} - -func TestIgnoreNotIgnored(t *testing.T) { - client := NewTestClient(t, "ignore default", "NOT_IGNORED") - defer client.Close() - - if err := client.Ignore("default"); err != ErrNotIgnored { - t.Fatalf("Expected ErrNotIgnored, but got: %s", err) - } -} - -func TestPut(t *testing.T) { - client := NewTestClient(t, "put 1024 0 10 11", "INSERTED 12345") - defer client.Close() - - job := &PutRequest{Body: []byte("Hello World"), Tube: "test", Params: &PutParams{1024, 0, 10 * time.Second}} - - id, err := client.Put(job) - if err != nil { - t.Fatalf("Unexpected error from Put: %s", err) - } - if id != 12345 { - t.Fatalf("Unexpected ID from Put. Expected 12345, but got %d", id) - } -} - -func TestPutBuried(t *testing.T) { - client := NewTestClient(t, "put 1024 0 10 11", "BURIED 12345") - defer client.Close() - - job := &PutRequest{Body: []byte("Hello World"), Tube: "test", Params: &PutParams{1024, 0, 10 * time.Second}} - - id, err := client.Put(job) - if err != ErrBuried { - t.Fatalf("Expected ErrBuried, but got: %s", err) - } - if id != 12345 { - t.Fatalf("Unexpected ID from Put. Expected 12345, but got %d", id) - } -} - -func TestRelease(t *testing.T) { - client := NewTestClient(t, "release 12345 1024 10", "RELEASED") - defer client.Close() - - if err := client.Release(&Job{ID: 12345}, 1024, 10*time.Second); err != nil { - t.Fatalf("Unexpected error from Release: %s", err) - } -} - -func TestReleaseBuried(t *testing.T) { - client := NewTestClient(t, "release 12345 1024 10", "BURIED") - defer client.Close() - - if err := client.Release(&Job{ID: 12345}, 1024, 10*time.Second); err != ErrBuried { - t.Fatalf("Expected ErrBuried, but got: %s", err) - } -} - -func TestReleaseNotFound(t *testing.T) { - client := NewTestClient(t, "release 12345 1024 10", "NOT_FOUND") - defer client.Close() - - if err := client.Release(&Job{ID: 12345}, 1024, 10*time.Second); err != ErrNotFound { - t.Fatalf("Expected ErrNotFound, but got: %s", err) - } -} - -func TestReserve(t *testing.T) { - client := NewTestClient(t, "reserve-with-timeout 1", "RESERVED 1234 11\r\nHello World") - defer client.Close() - - job, err := client.Reserve(time.Second) - if err != nil { - t.Fatalf("Unexpected error from Reserve: %s", err) - } - if job == nil { - t.Fatal("Expected a job from Reserve, but got nothing") - } -} - -func TestReserveTimedOut(t *testing.T) { - client := NewTestClient(t, "reserve-with-timeout 2", "TIMED_OUT") - defer client.Close() - - job, err := client.Reserve(2 * time.Second) - if err != nil { - t.Fatalf("Unexpected error from Reserve: %s", err) - } - if job != nil { - t.Fatalf("Expected no job from Reserve, but got job: %v", job) - } -} - -func TestTouch(t *testing.T) { - client := NewTestClient(t, "touch 12345", "TOUCHED") - defer client.Close() - - if err := client.Touch(&Job{ID: 12345}); err != nil { - t.Fatalf("Unexpected error from Touch: %s", err) - } - -} - -func TestTouchNotFound(t *testing.T) { - client := NewTestClient(t, "touch 12345", "NOT_FOUND") - defer client.Close() - - if err := client.Touch(&Job{ID: 12345}); err != ErrNotFound { - t.Fatalf("Expected ErrNotFound, but got: %s", err) - } -} - -func TestUse(t *testing.T) { - client := NewTestClient(t, "use test", "USING test") - defer client.Close() - - if err := client.Use("test"); err != nil { - t.Fatalf("Unexpected error from Use: %s", err) - } -} - -func TestWatch(t *testing.T) { - client := NewTestClient(t, "watch test", "WATCHING test") - defer client.Close() - - if err := client.Watch("test"); err != nil { - t.Fatalf("Unexpected error from Watch: %s", err) - } -} diff --git a/config.go b/config.go new file mode 100644 index 0000000..3772c28 --- /dev/null +++ b/config.go @@ -0,0 +1,65 @@ +package beanstalk + +import ( + "crypto/tls" + "io/ioutil" + "log" + "time" +) + +// A Config structure is used to configure a Consumer, Producer, one of its +// pools or Conn. +type Config struct { + // NumGoroutines is the number of goroutines that the Receive() method will + // spin up. + // The default is to spin up 1 goroutine. + NumGoroutines int + // ReserveTimeout is the time a consumer should wait before reserving a job, + // when the last attempt didn't yield a job. + // The default is to wait 5 seconds. + ReserveTimeout time.Duration + // ReleaseTimeout is the time a consumer should hold a reserved job before + // it is released back. + // The default is to wait 3 seconds. + ReleaseTimeout time.Duration + // ReconnectTimeout is the timeout between reconnects. + // The default is to wait 10 seconds. + ReconnectTimeout time.Duration + // TLSConfig describes the configuration that is used when Dial() makes a + // TLS connection. + TLSConfig *tls.Config + // InfoLog is used to log informational messages. + InfoLog *log.Logger + // ErrorLog is used to log error messages. + ErrorLog *log.Logger + + jobC chan *Job +} + +func (config Config) normalize() Config { + if config.NumGoroutines < 1 { + config.NumGoroutines = 1 + } + if config.ReserveTimeout <= 0 { + config.ReserveTimeout = 5 * time.Second + } + if config.ReleaseTimeout <= 0 { + config.ReleaseTimeout = 3 * time.Second + } + if config.ReconnectTimeout <= 0 { + config.ReconnectTimeout = 10 * time.Second + } + + if config.InfoLog == nil { + config.InfoLog = log.New(ioutil.Discard, "", 0) + } + if config.ErrorLog == nil { + config.ErrorLog = log.New(ioutil.Discard, "", 0) + } + + if config.jobC == nil { + config.jobC = make(chan *Job) + } + + return config +} diff --git a/conn.go b/conn.go new file mode 100644 index 0000000..123ceae --- /dev/null +++ b/conn.go @@ -0,0 +1,328 @@ +package beanstalk + +import ( + "context" + "crypto/tls" + "errors" + "io" + "net" + "net/textproto" + "strconv" + "strings" + "sync" + "time" + + "go.opencensus.io/trace" + "gopkg.in/yaml.v2" +) + +// These error may be returned by any of Conn's methods. +var ( + ErrBuried = errors.New("job was buried") + ErrDeadlineSoon = errors.New("deadline soon") + ErrDisconnected = errors.New("client disconnected") + ErrNotFound = errors.New("job not found") + ErrTimedOut = errors.New("reserve timed out") + ErrNotIgnored = errors.New("tube not ignored") + ErrTubeTooLong = errors.New("tube name too long") + ErrUnexpected = errors.New("unexpected response received") +) + +// Conn describes a connection to a beanstalk server. +type Conn struct { + URI string + config Config + conn net.Conn + text *textproto.Conn + lastTube string + mu sync.Mutex +} + +// Dial into a beanstalk server. +func Dial(uri string, config Config) (*Conn, error) { + socket, isTLS, err := ParseURI(uri) + if err != nil { + return nil, err + } + + // Dial into the beanstalk server. + var netConn net.Conn + if isTLS { + tlsConn, err := tls.Dial("tcp", socket, config.TLSConfig) + if err != nil { + return nil, err + } + + if err = tlsConn.Handshake(); err != nil { + return nil, err + } + + netConn = tlsConn + } else { + var err error + if netConn, err = net.Dial("tcp", socket); err != nil { + return nil, err + } + } + + return &Conn{ + URI: uri, + config: config.normalize(), + conn: netConn, + text: textproto.NewConn(netConn), + }, nil +} + +// Close this connection. +func (conn *Conn) Close() error { + return conn.conn.Close() +} + +func (conn *Conn) String() string { + return conn.URI + " (local=" + conn.conn.LocalAddr().String() + ")" +} + +func (conn *Conn) command(ctx context.Context, format string, params ...interface{}) (uint64, []byte, error) { + // Write a command and read the response. + id, body, err := func() (uint64, []byte, error) { + if deadline, ok := ctx.Deadline(); ok { + if err := conn.conn.SetDeadline(deadline); err != nil { + return 0, nil, err + } + + defer conn.conn.SetDeadline(time.Time{}) + } + + if err := conn.text.PrintfLine(format, params...); err != nil { + return 0, nil, err + } + + line, err := conn.text.ReadLine() + if err != nil { + return 0, nil, err + } + + parts := strings.SplitN(line, " ", 3) + switch parts[0] { + case "INSERTED": + if len(parts) != 2 { + return 0, nil, ErrUnexpected + } + + id, err := strconv.ParseUint(parts[1], 10, 64) + if err != nil { + return 0, nil, ErrUnexpected + } + + return id, nil, err + + case "OK": + if len(parts) != 2 { + return 0, nil, ErrUnexpected + } + + size, err := strconv.ParseInt(parts[1], 10, 32) + if err != nil { + return 0, nil, err + } + body := make([]byte, size+2) + if _, err := io.ReadFull(conn.text.R, body); err != nil { + return 0, nil, err + } + + return 0, body, nil + + case "RESERVED": + if len(parts) != 3 { + return 0, nil, ErrUnexpected + } + + id, err := strconv.ParseUint(parts[1], 10, 64) + if err != nil { + return 0, nil, err + } + size, err := strconv.ParseInt(parts[2], 10, 32) + if err != nil { + return 0, nil, err + } + body := make([]byte, size+2) + if _, err := io.ReadFull(conn.text.R, body); err != nil { + return 0, nil, err + } + + return id, body[:size], nil + + case "DELETED", "RELEASED", "TOUCHED", "USING", "WATCHING": + return 0, nil, nil + case "BURIED": + return 0, nil, ErrBuried + case "DEADLINE_SOON": + return 0, nil, ErrDeadlineSoon + case "NOT_FOUND": + return 0, nil, ErrNotFound + case "NOT_IGNORED": + return 0, nil, ErrNotIgnored + case "TIMED_OUT": + return 0, nil, ErrTimedOut + } + + return 0, nil, ErrUnexpected + }() + + // An io.EOF means the connection got disconnected. + if err == io.EOF { + return 0, nil, ErrDisconnected + } + + return id, body, err +} + +func (conn *Conn) lcommand(ctx context.Context, format string, params ...interface{}) (uint64, []byte, error) { + conn.mu.Lock() + defer conn.mu.Unlock() + + return conn.command(ctx, format, params...) +} + +func (conn *Conn) bury(ctx context.Context, job *Job, priority uint32) error { + ctx, span := trace.StartSpan(ctx, "github.com/prep/beanstalk/Conn.bury") + defer span.End() + + _, _, err := conn.lcommand(ctx, "bury %d %d", job.ID, priority) + if err == ErrBuried { + return nil + } + + return err +} + +func (conn *Conn) delete(ctx context.Context, job *Job) error { + ctx, span := trace.StartSpan(ctx, "github.com/prep/beanstalk/Conn.delete") + defer span.End() + + _, _, err := conn.lcommand(ctx, "delete %d", job.ID) + return err +} + +// Ignore the specified tube. +func (conn *Conn) Ignore(ctx context.Context, tube string) error { + ctx, span := trace.StartSpan(ctx, "github.com/prep/beanstalk/Conn.Ignore") + defer span.End() + + _, _, err := conn.lcommand(ctx, "ignore %s", tube) + return err +} + +// Put a job in the specified tube. +func (conn *Conn) Put(ctx context.Context, tube string, body []byte, params PutParams) (uint64, error) { + ctx, span := trace.StartSpan(ctx, "github.com/prep/beanstalk/Conn.Put") + defer span.End() + + conn.mu.Lock() + defer conn.mu.Unlock() + + // If the tube is different than the last time, switch tubes. + if tube != conn.lastTube { + if _, _, err := conn.command(ctx, "use %s", tube); err != nil { + return 0, err + } + + conn.lastTube = tube + } + + id, _, err := conn.command(ctx, "put %d %d %d %d\r\n%s", params.Priority, params.Delay/time.Second, params.TTR/time.Second, len(body), body) + return id, err +} + +func (conn *Conn) release(ctx context.Context, job *Job, priority uint32, delay time.Duration) error { + ctx, span := trace.StartSpan(ctx, "github.com/prep/beanstalk/Conn.release") + defer span.End() + + _, _, err := conn.lcommand(ctx, "release %d %d %d", job.ID, priority, delay/time.Second) + return err +} + +// ReserveWithTimeout tries to reserve a job and block for up to a maximum of +// timeout. If no job could be reserved, this function will return without a +// job or error. +func (conn *Conn) ReserveWithTimeout(ctx context.Context, timeout time.Duration) (*Job, error) { + ctx, span := trace.StartSpan(ctx, "github.com/prep/beanstalk/Conn.ReserveWithTimeout") + defer span.End() + + conn.mu.Lock() + defer conn.mu.Unlock() + + reservedAt := time.Now() + id, body, err := conn.command(ctx, "reserve-with-timeout %d", timeout/time.Second) + switch { + case err == ErrDeadlineSoon: + return nil, nil + case err == ErrNotFound: + return nil, nil + case err == ErrTimedOut: + return nil, nil + case err != nil: + return nil, err + } + + job := &Job{ID: id, Body: body, ReservedAt: reservedAt, conn: conn} + + // If this command errors out, it's either a NOT_FOUND response or an error + // on the connection. If it's the former, the TTR was probably very short and + // the connection very slow. + // Either way, the job that was reserved is already lost. + if _, body, err = conn.command(ctx, "stats-job %d", job.ID); err != nil { + if err == ErrNotFound { + return nil, nil + } + + return nil, err + } + + // If the job stats are unmarshallable, return the error and expect the caller + // to close the connection which takes care of the job's reservation. + // However, in case the caller doesn't and still wants the job, return it anyway. + if err := yaml.Unmarshal(body, &job.Stats); err != nil { + return job, err + } + + job.Stats.Age *= time.Second + job.Stats.Delay *= time.Second + job.Stats.TTR *= time.Second + job.Stats.TimeLeft *= time.Second + + return job, nil +} + +// touch the job thereby resetting its reserved status. +func (conn *Conn) touch(ctx context.Context, job *Job) error { + ctx, span := trace.StartSpan(ctx, "github.com/prep/beanstalk/Conn.touch") + defer span.End() + + touchedAt := time.Now() + if _, _, err := conn.lcommand(ctx, "touch %d", job.ID); err != nil { + return err + } + + // TimeLeft is always 1 second less than the TTR. + job.Stats.TimeLeft = job.Stats.TTR - time.Second + job.ReservedAt = touchedAt + + return nil +} + +// Watch the specified tube. +func (conn *Conn) Watch(ctx context.Context, tube string) error { + ctx, span := trace.StartSpan(ctx, "github.com/prep/beanstalk/Conn.Watch") + defer span.End() + + // This check is performed here instead of server-side, because if the name + // is too long the server return both a BAD_FORMAT and an UNKNOWN_COMMAND + // response that makes parsing more difficult. + if len(tube) > 200 { + return ErrTubeTooLong + } + + _, _, err := conn.lcommand(ctx, "watch %s", tube) + return err +} diff --git a/conn_test.go b/conn_test.go new file mode 100644 index 0000000..d1a2a4d --- /dev/null +++ b/conn_test.go @@ -0,0 +1,610 @@ +package beanstalk + +import ( + "bytes" + "context" + "net" + "net/textproto" + "sync" + "testing" + "time" +) + +// Line describes a read line from the client. +type Line struct { + lineno int + line string +} + +// At validates if the specified string is present at a specific line number. +func (line Line) At(lineno int, s string) bool { + return lineno == line.lineno && s == line.line +} + +// Server implements a test beanstalk server. +type Server struct { + listener net.Listener + mu sync.RWMutex + lineno int + handler func(line Line) string +} + +// NewServer returns a new Server. +func NewServer() *Server { + listener, err := net.Listen("tcp", "localhost:0") + if err != nil { + panic("Unable to set up listening socket for text beanstalk server: " + err.Error()) + } + + server := &Server{listener: listener} + go server.accept() + + return server +} + +// Close the server socket. +func (server *Server) Close() { + _ = server.listener.Close() +} + +// accept incoming connections. +func (server *Server) accept() { + defer server.listener.Close() + + for { + conn, err := server.listener.Accept() + if err != nil { + return + } + + server.handleConn(textproto.NewConn(conn)) + } +} + +// handleConn handles an existing client connection. +func (server *Server) handleConn(conn *textproto.Conn) { + defer conn.Close() + + for { + line, err := conn.ReadLine() + if err != nil { + return + } + + // Fetch a read-lock and call the handler with the line information that + // was just read. + func() { + server.mu.RLock() + defer server.mu.RUnlock() + + server.lineno++ + if server.handler != nil { + if resp := server.handler(Line{server.lineno, line}); resp != "" { + _ = conn.PrintfLine(resp) + } + } + }() + } +} + +// HandleFunc registers the handler function that should be called for every +// line that this server receives from the client. +func (server *Server) HandleFunc(handler func(line Line) string) { + server.mu.Lock() + defer server.mu.Unlock() + + server.lineno = 0 + server.handler = handler +} + +// Socket returns the host:port combo that this server is listening on. +func (server *Server) Socket() string { + return server.listener.Addr().String() +} + +func TestConn(t *testing.T) { + server := NewServer() + defer server.Close() + + var conn *Conn + var ctx = context.Background() + + // Dial the beanstalk server and set up a client connection. + t.Run("Dial", func(t *testing.T) { + var err error + conn, err = Dial(server.Socket(), Config{}) + if err != nil { + t.Fatalf("Unable to dial to beanstalk server: %s", err) + } + }) + defer conn.Close() + + // bury a job. + t.Run("bury", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "bury 1 10"): + return "BURIED" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + } + + return "" + }) + + err := conn.bury(ctx, &Job{ID: 1}, 10) + switch { + case err == ErrDisconnected: + case err != nil: + t.Fatalf("Error burying job: %s", err) + } + + // NotFound tests what happens when the NOT_FOUND error is returned. + t.Run("NotFound", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "bury 2 11"): + return "NOT_FOUND" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + } + + return "" + }) + + err := conn.bury(ctx, &Job{ID: 2}, 11) + switch { + case err == ErrDisconnected: + case err == ErrNotFound: + case err != nil: + t.Fatalf("Error burying job: %s", err) + } + }) + }) + + // delete a job. + t.Run("delete", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "delete 3"): + return "DELETED" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + } + + return "" + }) + + err := conn.delete(ctx, &Job{ID: 3}) + switch { + case err == ErrDisconnected: + case err != nil: + t.Fatalf("Error deleting job: %s", err) + } + + // NotFound tests what happens when the NOT_FOUND error is returned. + t.Run("NotFound", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "delete 4"): + return "NOT_FOUND" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + } + + return "" + }) + + err := conn.delete(ctx, &Job{ID: 4}) + switch { + case err == ErrDisconnected: + case err == ErrNotFound: + case err != nil: + t.Fatalf("Error deleting job: %s", err) + } + }) + }) + + // Ignore watching a tube. + t.Run("Ignore", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + if line.At(1, "ignore foo") { + return "WATCHING 1" + } + + t.Fatalf("Unexpected client request: %s", line.line) + return "" + }) + + err := conn.Ignore(ctx, "foo") + switch { + case err == ErrDisconnected: + case err != nil: + t.Fatalf("Error ignoring tube: %s", err) + } + + // NotIgnored test what happens if the ignore command fails because it tried + // to ignore the only tube this connection was watching. + t.Run("NotIgnored", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + if line.At(1, "ignore bar") { + return "NOT_IGNORED" + + } + + t.Fatalf("Unexpected client request: %s", line.line) + return "" + }) + + err := conn.Ignore(ctx, "bar") + switch { + case err == ErrDisconnected: + case err != ErrNotIgnored: + t.Fatalf("Expected the ErrNotIgnored error, but got %s", err) + } + }) + }) + + // Put a new message into a tube. + t.Run("Put", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "use foobar"): + return "USING foobar" + case line.At(2, "put 1024 10 60 11"): + case line.At(3, "Hello World"): + return "INSERTED 5" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + } + + return "" + }) + + id, err := conn.Put(ctx, "foobar", []byte("Hello World"), PutParams{Priority: 1024, Delay: 10 * time.Second, TTR: 60 * time.Second}) + switch { + case err == ErrDisconnected: + case err != nil: + t.Fatalf("Error inserting a new job: %s", err) + case id != 5: + t.Fatalf("Expected job ID 5, but got %d", id) + } + + // OnSameTube tests if the command order makes sense if another message is + // put into the same tube. + t.Run("OnSameTube", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "put 1024 10 60 11"): + case line.At(2, "Hello World"): + return "INSERTED 6" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + } + + return "" + }) + + id, err := conn.Put(ctx, "foobar", []byte("Hello World"), PutParams{Priority: 1024, Delay: 10 * time.Second, TTR: 60 * time.Second}) + switch { + case err == ErrDisconnected: + case err != nil: + t.Fatalf("Error inserting a new job: %s", err) + case id != 6: + t.Fatalf("Expected job ID 6, but got %d", id) + } + }) + + // OnDifferentTube tests if the command order makes sense if a message is + // put into a different tube than the previous message. + t.Run("OnDifferentTube", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "use zoink"): + return "USING zoink" + case line.At(2, "put 512 15 30 10"): + case line.At(3, "Hello Narf"): + return "INSERTED 7" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + } + + return "" + }) + + id, err := conn.Put(ctx, "zoink", []byte("Hello Narf"), PutParams{Priority: 512, Delay: 15 * time.Second, TTR: 30 * time.Second}) + switch { + case err == ErrDisconnected: + case err != nil: + t.Fatalf("Error inserting a new job: %s", err) + case id != 7: + t.Fatalf("Expected job ID 7, but got %d", id) + } + }) + }) + + // release tests the release method, responsible for releasing jobs back. + t.Run("release", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "release 8 12 20"): + return "RELEASED" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + } + + return "" + }) + + err := conn.release(ctx, &Job{ID: 8}, 12, 20*time.Second) + switch { + case err == ErrDisconnected: + case err != nil: + t.Fatalf("Error releasing job: %s", err) + } + + // Buried tests what happens when the BURIED error is returned. + t.Run("Buried", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "release 9 13 21"): + return "BURIED" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + } + + return "" + }) + + err := conn.release(ctx, &Job{ID: 9}, 13, 21*time.Second) + switch { + case err == ErrDisconnected: + case err != ErrBuried: + t.Fatalf("Expected the ErrBuried error, but got %s", err) + } + }) + + // NotFound tests what happens when the NOT_FOUND error is returned. + t.Run("NotFound", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "release 10 14 22"): + return "NOT_FOUND" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + } + + return "" + }) + + err := conn.release(ctx, &Job{ID: 10}, 14, 22*time.Second) + switch { + case err == ErrDisconnected: + case err != ErrNotFound: + t.Fatalf("Expected the ErrNotFound error, but got %s", err) + } + }) + }) + + // ReserveWithTimeout tests the ReserveWithTimeout method. + t.Run("ReserveWithTimeout", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "reserve-with-timeout 1"): + return "RESERVED 12 11\r\nHello World" + case line.At(2, "stats-job 12"): + return "OK 166\r\n---\r\nid: 12\r\ntube: default\r\nstate: reserved\r\npri: 512\r\nage: 23\r\ndelay: 15\r\nttr: 30\r\ntime-left: 25\r\nfile: 6\r\nreserves: 1\r\ntimeouts: 4\r\nreleases: 5\r\nburies: 2\r\nkicks: 7" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + + } + + return "" + }) + + job, err := conn.ReserveWithTimeout(ctx, 1*time.Second) + switch { + case err == ErrDisconnected: + case err != nil: + t.Fatalf("Error reserving a job: %s", err) + case job == nil: + t.Fatal("Expected job, but got nothing") + + // Validate the basic attributes. + case job.ID != 12: + t.Fatalf("Expected job ID 12, but got %d", job.ID) + case !bytes.Equal(job.Body, []byte(`Hello World`)): + t.Fatalf("Expected job body to be \"Hello World\", but got %q", string(job.Body)) + case job.ReservedAt.IsZero(): + t.Fatal("Expected job ReservedAt to be set, but it was not") + + // Validate the Stats. + case job.Stats.Tube != "default": + t.Fatalf("Expected job tube default, but got %s", job.Stats.Tube) + case job.Stats.State != "reserved": + t.Fatalf("Expected job state reserved, but got %s", job.Stats.State) + case job.Stats.Age != 23*time.Second: + t.Fatalf("Expected job age to be 23s, but got %s", job.Stats.Age) + case job.Stats.TimeLeft != 25*time.Second: + t.Fatalf("Expected job time left to be 25s, but got %s", job.Stats.TimeLeft) + case job.Stats.File != 6: + t.Fatalf("Expected job binfile number to be 6, but got %d", job.Stats.File) + case job.Stats.Reserves != 1: + t.Fatalf("Expected job reserved to be 1, but got %d", job.Stats.Reserves) + case job.Stats.Timeouts != 4: + t.Fatalf("Expected job timeouts to be 4, but got %d", job.Stats.Timeouts) + case job.Stats.Releases != 5: + t.Fatalf("Expected job release to be 5, but got %d", job.Stats.Releases) + case job.Stats.Buries != 2: + t.Fatalf("Expected job buries to be 2, but got %d", job.Stats.Buries) + case job.Stats.Kicks != 7: + t.Fatalf("Expected job kicks to be 7, but got %d", job.Stats.Kicks) + + // Validate the PutParams. + case job.Stats.PutParams.Priority != 512: + t.Fatalf("Expected job priority to be 512, but got %d", job.Stats.PutParams.Priority) + case job.Stats.PutParams.Delay != 15*time.Second: + t.Fatalf("Expected job TTR to be 15s, but got %s", job.Stats.PutParams.Delay) + case job.Stats.PutParams.TTR != 30*time.Second: + t.Fatalf("Expected job TTR to be 30s, but got %s", job.Stats.PutParams.TTR) + } + + // WithTimeout tests if a TIMED_OUT response is properly handled. + t.Run("WithTimeout", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "reserve-with-timeout 2"): + return "TIMED_OUT" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + } + + return "" + }) + + job, err := conn.ReserveWithTimeout(ctx, 2*time.Second) + switch { + case err == ErrDisconnected: + case err != nil: + t.Fatalf("Error reserving a job: %s", err) + case job != nil: + t.Fatalf("Expected job to be nil, but got %#v", job) + } + }) + + // WithDeadline tests if a DEADLINE_SOON response is properly handled. + t.Run("WithDeadline", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "reserve-with-timeout 3"): + return "DEADLINE_SOON" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + } + + return "" + }) + + job, err := conn.ReserveWithTimeout(ctx, 3*time.Second) + switch { + case err == ErrDisconnected: + case err != nil: + t.Fatalf("Error reserving a job: %s", err) + case job != nil: + t.Fatalf("Expected job to be nil, but got %#v", job) + } + }) + + // WithNotFound tests if the situation where a reserved job expired before + // the stats-job command could return successfully. + t.Run("WithNotFound", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "reserve-with-timeout 4"): + return "RESERVED 13 11\r\nHello World" + case line.At(2, "stats-job 13"): + return "NOT_FOUND" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + + } + + return "" + }) + + job, err := conn.ReserveWithTimeout(ctx, 4*time.Second) + switch { + case err == ErrDisconnected: + case err != nil: + t.Fatalf("Error reserving a job: %s", err) + case job != nil: + t.Fatalf("Expected job to be nil, but got %#v", job) + } + }) + }) + + // touch an existing job. + t.Run("touch", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "touch 13"): + return "TOUCHED" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + + } + + return "" + }) + + job := &Job{ID: 13} + job.Stats.PutParams.TTR = 5 * time.Second + + err := conn.touch(ctx, job) + switch { + case err == ErrDisconnected: + case err != nil: + t.Fatalf("Error watching a channel: %s", err) + case job.Stats.PutParams.TTR != 5*time.Second: + t.Fatalf("Expected job TTR to be 5s, but got %s", job.Stats.PutParams.TTR) + case job.Stats.TimeLeft != 4*time.Second: + t.Fatalf("Expected job time left to be 4s, but got %s", job.Stats.TimeLeft) + case job.ReservedAt.IsZero(): + t.Fatal("Expected job ReservedAt to be set, but it was not") + } + + t.Run("NotFound", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "touch 14"): + return "NOT_FOUND" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + + } + + return "" + }) + + err := conn.touch(ctx, &Job{ID: 14}) + switch { + case err == ErrDisconnected: + case err != ErrNotFound: + t.Fatalf("Expected the ErrNotFound error, but got %s", err) + } + }) + }) + + // Watch a new tube. + t.Run("Watch", func(t *testing.T) { + server.HandleFunc(func(line Line) string { + switch { + case line.At(1, "watch events"): + return "WATCHING 2" + default: + t.Fatalf("Unexpected client request at line %d: %s", line.lineno, line.line) + + } + + return "" + }) + + err := conn.Watch(ctx, "events") + switch { + case err == ErrDisconnected: + case err != nil: + t.Fatalf("Error watching a channel: %s", err) + } + + // ErrTubeTooLong tests if a client-side error is returned if the tube name + // is too long. + t.Run("ErrTubeTooLong", func(t *testing.T) { + err := conn.Watch(ctx, string(make([]byte, 201))) + switch { + case err == ErrDisconnected: + case err != ErrTubeTooLong: + t.Fatalf("Expected the ErrTubeTooLong error, but got %s", err) + } + }) + }) +} diff --git a/connection.go b/connection.go deleted file mode 100644 index 27eb5c1..0000000 --- a/connection.go +++ /dev/null @@ -1,107 +0,0 @@ -package beanstalk - -import ( - "crypto/tls" - "fmt" - "net" - "net/url" - "strings" - "time" -) - -// connect tries to create a new connection to the specified URI. It returns -// a channel on which a successful connect is advertised, as well as a channel -// to abort a connection attempt in progress. -func connect(uri string, options *Options) (<-chan net.Conn, chan<- struct{}) { - newConnection, abortConnect := make(chan net.Conn), make(chan struct{}, 1) - - go func(uri string, options *Options) { - var offerC chan net.Conn - var retry = time.NewTimer(time.Second) - retry.Stop() - - // Try to establish a connection to the remote beanstalk server. - for { - conn, err := dial(uri) - if err != nil { - retry.Reset(options.ReconnectTimeout) - options.LogError("Beanstalk connection failed to %s: %s", uri, err) - } else { - offerC = newConnection - options.LogInfo("Beanstalk connection successful to %s (%s)", uri, conn.LocalAddr().String()) - } - - select { - case <-retry.C: - case offerC <- conn: - return - case <-abortConnect: - if conn != nil { - _ = conn.Close() - } - - retry.Stop() - return - } - } - }(uri, options) - - return newConnection, abortConnect -} - -// dial tries to set up either a non-TLS or a TLS connection to the host:port -// combo specified in socket. -func dial(uri string) (net.Conn, error) { - socket, useTLS, err := ParseURL(uri) - if err != nil { - return nil, err - } - - if !useTLS { - return net.Dial("tcp", socket) - } - - conn, err := tls.Dial("tcp", socket, &tls.Config{}) - if conn != nil { - if err = conn.Handshake(); err == nil { - return conn, nil - } - } - - return nil, err -} - -// ParseURL takes a beanstalk URL and returns its hostname:port combination -// and if it's a TLS socket or not. -// Allowable schemes are beanstalk://, beanstalks:// and tls://, or a simple -// hostname or hostname:port format for backwards compatibility. -func ParseURL(u string) (socket string, useTLS bool, err error) { - if strings.Contains(u, "://") { - URL, err := url.Parse(u) - if err != nil { - return "", false, err - } - - switch URL.Scheme { - case "beanstalk": - case "beanstalks", "tls": - useTLS = true - default: - return "", false, fmt.Errorf("%s: unknown scheme for beanstalk URL", URL.Scheme) - } - - socket = URL.Host - } else { - socket = u - } - - if _, _, err := net.SplitHostPort(socket); err != nil { - if addrErr, ok := err.(*net.AddrError); ok && addrErr.Err == "missing port in address" { - socket = net.JoinHostPort(socket, "11300") - } else { - return "", false, err - } - } - - return -} diff --git a/connection_test.go b/connection_test.go deleted file mode 100644 index afdeeff..0000000 --- a/connection_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package beanstalk - -import ( - "testing" -) - -type urlTest struct { - URL string - Socket string - UseTLS bool - Err bool -} - -var urls = []urlTest{ - {URL: "beanstalk://test.com", Socket: "test.com:11300", UseTLS: false, Err: false}, - {URL: "beanstalk://test.com:11300", Socket: "test.com:11300", UseTLS: false, Err: false}, - {URL: "beanstalks://test.com:10301", Socket: "test.com:10301", UseTLS: true, Err: false}, - {URL: "tls://test:1234", Socket: "test:1234", UseTLS: true, Err: false}, - {URL: "http://localhost:11300", Socket: "", UseTLS: false, Err: true}, - {URL: "localhost:11300", Socket: "localhost:11300", UseTLS: false, Err: false}, - {URL: "foobar", Socket: "foobar:11300", UseTLS: false, Err: false}, -} - -func TestParseURL(t *testing.T) { - for _, url := range urls { - socket, useTLS, err := ParseURL(url.URL) - switch { - case (err != nil) && !url.Err: - t.Errorf("Expected URL %q to have no error, but got %#v", url.URL, err) - continue - case (err == nil) && url.Err: - t.Errorf("Exepected URL %q to have an error, but got none", url.URL) - continue - } - - if socket != url.Socket { - t.Errorf("Expected URL %q to have socket=%q, but got socket=%q", url.URL, url.Socket, socket) - } - if useTLS != url.UseTLS { - t.Errorf("Expected URL %q to have a useTLS=%v, but got useTLS=%v", url.URL, url.UseTLS, useTLS) - } - } -} diff --git a/consumer.go b/consumer.go index dfd01fa..8a6311d 100644 --- a/consumer.go +++ b/consumer.go @@ -1,336 +1,247 @@ package beanstalk import ( + "context" "fmt" - "strings" "sync" "time" ) -var ( - keepAliveInterval = 10 * time.Second -) - -// Consumer reserves jobs from a beanstalk server and keeps those jobs alive -// until an external consumer has either buried, deleted or released it. +// Consumer maintains a connnection to a beanstalk server and offers up jobs +// on its exposed jobs channel. When it gets disconnected, it automatically +// tries to reconnect. type Consumer struct { - url string + // C offers up reserved jobs. + C <-chan *Job + tubes []string - jobC chan<- *Job - pause chan bool - stop chan struct{} + config Config isPaused bool - options *Options + pause chan bool + close chan struct{} + closeOnce sync.Once mu sync.Mutex - startOnce sync.Once - stopOnce sync.Once } -// NewConsumer returns a new Consumer object. -func NewConsumer(url string, tubes []string, jobC chan<- *Job, options *Options) (*Consumer, error) { - if options == nil { - options = DefaultOptions() - } +// NewConsumer connects to the beanstalk server that's referenced in URI and +// returns a Consumer. +func NewConsumer(uri string, tubes []string, config Config) (*Consumer, error) { + config = config.normalize() - if _, _, err := ParseURL(url); err != nil { + conn, err := Dial(uri, config) + if err != nil { return nil, err } - return &Consumer{ - url: url, + consumer := &Consumer{ + C: config.jobC, tubes: tubes, - jobC: jobC, - pause: make(chan bool, 1), - stop: make(chan struct{}, 1), + config: config, isPaused: true, - options: options, - }, nil -} + pause: make(chan bool, 1), + close: make(chan struct{}), + } -// Start this consumer. -func (consumer *Consumer) Start() { - consumer.startOnce.Do(func() { - go consumer.connectionManager() - }) + keepConnected(consumer, conn, config, consumer.close) + return consumer, nil } -// Stop this consumer. -func (consumer *Consumer) Stop() { - consumer.stopOnce.Do(func() { - close(consumer.stop) +// Close this consumer's connection. +func (consumer *Consumer) Close() { + consumer.closeOnce.Do(func() { + close(consumer.close) }) } -// Play allows this consumer to start reserving jobs. Returns true on success -// and false if this consumer was stopped. -func (consumer *Consumer) Play() bool { - select { - case <-consumer.stop: - return false - default: - } - +// Play unpauses this customer. +func (consumer *Consumer) Play() { consumer.mu.Lock() defer consumer.mu.Unlock() select { + case <-consumer.close: + case consumer.pause <- false: case <-consumer.pause: - default: + consumer.pause <- false } - - consumer.pause <- false - return true } -// Pause stops this consumer from reserving jobs. Returns true on success and -// false if this consumer was stopped. -func (consumer *Consumer) Pause() bool { - select { - case <-consumer.stop: - return false - default: - } - +// Pause this consumer. +func (consumer *Consumer) Pause() { consumer.mu.Lock() defer consumer.mu.Unlock() select { + case <-consumer.close: + case consumer.pause <- true: case <-consumer.pause: - default: + consumer.pause <- true } - - consumer.pause <- true - return true } -// connectionManager is responsible for setting up a connection to the -// beanstalk server and wrapping it in a Client, which on success is passed -// to the clientManager function. -func (consumer *Consumer) connectionManager() { - var ( - err error - options = consumer.options - ) +// Receive calls fn for each job it can reserve on this consumer. +func (consumer *Consumer) Receive(ctx context.Context, fn func(ctx context.Context, job *Job)) { + var wg sync.WaitGroup + wg.Add(consumer.config.NumGoroutines) - // Start a new connection. - newConnection, abortConnect := connect(consumer.url, consumer.options) + for i := 0; i < consumer.config.NumGoroutines; i++ { + go func() { + defer wg.Done() - for { - select { - // This case triggers whenever a new connection was established. - case conn := <-newConnection: - client := NewClient(conn, options) - - options.LogInfo("Watching tubes: %s", strings.Join(consumer.tubes, ", ")) - for _, tube := range consumer.tubes { - if err = client.Watch(tube); err != nil { - options.LogError("%s: Error watching tube: %s", tube, err) - break - } - } + for { + select { + case job := <-consumer.C: + fn(ctx, job) - if err == nil && !includesString(consumer.tubes, "default") { - if err := client.Ignore("default"); err != nil { - options.LogError("default: Unable to ignore tube: %s", err) + case <-ctx.Done(): + return + case <-consumer.close: + return } } + }() + } - if err := consumer.clientManager(client); err != nil { - newConnection, abortConnect = connect(consumer.url, consumer.options) - } else { - return - } + wg.Wait() +} - // Keep track of the pause state. - case consumer.isPaused = <-consumer.pause: +func (consumer *Consumer) setupConnection(conn *Conn, config Config) error { + // If no tubes were specified, stick to the default one. + if len(consumer.tubes) == 0 { + return nil + } - // Abort this connection and stop this consumer all together when the - // stop signal was received. - case <-consumer.stop: - close(abortConnect) - return + return contextTimeoutFunc(3*time.Second, func(ctx context.Context) error { + for _, tube := range consumer.tubes { + if err := conn.Watch(ctx, tube); err != nil { + return fmt.Errorf("error watching tube: %s: %s", tube, err) + } } - } + + if !includes(consumer.tubes, "default") { + if err := conn.Ignore(ctx, "default"); err != nil { + return fmt.Errorf("error ignoring default tube: %s", err) + } + } + + return nil + }) } -// clientManager is responsible for reserving beanstalk jobs and offering them -// up the the job channel. -func (consumer *Consumer) clientManager(client *Client) (err error) { - var ( - job *Job - jobOffer chan<- *Job - jobCommandC = make(chan *JobCommand) - jobsOutThere int - keepAlive *time.Timer - ) - - // This is used to pause the select-statement for a bit when the job queue - // is full or when "reserve-with-timeout 0" yields no job. - timeout := time.NewTimer(time.Second) - timeout.Stop() - - // Set up a touch timer that fires whenever the pending reserved job needs - // to be touched to keep the reservation on that job. - touchTimer := time.NewTimer(time.Second) - touchTimer.Stop() - - // If this consumer is paused, make sure to start the polling to keep the - // connection alive. This is necessary in case there is a proxy between the - // client and server that disconnects idle connections. +// handleIO is responsible for reserving jobs on the connection and offering +// them up to a listener on C. +func (consumer *Consumer) handleIO(conn *Conn, config Config) (err error) { + var job *Job + var jobC chan<- *Job + + // reserveTimeout is used to pause between reserve attempts when the last + // attempt failed to reserve a job. + reserveTimeout := time.NewTimer(0) if consumer.isPaused { - keepAlive = time.NewTimer(keepAliveInterval) - } else { - keepAlive = time.NewTimer(time.Second) - keepAlive.Stop() + reserveTimeout.Stop() } - // Whenever this function returns, clean up the pending job and close the - // client connection. - defer func() { - if job != nil { - if e := client.Release(job, job.Priority, 0); e != nil { - consumer.options.LogError("Unable to release job %d: %s", job.ID, e) - } + // releaseTimeout is used to release a reserved job back to prevent holding + // a job too long when it doesn't get claimed in a reasonable amount of time. + releaseTimeout := time.NewTimer(time.Second) + releaseTimeout.Stop() + + // releaseJob releases the currently reserved job. + releaseJob := func() error { + if job == nil { + return nil } - client.Close() - touchTimer.Stop() - close(jobCommandC) - }() + releaseTimeout.Stop() + err = contextTimeoutFunc(3*time.Second, job.Release) - // isFatalErr is a convenience function that checks if the returned error - // from a beanstalk command is fatal, or can be ignored. - isFatalErr := func() bool { + // Don't treat NOT_FOUND responses as a fatal error. if err == ErrNotFound { + config.ErrorLog.Printf("Consumer could not release job %d: %s", job.ID, err) err = nil } - return err != nil - } - for { - // Attempt to reserve a job if the state allows for it. - if job == nil && !consumer.isPaused { - if jobsOutThere == 0 { - job, err = client.Reserve(consumer.options.ReserveTimeout) - } else { - job, err = client.Reserve(0) - } + job, jobC = nil, nil + return err + } - switch { - case err == ErrDraining: - timeout.Reset(time.Minute) - case err == ErrDeadlineSoon: - timeout.Reset(time.Second) - case err != nil: - consumer.options.LogError("Error reserving job: %s", err) - return - - // A new job was reserved. - case job != nil: - job.commandC = jobCommandC - jobOffer = consumer.jobC - touchTimer.Reset(job.TouchAt()) - - // With jobs out there and no successful reserve, wait a bit before - // attempting another reserve. - case jobsOutThere != 0: - timeout.Reset(time.Second) - - // No job reserved and no jobs out there, perform another reserve almost - // immediately. - default: - timeout.Reset(0) - } - } else { - timeout.Reset(time.Second) + // reserveJob reserves a job. + reserveJob := func() error { + // Don't do anything if the connection is paused, or a job was already + // reserved. + if consumer.isPaused || job != nil { + return nil } - select { - // Offer the job up on the shared jobs channel. - case jobOffer <- job: - job, jobOffer = nil, nil - touchTimer.Stop() - jobsOutThere++ - - // Wait a bit before trying to reserve a job again, or just fall through. - case <-timeout.C: - - // Touch the pending job to make sure it doesn't expire. - case <-touchTimer.C: - if job != nil { - if err = client.Touch(job); err != nil { - consumer.options.LogError("Unable to touch job %d: %s", job.ID, err) - if isFatalErr() { - return - } - - job, jobOffer = nil, nil - } else { - touchTimer.Reset(job.TouchAt()) - } - } + // Attempt to reserve a job. + err = contextTimeoutFunc(3*time.Second, func(ctx context.Context) error { + job, err = conn.ReserveWithTimeout(ctx, 0) + return err + }) - // Keep the connection alive when the connection state is paused. - case <-keepAlive.C: - if _, _, err = client.requestResponse("list-tube-used"); err != nil { - return - } + switch { + case err != nil: + return err - if consumer.isPaused { - keepAlive.Reset(keepAliveInterval) - } + // Job reserved, so start the release timer so that the job isn't being + // held for too long if it doesn't get claimed. + case job != nil: + jobC = config.jobC - // Bury, delete or release a reserved job. - case req := <-jobCommandC: - if req.Command == Touch { - if err = client.Touch(req.Job); err != nil { - consumer.options.LogError("Unable to touch job %d: %s", req.Job.ID, err) - } + // Make sure the release timer isn't bigger than the TTR of the job. + if job.TouchAfter() < config.ReleaseTimeout { + releaseTimeout.Reset(job.TouchAfter()) } else { - switch req.Command { - case Bury: - err = client.Bury(req.Job, req.Priority) - case Delete: - err = client.Delete(req.Job) - case Release: - err = client.Release(req.Job, req.Priority, req.Delay) - default: - err = fmt.Errorf("%d: unknown job command", req.Command) - } + releaseTimeout.Reset(config.ReleaseTimeout) + } - jobsOutThere-- - if err != nil { - consumer.options.LogError("Unable to finish job %d: %s", req.Job.ID, err) - } + // No job reserved, so try again after a pause. + default: + reserveTimeout.Reset(config.ReserveTimeout) + } + + return nil + } + + for { + select { + // Offer up the reserved job. + case jobC <- job: + job, jobC = nil, nil + releaseTimeout.Stop() + + // Immediately try to reserve a new job. + reserveTimeout.Reset(0) + + // Try to reserve a new job. + case <-reserveTimeout.C: + if err = reserveJob(); err != nil { + return err } - req.Err <- err - if isFatalErr() { - return + // Release the reserved job back, after having held it for a while. + case <-releaseTimeout.C: + if err = releaseJob(); err != nil { + return err } - // Pause or unpause this connection. + // Wait a bit before attempting another reserve. This gives other workers + // time to pick the previously released job. + reserveTimeout.Reset(config.ReserveTimeout) + + // Pause or unpause this consumer. case consumer.isPaused = <-consumer.pause: if consumer.isPaused { - if job != nil { - if err = client.Release(job, job.Priority, 0); err != nil { - consumer.options.LogError("Unable to release job %d: %s", job.ID, err) - if isFatalErr() { - return - } - } - - job, jobOffer = nil, nil + if err = releaseJob(); err != nil { + return err } - - keepAlive.Reset(keepAliveInterval) } else { - keepAlive.Stop() + reserveTimeout.Reset(0) } - // Stop this connection and close this consumer down. - case <-consumer.stop: - return nil + // Exit when this consumer is closing down. + case <-consumer.close: + return releaseJob() } } } diff --git a/consumer_pool.go b/consumer_pool.go index 623d5bb..45327be 100644 --- a/consumer_pool.go +++ b/consumer_pool.go @@ -1,48 +1,51 @@ package beanstalk -import "sync" +import ( + "context" + "sync" +) -// ConsumerPool maintains a pool of beanstalk consumers. +// ConsumerPool manages a pool of consumers that share a single channel on +// which jobs are offered. type ConsumerPool struct { - // C offers up newly reserved beanstalk jobs. - C <-chan *Job - c chan *Job + // C offers up reserved jobs. + C <-chan *Job + + config Config consumers []*Consumer + stop chan struct{} stopOnce sync.Once mu sync.Mutex } -// NewConsumerPool creates a pool of beanstalk consumers. -func NewConsumerPool(urls []string, tubes []string, options *Options) (*ConsumerPool, error) { - jobC := make(chan *Job) - pool := &ConsumerPool{C: jobC, c: jobC} +// NewConsumerPool creates a pool of Consumers from the list of URIs that has +// been provided. +func NewConsumerPool(uris []string, tubes []string, config Config) (*ConsumerPool, error) { + config = config.normalize() - // Create a consumer for each URL. - for _, url := range urls { - consumer, err := NewConsumer(url, tubes, jobC, options) + pool := &ConsumerPool{C: config.jobC, config: config, stop: make(chan struct{})} + for _, uri := range uris { + consumer, err := NewConsumer(uri, tubes, config) if err != nil { + pool.Stop() return nil, err } pool.consumers = append(pool.consumers, consumer) } - // Start all the consumers. - for _, consumer := range pool.consumers { - consumer.Start() - } - return pool, nil } -// Stop shuts down all the consumers in the pool. +// Stop all the consumers in this pool. func (pool *ConsumerPool) Stop() { pool.stopOnce.Do(func() { pool.mu.Lock() defer pool.mu.Unlock() + close(pool.stop) for i, consumer := range pool.consumers { - consumer.Stop() + consumer.Close() pool.consumers[i] = nil } @@ -50,7 +53,7 @@ func (pool *ConsumerPool) Stop() { }) } -// Play tells all the consumers to start reservering jobs. +// Play unpauses all the consumers in this pool. func (pool *ConsumerPool) Play() { pool.mu.Lock() defer pool.mu.Unlock() @@ -60,7 +63,7 @@ func (pool *ConsumerPool) Play() { } } -// Pause tells all the consumer to stop reservering jobs. +// Pause all the consumers in this pool. func (pool *ConsumerPool) Pause() { pool.mu.Lock() defer pool.mu.Unlock() @@ -69,3 +72,29 @@ func (pool *ConsumerPool) Pause() { consumer.Pause() } } + +// Receive calls fn in for each job it can reserve on the consumers in this pool. +func (pool *ConsumerPool) Receive(ctx context.Context, fn func(ctx context.Context, job *Job)) { + var wg sync.WaitGroup + wg.Add(pool.config.NumGoroutines) + + for i := 0; i < pool.config.NumGoroutines; i++ { + go func() { + defer wg.Done() + + for { + select { + case job := <-pool.C: + fn(ctx, job) + + case <-ctx.Done(): + return + case <-pool.stop: + return + } + } + }() + } + + wg.Wait() +} diff --git a/consumer_test.go b/consumer_test.go deleted file mode 100644 index 88540c2..0000000 --- a/consumer_test.go +++ /dev/null @@ -1,300 +0,0 @@ -package beanstalk - -import ( - "log" - "net" - "net/textproto" - "os" - "testing" - "time" -) - -var waitABit = 20 * time.Millisecond - -type TestConsumer struct { - *Consumer - t *testing.T - listener net.Listener - client *textproto.Conn - offeredJobC chan *Job - requestC chan string - responseC chan string -} - -func NewTestConsumer(t *testing.T) *TestConsumer { - listener, err := net.Listen("tcp", ":0") - if err != nil { - panic("Unable to create consumer socket: " + err.Error()) - } - - jobC := make(chan *Job) - - options := &Options{ - ReserveTimeout: time.Second, - ReconnectTimeout: time.Second * 3, - ErrorLog: log.New(os.Stdout, "ERROR: ", 0), - } - - consumer, err := NewConsumer("beanstalk://"+listener.Addr().String(), []string{"test"}, jobC, options) - if err != nil { - panic("Unable to create consumer: " + err.Error()) - } - consumer.Start() - - testConsumer := &TestConsumer{ - Consumer: consumer, - t: t, - listener: listener, - offeredJobC: jobC, - requestC: make(chan string), - responseC: make(chan string), - } - - go testConsumer.acceptNewConnections() - testConsumer.ExpectWatchAndIgnore() - - return testConsumer -} - -func (consumer *TestConsumer) Close() { - consumer.Stop() - _ = consumer.listener.Close() -} - -func (consumer *TestConsumer) acceptNewConnections() { - for { - conn, err := consumer.listener.Accept() - if err != nil { - return - } - - consumer.client = textproto.NewConn(conn) - consumer.handleConnection() - } -} - -func (consumer *TestConsumer) handleConnection() { - defer consumer.client.Close() - - for { - request, err := consumer.client.ReadLine() - if err != nil { - break - } - - consumer.requestC <- request - - response := <-consumer.responseC - _ = consumer.client.PrintfLine(response) - } -} - -func (consumer *TestConsumer) Expect(line string) chan string { - select { - case request := <-consumer.requestC: - if request != line { - consumer.t.Fatalf("Expected '%s', but got '%s' instead", line, request) - } - return consumer.responseC - - case <-time.After(waitABit): - consumer.t.Fatalf("Expected '%s', but received nothing", line) - } - - return nil -} - -func (consumer *TestConsumer) ExpectWatchAndIgnore() { - consumer.Expect("watch test") <- "WATCHING 2" - consumer.Expect("ignore default") <- "WATCHING 1" - - if consumer.isPaused { - consumer.ExpectNoRequest() - } -} - -func (consumer *TestConsumer) ExpectNoRequest() { - select { - case request := <-consumer.requestC: - consumer.t.Fatalf("Didn't expect a command from the client, but got '%s' instead", request) - case <-time.After(waitABit): - } -} - -func (consumer *TestConsumer) ExpectJobOffer() *Job { - select { - case job := <-consumer.offeredJobC: - return job - case <-time.After(waitABit): - consumer.t.Fatalf("Expected a job to be offered, but got nothing") - } - - return nil -} - -func (consumer *TestConsumer) FinalizeJob(job *Job, command string) { - go func(job *Job, command string) { - var err error - - switch command { - case "bury": - err = job.Bury() - case "delete": - err = job.Delete() - case "release": - err = job.Release() - } - - if err != nil { - consumer.t.Fatalf("Error while calling '%s' on job: %s", command, err) - } - }(job, command) -} - -// **************************************************************************** - -func TestConsumerPlayAndPause(t *testing.T) { - consumer := NewTestConsumer(t) - defer consumer.Close() - - // By default, the consumer is paused. - consumer.ExpectNoRequest() - - // Unpause the consumer, which should trigger a reserve. - consumer.Play() - consumer.Expect("reserve-with-timeout 1") <- "TIMED_OUT" - - // Pause the consumer, which shouldn't trigger a reserve. - consumer.Pause() - consumer.ExpectNoRequest() -} - -func TestConsumerStop(t *testing.T) { - consumer := NewTestConsumer(t) - defer consumer.Close() - - select { - case <-consumer.stop: - t.Fatal("Expected consumer to not be stopped") - default: - } - - consumer.Stop() - - select { - case <-consumer.stop: - default: - t.Fatal("Expected consumer to be stopped") - } -} - -func TestConsumerReserveAndDelete(t *testing.T) { - consumer := NewTestConsumer(t) - defer consumer.Close() - - // Unpause the consumer, which should trigger a reserve. Return a job. - consumer.Play() - consumer.Expect("reserve-with-timeout 1") <- "RESERVED 1 3\r\nfoo" - consumer.Expect("stats-job 1") <- "OK 29\r\n---\n pri: 1024\n ttr: 3\n" - consumer.ExpectNoRequest() - - // The reserved job should be offered. - job := consumer.ExpectJobOffer() - consumer.Expect("reserve-with-timeout 0") <- "TIMED_OUT" - consumer.FinalizeJob(job, "delete") - consumer.Expect("delete 1") <- "DELETED" - - // Pause the consumer, even though a reserve was already issued. - consumer.Pause() - consumer.Expect("reserve-with-timeout 1") <- "TIMED_OUT" - consumer.ExpectNoRequest() -} - -func TestConsumerJobReleaseOnPause(t *testing.T) { - consumer := NewTestConsumer(t) - defer consumer.Close() - consumer.Play() - - // Reserve a job. - consumer.Expect("reserve-with-timeout 1") <- "RESERVED 1 3\r\nfoo" - consumer.Expect("stats-job 1") <- "OK 29\r\n---\n pri: 1024\n ttr: 3\n" - - // Pausing the consumer should trigger a release. - consumer.Pause() - consumer.Expect("release 1 1024 0") <- "RELEASED" - consumer.ExpectNoRequest() -} - -func TestConsumerReserveWithDeadline(t *testing.T) { - consumer := NewTestConsumer(t) - defer consumer.Close() - consumer.Play() - - // Try to reserve 2 jobs for which a DEADLINE_SOON is issued on the 2nd - // reserve. - consumer.Expect("reserve-with-timeout 1") <- "RESERVED 1 3\r\nfoo" - consumer.Expect("stats-job 1") <- "OK 29\r\n---\n pri: 1024\n ttr: 3\n" - consumer.ExpectJobOffer() - consumer.Expect("reserve-with-timeout 0") <- "DEADLINE_SOON" - - // the DEADLINE_SOON response triggers a 1 second timeout. - time.Sleep(time.Second) - - // A DEADLINE_SOON should simply trigger a new reserve. - consumer.Expect("reserve-with-timeout 0") <- "TIMED_OUT" -} - -func TestConsumerReserveFlow(t *testing.T) { - consumer := NewTestConsumer(t) - defer consumer.Close() - consumer.Play() - - // Reserve a job. - consumer.Expect("reserve-with-timeout 1") <- "RESERVED 1 3\r\nfoo" - consumer.Expect("stats-job 1") <- "OK 29\r\n---\n pri: 1024\n ttr: 3\n" - consumer.ExpectNoRequest() - - // The job should be pending and fetchable. - job1 := consumer.ExpectJobOffer() - - // Once the job is fetched, a new reserve should have been issued. - consumer.Expect("reserve-with-timeout 0") <- "RESERVED 2 3\r\nfoo" - consumer.Expect("stats-job 2") <- "OK 29\r\n---\n pri: 1024\n ttr: 3\n" - consumer.ExpectNoRequest() - - // Another job should be pending and fetchable. - job2 := consumer.ExpectJobOffer() - - // Pause the consumer and catch the pending reserve. - consumer.Pause() - consumer.Expect("reserve-with-timeout 0") <- "TIMED_OUT" - - // Allow the select-statement to read the pause channel, before we let it - // compete with the job command channel. - time.Sleep(waitABit) - - // Release the 1st job. - go func() { - if err := job1.Release(); err != nil { - t.Fatalf("Error releasing job 1: %s", err) - } - }() - - consumer.Expect("release 1 1024 0") <- "RELEASED" - consumer.ExpectNoRequest() - - // Release the 2nd job. - go func() { - if err := job2.Release(); err != nil { - t.Fatalf("Error releasing job 2: %s", err) - } - }() - - consumer.Expect("release 2 1024 0") <- "RELEASED" - consumer.ExpectNoRequest() - - // Start the consumer back up again. This should yield a reserve with a - // specified timeout. - consumer.Play() - consumer.Expect("reserve-with-timeout 1") <- "TIMED_OUT" -} diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..7883ca2 --- /dev/null +++ b/doc.go @@ -0,0 +1,92 @@ +/* +Package beanstalk implements a beanstalk client that includes various +abstractions to make producing and consuming jobs easier. + +Create a Conn if you want the most basic version of a beanstalk client: + + conn, err := beanstalk.Dial("localhost:11300", beanstalk.Config{}) + if err != nil { + // handle error + } + defer conn.Close() + + id, err := conn.Put(ctx, "example_tube", []byte("Hello World"), beanstalk.PutParams{ + Priority: 1024, + Delay: 2 * time.Second, + TTR: 1 * time.Minute, + }) + if err != nil { + // handle error + } + + if err = conn.Watch(ctx, "example_tube"); err != nil { + // handle error + } + + job, err := conn.ReserveWithTimeout(ctx, 3*time.Second) + if err != nil { + // handle error + } + + // process job + + if err = job.Delete(ctx); err != nil { + // handle error + } + +In most cases it is easier to leverage ConsumerPool and ProducerPool to manage +one or more beanstalk client connections, as this provides some form of +load balacning and auto-reconnect mechanisms under the hood. + +The ProducerPool manages one or more client connections used specifically for +producing beanstalk jobs. If exports a Put method that load balances between the +available connections + + pool, err := beanstalk.NewProducerPool([]string{"localhost:11300"}, beanstalk.Config{}) + if err != nil { + // handle error + } + defer pool.Stop() + + id, err := pool.Put(ctx, "example_tube", []byte("Hello World"), beanstalk.PutParams{ + Priority: 1024, + Delay: 2 * time.Second, + TTR: 1 * time.Minute, + } + +A ConsumerPool manages one or more client connections used specifically for +consuming beanstalk jobs. If exports a channel on which Job types can be read. + + pool, err := beanstalk.NewConsumerPool([]string{"localhost:11300"}, []string{"example_tube"}, beanstalk.Config{}) + if err != nil { + // handle error + } + defer pool.Stop() + + pool.Play() + for job := range pool.C { + // process job + + if err = job.Delete(ctx); err != nil { + // handle error + } + } + +Alternatively, instead of leveraging the exported channel it is possible to +provide a handler function that is called for every reserved beanstalk job by +calling the Receive method on ConsumerPool. + + pool.Play() + pool.Receive(ctx, func(ctx context.Context, job *beanstalk.Job) { + // process job + + if err = job.Delete(ctx); err != nil { + // handle error + } + }) + +In the above examples the beanstalk server was referenced by way of the +host:port notation. This package also supports URI formats like beanstalk:// for +a plaintext connection, and beanstalks:// or tls:// for encrypted connections. +*/ +package beanstalk diff --git a/go.mod b/go.mod index f41a85f..7e722bf 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module github.com/prep/beanstalk go 1.12 -require gopkg.in/yaml.v2 v2.2.2 +require ( + go.opencensus.io v0.22.0 + gopkg.in/yaml.v2 v2.2.2 +) diff --git a/go.sum b/go.sum index e936db1..34ada8e 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,46 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU= +github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +go.opencensus.io v0.22.0 h1:C9hSCOW830chIVkdja34wa6Ky+IzWllkUinR+BtRZd4= +go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190501004415-9ce7a6920f09/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/job.go b/job.go index f322105..b1cf2eb 100644 --- a/job.go +++ b/job.go @@ -1,133 +1,97 @@ package beanstalk import ( + "context" "errors" - "sync" "time" ) -// The errors that can be returned by any of the Job functions. -var ( - ErrJobFinished = errors.New("job was already finished") - ErrLostConnection = errors.New("the connection was lost") -) - -// Command describes a beanstalk command that finishes a reserved job. -type Command int - -// These are the caller ids that are used when calling back to the Consumer. -const ( - Bury Command = iota - Delete - Release - Touch -) +// ErrJobFinished is returned when a job was already finished. +var ErrJobFinished = errors.New("job was already finished") -// JobCommand is sent to the consumer that reserved the job with the purpose -// of finishing a job. -type JobCommand struct { - Command Command - Job *Job - Priority uint32 - Delay time.Duration - Err chan error +// PutParams are the parameters used to perform a Put operation. +type PutParams struct { + Priority uint32 `yaml:"pri"` + Delay time.Duration `yaml:"delay"` + TTR time.Duration `yaml:"ttr"` } -// Job contains the data of a reserved job. +// Job describes a beanstalk job and its stats. type Job struct { - ID uint64 - Body []byte - Priority uint32 - TTR time.Duration - touchedAt time.Time - commandC chan<- *JobCommand - sync.Mutex -} - -func (job *Job) cmd(command Command, priority uint32, delay time.Duration) (err error) { - defer func() { - if r := recover(); r != nil { - err = ErrLostConnection - } - }() - - if job.commandC == nil { - return ErrJobFinished - } - - jobCommand := &JobCommand{ - Command: command, - Job: job, - Priority: priority, - Delay: delay, - Err: make(chan error), - } - - job.commandC <- jobCommand - if command != Touch { - job.commandC = nil + ID uint64 + Body []byte + ReservedAt time.Time + Stats struct { + PutParams `yaml:",inline"` + Tube string `yaml:"tube"` + State string `yaml:"state"` + Age time.Duration `yaml:"age"` + TimeLeft time.Duration `yaml:"time-left"` + File int `yaml:"file"` + Reserves int `yaml:"reserves"` + Timeouts int `yaml:"timeouts"` + Releases int `yaml:"releases"` + Buries int `yaml:"buries"` + Kicks int `yaml:"kicks"` } - return <-jobCommand.Err + conn *Conn } -// Bury tells the consumer to bury this job with the same priority as this job -// was inserted with. -func (job *Job) Bury() error { - return job.cmd(Bury, job.Priority, 0) +// Bury this job. +func (job *Job) Bury(ctx context.Context) error { + return job.BuryWithPriority(ctx, job.Stats.Priority) } -// BuryWithPriority tells the consumer to bury this job with the specified -// priority. -func (job *Job) BuryWithPriority(priority uint32) error { - return job.cmd(Bury, priority, 0) -} +// BuryWithPriority buries this job with the specified priority. +func (job *Job) BuryWithPriority(ctx context.Context, priority uint32) error { + if job.conn == nil { + return ErrJobFinished + } -// Delete tells the consumer to delete this job. -func (job *Job) Delete() error { - return job.cmd(Delete, 0, 0) + err := job.conn.bury(ctx, job, priority) + job.conn = nil + return err } -// Release tells the consumer to release this job with the same priority as -// this job was inserted with and without delay. -func (job *Job) Release() error { - return job.cmd(Release, job.Priority, 0) -} +// Delete this job. +func (job *Job) Delete(ctx context.Context) error { + if job.conn == nil { + return ErrJobFinished + } -// ReleaseWithParams tells the consumer to release this job with the specified -// priority and delay. -func (job *Job) ReleaseWithParams(priority uint32, delay time.Duration) error { - return job.cmd(Release, priority, delay) + err := job.conn.delete(ctx, job) + job.conn = nil + return err } -// Touch refreshes the reserve timer for this job. -func (job *Job) Touch() error { - return job.cmd(Touch, 0, 0) +// Release this job back with its original priority and without delay. +func (job *Job) Release(ctx context.Context) error { + return job.ReleaseWithParams(ctx, job.Stats.Priority, 0) } -// Mark the time this job was touched. -func (job *Job) touched() { - job.Lock() - job.touchedAt = time.Now().UTC() - job.Unlock() +// ReleaseWithParams releases this job back with the specified priority and delay. +func (job *Job) ReleaseWithParams(ctx context.Context, priority uint32, delay time.Duration) error { + if job.conn == nil { + return ErrJobFinished + } + + err := job.conn.release(ctx, job, priority, delay) + job.conn = nil + return err } -// TouchAt returns the duration after which a call to Touch() should be -// triggered in order to keep the job reserved. -func (job *Job) TouchAt() time.Duration { - var margin time.Duration - - switch { - case job.TTR <= 3*time.Second: - margin = 200 * time.Millisecond - case job.TTR < 60*time.Second: - margin = time.Second - default: - margin = 3 * time.Second +// Touch the job thereby resetting its reserved status. +func (job *Job) Touch(ctx context.Context) error { + if job.conn == nil { + return ErrJobFinished } - job.Lock() - defer job.Unlock() + return job.conn.touch(ctx, job) +} - return job.touchedAt.Add(job.TTR - margin).Sub(time.Now().UTC()) +// TouchAfter returns the duration until this jobs needs to be touched for its +// reservation to be retained. +func (job *Job) TouchAfter() time.Duration { + return time.Until(job.ReservedAt.Add(job.Stats.TimeLeft)) } diff --git a/job_test.go b/job_test.go deleted file mode 100644 index 5a93706..0000000 --- a/job_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package beanstalk - -import ( - "testing" - "time" -) - -func NewTestJob() *Job { - commandC := make(chan *JobCommand) - go func() { - if req, ok := <-commandC; ok { - req.Err <- nil - } else { - return - } - }() - - return &Job{ - ID: 12345, - Body: []byte("Hello World"), - TTR: time.Duration(1), - commandC: commandC, - } -} - -func TestBuryJob(t *testing.T) { - job := NewTestJob() - if err := job.Bury(); err != nil { - t.Fatalf("Unexpected error from Bury: %s", err) - } -} - -func TestBuryJobWithPriority(t *testing.T) { - job := NewTestJob() - if err := job.BuryWithPriority(1024); err != nil { - t.Fatalf("Unexpected error from Bury: %s", err) - } -} - -func TestDeleteJob(t *testing.T) { - job := NewTestJob() - if err := job.Delete(); err != nil { - t.Fatalf("Unexpected error from Delete: %s", err) - } -} - -func TestReleaseJob(t *testing.T) { - job := NewTestJob() - if err := job.Release(); err != nil { - t.Fatalf("Unexpected error from Release: %s", err) - } -} - -func TestReleaseJobWithParams(t *testing.T) { - job := NewTestJob() - if err := job.ReleaseWithParams(1024, time.Second); err != nil { - t.Fatalf("Unexpected error from Release: %s", err) - } -} - -func TestDoubleFinalizeJob(t *testing.T) { - job := NewTestJob() - if err := job.Delete(); err != nil { - t.Fatalf("Unexpected error from Delete: %s", err) - } - if err := job.Delete(); err != ErrJobFinished { - t.Fatalf("Expected ErrJobFinished, but got: %s", err) - } - -} - -func TestConnectionLostOnJob(t *testing.T) { - job := NewTestJob() - close(job.commandC) - - if err := job.Delete(); err != ErrLostConnection { - t.Fatalf("Expected ErrLostConnection, but got: %s", err) - } - -} diff --git a/options.go b/options.go deleted file mode 100644 index 5f49058..0000000 --- a/options.go +++ /dev/null @@ -1,66 +0,0 @@ -package beanstalk - -import ( - "log" - "time" -) - -// DefaultOptions returns an Options object with default values. -func DefaultOptions() *Options { - return &Options{ - ReserveTimeout: time.Second, - ReconnectTimeout: time.Second * 3, - } -} - -// Options define the configurable parts of the Client, Consumers and Producers. -type Options struct { - // ReserveTimeout is the maximum amount of time (in seconds) a reserve call - // is allowed to block. - ReserveTimeout time.Duration - - // ReconnectTimeout is the time to wait between reconnect attempts. - ReconnectTimeout time.Duration - - // ReadWriteTimeout is the time a read or write operation on the beanstalk - // connection is given before it should unblock. - ReadWriteTimeout time.Duration - - // LogPrefix is a string that gets prepending to every line that is written - // to the loggers, which suits a special use case by the author. Most people - // will probably want to use the prefix parameter of log.New(). - LogPrefix string - - // status updates and the like. - InfoLog *log.Logger - - // ErrorLog is an optional logger for error messages, like read/write errors - // and unexpected responses. - ErrorLog *log.Logger -} - -// LogInfo writes a log message to the InfoLog logger, if it was set. -func (options *Options) LogInfo(format string, v ...interface{}) { - if options.InfoLog == nil { - return - } - - if options.LogPrefix == "" { - options.InfoLog.Printf(format, v...) - } else { - options.InfoLog.Printf(options.LogPrefix+" "+format, v...) - } -} - -// LogError writes a log message to the ErrorLog logger, if it was set. -func (options *Options) LogError(format string, v ...interface{}) { - if options.ErrorLog == nil { - return - } - - if options.LogPrefix == "" { - options.ErrorLog.Printf(format, v...) - } else { - options.ErrorLog.Printf(options.LogPrefix+" "+format, v...) - } -} diff --git a/producer.go b/producer.go index d2ee38e..26b64e8 100644 --- a/producer.go +++ b/producer.go @@ -1,116 +1,94 @@ package beanstalk -import "sync" +import ( + "context" + "sync" -// Producer puts the jobs it receives on its channel into beanstalk. + "go.opencensus.io/trace" +) + +// Producer manages a connection for the purpose of inserting jobs. type Producer struct { - url string - putC chan *Put - stop chan struct{} - isStopped bool - options *Options - startOnce sync.Once - stopOnce sync.Once + conn *Conn + errC chan error + close chan struct{} + closeOnce sync.Once mu sync.Mutex } -// NewProducer returns a new Producer object. -func NewProducer(url string, putC chan *Put, options *Options) (*Producer, error) { - if options == nil { - options = DefaultOptions() - } +// NewProducer creates a connection to a beanstalk server, but will return an +// error if the connection fails. Once established, the connection will be +// maintained in the background. +func NewProducer(uri string, config Config) (*Producer, error) { + config = config.normalize() - if _, _, err := ParseURL(url); err != nil { + conn, err := Dial(uri, config) + if err != nil { return nil, err } - return &Producer{ - url: url, - putC: putC, - stop: make(chan struct{}, 1), - options: options, - }, nil + producer := &Producer{ + conn: conn, + errC: make(chan error, 1), + close: make(chan struct{}), + } + + keepConnected(producer, conn, config, producer.close) + return producer, nil } -// Start this producer. -func (producer *Producer) Start() { - producer.startOnce.Do(func() { - go producer.manager() +// Close this consumer's connection. +func (producer *Producer) Close() { + producer.closeOnce.Do(func() { + close(producer.close) }) } -// Stop this producer. -func (producer *Producer) Stop() { - producer.stopOnce.Do(func() { +func (producer *Producer) setupConnection(conn *Conn, config Config) error { + return nil +} + +// handleIO takes jobs offered up on C and inserts them into beanstalk. +func (producer *Producer) handleIO(conn *Conn, config Config) error { + producer.mu.Lock() + producer.conn = conn + producer.mu.Unlock() + + select { + // If an error occurred in Put, return it. + case err := <-producer.errC: + return err + + // Exit when this producer is closing down. + case <-producer.close: producer.mu.Lock() - producer.isStopped = true - close(producer.stop) + producer.conn = nil producer.mu.Unlock() - }) + + return nil + } } -// manager is responsible for accepting new put requests and inserting them -// into beanstalk. -func (producer *Producer) manager() { - var client *Client - var lastTube string - var putC chan *Put - - // Set up a new connection. - newConnection, abortConnect := connect(producer.url, producer.options) - - // Close the client and reconnect. - reconnect := func(format string, a ...interface{}) { - producer.options.LogError(format, a...) - - if client != nil { - client.Close() - client, putC, lastTube = nil, nil, "" - producer.options.LogInfo("Producer connection closed. Reconnecting") - newConnection, abortConnect = connect(producer.url, producer.options) - } +// Put inserts a job into beanstalk. +func (producer *Producer) Put(ctx context.Context, tube string, body []byte, params PutParams) (uint64, error) { + ctx, span := trace.StartSpan(ctx, "github.com/prep/beanstalk/Producer.Put") + defer span.End() + + producer.mu.Lock() + defer producer.mu.Unlock() + + // If this producer isn't connected, return ErrDisconnected. + if producer.conn == nil { + return 0, ErrDisconnected } - for { - select { - // Set up a new beanstalk client connection. - case conn := <-newConnection: - client, abortConnect = NewClient(conn, producer.options), nil - putC = producer.putC - - // This case handles new 'put' requests. - case put := <-putC: - request := &put.request - - if request.Tube != lastTube { - if err := client.Use(request.Tube); err != nil { - put.Response(0, err) - reconnect("Unable to use tube '%s': %s", request.Tube, err) - break - } - - lastTube = request.Tube - } - - // Insert the job into beanstalk and return the response. - id, err := client.Put(request) - if err != nil { - reconnect("Unable to put job into beanstalk: %s", err) - } - - put.Response(id, err) - - // Close the connection and stop this goroutine from running. - case <-producer.stop: - if client != nil { - client.Close() - } - - if abortConnect != nil { - close(abortConnect) - } - - return - } + // Insert the job. If this fails, mark the connection as disconnected and + // report the error to handleIO. + id, err := producer.conn.Put(ctx, tube, body, params) + if err != nil { + producer.conn = nil + producer.errC <- err } + + return id, err } diff --git a/producer_pool.go b/producer_pool.go index 4b5d4a6..adb9315 100644 --- a/producer_pool.go +++ b/producer_pool.go @@ -1,43 +1,48 @@ package beanstalk -import "sync" +import ( + "context" + "math/rand" + "sync" -// ProducerPool maintains a pool of Producers with the purpose of spreading -// incoming Put requests over the maintained Producers. + "go.opencensus.io/trace" +) + +// ProducerPool manages a connection pool of Producers and provides a simple +// interface for balancing Put requests over the pool of connections. type ProducerPool struct { producers []*Producer - putC chan *Put - putTokens chan *Put stopOnce sync.Once + mu sync.RWMutex } -// NewProducerPool creates a pool of Producer objects. -func NewProducerPool(urls []string, options *Options) (*ProducerPool, error) { - pool := &ProducerPool{putC: make(chan *Put)} - pool.putTokens = make(chan *Put, len(urls)) +// NewProducerPool creates a pool of Producers from the list of URIs that has +// been provided. +func NewProducerPool(uris []string, config Config) (*ProducerPool, error) { + config = config.normalize() - for _, url := range urls { - producer, err := NewProducer(url, pool.putC, options) + var pool ProducerPool + for _, URI := range uris { + producer, err := NewProducer(URI, config) if err != nil { + pool.Stop() return nil, err } pool.producers = append(pool.producers, producer) - pool.putTokens <- NewPut(pool.putC, options) - } - - for _, producer := range pool.producers { - producer.Start() } - return pool, nil + return &pool, nil } -// Stop shuts down all the producers in the pool. +// Stop all the producers in this pool. func (pool *ProducerPool) Stop() { pool.stopOnce.Do(func() { + pool.mu.Lock() + defer pool.mu.Unlock() + for i, producer := range pool.producers { - producer.Stop() + producer.Close() pool.producers[i] = nil } @@ -45,11 +50,28 @@ func (pool *ProducerPool) Stop() { }) } -// Put inserts a new job into beanstalk. -func (pool *ProducerPool) Put(tube string, body []byte, params *PutParams) (uint64, error) { - put := <-pool.putTokens - id, err := put.Request(tube, body, params) - pool.putTokens <- put +// Put a job into the specified tube. +func (pool *ProducerPool) Put(ctx context.Context, tube string, body []byte, params PutParams) (uint64, error) { + ctx, span := trace.StartSpan(ctx, "github.com/prep/beanstalk/ProducerPool.Put") + defer span.End() + + pool.mu.RLock() + defer pool.mu.RUnlock() + + // Cycle randomly over the producers. + for _, num := range rand.Perm(len(pool.producers)) { + id, err := pool.producers[num].Put(ctx, tube, body, params) + switch { + // If a producer is disconnected, try the next one. + case err == ErrDisconnected: + continue + case err != nil: + return 0, err + } + + return id, nil + } - return id, err + // If no producer was found, all were disconnected. + return 0, ErrDisconnected } diff --git a/put.go b/put.go deleted file mode 100644 index a7f12c1..0000000 --- a/put.go +++ /dev/null @@ -1,79 +0,0 @@ -package beanstalk - -import "time" - -// Put defines a beanstalk put request and response. It is assumed that an -// object of this type is shared between a goroutine that sets the request -// and a goroutine that sets the response. -type Put struct { - request PutRequest - response PutResponse - timer *time.Timer - options *Options - putC chan<- *Put - respC chan struct{} -} - -// NewPut returns a new Put object that operates on the specified producer -// channel. -func NewPut(putC chan<- *Put, options *Options) *Put { - if options == nil { - options = DefaultOptions() - } - - timer := time.NewTimer(time.Second) - timer.Stop() - - return &Put{ - timer: timer, - options: options, - putC: putC, - respC: make(chan struct{})} -} - -// Request sends a put request to an available Producer. This function uses the -// ReadWriteTimeout field from Options{} to limit the time to wait for an -// available producer before it returns ErrNotConnected. -func (put *Put) Request(tube string, body []byte, params *PutParams) (uint64, error) { - put.request.Tube, put.request.Body, put.request.Params = tube, body, params - if put.options.ReadWriteTimeout != 0 { - put.timer.Reset(put.options.ReadWriteTimeout) - } - - select { - case put.putC <- put: - put.timer.Stop() - case <-put.timer.C: - put.options.LogError("Unable to find a producer. Timeout was reached") - return 0, ErrNotConnected - } - - <-put.respC - return put.response.ID, put.response.Err -} - -// Response sends a put response back. This function is called from a producer. -func (put *Put) Response(id uint64, err error) { - put.response.ID, put.response.Err = id, err - put.respC <- struct{}{} -} - -// PutRequest describes a put request. -type PutRequest struct { - Body []byte - Tube string - Params *PutParams -} - -// PutParams describe the parameters for a put request. -type PutParams struct { - Priority uint32 - Delay time.Duration - TTR time.Duration -} - -// PutResponse defines the response to a Put request. -type PutResponse struct { - ID uint64 - Err error -} diff --git a/stats.go b/stats.go deleted file mode 100644 index 005a6f7..0000000 --- a/stats.go +++ /dev/null @@ -1,135 +0,0 @@ -package beanstalk - -import ( - "strings" - "sync" - - "gopkg.in/yaml.v2" -) - -// TubeStat describes the statistics of a beanstalk tube. -type TubeStat struct { - Name string `yaml:"name" json:"name"` - UrgentJobs int `yaml:"current-jobs-urgent" json:"urgentJobs"` - ReadyJobs int `yaml:"current-jobs-ready" json:"readyJobs"` - ReservedJobs int `yaml:"current-jobs-reserved" json:"reservedJobs"` - DelayedJobs int `yaml:"current-jobs-delayed" json:"delayedJobs"` - BuriedJobs int `yaml:"current-jobs-buried" json:"buriedJobs"` - TotalJobs int64 `yaml:"total-jobs" json:"totalJobs"` - Using int `yaml:"current-using" json:"using"` - Watching int `yaml:"current-watching" json:"watching"` - Waiting int `yaml:"current-waiting" json:"waiting"` - DeleteCommands int64 `yaml:"cmd-delete" json:"deleteCommands"` - PauseCommands int64 `yaml:"cmd-pause-tube" json:"pauseCommands"` -} - -type stats struct { - TubeStats []TubeStat - err error -} - -// TubeStats returns a list of beanstalk tube statistics. -func TubeStats(urls []string, options *Options, tubePrefix string) ([]TubeStat, error) { - if options == nil { - options = DefaultOptions() - } - - var wg sync.WaitGroup - wg.Add(len(urls)) - - // Request the beanstalk tube statistics for all the beanstalk servers. - results := make([]stats, len(urls)) - for i, url := range urls { - go func(i int, url string) { - defer wg.Done() - - err := func() error { - // Dial into the beanstalk server. - conn, err := dial(url) - if err != nil { - return err - } - defer conn.Close() - - // Fetch the list of tubes on this server. - client := NewClient(conn, options) - _, data, err := client.requestResponse("list-tubes") - if err != nil { - return err - } - - var tubes []string - if err = yaml.Unmarshal(data, &tubes); err != nil { - return err - } - - // Cycle over the list of tube names and skip any tube that doesn't - // match the specified prefix. - var tubeStats []TubeStat - for _, tube := range tubes { - if !strings.HasPrefix(tube, tubePrefix) { - continue - } - - // Fetch the statistics for this tube. - if _, data, err = client.requestResponse("stats-tube %s", tube); err != nil { - return err - } - - var tubeStat TubeStat - if err = yaml.Unmarshal(data, &tubeStat); err != nil { - return err - } - - tubeStats = append(tubeStats, tubeStat) - } - - results[i] = stats{TubeStats: tubeStats} - return nil - }() - - if err != nil { - results[i] = stats{err: err} - } - }(i, url) - } - - wg.Wait() - - // Return the first error it encounters. - for _, result := range results { - if result.err != nil { - return []TubeStat{}, result.err - } - } - - // Collect and merge the results. - var tubeStats []TubeStat - for _, result := range results { - for _, tubeStat := range result.TubeStats { - found := false - - for i, stat := range tubeStats { - if stat.Name == tubeStat.Name { - tubeStats[i].UrgentJobs += tubeStat.UrgentJobs - tubeStats[i].ReadyJobs += tubeStat.ReadyJobs - tubeStats[i].ReservedJobs += tubeStat.ReservedJobs - tubeStats[i].DelayedJobs += tubeStat.DelayedJobs - tubeStats[i].BuriedJobs += tubeStat.BuriedJobs - tubeStats[i].TotalJobs += tubeStat.TotalJobs - tubeStats[i].Using += tubeStat.Using - tubeStats[i].Watching += tubeStat.Watching - tubeStats[i].DeleteCommands += tubeStat.DeleteCommands - tubeStats[i].PauseCommands += tubeStat.PauseCommands - found = true - } - } - - if !found { - tubeStats = append(tubeStats, tubeStat) - } - } - } - - return tubeStats, nil -} diff --git a/utils.go b/utils.go deleted file mode 100644 index 7928498..0000000 --- a/utils.go +++ /dev/null @@ -1,36 +0,0 @@ -package beanstalk - -import ( - "bytes" - "errors" -) - -var errEntryNotFound = errors.New("yaml entry not found") - -// includesString checks if string _s_ is included in the slice of strings _a_. -func includesString(a []string, s string) bool { - for _, v := range a { - if v == s { - return true - } - } - - return false -} - -// yamlValue returns the value of a yaml entry. -func yamlValue(yaml []byte, field string) (string, error) { - var idxl, idxr int - - bField := []byte(field + ": ") - if idxl = bytes.Index(yaml, bField); idxl == -1 { - return "", errEntryNotFound - } - - idxl += len(bField) - if idxr = bytes.Index(yaml[idxl:], []byte("\n")); idxr == -1 { - return "", errEntryNotFound - } - - return string(yaml[idxl : idxl+idxr]), nil -}