Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In-memory message cache #3

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cache-plugin.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
value: 42
24 changes: 9 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,25 +1,24 @@
module github.com/liftbridge-io/liftbridge

go 1.12
go 1.13

replace github.com/liftbridge-io/liftbridge/server/plugin => ./server/plugin
replace github.com/liftbridge-io/liftbridge/server/plugin/cache => ./server/plugin/cache
replace github.com/liftbridge-io/liftbridge/server/plugin/cache/api => ./server/plugin/cache/api

require (
github.com/Workiva/go-datastructures v1.0.50
github.com/dustin/go-humanize v1.0.0
github.com/golang/protobuf v1.3.1
github.com/golang/protobuf v1.3.2
github.com/hako/durafmt v0.0.0-20190612201238-650ed9f29a84
github.com/hashicorp/go-hclog v0.9.2 // indirect
github.com/hashicorp/go-immutable-radix v1.1.0 // indirect
github.com/hashicorp/go-uuid v1.0.1 // indirect
github.com/hashicorp/golang-lru v0.5.1 // indirect
github.com/hashicorp/raft v1.1.0
github.com/hashicorp/raft-boltdb v0.0.0-20190605210249-ef2e128ed477
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/liftbridge-io/go-liftbridge v0.0.0-20190831233708-6fbf530bb220
github.com/liftbridge-io/liftbridge-grpc v0.0.0-20190829220806-66e3ee4b7943
github.com/liftbridge-io/liftbridge/server/plugin v0.0.0-00010101000000-000000000000
github.com/liftbridge-io/liftbridge/server/plugin/cache v0.0.0-00010101000000-000000000000
github.com/liftbridge-io/nats-on-a-log v0.0.0-20190703144237-760cefbfc85e
github.com/natefinch/atomic v0.0.0-20150920032501-a62ce929ffcc
github.com/nats-io/gnatsd v1.4.1 // indirect
github.com/nats-io/jwt v0.2.8 // indirect
github.com/nats-io/nats-server/v2 v2.0.0
github.com/nats-io/nats.go v1.8.1
github.com/nats-io/nuid v1.0.1
Expand All @@ -28,10 +27,5 @@ require (
github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.3.0
github.com/urfave/cli v1.20.0
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 // indirect
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb // indirect
golang.org/x/text v0.3.2 // indirect
google.golang.org/genproto v0.0.0-20190701230453-710ae3a149df // indirect
google.golang.org/grpc v1.22.0
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
google.golang.org/grpc v1.24.0
)
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFU
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down Expand Up @@ -110,6 +112,7 @@ github.com/liftbridge-io/go-liftbridge v0.0.0-20190831233708-6fbf530bb220/go.mod
github.com/liftbridge-io/liftbridge v0.0.0-20190628061900-5f565727d49f/go.mod h1:DgUnOkzaVLgIOQ3W7ztMm0+xQmDaEM+tcgGCAdlZj8s=
github.com/liftbridge-io/liftbridge v0.0.0-20190704001405-928a9d17c609/go.mod h1:d9iQ/6VLp7paFMnNukU2nV1PnNjkz2vpNQzYS4zIy8Y=
github.com/liftbridge-io/liftbridge v0.0.0-20190831212313-ad1b5f9c2b17/go.mod h1:hnwhdV70dIFgIKvzBJt9rOfce19kE2a3ZGaRgSQmNOs=
github.com/liftbridge-io/liftbridge v0.0.0-20191009142300-f92ab643e701/go.mod h1:kHj0WRCX0aHXZrOq8hlCnoU8M5SurFqsJLdyZ94J2Go=
github.com/liftbridge-io/liftbridge-grpc v0.0.0-20190829220806-66e3ee4b7943 h1:7p6n1IhML9EYNhcdTci3haK09oP/e/TGMWN8DFVaxAE=
github.com/liftbridge-io/liftbridge-grpc v0.0.0-20190829220806-66e3ee4b7943/go.mod h1:ObGO38WdO4ldLsa2oUFcultUk0rggc+yZWcBb7qjnDI=
github.com/liftbridge-io/nats-on-a-log v0.0.0-20180718011723-80d0727461af h1:2m2YbqghQzLF+8uWDMbrFvCK5FBHOdarbywK9v6tclE=
Expand Down Expand Up @@ -293,6 +296,8 @@ google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiq
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.22.0 h1:J0UbZOIrCAl+fpTOf8YLs4dJo8L/owV4LYVtAXQoPkw=
google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.24.0 h1:vb/1TCsVn3DcJlQ0Gs1yB1pKI6Do2/QNwxdKqmc/b0s=
google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/urfave/cli"

"github.com/liftbridge-io/liftbridge/server"
cachePlugin "github.com/liftbridge-io/liftbridge/server/plugin/cache"
)

