Skip to content

Commit

Permalink
chore: handle peer events in batches
Browse files Browse the repository at this point in the history
Less wg calls per event batch.

Signed-off-by: Dmitriy Matrenichev <[email protected]>
Co-authored-by: Artem Chernyshev <[email protected]>
  • Loading branch information
DmitriyMV and Unix4ever committed Apr 29, 2024
1 parent 5422b1c commit a936b60
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 75 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-03-20T20:16:10Z by kres latest.
# Generated on 2024-04-26T15:13:42Z by kres ebc009d.

name: default
concurrency:
Expand Down Expand Up @@ -45,6 +45,7 @@ jobs:
run: |
git fetch --prune --unshallow
- name: Set up Docker Buildx
id: setup-buildx
uses: docker/setup-buildx-action@v3
with:
driver: remote
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-03-20T20:16:10Z by kres latest.
# Generated on 2024-04-26T15:13:42Z by kres ebc009d.

ARG TOOLCHAIN

# runs markdownlint
FROM docker.io/node:21.7.1-alpine3.19 AS lint-markdown
FROM docker.io/node:21.7.3-alpine3.19 AS lint-markdown
WORKDIR /src
RUN npm i -g [email protected]
RUN npm i [email protected]
Expand Down
11 changes: 6 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-03-20T20:16:10Z by kres latest.
# Generated on 2024-04-26T15:13:42Z by kres ebc009d.

# common variables

Expand All @@ -9,6 +9,7 @@ TAG := $(shell git describe --tag --always --dirty --match v[0-9]\*)
ABBREV_TAG := $(shell git describe --tags >/dev/null 2>/dev/null && git describe --tag --always --match v[0-9]\* --abbrev=0 || echo 'undefined')
BRANCH := $(shell git rev-parse --abbrev-ref HEAD)
ARTIFACTS := _out
IMAGE_TAG ?= $(TAG)
WITH_DEBUG ?= false
WITH_RACE ?= false
REGISTRY ?= ghcr.io
Expand All @@ -19,10 +20,10 @@ GRPC_GO_VERSION ?= 1.3.0
GRPC_GATEWAY_VERSION ?= 2.19.1
VTPROTOBUF_VERSION ?= 0.6.0
DEEPCOPY_VERSION ?= v0.5.6
GOLANGCILINT_VERSION ?= v1.57.0
GOLANGCILINT_VERSION ?= v1.57.2
GOFUMPT_VERSION ?= v0.6.0
GO_VERSION ?= 1.22.1
GOIMPORTS_VERSION ?= v0.19.0
GO_VERSION ?= 1.22.2
GOIMPORTS_VERSION ?= v0.20.0
GO_BUILDFLAGS ?=
GO_LDFLAGS ?=
CGO_ENABLED ?= 0
Expand Down Expand Up @@ -110,7 +111,7 @@ If you already have a compatible builder instance, you may use that instead.
## Artifacts

All artifacts will be output to ./$(ARTIFACTS). Images will be tagged with the
registry "$(REGISTRY)", username "$(USERNAME)", and a dynamic tag (e.g. $(IMAGE):$(TAG)).
registry "$(REGISTRY)", username "$(USERNAME)", and a dynamic tag (e.g. $(IMAGE):$(IMAGE_TAG)).
The registry and username can be overridden by exporting REGISTRY, and USERNAME
respectively.

