diff --git a/README.md b/README.md index 59744a15..0640cb3c 100644 --- a/README.md +++ b/README.md @@ -416,6 +416,13 @@ As well Ratelimit supports TLS connections and authentication. These can be conf 1. `REDIS_TLS` & `REDIS_PERSECOND_TLS`: set to `"true"` to enable a TLS connection for the specific connection type. 1. `REDIS_AUTH` & `REDIS_PERSECOND_AUTH`: set to `"password"` to enable authentication to the redis host. +Ratelimit use [implicit pipelining](https://github.com/mediocregopher/radix/blob/v3.5.1/pool.go#L238) to send requests to redis. Pipelining can be configured using the following environment variables: + +1. `REDIS_PIPELINE_WINDOW` & `REDIS_PERSECOND_PIPELINE_WINDOW`: sets the duration after which internal pipelines will be flushed. +If window is zero then implicit pipelining will be disabled. +1. `REDIS_PIPELINE_LIMIT` & `REDIS_PERSECOND_PIPELINE_LIMIT`: sets maximum number of commands that can be pipelined before flushing. +If limit is zero then no limit will be used and pipelines will only be limited by the specified time window. + ## One Redis Instance To configure one Redis instance use the following environment variables: diff --git a/go.mod b/go.mod index 64a95008..f679a373 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,9 @@ module github.com/envoyproxy/ratelimit go 1.14 require ( + github.com/alicebob/miniredis/v2 v2.11.4 github.com/cespare/xxhash v1.1.0 // indirect github.com/coocood/freecache v1.1.0 - github.com/davecgh/go-spew v1.1.1 // indirect github.com/envoyproxy/go-control-plane v0.6.9 github.com/gogo/protobuf v1.3.1 // indirect github.com/golang/mock v1.4.1 @@ -16,18 +16,19 @@ require ( github.com/lyft/goruntime v0.2.1 github.com/lyft/gostats v0.4.0 github.com/lyft/protoc-gen-validate v0.0.7-0.20180626203901-f9d2b11e4414 // indirect - github.com/mediocregopher/radix.v2 v0.0.0-20181115013041-b67df6e626f9 + github.com/mediocregopher/radix/v3 v3.5.1 github.com/onsi/ginkgo v1.12.0 // indirect github.com/onsi/gomega v1.9.0 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect github.com/sirupsen/logrus v1.0.4 - github.com/stretchr/testify v1.1.3 + github.com/stretchr/objx v0.2.0 // indirect + github.com/stretchr/testify v1.5.1 golang.org/x/crypto v0.0.0-20191219195013-becbf705a915 // indirect golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 + golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a // indirect golang.org/x/text v0.3.3-0.20191122225017-cbf43d21aaeb // indirect google.golang.org/genproto v0.0.0-20191216205247-b31c10ee225f // indirect google.golang.org/grpc v1.19.0 gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect - gopkg.in/yaml.v2 v2.2.7 + gopkg.in/yaml.v2 v2.3.0 ) diff --git a/go.sum b/go.sum index 9b715351..e0747b69 100644 --- a/go.sum +++ b/go.sum @@ -2,11 +2,20 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 h1:45bxf7AZMwWcqkLzDAQugVEwedisr5nRJ1r+7LYnv0U= +github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI= +github.com/alicebob/miniredis/v2 v2.11.4 h1:GsuyeunTx7EllZBU3/6Ji3dhMQZDpC9rLf1luJ+6M5M= +github.com/alicebob/miniredis/v2 v2.11.4/go.mod h1:VL3UDEfAH59bSa7MuHMuFToxkqyHh69s/WUbYlOAuyg= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coocood/freecache v1.1.0 h1:ENiHOsWdj1BrrlPwblhbn4GdAsMymK3pZORJ+bJGAjA= github.com/coocood/freecache v1.1.0/go.mod h1:ePwxCDzOYvARfHdr1pByNct1at3CoKnsipOHwKlNbzI= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.6.9 h1:deEH9W8ZAUGNbCdX+9iNzBOGrAOrnpJGoy0PcTqk/tE= @@ -23,6 +32,7 @@ github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/gomodule/redigo v1.7.1-0.20190322064113-39e2c31b7ca3/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= github.com/gorilla/mux v1.7.4-0.20191121170500-49c01487a141 h1:VQjjMh+uElTfioy6GnUrVrTMAiLTNF3xsrAlSwC+g8o= github.com/gorilla/mux v1.7.4-0.20191121170500-49c01487a141/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= @@ -41,6 +51,10 @@ github.com/lyft/protoc-gen-validate v0.0.7-0.20180626203901-f9d2b11e4414 h1:kLCS github.com/lyft/protoc-gen-validate v0.0.7-0.20180626203901-f9d2b11e4414/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/mediocregopher/radix.v2 v0.0.0-20181115013041-b67df6e626f9 h1:ViNuGS149jgnttqhc6XQNPwdupEMBXqCx9wtlW7P3sA= github.com/mediocregopher/radix.v2 v0.0.0-20181115013041-b67df6e626f9/go.mod h1:fLRUbhbSd5Px2yKUaGYYPltlyxi1guJz1vCmo1RQL50= +github.com/mediocregopher/radix/v3 v3.4.2 h1:galbPBjIwmyREgwGCfQEN4X8lxbJnKBYurgz+VfcStA= +github.com/mediocregopher/radix/v3 v3.4.2/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= +github.com/mediocregopher/radix/v3 v3.5.1 h1:IOYgQUMA380N4khaL5eNT4v/P2LnHa8b0wnVdwZMFsY= +github.com/mediocregopher/radix/v3 v3.5.1/go.mod h1:8FL3F6UQRXHXIBSPUs5h0RybMF8i4n7wVopoX3x7Bv8= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.0 h1:Iw5WCbBcaAAd0fpRb1c9r5YCylv4XDoCSigm1zLevwU= github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg= @@ -53,8 +67,19 @@ github.com/sirupsen/logrus v1.0.4 h1:gzbtLsZC3Ic5PptoRG+kQj4L60qjK7H7XszrU163JNQ github.com/sirupsen/logrus v1.0.4/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.1.3 h1:76sIvNG1I8oBerx/MvuVHh5HBWBW7oxfsi3snKIsz5w= github.com/stretchr/testify v1.1.3/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb h1:ZkM6LRnq40pR1Ox0hTHlnpkcOTuFIDQpZ1IN8rKKhX0= +github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191219195013-becbf705a915 h1:aJ0ex187qoXrJHPo8ZasVTASQB7llQP6YeNzgDALPRk= golang.org/x/crypto v0.0.0-20191219195013-becbf705a915/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -72,9 +97,13 @@ golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03iWnKLEWinaScsxF2Vm2o= +golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -92,6 +121,8 @@ golang.org/x/tools v0.0.0-20190425150028-36563e24a262 h1:qsl9y/CJx34tuA7QCPNp86J golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= @@ -109,9 +140,12 @@ gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 h1:OAj3g0cR6Dx/R07QgQe8wkA9RNj gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/src/redis/cache_impl.go b/src/redis/cache_impl.go index 5cc7b63e..4e37d10f 100644 --- a/src/redis/cache_impl.go +++ b/src/redis/cache_impl.go @@ -18,12 +18,12 @@ import ( ) type rateLimitCacheImpl struct { - pool Pool - // Optional Pool for a dedicated cache of per second limits. - // If this pool is nil, then the Cache will use the pool for all - // limits regardless of unit. If this pool is not nil, then it + client Client + // Optional Client for a dedicated cache of per second limits. + // If this client is nil, then the Cache will use the client for all + // limits regardless of unit. If this client is not nil, then it // is used for limits that have a SECOND unit. - perSecondPool Pool + perSecondClient Client timeSource TimeSource jitterRand *rand.Rand expirationJitterMaxSeconds int64 @@ -105,16 +105,14 @@ type cacheKey struct { perSecond bool } -func pipelineAppend(conn Connection, key string, hitsAddend uint32, expirationSeconds int64) { - conn.PipeAppend("INCRBY", key, hitsAddend) - conn.PipeAppend("EXPIRE", key, expirationSeconds) -} - -func pipelineFetch(conn Connection) uint32 { - ret := uint32(conn.PipeResponse().Int()) - // Pop off EXPIRE response and check for error. - conn.PipeResponse() - return ret +func pipelineAppend(client Client, key string, hitsAddend uint32, result *uint32, expirationSeconds int64) (err error) { + if err = client.DoCmd(result, "INCRBY", key, hitsAddend); err != nil { + return + } + if err = client.DoCmd(nil, "EXPIRE", key, expirationSeconds); err != nil { + return + } + return } func (this *rateLimitCacheImpl) DoLimit( @@ -124,17 +122,6 @@ func (this *rateLimitCacheImpl) DoLimit( logger.Debugf("starting cache lookup") - conn := this.pool.Get() - defer this.pool.Put(conn) - - // Optional connection for per second limits. If the cache has a perSecondPool setup, - // then use a connection from the pool for per second limits. - var perSecondConn Connection = nil - if this.perSecondPool != nil { - perSecondConn = this.perSecondPool.Get() - defer this.perSecondPool.Put(perSecondConn) - } - // request.HitsAddend could be 0 (default value) if not specified by the caller in the Ratelimit request. hitsAddend := max(1, request.HitsAddend) @@ -154,6 +141,8 @@ func (this *rateLimitCacheImpl) DoLimit( } isOverLimitWithLocalCache := make([]bool, len(request.Descriptors)) + results := make([]uint32, len(request.Descriptors)) + var err error // Now, actually setup the pipeline, skipping empty cache keys. for i, cacheKey := range cacheKeys { @@ -179,12 +168,17 @@ func (this *rateLimitCacheImpl) DoLimit( } // Use the perSecondConn if it is not nil and the cacheKey represents a per second Limit. - if perSecondConn != nil && cacheKey.perSecond { - pipelineAppend(perSecondConn, cacheKey.key, hitsAddend, expirationSeconds) + if this.perSecondClient != nil && cacheKey.perSecond { + if err = pipelineAppend(this.perSecondClient, cacheKey.key, hitsAddend, &results[i], expirationSeconds); err != nil { + break + } } else { - pipelineAppend(conn, cacheKey.key, hitsAddend, expirationSeconds) + if err = pipelineAppend(this.client, cacheKey.key, hitsAddend, &results[i], expirationSeconds); err != nil { + break + } } } + checkError(err) // Now fetch the pipeline. responseDescriptorStatuses := make([]*pb.RateLimitResponse_DescriptorStatus, @@ -212,14 +206,7 @@ func (this *rateLimitCacheImpl) DoLimit( continue } - var limitAfterIncrease uint32 - // Use the perSecondConn if it is not nil and the cacheKey represents a per second Limit. - if this.perSecondPool != nil && cacheKey.perSecond { - limitAfterIncrease = pipelineFetch(perSecondConn) - } else { - limitAfterIncrease = pipelineFetch(conn) - } - + limitAfterIncrease := results[i] limitBeforeIncrease := limitAfterIncrease - hitsAddend overLimitThreshold := limits[i].Limit.RequestsPerUnit // The nearLimitThreshold is the number of requests that can be made before hitting the NearLimitRatio. @@ -288,10 +275,10 @@ func (this *rateLimitCacheImpl) DoLimit( return responseDescriptorStatuses } -func NewRateLimitCacheImpl(pool Pool, perSecondPool Pool, timeSource TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64, localCache *freecache.Cache) RateLimitCache { +func NewRateLimitCacheImpl(client Client, perSecondClient Client, timeSource TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64, localCache *freecache.Cache) RateLimitCache { return &rateLimitCacheImpl{ - pool: pool, - perSecondPool: perSecondPool, + client: client, + perSecondClient: perSecondClient, timeSource: timeSource, jitterRand: jitterRand, expirationJitterMaxSeconds: expirationJitterMaxSeconds, diff --git a/src/redis/driver.go b/src/redis/driver.go index 1f4bea32..0f672df7 100644 --- a/src/redis/driver.go +++ b/src/redis/driver.go @@ -7,33 +7,20 @@ func (e RedisError) Error() string { return string(e) } -// Interface for a redis connection pool. -type Pool interface { - // Get a connection from the pool. Call Put() on the connection when done. - // Throws RedisError if a connection can not be obtained. - Get() Connection - - // Put a connection back into the pool. - // @param c supplies the connection to put back. - Put(c Connection) -} - -// Interface for a redis connection. -type Connection interface { - // Append a command onto the pipeline queue. - // @param command supplies the command to append. +// Interface for a redis client. +type Client interface { + // DoCmd is used to perform a redis command and retrieve a result. + // + // @param rcv supplies receiver for the result. + // @param cmd supplies the command to append. + // @param key supplies the key to append. // @param args supplies the additional arguments. - PipeAppend(command string, args ...interface{}) + DoCmd(rcv interface{}, cmd, key string, args ...interface{}) error - // Execute the pipeline queue and wait for a response. - // @return a response object. - // Throws a RedisError if there was an error fetching the response. - PipeResponse() Response -} + // Once Close() is called all future method calls on the Client will return + // an error + Close() error -// Interface for a redis response. -type Response interface { - // @return the response as an integer. - // Throws a RedisError if the response is not convertable to an integer. - Int() int64 + // NumActiveConns return number of active connections, used in testing. + NumActiveConns() int } diff --git a/src/redis/driver_impl.go b/src/redis/driver_impl.go index dbcf787e..47d0d585 100644 --- a/src/redis/driver_impl.go +++ b/src/redis/driver_impl.go @@ -2,12 +2,13 @@ package redis import ( "crypto/tls" - "net" + "fmt" + "time" + + "github.com/mediocregopher/radix/v3/trace" - "github.com/envoyproxy/ratelimit/src/assert" stats "github.com/lyft/gostats" - "github.com/mediocregopher/radix.v2/pool" - "github.com/mediocregopher/radix.v2/redis" + "github.com/mediocregopher/radix/v3" logger "github.com/sirupsen/logrus" ) @@ -25,18 +26,22 @@ func newPoolStats(scope stats.Scope) poolStats { return ret } -type poolImpl struct { - pool *pool.Pool - stats poolStats -} - -type connectionImpl struct { - client *redis.Client - pending uint +func poolTrace(ps *poolStats) trace.PoolTrace { + return trace.PoolTrace{ + ConnCreated: func(_ trace.PoolConnCreated) { + ps.connectionTotal.Add(1) + ps.connectionActive.Add(1) + }, + ConnClosed: func(_ trace.PoolConnClosed) { + ps.connectionActive.Sub(1) + ps.connectionClose.Add(1) + }, + } } -type responseImpl struct { - response *redis.Resp +type clientImpl struct { + client radix.Client + stats poolStats } func checkError(err error) { @@ -45,78 +50,60 @@ func checkError(err error) { } } -func (this *poolImpl) Get() Connection { - client, err := this.pool.Get() - checkError(err) - this.stats.connectionActive.Inc() - this.stats.connectionTotal.Inc() - return &connectionImpl{client, 0} -} +func NewClientImpl(scope stats.Scope, useTls bool, auth string, url string, poolSize int, + pipelineWindow time.Duration, pipelineLimit int) Client { + logger.Warnf("connecting to redis on %s with pool size %d", url, poolSize) -func (this *poolImpl) Put(c Connection) { - impl := c.(*connectionImpl) - this.stats.connectionActive.Dec() - if impl.pending == 0 { - this.pool.Put(impl.client) - } else { - // radix does not appear to track if we attempt to put a connection back with pipelined - // responses that have not been flushed. If we are in this state, just kill the connection - // and don't put it back in the pool. - impl.client.Close() - this.stats.connectionClose.Inc() - } -} + df := func(network, addr string) (radix.Conn, error) { + var dialOpts []radix.DialOpt -func NewPoolImpl(scope stats.Scope, useTls bool, auth string, url string, poolSize int) Pool { - logger.Warnf("connecting to redis on %s with pool size %d", url, poolSize) - df := func(network, addr string) (*redis.Client, error) { - var conn net.Conn var err error if useTls { - conn, err = tls.Dial("tcp", addr, &tls.Config{}) - } else { - conn, err = net.Dial("tcp", addr) + dialOpts = append(dialOpts, radix.DialUseTLS(&tls.Config{})) } - if err != nil { - return nil, err - } - client, err := redis.NewClient(conn) if err != nil { return nil, err } if auth != "" { logger.Warnf("enabling authentication to redis on %s", url) - if err = client.Cmd("AUTH", auth).Err; err != nil { - client.Close() - return nil, err - } + + dialOpts = append(dialOpts, radix.DialAuthPass(auth)) } - return client, nil + + return radix.Dial(network, addr, dialOpts...) } - pool, err := pool.NewCustom("tcp", url, poolSize, df) + + stats := newPoolStats(scope) + + // TODO: support sentinel and redis cluster + pool, err := radix.NewPool("tcp", url, poolSize, radix.PoolConnFunc(df), + radix.PoolPipelineWindow(pipelineWindow, pipelineLimit), + radix.PoolWithTrace(poolTrace(&stats)), + ) checkError(err) - return &poolImpl{ - pool: pool, - stats: newPoolStats(scope)} -} -func (this *connectionImpl) PipeAppend(cmd string, args ...interface{}) { - this.client.PipeAppend(cmd, args...) - this.pending++ + // Check if connection is good + var pingResponse string + checkError(pool.Do(radix.Cmd(&pingResponse, "PING"))) + if pingResponse != "PONG" { + checkError(fmt.Errorf("connecting redis error: %s", pingResponse)) + } + + return &clientImpl{ + client: pool, + stats: stats, + } } -func (this *connectionImpl) PipeResponse() Response { - assert.Assert(this.pending > 0) - this.pending-- +func (c *clientImpl) DoCmd(rcv interface{}, cmd, key string, args ...interface{}) error { + return c.client.Do(radix.FlatCmd(rcv, cmd, key, args...)) +} - resp := this.client.PipeResp() - checkError(resp.Err) - return &responseImpl{resp} +func (c *clientImpl) Close() error { + return c.client.Close() } -func (this *responseImpl) Int() int64 { - i, err := this.response.Int64() - checkError(err) - return i +func (c *clientImpl) NumActiveConns() int { + return int(c.stats.connectionActive.Value()) } diff --git a/src/service_cmd/runner/runner.go b/src/service_cmd/runner/runner.go index a6a5d0d8..471caadc 100644 --- a/src/service_cmd/runner/runner.go +++ b/src/service_cmd/runner/runner.go @@ -49,12 +49,14 @@ func (runner *Runner) Run() { srv := server.NewServer("ratelimit", runner.statsStore, localCache, settings.GrpcUnaryInterceptor(nil)) - var perSecondPool redis.Pool + var perSecondPool redis.Client if s.RedisPerSecond { - perSecondPool = redis.NewPoolImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize) + perSecondPool = redis.NewClientImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, + s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPipelineWindow, s.RedisPipelineLimit) } - var otherPool redis.Pool - otherPool = redis.NewPoolImpl(srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisUrl, s.RedisPoolSize) + var otherPool redis.Client + otherPool = redis.NewClientImpl(srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisUrl, s.RedisPoolSize, + s.RedisPipelineWindow, s.RedisPipelineLimit) service := ratelimit.NewService( srv.Runtime(), diff --git a/src/settings/settings.go b/src/settings/settings.go index 8d1335bb..53ab1347 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -1,6 +1,8 @@ package settings import ( + "time" + "github.com/kelseyhightower/envconfig" "google.golang.org/grpc" ) @@ -9,29 +11,33 @@ type Settings struct { // runtime options GrpcUnaryInterceptor grpc.ServerOption // env config - Port int `envconfig:"PORT" default:"8080"` - GrpcPort int `envconfig:"GRPC_PORT" default:"8081"` - DebugPort int `envconfig:"DEBUG_PORT" default:"6070"` - UseStatsd bool `envconfig:"USE_STATSD" default:"true"` - StatsdHost string `envconfig:"STATSD_HOST" default:"localhost"` - StatsdPort int `envconfig:"STATSD_PORT" default:"8125"` - RuntimePath string `envconfig:"RUNTIME_ROOT" default:"/srv/runtime_data/current"` - RuntimeSubdirectory string `envconfig:"RUNTIME_SUBDIRECTORY"` - RuntimeIgnoreDotFiles bool `envconfig:"RUNTIME_IGNOREDOTFILES" default:"false"` - LogLevel string `envconfig:"LOG_LEVEL" default:"WARN"` - RedisSocketType string `envconfig:"REDIS_SOCKET_TYPE" default:"unix"` - RedisUrl string `envconfig:"REDIS_URL" default:"/var/run/nutcracker/ratelimit.sock"` - RedisPoolSize int `envconfig:"REDIS_POOL_SIZE" default:"10"` - RedisAuth string `envconfig:"REDIS_AUTH" default:""` - RedisTls bool `envconfig:"REDIS_TLS" default:"false"` - RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"` - RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"` - RedisPerSecondUrl string `envconfig:"REDIS_PERSECOND_URL" default:"/var/run/nutcracker/ratelimitpersecond.sock"` - RedisPerSecondPoolSize int `envconfig:"REDIS_PERSECOND_POOL_SIZE" default:"10"` - RedisPerSecondAuth string `envconfig:"REDIS_PERSECOND_AUTH" default:""` - RedisPerSecondTls bool `envconfig:"REDIS_PERSECOND_TLS" default:"false"` - ExpirationJitterMaxSeconds int64 `envconfig:"EXPIRATION_JITTER_MAX_SECONDS" default:"300"` - LocalCacheSizeInBytes int `envconfig:"LOCAL_CACHE_SIZE_IN_BYTES" default:"0"` + Port int `envconfig:"PORT" default:"8080"` + GrpcPort int `envconfig:"GRPC_PORT" default:"8081"` + DebugPort int `envconfig:"DEBUG_PORT" default:"6070"` + UseStatsd bool `envconfig:"USE_STATSD" default:"true"` + StatsdHost string `envconfig:"STATSD_HOST" default:"localhost"` + StatsdPort int `envconfig:"STATSD_PORT" default:"8125"` + RuntimePath string `envconfig:"RUNTIME_ROOT" default:"/srv/runtime_data/current"` + RuntimeSubdirectory string `envconfig:"RUNTIME_SUBDIRECTORY"` + RuntimeIgnoreDotFiles bool `envconfig:"RUNTIME_IGNOREDOTFILES" default:"false"` + LogLevel string `envconfig:"LOG_LEVEL" default:"WARN"` + RedisSocketType string `envconfig:"REDIS_SOCKET_TYPE" default:"unix"` + RedisUrl string `envconfig:"REDIS_URL" default:"/var/run/nutcracker/ratelimit.sock"` + RedisPoolSize int `envconfig:"REDIS_POOL_SIZE" default:"10"` + RedisAuth string `envconfig:"REDIS_AUTH" default:""` + RedisTls bool `envconfig:"REDIS_TLS" default:"false"` + RedisPipelineWindow time.Duration `envconfig:"REDIS_PIPELINE_WINDOW" default:"75µs"` + RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"8"` + RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"` + RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"` + RedisPerSecondUrl string `envconfig:"REDIS_PERSECOND_URL" default:"/var/run/nutcracker/ratelimitpersecond.sock"` + RedisPerSecondPoolSize int `envconfig:"REDIS_PERSECOND_POOL_SIZE" default:"10"` + RedisPerSecondAuth string `envconfig:"REDIS_PERSECOND_AUTH" default:""` + RedisPerSecondTls bool `envconfig:"REDIS_PERSECOND_TLS" default:"false"` + RedisPerSecondPipelineWindow time.Duration `envconfig:"REDIS_PERSECOND_PIPELINE_WINDOW" default:"75µs"` + RedisPerSecondPipelineLimit int `envconfig:"REDIS_PERSECOND_PIPELINE_LIMIT" default:"8"` + ExpirationJitterMaxSeconds int64 `envconfig:"EXPIRATION_JITTER_MAX_SECONDS" default:"300"` + LocalCacheSizeInBytes int `envconfig:"LOCAL_CACHE_SIZE_IN_BYTES" default:"0"` } type Option func(*Settings) diff --git a/test/mocks/config/config.go b/test/mocks/config/config.go index e4ddc17b..044b55ec 100644 --- a/test/mocks/config/config.go +++ b/test/mocks/config/config.go @@ -9,7 +9,7 @@ import ( ratelimit "github.com/envoyproxy/go-control-plane/envoy/api/v2/ratelimit" config "github.com/envoyproxy/ratelimit/src/config" gomock "github.com/golang/mock/gomock" - gostats "github.com/lyft/gostats" + stats "github.com/lyft/gostats" reflect "reflect" ) @@ -38,6 +38,7 @@ func (m *MockRateLimitConfig) EXPECT() *MockRateLimitConfigMockRecorder { // Dump mocks base method func (m *MockRateLimitConfig) Dump() string { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Dump") ret0, _ := ret[0].(string) return ret0 @@ -45,11 +46,13 @@ func (m *MockRateLimitConfig) Dump() string { // Dump indicates an expected call of Dump func (mr *MockRateLimitConfigMockRecorder) Dump() *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Dump", reflect.TypeOf((*MockRateLimitConfig)(nil).Dump)) } // GetLimit mocks base method func (m *MockRateLimitConfig) GetLimit(arg0 context.Context, arg1 string, arg2 *ratelimit.RateLimitDescriptor) *config.RateLimit { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetLimit", arg0, arg1, arg2) ret0, _ := ret[0].(*config.RateLimit) return ret0 @@ -57,6 +60,7 @@ func (m *MockRateLimitConfig) GetLimit(arg0 context.Context, arg1 string, arg2 * // GetLimit indicates an expected call of GetLimit func (mr *MockRateLimitConfigMockRecorder) GetLimit(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLimit", reflect.TypeOf((*MockRateLimitConfig)(nil).GetLimit), arg0, arg1, arg2) } @@ -84,7 +88,8 @@ func (m *MockRateLimitConfigLoader) EXPECT() *MockRateLimitConfigLoaderMockRecor } // Load mocks base method -func (m *MockRateLimitConfigLoader) Load(arg0 []config.RateLimitConfigToLoad, arg1 gostats.Scope) config.RateLimitConfig { +func (m *MockRateLimitConfigLoader) Load(arg0 []config.RateLimitConfigToLoad, arg1 stats.Scope) config.RateLimitConfig { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Load", arg0, arg1) ret0, _ := ret[0].(config.RateLimitConfig) return ret0 @@ -92,5 +97,6 @@ func (m *MockRateLimitConfigLoader) Load(arg0 []config.RateLimitConfigToLoad, ar // Load indicates an expected call of Load func (mr *MockRateLimitConfigLoaderMockRecorder) Load(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Load", reflect.TypeOf((*MockRateLimitConfigLoader)(nil).Load), arg0, arg1) } diff --git a/test/mocks/mocks.go b/test/mocks/mocks.go index e0ce8288..98d5a4a9 100644 --- a/test/mocks/mocks.go +++ b/test/mocks/mocks.go @@ -3,4 +3,4 @@ package mocks //go:generate go run github.com/golang/mock/mockgen -destination ./runtime/snapshot/snapshot.go github.com/lyft/goruntime/snapshot IFace //go:generate go run github.com/golang/mock/mockgen -destination ./runtime/loader/loader.go github.com/lyft/goruntime/loader IFace //go:generate go run github.com/golang/mock/mockgen -destination ./config/config.go github.com/envoyproxy/ratelimit/src/config RateLimitConfig,RateLimitConfigLoader -//go:generate go run github.com/golang/mock/mockgen -destination ./redis/redis.go github.com/envoyproxy/ratelimit/src/redis RateLimitCache,Pool,Connection,Response,TimeSource,JitterRandSource +//go:generate go run github.com/golang/mock/mockgen -destination ./redis/redis.go github.com/envoyproxy/ratelimit/src/redis RateLimitCache,Client,TimeSource,JitterRandSource diff --git a/test/mocks/redis/redis.go b/test/mocks/redis/redis.go index df24212a..be5f5a34 100644 --- a/test/mocks/redis/redis.go +++ b/test/mocks/redis/redis.go @@ -1,227 +1,206 @@ -// Automatically generated by MockGen. DO NOT EDIT! -// Source: github.com/envoyproxy/ratelimit/src/redis (interfaces: RateLimitCache,Pool,Connection,Response,TimeSource,JitterRandSource) +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/envoyproxy/ratelimit/src/redis (interfaces: RateLimitCache,Client,TimeSource,JitterRandSource) +// Package mock_redis is a generated GoMock package. package mock_redis import ( - ratelimit "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v2" + context "context" + v2 "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v2" config "github.com/envoyproxy/ratelimit/src/config" - redis "github.com/envoyproxy/ratelimit/src/redis" gomock "github.com/golang/mock/gomock" - context "golang.org/x/net/context" + reflect "reflect" ) -// Mock of RateLimitCache interface +// MockRateLimitCache is a mock of RateLimitCache interface type MockRateLimitCache struct { ctrl *gomock.Controller - recorder *_MockRateLimitCacheRecorder + recorder *MockRateLimitCacheMockRecorder } -// Recorder for MockRateLimitCache (not exported) -type _MockRateLimitCacheRecorder struct { +// MockRateLimitCacheMockRecorder is the mock recorder for MockRateLimitCache +type MockRateLimitCacheMockRecorder struct { mock *MockRateLimitCache } +// NewMockRateLimitCache creates a new mock instance func NewMockRateLimitCache(ctrl *gomock.Controller) *MockRateLimitCache { mock := &MockRateLimitCache{ctrl: ctrl} - mock.recorder = &_MockRateLimitCacheRecorder{mock} + mock.recorder = &MockRateLimitCacheMockRecorder{mock} return mock } -func (_m *MockRateLimitCache) EXPECT() *_MockRateLimitCacheRecorder { - return _m.recorder +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockRateLimitCache) EXPECT() *MockRateLimitCacheMockRecorder { + return m.recorder } -func (_m *MockRateLimitCache) DoLimit(_param0 context.Context, _param1 *ratelimit.RateLimitRequest, _param2 []*config.RateLimit) []*ratelimit.RateLimitResponse_DescriptorStatus { - ret := _m.ctrl.Call(_m, "DoLimit", _param0, _param1, _param2) - ret0, _ := ret[0].([]*ratelimit.RateLimitResponse_DescriptorStatus) +// DoLimit mocks base method +func (m *MockRateLimitCache) DoLimit(arg0 context.Context, arg1 *v2.RateLimitRequest, arg2 []*config.RateLimit) []*v2.RateLimitResponse_DescriptorStatus { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DoLimit", arg0, arg1, arg2) + ret0, _ := ret[0].([]*v2.RateLimitResponse_DescriptorStatus) return ret0 } -func (_mr *_MockRateLimitCacheRecorder) DoLimit(arg0, arg1, arg2 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "DoLimit", arg0, arg1, arg2) +// DoLimit indicates an expected call of DoLimit +func (mr *MockRateLimitCacheMockRecorder) DoLimit(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoLimit", reflect.TypeOf((*MockRateLimitCache)(nil).DoLimit), arg0, arg1, arg2) } -// Mock of Pool interface -type MockPool struct { +// MockClient is a mock of Client interface +type MockClient struct { ctrl *gomock.Controller - recorder *_MockPoolRecorder + recorder *MockClientMockRecorder } -// Recorder for MockPool (not exported) -type _MockPoolRecorder struct { - mock *MockPool +// MockClientMockRecorder is the mock recorder for MockClient +type MockClientMockRecorder struct { + mock *MockClient } -func NewMockPool(ctrl *gomock.Controller) *MockPool { - mock := &MockPool{ctrl: ctrl} - mock.recorder = &_MockPoolRecorder{mock} +// NewMockClient creates a new mock instance +func NewMockClient(ctrl *gomock.Controller) *MockClient { + mock := &MockClient{ctrl: ctrl} + mock.recorder = &MockClientMockRecorder{mock} return mock } -func (_m *MockPool) EXPECT() *_MockPoolRecorder { - return _m.recorder +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockClient) EXPECT() *MockClientMockRecorder { + return m.recorder } -func (_m *MockPool) Get() redis.Connection { - ret := _m.ctrl.Call(_m, "Get") - ret0, _ := ret[0].(redis.Connection) +// Close mocks base method +func (m *MockClient) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) return ret0 } -func (_mr *_MockPoolRecorder) Get() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Get") +// Close indicates an expected call of Close +func (mr *MockClientMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockClient)(nil).Close)) } -func (_m *MockPool) Put(_param0 redis.Connection) { - _m.ctrl.Call(_m, "Put", _param0) -} - -func (_mr *_MockPoolRecorder) Put(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Put", arg0) -} - -// Mock of Connection interface -type MockConnection struct { - ctrl *gomock.Controller - recorder *_MockConnectionRecorder -} - -// Recorder for MockConnection (not exported) -type _MockConnectionRecorder struct { - mock *MockConnection -} - -func NewMockConnection(ctrl *gomock.Controller) *MockConnection { - mock := &MockConnection{ctrl: ctrl} - mock.recorder = &_MockConnectionRecorder{mock} - return mock -} - -func (_m *MockConnection) EXPECT() *_MockConnectionRecorder { - return _m.recorder -} - -func (_m *MockConnection) PipeAppend(_param0 string, _param1 ...interface{}) { - _s := []interface{}{_param0} - for _, _x := range _param1 { - _s = append(_s, _x) +// DoCmd mocks base method +func (m *MockClient) DoCmd(arg0 interface{}, arg1, arg2 string, arg3 ...interface{}) error { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1, arg2} + for _, a := range arg3 { + varargs = append(varargs, a) } - _m.ctrl.Call(_m, "PipeAppend", _s...) -} - -func (_mr *_MockConnectionRecorder) PipeAppend(arg0 interface{}, arg1 ...interface{}) *gomock.Call { - _s := append([]interface{}{arg0}, arg1...) - return _mr.mock.ctrl.RecordCall(_mr.mock, "PipeAppend", _s...) -} - -func (_m *MockConnection) PipeResponse() redis.Response { - ret := _m.ctrl.Call(_m, "PipeResponse") - ret0, _ := ret[0].(redis.Response) + ret := m.ctrl.Call(m, "DoCmd", varargs...) + ret0, _ := ret[0].(error) return ret0 } -func (_mr *_MockConnectionRecorder) PipeResponse() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "PipeResponse") -} - -// Mock of Response interface -type MockResponse struct { - ctrl *gomock.Controller - recorder *_MockResponseRecorder -} - -// Recorder for MockResponse (not exported) -type _MockResponseRecorder struct { - mock *MockResponse -} - -func NewMockResponse(ctrl *gomock.Controller) *MockResponse { - mock := &MockResponse{ctrl: ctrl} - mock.recorder = &_MockResponseRecorder{mock} - return mock +// DoCmd indicates an expected call of DoCmd +func (mr *MockClientMockRecorder) DoCmd(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1, arg2}, arg3...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoCmd", reflect.TypeOf((*MockClient)(nil).DoCmd), varargs...) } -func (_m *MockResponse) EXPECT() *_MockResponseRecorder { - return _m.recorder -} - -func (_m *MockResponse) Int() int64 { - ret := _m.ctrl.Call(_m, "Int") - ret0, _ := ret[0].(int64) +// NumActiveConns mocks base method +func (m *MockClient) NumActiveConns() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NumActiveConns") + ret0, _ := ret[0].(int) return ret0 } -func (_mr *_MockResponseRecorder) Int() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Int") +// NumActiveConns indicates an expected call of NumActiveConns +func (mr *MockClientMockRecorder) NumActiveConns() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NumActiveConns", reflect.TypeOf((*MockClient)(nil).NumActiveConns)) } -// Mock of TimeSource interface +// MockTimeSource is a mock of TimeSource interface type MockTimeSource struct { ctrl *gomock.Controller - recorder *_MockTimeSourceRecorder + recorder *MockTimeSourceMockRecorder } -// Recorder for MockTimeSource (not exported) -type _MockTimeSourceRecorder struct { +// MockTimeSourceMockRecorder is the mock recorder for MockTimeSource +type MockTimeSourceMockRecorder struct { mock *MockTimeSource } +// NewMockTimeSource creates a new mock instance func NewMockTimeSource(ctrl *gomock.Controller) *MockTimeSource { mock := &MockTimeSource{ctrl: ctrl} - mock.recorder = &_MockTimeSourceRecorder{mock} + mock.recorder = &MockTimeSourceMockRecorder{mock} return mock } -func (_m *MockTimeSource) EXPECT() *_MockTimeSourceRecorder { - return _m.recorder +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockTimeSource) EXPECT() *MockTimeSourceMockRecorder { + return m.recorder } -func (_m *MockTimeSource) UnixNow() int64 { - ret := _m.ctrl.Call(_m, "UnixNow") +// UnixNow mocks base method +func (m *MockTimeSource) UnixNow() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UnixNow") ret0, _ := ret[0].(int64) return ret0 } -func (_mr *_MockTimeSourceRecorder) UnixNow() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "UnixNow") +// UnixNow indicates an expected call of UnixNow +func (mr *MockTimeSourceMockRecorder) UnixNow() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnixNow", reflect.TypeOf((*MockTimeSource)(nil).UnixNow)) } -// Mock of JitterRandSource interface +// MockJitterRandSource is a mock of JitterRandSource interface type MockJitterRandSource struct { ctrl *gomock.Controller - recorder *_MockJitterRandSourceRecorder + recorder *MockJitterRandSourceMockRecorder } -// Recorder for MockJitterRandSource (not exported) -type _MockJitterRandSourceRecorder struct { +// MockJitterRandSourceMockRecorder is the mock recorder for MockJitterRandSource +type MockJitterRandSourceMockRecorder struct { mock *MockJitterRandSource } +// NewMockJitterRandSource creates a new mock instance func NewMockJitterRandSource(ctrl *gomock.Controller) *MockJitterRandSource { mock := &MockJitterRandSource{ctrl: ctrl} - mock.recorder = &_MockJitterRandSourceRecorder{mock} + mock.recorder = &MockJitterRandSourceMockRecorder{mock} return mock } -func (_m *MockJitterRandSource) EXPECT() *_MockJitterRandSourceRecorder { - return _m.recorder +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockJitterRandSource) EXPECT() *MockJitterRandSourceMockRecorder { + return m.recorder } -func (_m *MockJitterRandSource) Int63() int64 { - ret := _m.ctrl.Call(_m, "Int63") +// Int63 mocks base method +func (m *MockJitterRandSource) Int63() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Int63") ret0, _ := ret[0].(int64) return ret0 } -func (_mr *_MockJitterRandSourceRecorder) Int63() *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Int63") +// Int63 indicates an expected call of Int63 +func (mr *MockJitterRandSourceMockRecorder) Int63() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Int63", reflect.TypeOf((*MockJitterRandSource)(nil).Int63)) } -func (_m *MockJitterRandSource) Seed(_param0 int64) { - _m.ctrl.Call(_m, "Seed", _param0) +// Seed mocks base method +func (m *MockJitterRandSource) Seed(arg0 int64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Seed", arg0) } -func (_mr *_MockJitterRandSourceRecorder) Seed(arg0 interface{}) *gomock.Call { - return _mr.mock.ctrl.RecordCall(_mr.mock, "Seed", arg0) +// Seed indicates an expected call of Seed +func (mr *MockJitterRandSourceMockRecorder) Seed(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Seed", reflect.TypeOf((*MockJitterRandSource)(nil).Seed), arg0) } diff --git a/test/mocks/runtime/loader/loader.go b/test/mocks/runtime/loader/loader.go index 16bbc109..da00c649 100644 --- a/test/mocks/runtime/loader/loader.go +++ b/test/mocks/runtime/loader/loader.go @@ -35,16 +35,19 @@ func (m *MockIFace) EXPECT() *MockIFaceMockRecorder { // AddUpdateCallback mocks base method func (m *MockIFace) AddUpdateCallback(arg0 chan<- int) { + m.ctrl.T.Helper() m.ctrl.Call(m, "AddUpdateCallback", arg0) } // AddUpdateCallback indicates an expected call of AddUpdateCallback func (mr *MockIFaceMockRecorder) AddUpdateCallback(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddUpdateCallback", reflect.TypeOf((*MockIFace)(nil).AddUpdateCallback), arg0) } // Snapshot mocks base method func (m *MockIFace) Snapshot() snapshot.IFace { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Snapshot") ret0, _ := ret[0].(snapshot.IFace) return ret0 @@ -52,5 +55,6 @@ func (m *MockIFace) Snapshot() snapshot.IFace { // Snapshot indicates an expected call of Snapshot func (mr *MockIFaceMockRecorder) Snapshot() *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Snapshot", reflect.TypeOf((*MockIFace)(nil).Snapshot)) } diff --git a/test/mocks/runtime/snapshot/snapshot.go b/test/mocks/runtime/snapshot/snapshot.go index 432e3469..a56fe5a5 100644 --- a/test/mocks/runtime/snapshot/snapshot.go +++ b/test/mocks/runtime/snapshot/snapshot.go @@ -36,6 +36,7 @@ func (m *MockIFace) EXPECT() *MockIFaceMockRecorder { // Entries mocks base method func (m *MockIFace) Entries() map[string]*entry.Entry { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Entries") ret0, _ := ret[0].(map[string]*entry.Entry) return ret0 @@ -43,11 +44,13 @@ func (m *MockIFace) Entries() map[string]*entry.Entry { // Entries indicates an expected call of Entries func (mr *MockIFaceMockRecorder) Entries() *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Entries", reflect.TypeOf((*MockIFace)(nil).Entries)) } // FeatureEnabled mocks base method func (m *MockIFace) FeatureEnabled(arg0 string, arg1 uint64) bool { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FeatureEnabled", arg0, arg1) ret0, _ := ret[0].(bool) return ret0 @@ -55,11 +58,13 @@ func (m *MockIFace) FeatureEnabled(arg0 string, arg1 uint64) bool { // FeatureEnabled indicates an expected call of FeatureEnabled func (mr *MockIFaceMockRecorder) FeatureEnabled(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FeatureEnabled", reflect.TypeOf((*MockIFace)(nil).FeatureEnabled), arg0, arg1) } // FeatureEnabledForID mocks base method func (m *MockIFace) FeatureEnabledForID(arg0 string, arg1 uint64, arg2 uint32) bool { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FeatureEnabledForID", arg0, arg1, arg2) ret0, _ := ret[0].(bool) return ret0 @@ -67,11 +72,13 @@ func (m *MockIFace) FeatureEnabledForID(arg0 string, arg1 uint64, arg2 uint32) b // FeatureEnabledForID indicates an expected call of FeatureEnabledForID func (mr *MockIFaceMockRecorder) FeatureEnabledForID(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FeatureEnabledForID", reflect.TypeOf((*MockIFace)(nil).FeatureEnabledForID), arg0, arg1, arg2) } // Get mocks base method func (m *MockIFace) Get(arg0 string) string { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Get", arg0) ret0, _ := ret[0].(string) return ret0 @@ -79,11 +86,13 @@ func (m *MockIFace) Get(arg0 string) string { // Get indicates an expected call of Get func (mr *MockIFaceMockRecorder) Get(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockIFace)(nil).Get), arg0) } // GetInteger mocks base method func (m *MockIFace) GetInteger(arg0 string, arg1 uint64) uint64 { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetInteger", arg0, arg1) ret0, _ := ret[0].(uint64) return ret0 @@ -91,11 +100,13 @@ func (m *MockIFace) GetInteger(arg0 string, arg1 uint64) uint64 { // GetInteger indicates an expected call of GetInteger func (mr *MockIFaceMockRecorder) GetInteger(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetInteger", reflect.TypeOf((*MockIFace)(nil).GetInteger), arg0, arg1) } // GetModified mocks base method func (m *MockIFace) GetModified(arg0 string) time.Time { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetModified", arg0) ret0, _ := ret[0].(time.Time) return ret0 @@ -103,11 +114,13 @@ func (m *MockIFace) GetModified(arg0 string) time.Time { // GetModified indicates an expected call of GetModified func (mr *MockIFaceMockRecorder) GetModified(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetModified", reflect.TypeOf((*MockIFace)(nil).GetModified), arg0) } // Keys mocks base method func (m *MockIFace) Keys() []string { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Keys") ret0, _ := ret[0].([]string) return ret0 @@ -115,15 +128,18 @@ func (m *MockIFace) Keys() []string { // Keys indicates an expected call of Keys func (mr *MockIFaceMockRecorder) Keys() *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Keys", reflect.TypeOf((*MockIFace)(nil).Keys)) } // SetEntry mocks base method func (m *MockIFace) SetEntry(arg0 string, arg1 *entry.Entry) { + m.ctrl.T.Helper() m.ctrl.Call(m, "SetEntry", arg0, arg1) } // SetEntry indicates an expected call of SetEntry func (mr *MockIFaceMockRecorder) SetEntry(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetEntry", reflect.TypeOf((*MockIFace)(nil).SetEntry), arg0, arg1) } diff --git a/test/redis/bench_test.go b/test/redis/bench_test.go new file mode 100644 index 00000000..3463f25a --- /dev/null +++ b/test/redis/bench_test.go @@ -0,0 +1,93 @@ +package redis_test + +import ( + "context" + "runtime" + "testing" + "time" + + pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v2" + "github.com/envoyproxy/ratelimit/src/config" + "github.com/envoyproxy/ratelimit/src/redis" + stats "github.com/lyft/gostats" + + "math/rand" + + "github.com/envoyproxy/ratelimit/test/common" +) + +func BenchmarkParallelDoLimit(b *testing.B) { + b.Skip("Skip benchmark") + + b.ReportAllocs() + + // See https://github.com/mediocregopher/radix/blob/v3.5.1/bench/bench_test.go#L176 + parallel := runtime.GOMAXPROCS(0) + poolSize := parallel * runtime.GOMAXPROCS(0) + + do := func(b *testing.B, fn func() error) { + b.ResetTimer() + b.SetParallelism(parallel) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if err := fn(); err != nil { + b.Fatal(err) + } + } + }) + } + + mkDoLimitBench := func(pipelineWindow time.Duration, pipelineLimit int) func(*testing.B) { + return func(b *testing.B) { + statsStore := stats.NewStore(stats.NewNullSink(), false) + client := redis.NewClientImpl(statsStore, false, "", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit) + defer client.Close() + + cache := redis.NewRateLimitCacheImpl(client, nil, redis.NewTimeSourceImpl(), rand.New(redis.NewLockedSource(time.Now().Unix())), 10, nil) + request := common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value"}}}, 1) + limits := []*config.RateLimit{config.NewRateLimit(1000000000, pb.RateLimitResponse_RateLimit_SECOND, "key_value", statsStore)} + + // wait for the pool to fill up + for { + time.Sleep(50 * time.Millisecond) + if client.NumActiveConns() >= poolSize { + break + } + } + + b.ResetTimer() + + do(b, func() error { + cache.DoLimit(context.Background(), request, limits) + return nil + }) + } + } + + b.Run("no pipeline", mkDoLimitBench(0, 0)) + + b.Run("pipeline 35us 1", mkDoLimitBench(35*time.Microsecond, 1)) + b.Run("pipeline 75us 1", mkDoLimitBench(75*time.Microsecond, 1)) + b.Run("pipeline 150us 1", mkDoLimitBench(150*time.Microsecond, 1)) + b.Run("pipeline 300us 1", mkDoLimitBench(300*time.Microsecond, 1)) + + b.Run("pipeline 35us 2", mkDoLimitBench(35*time.Microsecond, 2)) + b.Run("pipeline 75us 2", mkDoLimitBench(75*time.Microsecond, 2)) + b.Run("pipeline 150us 2", mkDoLimitBench(150*time.Microsecond, 2)) + b.Run("pipeline 300us 2", mkDoLimitBench(300*time.Microsecond, 2)) + + b.Run("pipeline 35us 4", mkDoLimitBench(35*time.Microsecond, 4)) + b.Run("pipeline 75us 4", mkDoLimitBench(75*time.Microsecond, 4)) + b.Run("pipeline 150us 4", mkDoLimitBench(150*time.Microsecond, 4)) + b.Run("pipeline 300us 4", mkDoLimitBench(300*time.Microsecond, 4)) + + b.Run("pipeline 35us 8", mkDoLimitBench(35*time.Microsecond, 8)) + b.Run("pipeline 75us 8", mkDoLimitBench(75*time.Microsecond, 8)) + b.Run("pipeline 150us 8", mkDoLimitBench(150*time.Microsecond, 8)) + b.Run("pipeline 300us 8", mkDoLimitBench(300*time.Microsecond, 8)) + + b.Run("pipeline 35us 16", mkDoLimitBench(35*time.Microsecond, 16)) + b.Run("pipeline 75us 16", mkDoLimitBench(75*time.Microsecond, 16)) + b.Run("pipeline 150us 16", mkDoLimitBench(150*time.Microsecond, 16)) + b.Run("pipeline 300us 16", mkDoLimitBench(300*time.Microsecond, 16)) +} diff --git a/test/redis/cache_impl_test.go b/test/redis/cache_impl_test.go index d4f94fc9..e0807689 100644 --- a/test/redis/cache_impl_test.go +++ b/test/redis/cache_impl_test.go @@ -2,6 +2,7 @@ package redis_test import ( "testing" + "time" "github.com/coocood/freecache" @@ -12,6 +13,7 @@ import ( "math/rand" + "github.com/alicebob/miniredis/v2" "github.com/envoyproxy/ratelimit/test/common" mock_redis "github.com/envoyproxy/ratelimit/test/mocks/redis" "github.com/golang/mock/gomock" @@ -29,40 +31,27 @@ func testRedis(usePerSecondRedis bool) func(*testing.T) { controller := gomock.NewController(t) defer controller.Finish() - pool := mock_redis.NewMockPool(controller) - perSecondPool := mock_redis.NewMockPool(controller) + client := mock_redis.NewMockClient(controller) + perSecondClient := mock_redis.NewMockClient(controller) timeSource := mock_redis.NewMockTimeSource(controller) - connection := mock_redis.NewMockConnection(controller) - perSecondConnection := mock_redis.NewMockConnection(controller) - response := mock_redis.NewMockResponse(controller) var cache redis.RateLimitCache if usePerSecondRedis { - cache = redis.NewRateLimitCacheImpl(pool, perSecondPool, timeSource, rand.New(rand.NewSource(1)), 0, nil) + cache = redis.NewRateLimitCacheImpl(client, perSecondClient, timeSource, rand.New(rand.NewSource(1)), 0, nil) } else { - cache = redis.NewRateLimitCacheImpl(pool, nil, timeSource, rand.New(rand.NewSource(1)), 0, nil) + cache = redis.NewRateLimitCacheImpl(client, nil, timeSource, rand.New(rand.NewSource(1)), 0, nil) } statsStore := stats.NewStore(stats.NewNullSink(), false) - if usePerSecondRedis { - perSecondPool.EXPECT().Get().Return(perSecondConnection) - } - pool.EXPECT().Get().Return(connection) timeSource.EXPECT().UnixNow().Return(int64(1234)) - var connUsed *mock_redis.MockConnection + var clientUsed *mock_redis.MockClient if usePerSecondRedis { - connUsed = perSecondConnection + clientUsed = perSecondClient } else { - connUsed = connection + clientUsed = client } - connUsed.EXPECT().PipeAppend("INCRBY", "domain_key_value_1234", uint32(1)) - connUsed.EXPECT().PipeAppend("EXPIRE", "domain_key_value_1234", int64(1)) - connUsed.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(5)) - connUsed.EXPECT().PipeResponse() - if usePerSecondRedis { - perSecondPool.EXPECT().Put(perSecondConnection) - } - pool.EXPECT().Put(connection) + + clientUsed.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key_value_1234", uint32(1)).SetArg(0, uint32(5)) + clientUsed.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key_value_1234", int64(1)) request := common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value"}}}, 1) limits := []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, "key_value", statsStore)} @@ -74,21 +63,11 @@ func testRedis(usePerSecondRedis bool) func(*testing.T) { assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value()) assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) - if usePerSecondRedis { - perSecondPool.EXPECT().Get().Return(perSecondConnection) - } - pool.EXPECT().Get().Return(connection) + clientUsed = client timeSource.EXPECT().UnixNow().Return(int64(1234)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key2_value2_subkey2_subvalue2_1200", uint32(1)) - connection.EXPECT().PipeAppend( + clientUsed.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key2_value2_subkey2_subvalue2_1200", uint32(1)).SetArg(0, uint32(11)) + clientUsed.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key2_value2_subkey2_subvalue2_1200", int64(60)) - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(11)) - connection.EXPECT().PipeResponse() - if usePerSecondRedis { - perSecondPool.EXPECT().Put(perSecondConnection) - } - pool.EXPECT().Put(connection) request = common.NewRateLimitRequest( "domain", @@ -107,27 +86,14 @@ func testRedis(usePerSecondRedis bool) func(*testing.T) { assert.Equal(uint64(1), limits[1].Stats.OverLimit.Value()) assert.Equal(uint64(0), limits[1].Stats.NearLimit.Value()) - if usePerSecondRedis { - perSecondPool.EXPECT().Get().Return(perSecondConnection) - } - pool.EXPECT().Get().Return(connection) + clientUsed = client timeSource.EXPECT().UnixNow().Return(int64(1000000)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key3_value3_997200", uint32(1)) - connection.EXPECT().PipeAppend( + clientUsed.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key3_value3_997200", uint32(1)).SetArg(0, uint32(11)) + clientUsed.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key3_value3_997200", int64(3600)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key3_value3_subkey3_subvalue3_950400", uint32(1)) - connection.EXPECT().PipeAppend( + clientUsed.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key3_value3_subkey3_subvalue3_950400", uint32(1)).SetArg(0, uint32(13)) + clientUsed.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key3_value3_subkey3_subvalue3_950400", int64(86400)) - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(11)) - connection.EXPECT().PipeResponse() - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(13)) - connection.EXPECT().PipeResponse() - if usePerSecondRedis { - perSecondPool.EXPECT().Put(perSecondConnection) - } - pool.EXPECT().Put(connection) request = common.NewRateLimitRequest( "domain", @@ -193,26 +159,19 @@ func TestOverLimitWithLocalCache(t *testing.T) { controller := gomock.NewController(t) defer controller.Finish() - pool := mock_redis.NewMockPool(controller) + client := mock_redis.NewMockClient(controller) timeSource := mock_redis.NewMockTimeSource(controller) - connection := mock_redis.NewMockConnection(controller) - response := mock_redis.NewMockResponse(controller) localCache := freecache.NewCache(100) - cache := redis.NewRateLimitCacheImpl(pool, nil, timeSource, rand.New(rand.NewSource(1)), 0, localCache) + cache := redis.NewRateLimitCacheImpl(client, nil, timeSource, rand.New(rand.NewSource(1)), 0, localCache) sink := &common.TestStatSink{} statsStore := stats.NewStore(sink, true) localCacheStats := redis.NewLocalCacheStats(localCache, statsStore.Scope("localcache")) // Test Near Limit Stats. Under Near Limit Ratio - pool.EXPECT().Get().Return(connection) timeSource.EXPECT().UnixNow().Return(int64(1000000)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key4_value4_997200", uint32(1)) - connection.EXPECT().PipeAppend( + client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(0, uint32(11)) + client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)) - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(11)) - connection.EXPECT().PipeResponse() - pool.EXPECT().Put(connection) request := common.NewRateLimitRequest("domain", [][][2]string{{{"key4", "value4"}}}, 1) @@ -232,15 +191,10 @@ func TestOverLimitWithLocalCache(t *testing.T) { testLocalCacheStats(localCacheStats, statsStore, sink, 0, 1, 1, 0, 0) // Test Near Limit Stats. At Near Limit Ratio, still OK - pool.EXPECT().Get().Return(connection) timeSource.EXPECT().UnixNow().Return(int64(1000000)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key4_value4_997200", uint32(1)) - connection.EXPECT().PipeAppend( + client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(0, uint32(13)) + client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)) - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(13)) - connection.EXPECT().PipeResponse() - pool.EXPECT().Put(connection) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{ @@ -255,15 +209,10 @@ func TestOverLimitWithLocalCache(t *testing.T) { testLocalCacheStats(localCacheStats, statsStore, sink, 0, 2, 2, 0, 0) // Test Over limit stats - pool.EXPECT().Get().Return(connection) timeSource.EXPECT().UnixNow().Return(int64(1000000)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key4_value4_997200", uint32(1)) - connection.EXPECT().PipeAppend( + client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(0, uint32(16)) + client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)) - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(16)) - connection.EXPECT().PipeResponse() - pool.EXPECT().Put(connection) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{ @@ -278,14 +227,10 @@ func TestOverLimitWithLocalCache(t *testing.T) { testLocalCacheStats(localCacheStats, statsStore, sink, 0, 2, 3, 0, 1) // Test Over limit stats with local cache - pool.EXPECT().Get().Return(connection) timeSource.EXPECT().UnixNow().Return(int64(1000000)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key4_value4_997200", uint32(1)).Times(0) - connection.EXPECT().PipeAppend( + client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).Times(0) + client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)).Times(0) - connection.EXPECT().PipeResponse().Times(0) - response.EXPECT().Int().Times(0) - pool.EXPECT().Put(connection) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{ {Code: pb.RateLimitResponse_OVER_LIMIT, CurrentLimit: limits[0].Limit, LimitRemaining: 0}}, @@ -304,23 +249,16 @@ func TestNearLimit(t *testing.T) { controller := gomock.NewController(t) defer controller.Finish() - pool := mock_redis.NewMockPool(controller) + client := mock_redis.NewMockClient(controller) timeSource := mock_redis.NewMockTimeSource(controller) - connection := mock_redis.NewMockConnection(controller) - response := mock_redis.NewMockResponse(controller) - cache := redis.NewRateLimitCacheImpl(pool, nil, timeSource, rand.New(rand.NewSource(1)), 0, nil) + cache := redis.NewRateLimitCacheImpl(client, nil, timeSource, rand.New(rand.NewSource(1)), 0, nil) statsStore := stats.NewStore(stats.NewNullSink(), false) // Test Near Limit Stats. Under Near Limit Ratio - pool.EXPECT().Get().Return(connection) timeSource.EXPECT().UnixNow().Return(int64(1000000)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key4_value4_997200", uint32(1)) - connection.EXPECT().PipeAppend( + client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(0, uint32(11)) + client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)) - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(11)) - connection.EXPECT().PipeResponse() - pool.EXPECT().Put(connection) request := common.NewRateLimitRequest("domain", [][][2]string{{{"key4", "value4"}}}, 1) @@ -336,15 +274,10 @@ func TestNearLimit(t *testing.T) { assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) // Test Near Limit Stats. At Near Limit Ratio, still OK - pool.EXPECT().Get().Return(connection) timeSource.EXPECT().UnixNow().Return(int64(1000000)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key4_value4_997200", uint32(1)) - connection.EXPECT().PipeAppend( + client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(0, uint32(13)) + client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)) - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(13)) - connection.EXPECT().PipeResponse() - pool.EXPECT().Put(connection) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{ @@ -356,15 +289,10 @@ func TestNearLimit(t *testing.T) { // Test Near Limit Stats. We went OVER_LIMIT, but the near_limit counter only increases // when we are near limit, not after we have passed the limit. - pool.EXPECT().Get().Return(connection) timeSource.EXPECT().UnixNow().Return(int64(1000000)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key4_value4_997200", uint32(1)) - connection.EXPECT().PipeAppend( + client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key4_value4_997200", uint32(1)).SetArg(0, uint32(16)) + client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key4_value4_997200", int64(3600)) - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(16)) - connection.EXPECT().PipeResponse() - pool.EXPECT().Put(connection) assert.Equal( []*pb.RateLimitResponse_DescriptorStatus{ @@ -376,14 +304,9 @@ func TestNearLimit(t *testing.T) { // Now test hitsAddend that is greater than 1 // All of it under limit, under near limit - pool.EXPECT().Get().Return(connection) timeSource.EXPECT().UnixNow().Return(int64(1234)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key5_value5_1234", uint32(3)) - connection.EXPECT().PipeAppend("EXPIRE", "domain_key5_value5_1234", int64(1)) - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(5)) - connection.EXPECT().PipeResponse() - pool.EXPECT().Put(connection) + client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key5_value5_1234", uint32(3)).SetArg(0, uint32(5)) + client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key5_value5_1234", int64(1)) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key5", "value5"}}}, 3) limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, "key5_value5", statsStore)} @@ -396,14 +319,9 @@ func TestNearLimit(t *testing.T) { assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) // All of it under limit, some over near limit - pool.EXPECT().Get().Return(connection) timeSource.EXPECT().UnixNow().Return(int64(1234)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key6_value6_1234", uint32(2)) - connection.EXPECT().PipeAppend("EXPIRE", "domain_key6_value6_1234", int64(1)) - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(7)) - connection.EXPECT().PipeResponse() - pool.EXPECT().Put(connection) + client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key6_value6_1234", uint32(2)).SetArg(0, uint32(7)) + client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key6_value6_1234", int64(1)) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key6", "value6"}}}, 2) limits = []*config.RateLimit{config.NewRateLimit(8, pb.RateLimitResponse_RateLimit_SECOND, "key6_value6", statsStore)} @@ -416,14 +334,9 @@ func TestNearLimit(t *testing.T) { assert.Equal(uint64(1), limits[0].Stats.NearLimit.Value()) // All of it under limit, all of it over near limit - pool.EXPECT().Get().Return(connection) timeSource.EXPECT().UnixNow().Return(int64(1234)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key7_value7_1234", uint32(3)) - connection.EXPECT().PipeAppend("EXPIRE", "domain_key7_value7_1234", int64(1)) - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(19)) - connection.EXPECT().PipeResponse() - pool.EXPECT().Put(connection) + client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key7_value7_1234", uint32(3)).SetArg(0, uint32(19)) + client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key7_value7_1234", int64(1)) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key7", "value7"}}}, 3) limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, "key7_value7", statsStore)} @@ -436,14 +349,9 @@ func TestNearLimit(t *testing.T) { assert.Equal(uint64(3), limits[0].Stats.NearLimit.Value()) // Some of it over limit, all of it over near limit - pool.EXPECT().Get().Return(connection) timeSource.EXPECT().UnixNow().Return(int64(1234)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key8_value8_1234", uint32(3)) - connection.EXPECT().PipeAppend("EXPIRE", "domain_key8_value8_1234", int64(1)) - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(22)) - connection.EXPECT().PipeResponse() - pool.EXPECT().Put(connection) + client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key8_value8_1234", uint32(3)).SetArg(0, uint32(22)) + client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key8_value8_1234", int64(1)) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key8", "value8"}}}, 3) limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, "key8_value8", statsStore)} @@ -456,14 +364,9 @@ func TestNearLimit(t *testing.T) { assert.Equal(uint64(1), limits[0].Stats.NearLimit.Value()) // Some of it in all three places - pool.EXPECT().Get().Return(connection) timeSource.EXPECT().UnixNow().Return(int64(1234)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key9_value9_1234", uint32(7)) - connection.EXPECT().PipeAppend("EXPIRE", "domain_key9_value9_1234", int64(1)) - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(22)) - connection.EXPECT().PipeResponse() - pool.EXPECT().Put(connection) + client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key9_value9_1234", uint32(7)).SetArg(0, uint32(22)) + client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key9_value9_1234", int64(1)) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key9", "value9"}}}, 7) limits = []*config.RateLimit{config.NewRateLimit(20, pb.RateLimitResponse_RateLimit_SECOND, "key9_value9", statsStore)} @@ -476,14 +379,9 @@ func TestNearLimit(t *testing.T) { assert.Equal(uint64(4), limits[0].Stats.NearLimit.Value()) // all of it over limit - pool.EXPECT().Get().Return(connection) timeSource.EXPECT().UnixNow().Return(int64(1234)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key10_value10_1234", uint32(3)) - connection.EXPECT().PipeAppend("EXPIRE", "domain_key10_value10_1234", int64(1)) - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(30)) - connection.EXPECT().PipeResponse() - pool.EXPECT().Put(connection) + client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key10_value10_1234", uint32(3)).SetArg(0, uint32(30)) + client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key10_value10_1234", int64(1)) request = common.NewRateLimitRequest("domain", [][][2]string{{{"key10", "value10"}}}, 3) limits = []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, "key10_value10", statsStore)} @@ -501,23 +399,16 @@ func TestRedisWithJitter(t *testing.T) { controller := gomock.NewController(t) defer controller.Finish() - pool := mock_redis.NewMockPool(controller) + client := mock_redis.NewMockClient(controller) timeSource := mock_redis.NewMockTimeSource(controller) - connection := mock_redis.NewMockConnection(controller) - response := mock_redis.NewMockResponse(controller) jitterSource := mock_redis.NewMockJitterRandSource(controller) - cache := redis.NewRateLimitCacheImpl(pool, nil, timeSource, rand.New(jitterSource), 3600, nil) + cache := redis.NewRateLimitCacheImpl(client, nil, timeSource, rand.New(jitterSource), 3600, nil) statsStore := stats.NewStore(stats.NewNullSink(), false) - pool.EXPECT().Get().Return(connection) timeSource.EXPECT().UnixNow().Return(int64(1234)) jitterSource.EXPECT().Int63().Return(int64(100)) - connection.EXPECT().PipeAppend("INCRBY", "domain_key_value_1234", uint32(1)) - connection.EXPECT().PipeAppend("EXPIRE", "domain_key_value_1234", int64(101)) - connection.EXPECT().PipeResponse().Return(response) - response.EXPECT().Int().Return(int64(5)) - connection.EXPECT().PipeResponse() - pool.EXPECT().Put(connection) + client.EXPECT().DoCmd(gomock.Any(), "INCRBY", "domain_key_value_1234", uint32(1)).SetArg(0, uint32(5)) + client.EXPECT().DoCmd(gomock.Any(), "EXPIRE", "domain_key_value_1234", int64(101)) request := common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value"}}}, 1) limits := []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, "key_value", statsStore)} @@ -529,3 +420,106 @@ func TestRedisWithJitter(t *testing.T) { assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value()) assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value()) } + +func mustNewRedisServer() *miniredis.Miniredis { + srv, err := miniredis.Run() + if err != nil { + panic(err) + } + + return srv +} + +func TestNewClientImpl(t *testing.T) { + redisAuth := "123" + statsStore := stats.NewStore(stats.NewNullSink(), false) + + mkRedisClient := func(auth, addr string) redis.Client { + return redis.NewClientImpl(statsStore, false, auth, addr, 1, 1*time.Millisecond, 1) + } + + t.Run("connection refused", func(t *testing.T) { + assert.PanicsWithError(t, "dial tcp 127.0.0.1:12345: connect: connection refused", func() { + // It's possible there is a redis server listening on 6379 in ci environment, so + // use a random port. + mkRedisClient("", "localhost:12345") + }) + }) + + t.Run("ok", func(t *testing.T) { + redisSrv := mustNewRedisServer() + defer redisSrv.Close() + + var client redis.Client + assert.NotPanics(t, func() { + client = mkRedisClient("", redisSrv.Addr()) + }) + assert.NotNil(t, client) + }) + + t.Run("auth fail", func(t *testing.T) { + redisSrv := mustNewRedisServer() + defer redisSrv.Close() + + redisSrv.RequireAuth(redisAuth) + + assert.PanicsWithError(t, "NOAUTH Authentication required.", func() { + mkRedisClient("", redisSrv.Addr()) + }) + }) + + t.Run("auth pass", func(t *testing.T) { + redisSrv := mustNewRedisServer() + defer redisSrv.Close() + + redisSrv.RequireAuth(redisAuth) + + assert.NotPanics(t, func() { + mkRedisClient(redisAuth, redisSrv.Addr()) + }) + }) +} + +func TestDoCmd(t *testing.T) { + statsStore := stats.NewStore(stats.NewNullSink(), false) + + mkRedisClient := func(addr string) redis.Client { + return redis.NewClientImpl(statsStore, false, "", addr, 1, 0, 0) + } + + t.Run("SETGET ok", func(t *testing.T) { + redisSrv := mustNewRedisServer() + defer redisSrv.Close() + + client := mkRedisClient(redisSrv.Addr()) + var res string + + assert.Nil(t, client.DoCmd(nil, "SET", "foo", "bar")) + assert.Nil(t, client.DoCmd(&res, "GET", "foo")) + assert.Equal(t, "bar", res) + }) + + t.Run("INCRBY ok", func(t *testing.T) { + redisSrv := mustNewRedisServer() + defer redisSrv.Close() + + client := mkRedisClient(redisSrv.Addr()) + var res uint32 + hits := uint32(1) + + assert.Nil(t, client.DoCmd(&res, "INCRBY", "a", hits)) + assert.Equal(t, hits, res) + assert.Nil(t, client.DoCmd(&res, "INCRBY", "a", hits)) + assert.Equal(t, uint32(2), res) + }) + + t.Run("connection broken", func(t *testing.T) { + redisSrv := mustNewRedisServer() + client := mkRedisClient(redisSrv.Addr()) + + assert.Nil(t, client.DoCmd(nil, "SET", "foo", "bar")) + + redisSrv.Close() + assert.EqualError(t, client.DoCmd(nil, "GET", "foo"), "EOF") + }) +}