Skip to content

Commit

Permalink
Implement capnproto replication (#7659)
Browse files Browse the repository at this point in the history
* Implement capnproto replication

Our profiles from production show that a lot of CPU and memory in receivers
is used for unmarshaling protobuf messages. Although it is not possible to change
the remote-write format, we have the freedom to change the protocol used
for replicating timeseries data.

This commit introduces a new feature in receivers where replication can be done
using Cap'n Proto instead of gRPC + Protobuf. The advantage of the former protocol
is that deserialization is far cheaper and fields can be accessed directly from
the received message (byte slice) without allocating intermediate objects.
There is an additional cost for serialization because we have to convert from
Protobuf to the Cap'n proto format, but in our setup this still results in a net
reduction in resource usage.

Signed-off-by: Filip Petkovski <[email protected]>

* Pass logger

Signed-off-by: Filip Petkovski <[email protected]>

* Update capnp

Signed-off-by: Filip Petkovski <[email protected]>

* Modify flag

Signed-off-by: Filip Petkovski <[email protected]>

* Lint

Signed-off-by: Filip Petkovski <[email protected]>

* Fix spellcheck

Signed-off-by: Filip Petkovski <[email protected]>

* Use previous version

Signed-off-by: Filip Petkovski <[email protected]>

* Update docker base

Signed-off-by: Filip Petkovski <[email protected]>

* Bump go

Signed-off-by: Filip Petkovski <[email protected]>

* Update docs/components/receive.md

Co-authored-by: Pedro Tanaka <[email protected]>
Signed-off-by: Filip Petkovski <[email protected]>

* Validate labels

Signed-off-by: Filip Petkovski <[email protected]>

* e2e: add receive test with capnp replication

Signed-off-by: Giedrius Statkevičius <[email protected]>

* receive: make copy only when necessary

Signed-off-by: Giedrius Statkevičius <[email protected]>

* Fix failing test

Signed-off-by: Filip Petkovski <[email protected]>

* Add CHANGELOG entry

Signed-off-by: Filip Petkovski <[email protected]>

* Add capnproto Make target

Signed-off-by: Filip Petkovski <[email protected]>

* Replace panics with errors

Signed-off-by: Filip Petkovski <[email protected]>

* Fix benchmark

Signed-off-by: Filip Petkovski <[email protected]>

* Fix CHANGELOG

Signed-off-by: Filip Petkovski <[email protected]>

---------

Signed-off-by: Filip Petkovski <[email protected]>
Signed-off-by: Giedrius Statkevičius <[email protected]>
Co-authored-by: Pedro Tanaka <[email protected]>
Co-authored-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
3 people authored Oct 17, 2024
1 parent 274f95e commit 65b664c
Show file tree
Hide file tree
Showing 33 changed files with 4,163 additions and 349 deletions.
6 changes: 6 additions & 0 deletions .bingo/Variables.mk
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ $(BINGO): $(BINGO_DIR)/bingo.mod
@echo "(re)installing $(GOBIN)/bingo-v0.9.0"
@cd $(BINGO_DIR) && GOWORK=off $(GO) build -mod=mod -modfile=bingo.mod -o=$(GOBIN)/bingo-v0.9.0 "github.com/bwplotka/bingo"

CAPNPC_GO := $(GOBIN)/capnpc-go-v3.0.1-alpha.2.0.20240830165715-46ccd63a72af
$(CAPNPC_GO): $(BINGO_DIR)/capnpc-go.mod
@# Install binary/ries using Go 1.14+ build command. This is using bwplotka/bingo-controlled, separate go module with pinned dependencies.
@echo "(re)installing $(GOBIN)/capnpc-go-v3.0.1-alpha.2.0.20240830165715-46ccd63a72af"
@cd $(BINGO_DIR) && GOWORK=off $(GO) build -mod=mod -modfile=capnpc-go.mod -o=$(GOBIN)/capnpc-go-v3.0.1-alpha.2.0.20240830165715-46ccd63a72af "capnproto.org/go/capnp/v3/capnpc-go"

FAILLINT := $(GOBIN)/faillint-v1.13.0
$(FAILLINT): $(BINGO_DIR)/faillint.mod
@# Install binary/ries using Go 1.14+ build command. This is using bwplotka/bingo-controlled, separate go module with pinned dependencies.
Expand Down
5 changes: 5 additions & 0 deletions .bingo/capnpc-go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module _ // Auto generated by https://github.com/bwplotka/bingo. DO NOT EDIT

go 1.23.1

require capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af // capnpc-go
6 changes: 6 additions & 0 deletions .bingo/capnpc-go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af h1:A5wxH0ZidOtYYUGjhtBaRuB87M73bGfc06uWB8sHpg0=
capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af/go.mod h1:2vT5D2dtG8sJGEoEKU17e+j7shdaYp1Myl8X03B3hmc=
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 h1:d5EKgQfRQvO97jnISfR89AiCCCJMwMFoSxUiU0OGCRU=
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381/go.mod h1:OU76gHeRo8xrzGJU3F3I1CqX1ekM8dfJw0+wPeMwnp0=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
2 changes: 2 additions & 0 deletions .bingo/variables.env
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ ALERTMANAGER="${GOBIN}/alertmanager-v0.27.0"

BINGO="${GOBIN}/bingo-v0.9.0"

CAPNPC_GO="${GOBIN}/capnpc-go-v3.0.1-alpha.2.0.20240830165715-46ccd63a72af"

FAILLINT="${GOBIN}/faillint-v1.13.0"

GOIMPORTS="${GOBIN}/goimports-v0.23.0"
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7658](https://github.com/thanos-io/thanos/pull/7658) Store: Fix panic because too small buffer in pool.
- [#7643](https://github.com/thanos-io/thanos/pull/7643) Receive: fix thanos_receive_write_{timeseries,samples} stats
- [#7644](https://github.com/thanos-io/thanos/pull/7644) fix(ui): add null check to find overlapping blocks logic
- [#7814](https://github.com/thanos-io/thanos/pull/7814) Store: label_values: if matchers contain __name__=="something", do not add <labelName> != "" to fetch less postings.
- [#7814](https://github.com/thanos-io/thanos/pull/7814) Store: label_values: if matchers contain **name**=="something", do not add <labelname> != "" to fetch less postings.
- [#7679](https://github.com/thanos-io/thanos/pull/7679) Query: respect store.limit.* flags when evaluating queries
- [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796.

Expand All @@ -29,6 +29,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7429](https://github.com/thanos-io/thanos/pull/7429): Reloader: introduce `TolerateEnvVarExpansionErrors` to allow suppressing errors when expanding environment variables in the configuration file. When set, this will ensure that the reloader won't consider the operation to fail when an unset environment variable is encountered. Note that all unset environment variables are left as is, whereas all set environment variables are expanded as usual.
- [#7560](https://github.com/thanos-io/thanos/pull/7560) Query: Added the possibility of filtering rules by rule_name, rule_group or file to HTTP api.
- [#7652](https://github.com/thanos-io/thanos/pull/7652) Store: Implement metadata API limit in stores.
- [#7659](https://github.com/thanos-io/thanos/pull/7659) Receive: Add support for replication using [Cap'n Proto](https://capnproto.org/). This protocol has a lower CPU and memory footprint, which leads to a reduction in resource usage in Receivers. Before enabling it, make sure that all receivers are updated to a version which supports this replication method.

### Changed

Expand Down
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,13 @@ proto: ## Generates Go files from Thanos proto files.
proto: check-git $(GOIMPORTS) $(PROTOC) $(PROTOC_GEN_GOGOFAST)
@GOIMPORTS_BIN="$(GOIMPORTS)" PROTOC_BIN="$(PROTOC)" PROTOC_GEN_GOGOFAST_BIN="$(PROTOC_GEN_GOGOFAST)" PROTOC_VERSION="$(PROTOC_VERSION)" scripts/genproto.sh

.PHONY: capnp
capnp: ## Generates Go files from Thanos capnproto files.
capnp: check-git
capnp compile -I $(shell go list -m -f '{{.Dir}}' capnproto.org/go/capnp/v3)/std -ogo pkg/receive/writecapnp/write_request.capnp
@$(GOIMPORTS) -w pkg/receive/writecapnp/write_request.capnp.go
go run ./scripts/copyright

.PHONY: tarballs-release
tarballs-release: ## Build tarballs.
tarballs-release: $(PROMU)
Expand Down
54 changes: 43 additions & 11 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package main

import (
"context"
"fmt"
"net"
"os"
"path"
"strings"
Expand Down Expand Up @@ -271,6 +273,7 @@ func runReceive(
Limiter: limiter,

AsyncForwardWorkerCount: conf.asyncForwardWorkerCount,
ReplicationProtocol: receive.ReplicationProtocol(conf.replicationProtocol),
})

grpcProbe := prober.NewGRPC()
Expand Down Expand Up @@ -465,6 +468,26 @@ func runReceive(
}
}

{
capNProtoWriter := receive.NewCapNProtoWriter(logger, dbs, &receive.CapNProtoWriterOptions{
TooFarInFutureTimeWindow: int64(time.Duration(*conf.tsdbTooFarInFutureTimeWindow)),
})
handler := receive.NewCapNProtoHandler(logger, capNProtoWriter)
listener, err := net.Listen("tcp", conf.replicationAddr)
if err != nil {
return err
}
server := receive.NewCapNProtoServer(listener, handler, logger)
g.Add(func() error {
return server.ListenAndServe()
}, func(err error) {
server.Shutdown()
if err := listener.Close(); err != nil {
level.Warn(logger).Log("msg", "Cap'n Proto server did not shut down gracefully", "err", err.Error())
}
})
}

level.Info(logger).Log("msg", "starting receiver")
return nil
}
Expand Down Expand Up @@ -795,6 +818,7 @@ type receiveConfig struct {

grpcConfig grpcConfig

replicationAddr string
rwAddress string
rwServerCert string
rwServerKey string
Expand All @@ -816,17 +840,18 @@ type receiveConfig struct {
hashringsFileContent string
hashringsAlgorithm string

refreshInterval *model.Duration
endpoint string
tenantHeader string
tenantField string
tenantLabelName string
defaultTenantID string
replicaHeader string
replicationFactor uint64
forwardTimeout *model.Duration
maxBackoff *model.Duration
compression string
refreshInterval *model.Duration
endpoint string
tenantHeader string
tenantField string
tenantLabelName string
defaultTenantID string
replicaHeader string
replicationFactor uint64
forwardTimeout *model.Duration
maxBackoff *model.Duration
compression string
replicationProtocol string

tsdbMinBlockDuration *model.Duration
tsdbMaxBlockDuration *model.Duration
Expand Down Expand Up @@ -929,6 +954,13 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64Var(&rc.replicationFactor)

replicationProtocols := []string{string(receive.ProtobufReplication), string(receive.CapNProtoReplication)}
cmd.Flag("receive.replication-protocol", "The protocol to use for replicating remote-write requests. One of "+strings.Join(replicationProtocols, ", ")).
Default(string(receive.ProtobufReplication)).
EnumVar(&rc.replicationProtocol, replicationProtocols...)

cmd.Flag("receive.capnproto-address", "Address for the Cap'n Proto server.").Default(fmt.Sprintf("0.0.0.0:%s", receive.DefaultCapNProtoPort)).StringVar(&rc.replicationAddr)

rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden())

rc.maxBackoff = extkingpin.ModelDuration(cmd.Flag("receive-forward-max-backoff", "Maximum backoff for each forward fan-out request").Default("5s").Hidden())
Expand Down
27 changes: 26 additions & 1 deletion docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ If you are using the `hashmod` algorithm and wish to migrate to `ketama`, the si

This algorithm uses a `hashmod` function over all labels to decide which receiver is responsible for a given timeseries. This is the default algorithm due to historical reasons. However, its usage for new Receive installations is discouraged since adding new Receiver nodes leads to series churn and memory usage spikes.

### Replication protocols

By default, Receivers replicate data using Protobuf over gRPC. Deserializing protobuf-encoded messages can be resource-intensive and cause significant GC pressure. Alternatively, you can use [Cap'N Proto](https://capnproto.org/) for replication encoding and as the RPC framework.

In order to enable this mode, you can use the `receive.replication-protocol=capnproto` option on the receiver. Thanos will try to infer the Cap'N Proto address of each peer in the hashring using the existing gRPC address. You can also explicitly set the Cap'N Proto as follows:

```json
[
{
"endpoints": [
{"address": "node-1:10901", "capnproto_address": "node-1:19391"},
{"address": "node-2:10901", "capnproto_address": "node-2:19391"},
{"address": "node-3:10901", "capnproto_address": "node-3:19391"}
]
}
]
```

### Hashring management and autoscaling in Kubernetes

The [Thanos Receive Controller](https://github.com/observatorium/thanos-receive-controller) project aims to automate hashring management when running Thanos in Kubernetes. In combination with the Ketama hashring algorithm, this controller can also be used to keep hashrings up to date when Receivers are scaled automatically using an HPA or [Keda](https://keda.sh/).
Expand Down Expand Up @@ -312,7 +330,8 @@ Please see the metric `thanos_receive_forward_delay_seconds` to see if you need

The following formula is used for calculating quorum:

```go mdox-exec="sed -n '999,1008p' pkg/receive/handler.go"
```go mdox-exec="sed -n '1012,1022p' pkg/receive/handler.go"
// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success.
func (h *Handler) writeQuorum() int {
// NOTE(GiedriusS): this is here because otherwise RF=2 doesn't make sense as all writes
// would need to succeed all the time. Another way to think about it is when migrating
Expand Down Expand Up @@ -392,6 +411,8 @@ Flags:
Path to YAML file that contains object
store configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--receive.capnproto-address="0.0.0.0:19391"
Address for the Cap'n Proto server.
--receive.default-tenant-id="default-tenant"
Default tenant ID to use when none is provided
via a header.
Expand Down Expand Up @@ -438,6 +459,10 @@ Flags:
--receive.replication-factor=1
How many times to replicate incoming write
requests.
--receive.replication-protocol=protobuf
The protocol to use for replicating
remote-write requests. One of protobuf,
capnproto
--receive.split-tenant-label-name=""
Label name through which the request will
be split into multiple tenants. This takes
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ require (
)

require (
capnproto.org/go/capnp/v3 v3.0.1-alpha.1
github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1
github.com/hashicorp/golang-lru/v2 v2.0.7
Expand All @@ -132,6 +133,7 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect
github.com/cilium/ebpf v0.11.0 // indirect
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 // indirect
github.com/containerd/cgroups/v3 v3.0.3 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/go-licenser v0.3.1 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
capnproto.org/go/capnp/v3 v3.0.1-alpha.1 h1:hYEclwXEKsnu+PdHASdx3nLP0fC9kZnR+x1CEvMp9ck=
capnproto.org/go/capnp/v3 v3.0.1-alpha.1/go.mod h1:B+ZjwFmHwTYv201x6CdIo7MmDC/TROJDa00kbjTnv1s=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU=
Expand Down Expand Up @@ -1496,6 +1498,8 @@ github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw=
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 h1:d5EKgQfRQvO97jnISfR89AiCCCJMwMFoSxUiU0OGCRU=
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381/go.mod h1:OU76gHeRo8xrzGJU3F3I1CqX1ekM8dfJw0+wPeMwnp0=
github.com/containerd/cgroups/v3 v3.0.3 h1:S5ByHZ/h9PMe5IOQoN7E+nMc2UcLEM/V48DGDJ9kip0=
github.com/containerd/cgroups/v3 v3.0.3/go.mod h1:8HBe7V3aWGLFPd/k03swSIsGjZhHI2WzJmticMgVuz0=
github.com/coreos/go-systemd/v22 v22.4.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand Down Expand Up @@ -2094,6 +2098,8 @@ github.com/ovh/go-ovh v1.6.0 h1:ixLOwxQdzYDx296sXcgS35TOPEahJkpjMGtzPadCjQI=
github.com/ovh/go-ovh v1.6.0/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC7c=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ=
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY=
github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
github.com/phpdave11/gofpdi v1.0.13/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
Expand Down Expand Up @@ -2249,6 +2255,10 @@ github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31 h1:xPaP58g
github.com/thanos-io/promql-engine v0.0.0-20240921092401-37747eddbd31/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0=
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk=
github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk=
github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek=
github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
Expand Down
108 changes: 108 additions & 0 deletions pkg/receive/capnp_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package receive

import (
"context"
"net"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"

"github.com/thanos-io/thanos/pkg/receive/writecapnp"
"github.com/thanos-io/thanos/pkg/runutil"
)

type CapNProtoServer struct {
listener net.Listener
server writecapnp.Writer
logger log.Logger
}

func NewCapNProtoServer(listener net.Listener, handler *CapNProtoHandler, logger log.Logger) *CapNProtoServer {
return &CapNProtoServer{
listener: listener,
server: writecapnp.Writer_ServerToClient(handler),
logger: logger,
}
}

func (c *CapNProtoServer) ListenAndServe() error {
for {
conn, err := c.listener.Accept()
if err != nil {
return err
}

go func() {
defer runutil.CloseWithLogOnErr(c.logger, conn, "receive capnp conn")
rpcConn := rpc.NewConn(rpc.NewPackedStreamTransport(conn), &rpc.Options{
// The BootstrapClient is the RPC interface that will be made available
// to the remote endpoint by default.
BootstrapClient: capnp.Client(c.server).AddRef(),
})
<-rpcConn.Done()
}()
}
}

func (c *CapNProtoServer) Shutdown() {
c.server.Release()
}

type CapNProtoHandler struct {
writer *CapNProtoWriter
logger log.Logger
}

func NewCapNProtoHandler(logger log.Logger, writer *CapNProtoWriter) *CapNProtoHandler {
return &CapNProtoHandler{logger: logger, writer: writer}
}

func (c CapNProtoHandler) Write(ctx context.Context, call writecapnp.Writer_write) error {
call.Go()
wr, err := call.Args().Wr()
if err != nil {
return err
}
t, err := wr.Tenant()
if err != nil {
return err
}
req, err := writecapnp.NewRequest(wr)
if err != nil {
return err
}
defer req.Close()

var errs writeErrors
errs.Add(c.writer.Write(ctx, t, req))
if err := errs.ErrOrNil(); err != nil {
level.Debug(c.logger).Log("msg", "failed to handle request", "err", err)
result, allocErr := call.AllocResults()
if allocErr != nil {
return allocErr
}

switch errors.Cause(err) {
case nil:
return nil
case errNotReady:
result.SetError(writecapnp.WriteError_unavailable)
case errUnavailable:
result.SetError(writecapnp.WriteError_unavailable)
case errConflict:
result.SetError(writecapnp.WriteError_alreadyExists)
case errBadReplica:
result.SetError(writecapnp.WriteError_invalidArgument)
default:
result.SetError(writecapnp.WriteError_internal)
}
}

return nil
}
Loading

0 comments on commit 65b664c

Please sign in to comment.