const version = "0.0.1"
Expand Down Expand Up @@ -76,7 +77,8 @@ func main() {
}

server := server.New(config)
if err := server.Start(); err != nil {
// TODO: should load the cache plugin following a configuration entry
if err := server.Start(cachePlugin.New()); err != nil {
return err
}
runtime.Goexit()
Expand Down
32 changes: 32 additions & 0 deletions server/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,38 @@ func (m *metadataAPI) ReportLeader(ctx context.Context, req *proto.ReportLeaderO
func (m *metadataAPI) AddPartition(protoPartition *proto.Partition, recovered bool) (*partition, error) {
m.mu.Lock()
defer m.mu.Unlock()

// Subscribe this partition's stream
st, ok := m.streams[protoPartition.Stream]
if ok {
part, ok := st.partitions[protoPartition.Id]
if ok && part != nil {
apiServer := &apiServer{m.Server}

req := &client.SubscribeRequest{
Stream: protoPartition.Stream,
Partition: protoPartition.Id,
StartPosition: client.StartPosition_EARLIEST,
StartOffset: 0,
StartTimestamp: 0,
}
cancel := make(chan struct{})
ch, _, _ := apiServer.subscribe(context.Background(), part, req, cancel)
stream := protoPartition.Stream
m.Server.startGoroutine(func() {
for {
select {
case msg := <-ch:
for _, plugin := range m.plugins {
plugin.MessageReceived(stream, msg)
}
}
}
cancel <- struct{}{}
})
}
}

partition, err := m.addPartition(protoPartition, recovered, m.newPartition)
if err != nil {
return nil, err
Expand Down
9 changes: 9 additions & 0 deletions server/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,7 @@ func (p *partition) messageProcessingLoop(recvChan <-chan *nats.Msg, stop <-chan
batchWait = p.srv.config.BatchWaitTime
msgBatch = make([]*proto.Message, 0, batchSize)
)
LOOP:
for {
msgBatch = msgBatch[:0]
select {
Expand All @@ -560,6 +561,14 @@ func (p *partition) messageProcessingLoop(recvChan <-chan *nats.Msg, stop <-chan
}

m := natsToProtoMessage(msg, leaderEpoch)

for _, plugin := range p.srv.plugins {
// Ignore the message if one of the plugins requests it
if !plugin.ProcessMessage(p.GetStream(), p.GetSubject(), *m) {
continue LOOP
}
}

msgBatch = append(msgBatch, m)
remaining := batchSize - 1

Expand Down
10 changes: 10 additions & 0 deletions server/plugin/cache/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
.PHONY: all
all: go py

.PHONY: go
go:
protoc --gofast_out=plugins=grpc:api api.proto

.PHONY: py
py:
python -m grpc_tools.protoc -I. --python_out=py --grpc_python_out=py api.proto
17 changes: 17 additions & 0 deletions server/plugin/cache/api.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
syntax = "proto3";

package api;

message GetRequest {
string key = 1;
string stream = 2;
string subject = 3;
}

message GetResponse {
bytes value = 1;
}

service CacheAPI {
rpc Get(GetRequest) returns (GetResponse) {}
}
Loading