Expand Down
19 changes: 9 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,34 @@ require (
github.com/google/uuid v1.6.0
github.com/jsimonetti/rtnetlink v1.4.1
github.com/planetscale/vtprotobuf v0.6.0
github.com/siderolabs/gen v0.4.7
github.com/siderolabs/gen v0.4.8
github.com/siderolabs/go-pointer v1.0.0
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba
golang.org/x/sync v0.6.0
golang.org/x/sys v0.18.0
golang.org/x/sync v0.7.0
golang.org/x/sys v0.19.0
golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20230429144221-925a1e7659e6
google.golang.org/grpc v1.62.1
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/josharian/native v1.1.0 // indirect
github.com/mdlayher/genetlink v1.3.2 // indirect
github.com/mdlayher/netlink v1.7.2 // indirect
github.com/mdlayher/socket v0.5.0 // indirect
github.com/mdlayher/socket v0.5.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)
38 changes: 18 additions & 20 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ github.com/cilium/ebpf v0.12.3 h1:8ht6F9MquybnY97at+VDZb3eQQr8ev79RueWeVaEcG4=
github.com/cilium/ebpf v0.12.3/go.mod h1:TctK1ivibvI3znr66ljgi4hqOT8EYQjz1KWBfb1UVgM=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
Expand All @@ -24,16 +22,16 @@ github.com/mdlayher/genetlink v1.3.2 h1:KdrNKe+CTu+IbZnm/GVUMXSqBBLqcGpRDa0xkQy5
github.com/mdlayher/genetlink v1.3.2/go.mod h1:tcC3pkCrPUGIKKsCsp0B3AdaaKuHtaxoJRz3cc+528o=
github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/g=
github.com/mdlayher/netlink v1.7.2/go.mod h1:xraEF7uJbxLhc5fpHL4cPe221LI2bdttWlU+ZGLfQSw=
github.com/mdlayher/socket v0.5.0 h1:ilICZmJcQz70vrWVes1MFera4jGiWNocSkykwwoy3XI=
github.com/mdlayher/socket v0.5.0/go.mod h1:WkcBFfvyG8QENs5+hfQPl1X6Jpd2yeLIYgrGFmJiJxI=
github.com/mdlayher/socket v0.5.1 h1:VZaqt6RkGkt2OE9l3GcC6nZkqD3xKeQLyfleW/uBcos=
github.com/mdlayher/socket v0.5.1/go.mod h1:TjPLHI1UgwEv5J1B5q0zTZq12A/6H7nKmtTanQE37IQ=
github.com/mikioh/ipaddr v0.0.0-20190404000644-d465c8ab6721 h1:RlZweED6sbSArvlE924+mUcZuXKLBHA35U7LN621Bws=
github.com/mikioh/ipaddr v0.0.0-20190404000644-d465c8ab6721/go.mod h1:Ickgr2WtCLZ2MDGd4Gr0geeCH5HybhRJbonOgQpvSxc=
github.com/planetscale/vtprotobuf v0.6.0 h1:nBeETjudeJ5ZgBHUz1fVHvbqUKnYOXNhsIEabROxmNA=
github.com/planetscale/vtprotobuf v0.6.0/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/siderolabs/gen v0.4.7 h1:lM69UYggT7yzpubf7hEFaNujPdY55Y9zvQf/NC18GvA=
github.com/siderolabs/gen v0.4.7/go.mod h1:4PBYMdXxTg292IDRq4CGn5AymyDxJVEDvobVKDqFBEA=
github.com/siderolabs/gen v0.4.8 h1:VNpbmDLhkXp7qcSEkKk1Ee7vU2afs3xvHrWLGR2UuiY=
github.com/siderolabs/gen v0.4.8/go.mod h1:7ROKVHHB68R3Amrd4a1ZXz/oMrXWF3Mg3lSEgnkJY5c=
github.com/siderolabs/go-pointer v1.0.0 h1:6TshPKep2doDQJAAtHUuHWXbca8ZfyRySjSBT/4GsMU=
github.com/siderolabs/go-pointer v1.0.0/go.mod h1:HTRFUNYa3R+k0FFKNv11zgkaCLzEkWVzoYZ433P3kHc=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
Expand All @@ -46,16 +44,16 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba h1:0b9z3AuHCjxk0x/opv64kcgZLBseWJUpBw5I82+2U4M=
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba/go.mod h1:PLyyIXexvUFg3Owu6p/WfdlivPbZJsZdgWZlrGope/Y=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 h1:Di6/M8l0O2lCLc6VVRWhgCiApHV8MnQurBnFSHsQtNY=
golang.org/x/exp v0.0.0-20230725093048-515e97ebf090/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY=
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44=
Expand All @@ -66,10 +64,10 @@ golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173 h1:/jFs0duh4rdb8uI
golang.zx2c4.com/wireguard v0.0.0-20231211153847-12269c276173/go.mod h1:tkCQ4FQXmpAgYVh++1cq16/dH4QJtmvpRv19DWGAHSA=
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20230429144221-925a1e7659e6 h1:CawjfCvYQH2OU3/TnxLx97WDSUDRABfT18pCOYwc2GE=
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20230429144221-925a1e7659e6/go.mod h1:3rxYc4HtVcSG9gVaTs2GEBdehh+sYPOwKtyUWEOTb80=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 h1:IR+hp6ypxjH24bkMfEJ0yHR21+gwPWdV+/IBrPQyn3k=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8/go.mod h1:UCOku4NytXMJuLQE5VuqA5lX3PcHCBo8pxNyvkf4xBs=
google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk=
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:LG9vZxsWGOmUKieR8wPAUR3u3MpnYFQZROPIMaXh7/A=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM=
google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
139 changes: 102 additions & 37 deletions pkg/wireguard/wireguard.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"net"
"net/netip"
"os"
"slices"
"sync"
"time"

