Skip to content

Commit

Permalink
Refactor(pool): use generic buffer/pool (#399)
Browse files Browse the repository at this point in the history
  • Loading branch information
xjasonlyu authored Sep 1, 2024
1 parent fc4c5c4 commit 391d3d9
Show file tree
Hide file tree
Showing 17 changed files with 243 additions and 96 deletions.
28 changes: 13 additions & 15 deletions internal/pool/alloc.go → buffer/allocator/allocator.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
package pool
package allocator

import (
"errors"
"math/bits"
"sync"
)

var _allocator = NewAllocator()
"github.com/xjasonlyu/tun2socks/v2/internal/pool"
)

// Allocator for incoming frames, optimized to prevent overwriting
// after zeroing.
type Allocator struct {
buffers []sync.Pool
buffers []*pool.Pool[[]byte]
}

// NewAllocator initiates a []byte allocator for frames less than
// 65536 bytes, the waste(memory fragmentation) of space allocation
// is guaranteed to be no more than 50%.
func NewAllocator() *Allocator {
// New initiates a []byte allocator for frames less than 65536 bytes,
// the waste(memory fragmentation) of space allocation is guaranteed
// to be no more than 50%.
func New() *Allocator {
alloc := &Allocator{}
alloc.buffers = make([]sync.Pool, 17) // 1B -> 64K
alloc.buffers = make([]*pool.Pool[[]byte], 17) // 1B -> 64K
for k := range alloc.buffers {
i := k
alloc.buffers[k].New = func() any {
alloc.buffers[k] = pool.New(func() []byte {
return make([]byte, 1<<uint32(i))
}
})
}
return alloc
}
Expand All @@ -37,10 +36,10 @@ func (alloc *Allocator) Get(size int) []byte {

b := msb(size)
if size == 1<<b {
return alloc.buffers[b].Get().([]byte)[:size]
return alloc.buffers[b].Get()[:size]
}

return alloc.buffers[b+1].Get().([]byte)[:size]
return alloc.buffers[b+1].Get()[:size]
}

// Put returns a []byte to pool for future use,
Expand All @@ -51,7 +50,6 @@ func (alloc *Allocator) Put(buf []byte) error {
return errors.New("allocator Put() incorrect buffer size")
}

//nolint:staticcheck
alloc.buffers[b].Put(buf)
return nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pool
package allocator

import (
"math/rand"
Expand All @@ -8,7 +8,7 @@ import (
)

func TestAllocGet(t *testing.T) {
alloc := NewAllocator()
alloc := New()
assert.Nil(t, alloc.Get(0))
assert.Equal(t, 1, len(alloc.Get(1)))
assert.Equal(t, 2, len(alloc.Get(2)))
Expand All @@ -23,7 +23,7 @@ func TestAllocGet(t *testing.T) {
}

func TestAllocPut(t *testing.T) {
alloc := NewAllocator()
alloc := New()
assert.NotNil(t, alloc.Put(nil), "put nil misbehavior")
assert.NotNil(t, alloc.Put(make([]byte, 3)), "put elem:3 []bytes misbehavior")
assert.Nil(t, alloc.Put(make([]byte, 4)), "put elem:4 []bytes misbehavior")
Expand All @@ -33,7 +33,7 @@ func TestAllocPut(t *testing.T) {
}

func TestAllocPutThenGet(t *testing.T) {
alloc := NewAllocator()
alloc := New()
data := alloc.Get(4)
_ = alloc.Put(data)
newData := alloc.Get(4)
Expand Down
29 changes: 29 additions & 0 deletions buffer/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Package buffer provides a pool of []byte.
package buffer

import (
"github.com/xjasonlyu/tun2socks/v2/buffer/allocator"
)

const (
// MaxSegmentSize is the largest possible UDP datagram size.
MaxSegmentSize = (1 << 16) - 1

// RelayBufferSize is the default buffer size for TCP relays.
// io.Copy default buffer size is 32 KiB, but the maximum packet
// size of vmess/shadowsocks is about 16 KiB, so define a buffer
// of 20 KiB to reduce the memory of each TCP relay.
RelayBufferSize = 20 << 10
)

var _allocator = allocator.New()

// Get gets a []byte from default allocator with most appropriate cap.
func Get(size int) []byte {
return _allocator.Get(size)
}

// Put returns a []byte to default allocator for future use.
func Put(buf []byte) error {
return _allocator.Put(buf)
}
17 changes: 0 additions & 17 deletions internal/pool/buffer.go

This file was deleted.

47 changes: 31 additions & 16 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,38 @@
// Package pool provides a pool of []byte.
// Package pool provides internal pool utilities.
package pool

const (
// MaxSegmentSize is the largest possible UDP datagram size.
MaxSegmentSize = (1 << 16) - 1

// RelayBufferSize is a buffer of 20 KiB to reduce the memory
// of each TCP relay as io.Copy default buffer size is 32 KiB,
// but the maximum packet size of vmess/shadowsocks is about
// 16 KiB, so define .
RelayBufferSize = 20 << 10
import (
"sync"
)

// Get gets a []byte from default allocator with most appropriate cap.
func Get(size int) []byte {
return _allocator.Get(size)
// A Pool is a generic wrapper around [sync.Pool] to provide strongly-typed
// object pooling.
//
// Note that SA6002 (ref: https://staticcheck.io/docs/checks/#SA6002) will
// not be detected, so all internal pool use must take care to only store
// pointer types.
type Pool[T any] struct {
pool sync.Pool
}

// New returns a new [Pool] for T, and will use fn to construct new Ts when
// the pool is empty.
func New[T any](fn func() T) *Pool[T] {
return &Pool[T]{
pool: sync.Pool{
New: func() any {
return fn()
},
},
}
}

// Get gets a T from the pool, or creates a new one if the pool is empty.
func (p *Pool[T]) Get() T {
return p.pool.Get().(T)
}

// Put returns a []byte to default allocator for future use.
func Put(buf []byte) error {
return _allocator.Put(buf)
// Put returns x into the pool.
func (p *Pool[T]) Put(x T) {
p.pool.Put(x)
}
85 changes: 85 additions & 0 deletions internal/pool/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package pool

import (
"runtime/debug"
"sync"
"testing"

"github.com/stretchr/testify/require"
)

type pooledValue[T any] struct {
value T
}

func TestNew(t *testing.T) {
// Disable GC to avoid the victim cache during the test.
defer debug.SetGCPercent(debug.SetGCPercent(-1))

p := New(func() *pooledValue[string] {
return &pooledValue[string]{
value: "new",
}
})

// Probabilistically, 75% of sync.Pool.Put calls will succeed when -race
// is enabled (see ref below); attempt to make this quasi-deterministic by
// brute force (i.e., put significantly more objects in the pool than we
// will need for the test) in order to avoid testing without race enabled.
//
// ref: https://cs.opensource.google/go/go/+/refs/tags/go1.20.2:src/sync/pool.go;l=100-103
for i := 0; i < 1_000; i++ {
p.Put(&pooledValue[string]{
value: t.Name(),
})
}

// Ensure that we always get the expected value. Note that this must only
// run a fraction of the number of times that Put is called above.
for i := 0; i < 10; i++ {
func() {
x := p.Get()
defer p.Put(x)
require.Equal(t, t.Name(), x.value)
}()
}

// Depool all objects that might be in the pool to ensure that it's empty.
for i := 0; i < 1_000; i++ {
p.Get()
}

// Now that the pool is empty, it should use the value specified in the
// underlying sync.Pool.New func.
require.Equal(t, "new", p.Get().value)
}

func TestNew_Race(t *testing.T) {
p := New(func() *pooledValue[int] {
return &pooledValue[int]{
value: -1,
}
})

var wg sync.WaitGroup
defer wg.Wait()

// Run a number of goroutines that read and write pool object fields to
// tease out races.
for i := 0; i < 1_000; i++ {
i := i

wg.Add(1)
go func() {
defer wg.Done()

x := p.Get()
defer p.Put(x)

// Must both read and write the field.
if n := x.value; n >= -1 {
x.value = i
}
}()
}
}
6 changes: 3 additions & 3 deletions proxy/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (

"github.com/go-gost/relay"

"github.com/xjasonlyu/tun2socks/v2/buffer"
"github.com/xjasonlyu/tun2socks/v2/dialer"
"github.com/xjasonlyu/tun2socks/v2/internal/pool"
M "github.com/xjasonlyu/tun2socks/v2/metadata"
"github.com/xjasonlyu/tun2socks/v2/proxy/proto"
)
Expand Down Expand Up @@ -169,8 +169,8 @@ func (rc *relayConn) Read(b []byte) (n int, err error) {
return io.ReadFull(rc.Conn, b[:dLen])
}

buf := pool.Get(dLen)
defer pool.Put(buf)
buf := buffer.Get(dLen)
defer buffer.Put(buf)
_, err = io.ReadFull(rc.Conn, buf)
n = copy(b, buf)

Expand Down
25 changes: 25 additions & 0 deletions transport/internal/bufferpool/bufferpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package bufferpool

import (
"bytes"

"github.com/xjasonlyu/tun2socks/v2/internal/pool"
)

const _size = 1024 // by default, create 1 KiB buffers

var _pool = pool.New(func() *bytes.Buffer {
buf := &bytes.Buffer{}
buf.Grow(_size)
return buf
})

func Get() *bytes.Buffer {
buf := _pool.Get()
buf.Reset()
return buf
}

func Put(b *bytes.Buffer) {
_pool.Put(b)
}
6 changes: 3 additions & 3 deletions transport/shadowsocks/shadowaead/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"io"
"net"

"github.com/xjasonlyu/tun2socks/v2/internal/pool"
"github.com/xjasonlyu/tun2socks/v2/buffer"
)

// ErrShortPacket means that the packet is too short for a valid encrypted packet.
Expand Down Expand Up @@ -70,8 +70,8 @@ func NewPacketConn(c net.PacketConn, ciph Cipher) *PacketConn {

// WriteTo encrypts b and write to addr using the embedded PacketConn.
func (c *PacketConn) WriteTo(b []byte, addr net.Addr) (int, error) {
buf := pool.Get(maxPacketSize)
defer pool.Put(buf)
buf := buffer.Get(maxPacketSize)
defer buffer.Put(buf)
buf, err := Pack(buf, b, c)
if err != nil {
return 0, err
Expand Down
Loading

0 comments on commit 391d3d9

Please sign in to comment.