"github.com/siderolabs/go-pointer"
"go.uber.org/multierr"
"go.uber.org/zap"
"go4.org/netipx"
"golang.zx2c4.com/wireguard/conn"
Expand Down Expand Up @@ -247,6 +249,17 @@ func (dev *Device) Run(ctx context.Context, logger *zap.Logger, peers PeerSource
tunDeviceWait = tunDevice.Wait()
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

eventsCh, releaseSlice := runPeersDrainer(ctx, peers)

handlePeerEvent := func(events []PeerEvent) error {
defer releaseSlice(events)

return dev.handlePeerEvent(logger, events)
}

for {
select {
case <-ctx.Done():
Expand All @@ -259,8 +272,8 @@ func (dev *Device) Run(ctx context.Context, logger *zap.Logger, peers PeerSource
if err = dev.cleanupPeers(logger); err != nil {
return err
}
case peerEvent := <-peers.EventCh():
if err = dev.handlePeerEvent(logger, peerEvent); err != nil {
case events := <-eventsCh:
if err = handlePeerEvent(events); err != nil {
return err
}
}
Expand Down Expand Up @@ -348,26 +361,37 @@ func (dev *Device) checkDuplicateUpdate(client *wgctrl.Client, logger *zap.Logge
return false, nil
}

func (dev *Device) handlePeerEvent(logger *zap.Logger, peerEvent PeerEvent) error {
func (dev *Device) handlePeerEvent(logger *zap.Logger, peerEvents []PeerEvent) error {
dev.clientMu.Lock()
defer dev.clientMu.Unlock()

var err error

if handler := dev.dc.PeerHandler; handler != nil {
if !peerEvent.Remove {
if err := handler.HandlePeerAdded(peerEvent); err != nil {
return fmt.Errorf("error handling peer added event: %w", err)
for i := 0; i < len(peerEvents); i++ {
var (
peerEvent = peerEvents[i]
handleErr error
)

if peerEvent.Remove {
handleErr = handler.HandlePeerRemoved(peerEvent.PubKey)
} else {
handleErr = handler.HandlePeerAdded(peerEvent)
}
} else {
if err := handler.HandlePeerRemoved(peerEvent.PubKey); err != nil {
return fmt.Errorf("error handling peer removed event: %w", err)

peerEvents = slices.Delete(peerEvents, i, i+1)

if handleErr != nil {
err = multierr.Append(err, fmt.Errorf("peer handler failed on peer event %w", handleErr))
}
}
}

if !peerEvent.Remove {
skipEvent, err := dev.checkDuplicateUpdate(dev.client, logger, peerEvent)
if err != nil {
return err
if len(peerEvents) == 1 && !peerEvents[0].Remove {
skipEvent, duplicateErr := dev.checkDuplicateUpdate(dev.client, logger, peerEvents[0])
if duplicateErr != nil {
return duplicateErr
}

if skipEvent {
Expand All @@ -376,40 +400,46 @@ func (dev *Device) handlePeerEvent(logger *zap.Logger, peerEvent PeerEvent) erro
}

cfg := wgtypes.Config{
Peers: []wgtypes.PeerConfig{
{
PublicKey: peerEvent.PubKey,
Remove: peerEvent.Remove,
},
},
}

if !peerEvent.Remove {
cfg.Peers[0].ReplaceAllowedIPs = true
cfg.Peers[0].AllowedIPs = []net.IPNet{
*netipx.PrefixIPNet(netip.PrefixFrom(peerEvent.Address, peerEvent.Address.BitLen())),
Peers: make([]wgtypes.PeerConfig, 0, len(peerEvents)),
}

for _, peerEvent := range peerEvents {
peerCfg := wgtypes.PeerConfig{
PublicKey: peerEvent.PubKey,
Remove: peerEvent.Remove,
}
cfg.Peers[0].PersistentKeepaliveInterval = peerEvent.PersistentKeepAliveInterval

if peerEvent.Endpoint != "" {
ip, err := netip.ParseAddrPort(peerEvent.Endpoint)
if err != nil {
return fmt.Errorf("failed to parse last endpoint: %w", err)
if !peerEvent.Remove {
peerCfg.ReplaceAllowedIPs = true
peerCfg.AllowedIPs = []net.IPNet{
*netipx.PrefixIPNet(netip.PrefixFrom(peerEvent.Address, peerEvent.Address.BitLen())),
}
peerCfg.PersistentKeepaliveInterval = peerEvent.PersistentKeepAliveInterval

cfg.Peers[0].Endpoint = asUDP(ip)
if peerEvent.Endpoint != "" {
ip, parseErr := netip.ParseAddrPort(peerEvent.Endpoint)
if parseErr != nil {
err = multierr.Append(err, parseErr)

continue
}

peerCfg.Endpoint = asUDP(ip)
}

logger.Info("updating peer", zap.Stringer("public_key", peerEvent.PubKey), zap.Stringer("address", peerEvent.Address))
} else {
logger.Info("removing peer", zap.Stringer("public_key", peerEvent.PubKey))
}

logger.Info("updating peer", zap.Stringer("public_key", peerEvent.PubKey), zap.Stringer("address", peerEvent.Address))
} else {
logger.Info("removing peer", zap.Stringer("public_key", peerEvent.PubKey))
cfg.Peers = append(cfg.Peers, peerCfg)
}

if err := dev.client.ConfigureDevice(dev.ifaceName, cfg); err != nil {
return fmt.Errorf("error configuring Wireguard peers: %w", err)
if confErr := dev.client.ConfigureDevice(dev.ifaceName, cfg); confErr != nil {
err = multierr.Append(err, fmt.Errorf("error configuring Wireguard peers: %w", confErr))
}

return nil
return err
}

func (dev *Device) cleanupPeers(logger *zap.Logger) error {
Expand Down Expand Up @@ -494,3 +524,38 @@ func asUDP(addr netip.AddrPort) *net.UDPAddr {
Zone: addr.Addr().Zone(),
}
}

func runPeersDrainer(ctx context.Context, peers PeerSource) (chan []PeerEvent, func([]PeerEvent)) {
pool := sync.Pool{
New: func() any { return make([]PeerEvent, 0, 100) },
}

resultCh := make(chan []PeerEvent)

go func() {
var (
slc = pool.Get().([]PeerEvent) //nolint:errcheck,forcetypeassert
ch chan []PeerEvent
)

for {
select {
case <-ctx.Done():
return
case peerEvent := <-peers.EventCh():
slc = append(slc, peerEvent)
// allow sending only if we have events in our slice
ch = resultCh
case ch <- slc:
slc = pool.Get().([]PeerEvent) //nolint:errcheck,forcetypeassert
// disallow sending until we get an event
ch = nil
}
}
}()

return resultCh, func(slc []PeerEvent) {
clear(slc)
pool.Put(slc[:0]) //nolint:staticcheck
}
}

0 comments on commit a936b60

Please sign in to comment.