From 05792200ae7c0fefedcbae51830547626a1c141e Mon Sep 17 00:00:00 2001 From: Injun Song Date: Tue, 23 May 2023 11:31:44 +0900 Subject: [PATCH] feat(storagenode): add pipelined Append RPC handler This patch changes the Append RPC handler to support pipelined requests and does not change the client's API. Therefore, users can use Append API transparently. Supporting pipelined requests can lead to overhead since it is necessary to have additional goroutines and concurrent queues. To lower additional overhead, this change uses [reader-biased mutex](https://github.com/puzpuzpuz/xsync#rbmutex) instead of built-in RWMutex to avoid shared lock contention. As a result of experimentations, this PR showed very little overhead. Furthermore, we can improve the existing Append API more efficiently [using a long-lived stream](https://grpc.io/docs/guides/performance/#general): the current implementation creates a new stream whenever calling Append API, which leads to unnecessary tasks such as RPC initiation. We can reuse long-lived streams by changing client API. See this issue at #458. This PR implements server-side parts of LogStreamAppender mentioned in #433. It also can be used for pipelining generic Append RPC said in #441. --- bin/start_varlogsn.py | 4 + cmd/varlogsn/cli.go | 1 + cmd/varlogsn/flags.go | 14 + cmd/varlogsn/varlogsn.go | 1 + go.mod | 1 + go.sum | 2 + internal/storagenode/config.go | 15 + internal/storagenode/log_server.go | 165 +++- internal/storagenode/logstream/append.go | 127 +++ internal/storagenode/storagenode.go | 31 +- .../github.com/puzpuzpuz/xsync/v2/.gitignore | 15 + .../puzpuzpuz/xsync/v2/BENCHMARKS.md | 131 +++ vendor/github.com/puzpuzpuz/xsync/v2/LICENSE | 21 + .../github.com/puzpuzpuz/xsync/v2/README.md | 124 +++ .../github.com/puzpuzpuz/xsync/v2/counter.go | 99 +++ vendor/github.com/puzpuzpuz/xsync/v2/map.go | 785 ++++++++++++++++++ vendor/github.com/puzpuzpuz/xsync/v2/mapof.go | 688 +++++++++++++++ .../puzpuzpuz/xsync/v2/mpmcqueue.go | 137 +++ .../github.com/puzpuzpuz/xsync/v2/rbmutex.go | 145 ++++ vendor/github.com/puzpuzpuz/xsync/v2/util.go | 52 ++ .../puzpuzpuz/xsync/v2/util_mapof.go | 22 + vendor/modules.txt | 3 + 22 files changed, 2541 insertions(+), 42 deletions(-) create mode 100644 vendor/github.com/puzpuzpuz/xsync/v2/.gitignore create mode 100644 vendor/github.com/puzpuzpuz/xsync/v2/BENCHMARKS.md create mode 100644 vendor/github.com/puzpuzpuz/xsync/v2/LICENSE create mode 100644 vendor/github.com/puzpuzpuz/xsync/v2/README.md create mode 100644 vendor/github.com/puzpuzpuz/xsync/v2/counter.go create mode 100644 vendor/github.com/puzpuzpuz/xsync/v2/map.go create mode 100644 vendor/github.com/puzpuzpuz/xsync/v2/mapof.go create mode 100644 vendor/github.com/puzpuzpuz/xsync/v2/mpmcqueue.go create mode 100644 vendor/github.com/puzpuzpuz/xsync/v2/rbmutex.go create mode 100644 vendor/github.com/puzpuzpuz/xsync/v2/util.go create mode 100644 vendor/github.com/puzpuzpuz/xsync/v2/util_mapof.go diff --git a/bin/start_varlogsn.py b/bin/start_varlogsn.py index c9c205a6f..85784c2ab 100755 --- a/bin/start_varlogsn.py +++ b/bin/start_varlogsn.py @@ -172,6 +172,9 @@ def start(args: argparse.Namespace) -> None: if args.ballast_size: cmd.append(f"--ballast-size={args.ballast_size}") + if args.append_pipeline_size: + cmd.append(f"--append-pipeline-size={args.append_pipeline_size}") + # grpc options if args.server_read_buffer_size: cmd.append( @@ -268,6 +271,7 @@ def main() -> None: parser.add_argument("--volumes", nargs="+", required=True, action="extend", type=str) parser.add_argument("--ballast-size", type=str) + parser.add_argument("--append-pipeline-size", type=int) # grpc options parser.add_argument("--server-read-buffer-size", type=str) diff --git a/cmd/varlogsn/cli.go b/cmd/varlogsn/cli.go index ee05c209e..c7f779b70 100644 --- a/cmd/varlogsn/cli.go +++ b/cmd/varlogsn/cli.go @@ -57,6 +57,7 @@ func newStartCommand() *cli.Command { flagLogStreamExecutorCommitQueueCapacity.IntFlag(false, logstream.DefaultCommitQueueCapacity), flagLogStreamExecutorReplicateclientQueueCapacity.IntFlag(false, logstream.DefaultReplicateClientQueueCapacity), flagMaxLogStreamReplicasCount, + flagAppendPipelineSize, // storage options flagExperimentalStorageSeparateDB, diff --git a/cmd/varlogsn/flags.go b/cmd/varlogsn/flags.go index b2773b348..df40fb1a6 100644 --- a/cmd/varlogsn/flags.go +++ b/cmd/varlogsn/flags.go @@ -1,6 +1,8 @@ package main import ( + "fmt" + "github.com/urfave/cli/v2" "github.com/kakao/varlog/internal/flags" @@ -39,6 +41,18 @@ var ( Value: storagenode.DefaultMaxLogStreamReplicasCount, } + flagAppendPipelineSize = &cli.IntFlag{ + Name: "append-pipeline-size", + Usage: "Append pipleline size", + Value: storagenode.DefaultAppendPipelineSize, + Action: func(_ *cli.Context, value int) error { + if value < storagenode.MinAppendPipelineSize || value > storagenode.MaxAppendPipelineSize { + return fmt.Errorf("invalid value \"%d\" for flag --append-pipeline-size", value) + } + return nil + }, + } + // flags for grpc options. flagServerReadBufferSize = flags.FlagDesc{ Name: "server-read-buffer-size", diff --git a/cmd/varlogsn/varlogsn.go b/cmd/varlogsn/varlogsn.go index 2d627a75e..13ca8715d 100644 --- a/cmd/varlogsn/varlogsn.go +++ b/cmd/varlogsn/varlogsn.go @@ -137,6 +137,7 @@ func start(c *cli.Context) error { logstream.WithReplicateClientQueueCapacity(c.Int(flagLogStreamExecutorReplicateclientQueueCapacity.Name)), ), storagenode.WithMaxLogStreamReplicasCount(int32(c.Int(flagMaxLogStreamReplicasCount.Name))), + storagenode.WithAppendPipelineSize(int32(c.Int(flagAppendPipelineSize.Name))), storagenode.WithDefaultStorageOptions(storageOpts...), storagenode.WithLogger(logger), } diff --git a/go.mod b/go.mod index d4a0b48cb..76560acf7 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/lib/pq v1.10.9 github.com/pkg/errors v0.9.1 + github.com/puzpuzpuz/xsync/v2 v2.4.0 github.com/smartystreets/assertions v1.13.1 github.com/smartystreets/goconvey v1.8.0 github.com/soheilhy/cmux v0.1.5 diff --git a/go.sum b/go.sum index c9d055471..e5d1224d1 100644 --- a/go.sum +++ b/go.sum @@ -388,6 +388,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/puzpuzpuz/xsync/v2 v2.4.0 h1:5sXAMHrtx1bg9nbRZTOn8T4MkWe5V+o8yKRH02Eznag= +github.com/puzpuzpuz/xsync/v2 v2.4.0/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= diff --git a/internal/storagenode/config.go b/internal/storagenode/config.go index ea99de97b..c363d5fea 100644 --- a/internal/storagenode/config.go +++ b/internal/storagenode/config.go @@ -22,6 +22,10 @@ const ( DefaultReplicateClientReadBufferSize = 32 << 10 DefaultReplicateClientWriteBufferSize = 32 << 10 DefaultMaxLogStreamReplicasCount = -1 + + DefaultAppendPipelineSize = 8 + MinAppendPipelineSize = 1 + MaxAppendPipelineSize = 16 ) type config struct { @@ -44,6 +48,7 @@ type config struct { replicateClientReadBufferSize int64 replicateClientWriteBufferSize int64 maxLogStreamReplicasCount int32 + appendPipelineSize int32 volumes []string defaultLogStreamExecutorOptions []logstream.ExecutorOption pprofOpts []pprof.Option @@ -59,6 +64,7 @@ func newConfig(opts []Option) (config, error) { replicateClientReadBufferSize: DefaultReplicateClientReadBufferSize, replicateClientWriteBufferSize: DefaultReplicateClientWriteBufferSize, maxLogStreamReplicasCount: DefaultMaxLogStreamReplicasCount, + appendPipelineSize: DefaultAppendPipelineSize, logger: zap.NewNop(), } for _, opt := range opts { @@ -85,6 +91,9 @@ func (cfg *config) validate() error { if err := cfg.validateVolumes(); err != nil { return fmt.Errorf("storage node: invalid volume: %w", err) } + if cfg.appendPipelineSize < MinAppendPipelineSize || cfg.appendPipelineSize > MaxAppendPipelineSize { + return fmt.Errorf("storage node: invalid append pipeline size \"%d\"", cfg.appendPipelineSize) + } return nil } @@ -214,6 +223,12 @@ func WithMaxLogStreamReplicasCount(maxLogStreamReplicasCount int32) Option { }) } +func WithAppendPipelineSize(appendPipelineSize int32) Option { + return newFuncOption(func(cfg *config) { + cfg.appendPipelineSize = appendPipelineSize + }) +} + func WithVolumes(volumes ...string) Option { return newFuncOption(func(cfg *config) { cfg.volumes = volumes diff --git a/internal/storagenode/log_server.go b/internal/storagenode/log_server.go index da1113f32..91ab27673 100644 --- a/internal/storagenode/log_server.go +++ b/internal/storagenode/log_server.go @@ -7,6 +7,7 @@ import ( pbtypes "github.com/gogo/protobuf/types" "go.uber.org/multierr" + "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -23,55 +24,161 @@ type logServer struct { var _ snpb.LogIOServer = (*logServer)(nil) -func (ls logServer) Append(stream snpb.LogIO_AppendServer) (err error) { - req, rsp := &snpb.AppendRequest{}, &snpb.AppendResponse{} +func (ls *logServer) Append(stream snpb.LogIO_AppendServer) error { + // Avoid race of Add and Wait of wgAppenders. + rt := ls.sn.mu.RLock() + ls.sn.wgAppenders.Add(2) + ls.sn.mu.RUnlock(rt) + + cq := make(chan *logstream.AppendTask, ls.sn.appendPipelineSize) + + go ls.appendStreamRecvLoop(stream, cq) + + var eg errgroup.Group + eg.Go(func() error { + return ls.appendStreamSendLoop(stream, cq) + }) + err := eg.Wait() + // The stream is finished by the client, which invokes CloseSend. + // That result from appendStreamSendLoop is nil means follows: + // - RecvMsg's return value is io.EOF. + // - Completion queue is closed. + // - AppendTasks in the completion queue are exhausted. + if err == nil { + ls.sn.wgAppenders.Done() + return nil + } + + // Drain completion queue. + go ls.appendStreamDrainCQLoop(cq) + + // The stream is finished by returning io.EOF after calling SendMsg. + if err == io.EOF { + return nil + } + + var code codes.Code + switch err { + case verrors.ErrSealed: + code = codes.FailedPrecondition + case snerrors.ErrNotPrimary: + code = codes.Unavailable + default: + code = status.Code(err) + if code == codes.Unknown { + code = status.FromContextError(err).Code() + } + + } + return status.Error(code, err.Error()) +} + +func (ls *logServer) appendStreamRecvLoop(stream snpb.LogIO_AppendServer, cq chan<- *logstream.AppendTask) { + defer func() { + close(cq) + ls.sn.wgAppenders.Done() + }() + + var ( + appendTask *logstream.AppendTask + lse *logstream.Executor + err error + loaded bool + tpid types.TopicID + lsid types.LogStreamID + ) + req := &snpb.AppendRequest{} + ctx := stream.Context() for { req.Reset() err = stream.RecvMsg(req) if err == io.EOF { - return nil + return } + appendTask = logstream.NewAppendTask() if err != nil { - return err + goto Out } - err = snpb.ValidateTopicLogStream(req) - if err != nil { - return status.Error(codes.InvalidArgument, err.Error()) + if tpid.Invalid() && lsid.Invalid() { + err = snpb.ValidateTopicLogStream(req) + if err != nil { + err = status.Error(codes.InvalidArgument, err.Error()) + goto Out + } + tpid = req.TopicID + lsid = req.LogStreamID } - lse, loaded := ls.sn.executors.Load(req.TopicID, req.LogStreamID) - if !loaded { - return status.Error(codes.NotFound, "no such log stream") + if req.TopicID != tpid || req.LogStreamID != lsid { + err = status.Error(codes.InvalidArgument, "unmatched topic or logstream") + goto Out } - res, err := lse.Append(stream.Context(), req.Payload) - if err != nil { - var code codes.Code - switch err { - case verrors.ErrSealed: - code = codes.FailedPrecondition - case snerrors.ErrNotPrimary: - code = codes.Unavailable - default: - code = status.FromContextError(err).Code() + + if lse == nil { + lse, loaded = ls.sn.executors.Load(tpid, lsid) + if !loaded { + err = status.Error(codes.NotFound, "no such log stream") + goto Out } - return status.Error(code, err.Error()) } - rsp.Results = res - err = stream.Send(rsp) + err = lse.AppendAsync(ctx, req.Payload, appendTask) + Out: + if err != nil { + appendTask.SetError(err) + } + cq <- appendTask if err != nil { - return err + return } } } -func (ls logServer) Read(context.Context, *snpb.ReadRequest) (*snpb.ReadResponse, error) { +func (ls *logServer) appendStreamSendLoop(stream snpb.LogIO_AppendServer, cq <-chan *logstream.AppendTask) (err error) { + var res []snpb.AppendResult + rsp := &snpb.AppendResponse{} + ctx := stream.Context() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case appendTask, ok := <-cq: + if !ok { + return nil + } + res, err = appendTask.WaitForCompletion(ctx) + if err != nil { + appendTask.Release() + return err + } + + appendTask.ReleaseWriteWaitGroups() + appendTask.Release() + + rsp.Results = res + err = stream.Send(rsp) + if err != nil { + return err + } + } + } +} + +func (ls *logServer) appendStreamDrainCQLoop(cq <-chan *logstream.AppendTask) { + defer ls.sn.wgAppenders.Done() + for appendTask := range cq { + appendTask.Release() + } +} + +func (ls *logServer) Read(context.Context, *snpb.ReadRequest) (*snpb.ReadResponse, error) { return nil, status.Error(codes.Unimplemented, "deprecated") } -func (ls logServer) Subscribe(req *snpb.SubscribeRequest, stream snpb.LogIO_SubscribeServer) error { +func (ls *logServer) Subscribe(req *snpb.SubscribeRequest, stream snpb.LogIO_SubscribeServer) error { if err := snpb.ValidateTopicLogStream(req); err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -128,7 +235,7 @@ Loop: return status.Error(status.FromContextError(sr.Err()).Code(), sr.Err().Error()) } -func (ls logServer) SubscribeTo(req *snpb.SubscribeToRequest, stream snpb.LogIO_SubscribeToServer) (err error) { +func (ls *logServer) SubscribeTo(req *snpb.SubscribeToRequest, stream snpb.LogIO_SubscribeToServer) (err error) { err = snpb.ValidateTopicLogStream(req) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) @@ -177,7 +284,7 @@ Loop: return multierr.Append(err, sr.Err()) } -func (ls logServer) TrimDeprecated(ctx context.Context, req *snpb.TrimDeprecatedRequest) (*pbtypes.Empty, error) { +func (ls *logServer) TrimDeprecated(ctx context.Context, req *snpb.TrimDeprecatedRequest) (*pbtypes.Empty, error) { ls.sn.executors.Range(func(_ types.LogStreamID, tpid types.TopicID, lse *logstream.Executor) bool { if req.TopicID != tpid { return true @@ -188,7 +295,7 @@ func (ls logServer) TrimDeprecated(ctx context.Context, req *snpb.TrimDeprecated return &pbtypes.Empty{}, nil } -func (ls logServer) LogStreamReplicaMetadata(_ context.Context, req *snpb.LogStreamReplicaMetadataRequest) (*snpb.LogStreamReplicaMetadataResponse, error) { +func (ls *logServer) LogStreamReplicaMetadata(_ context.Context, req *snpb.LogStreamReplicaMetadataRequest) (*snpb.LogStreamReplicaMetadataResponse, error) { if err := snpb.ValidateTopicLogStream(req); err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } diff --git a/internal/storagenode/logstream/append.go b/internal/storagenode/logstream/append.go index 98a14cf03..61ffca3d5 100644 --- a/internal/storagenode/logstream/append.go +++ b/internal/storagenode/logstream/append.go @@ -2,6 +2,7 @@ package logstream import ( "context" + "sync" "time" "go.uber.org/zap" @@ -12,6 +13,82 @@ import ( "github.com/kakao/varlog/proto/snpb" ) +var appendTaskPool = sync.Pool{ + New: func() any { + return &AppendTask{} + }, +} + +type AppendTask struct { + lse *Executor + deferredFunc func(*AppendTask) + err error + start time.Time + apc appendContext + dataBatchLen int +} + +func NewAppendTask() *AppendTask { + at := appendTaskPool.Get().(*AppendTask) + return at +} + +func (at *AppendTask) SetError(err error) { + at.err = err +} + +func (at *AppendTask) Release() { + if at.deferredFunc != nil { + at.deferredFunc(at) + } + *at = AppendTask{} + appendTaskPool.Put(at) +} + +func (at *AppendTask) ReleaseWriteWaitGroups() { + for i := range at.apc.wwgs { + at.apc.wwgs[i].release() + } +} + +func (at *AppendTask) WaitForCompletion(ctx context.Context) (res []snpb.AppendResult, err error) { + if at.err != nil { + return nil, at.err + } + + res = make([]snpb.AppendResult, at.dataBatchLen) + for i := range at.apc.awgs { + cerr := at.apc.awgs[i].wait(ctx) + if cerr != nil { + res[i].Error = cerr.Error() + if err == nil { + err = cerr + } + continue + } + if err != nil { + at.lse.logger.Panic("Results of batch requests of Append RPC must not be interleaved with success and failure", zap.Error(err)) + } + res[i].Meta.TopicID = at.lse.tpid + res[i].Meta.LogStreamID = at.lse.lsid + res[i].Meta.GLSN = at.apc.awgs[i].glsn + res[i].Meta.LLSN = at.apc.awgs[i].llsn + at.apc.awgs[i].release() + } + if res[0].Meta.GLSN.Invalid() { + return nil, err + } + return res, nil +} + +func appendTaskDeferredFunc(at *AppendTask) { + at.lse.inflight.Add(-1) + at.lse.inflightAppend.Add(-1) + if at.lse.lsm != nil { + at.lse.lsm.AppendDuration.Add(time.Since(at.start).Microseconds()) + } +} + type appendContext struct { sts []*sequenceTask wwgs []*writeWaitGroup @@ -19,6 +96,56 @@ type appendContext struct { totalBytes int64 } +func (lse *Executor) AppendAsync(ctx context.Context, dataBatch [][]byte, appendTask *AppendTask) error { + lse.inflight.Add(1) + lse.inflightAppend.Add(1) + + startTime := time.Now() + dataBatchLen := len(dataBatch) + + appendTask.start = startTime + appendTask.lse = lse + appendTask.dataBatchLen = dataBatchLen + appendTask.deferredFunc = appendTaskDeferredFunc + + switch lse.esm.load() { + case executorStateSealing, executorStateSealed, executorStateLearning: + return verrors.ErrSealed + case executorStateClosed: + return verrors.ErrClosed + } + if !lse.isPrimary() { + return snerrors.ErrNotPrimary + } + + _, batchletLen := batchlet.SelectLengthClass(dataBatchLen) + batchletCount := dataBatchLen / batchletLen + if dataBatchLen%batchletLen > 0 { + batchletCount++ + } + + appendTask.apc = appendContext{ + sts: make([]*sequenceTask, 0, batchletCount), + wwgs: make([]*writeWaitGroup, 0, batchletCount), + awgs: make([]*appendWaitGroup, 0, dataBatchLen), + } + + var preparationDuration time.Duration + defer func() { + if lse.lsm != nil { + lse.lsm.AppendLogs.Add(int64(dataBatchLen)) + lse.lsm.AppendBytes.Add(appendTask.apc.totalBytes) + lse.lsm.AppendOperations.Add(1) + lse.lsm.AppendPreparationMicro.Add(preparationDuration.Microseconds()) + } + }() + + lse.prepareAppendContext(dataBatch, &appendTask.apc) + preparationDuration = time.Since(startTime) + lse.sendSequenceTasks(ctx, appendTask.apc.sts) + return nil +} + // Append appends a batch of logs to the log stream. func (lse *Executor) Append(ctx context.Context, dataBatch [][]byte) ([]snpb.AppendResult, error) { lse.inflight.Add(1) diff --git a/internal/storagenode/storagenode.go b/internal/storagenode/storagenode.go index 1c10e54d2..a1b300d45 100644 --- a/internal/storagenode/storagenode.go +++ b/internal/storagenode/storagenode.go @@ -14,6 +14,7 @@ import ( "time" "github.com/gogo/status" + "github.com/puzpuzpuz/xsync/v2" "github.com/soheilhy/cmux" "go.opentelemetry.io/otel/metric/global" "go.uber.org/multierr" @@ -50,7 +51,7 @@ type StorageNode struct { executors *executorsmap.ExecutorsMap - mu sync.RWMutex + mu *xsync.RBMutex lis net.Listener server *grpc.Server healthServer *health.Server @@ -71,6 +72,8 @@ type StorageNode struct { limits struct { logStreamReplicasCount atomic.Int32 } + + wgAppenders sync.WaitGroup } func NewStorageNode(opts ...Option) (*StorageNode, error) { @@ -137,6 +140,7 @@ func NewStorageNode(opts ...Option) (*StorageNode, error) { metrics: metrics, startTime: time.Now().UTC(), } + sn.mu = xsync.NewRBMutex() if sn.ballastSize > 0 { sn.ballast = make([]byte, sn.ballastSize) } @@ -254,13 +258,14 @@ func (sn *StorageNode) Close() (err error) { return true }) sn.server.Stop() // TODO: sn.server.GracefulStop() -> need not to use mutex + sn.wgAppenders.Wait() sn.logger.Info("closed") return err } func (sn *StorageNode) getMetadata(_ context.Context) (*snpb.StorageNodeMetadataDescriptor, error) { - sn.mu.RLock() - defer sn.mu.RUnlock() + rt := sn.mu.RLock() + defer sn.mu.RUnlock(rt) if sn.closed { return nil, snerrors.ErrClosed } @@ -300,8 +305,8 @@ func (sn *StorageNode) getMetadata(_ context.Context) (*snpb.StorageNodeMetadata } func (sn *StorageNode) addLogStreamReplica(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID, snPath string) (snpb.LogStreamReplicaMetadataDescriptor, error) { - sn.mu.RLock() - defer sn.mu.RUnlock() + rt := sn.mu.RLock() + defer sn.mu.RUnlock(rt) if sn.closed { return snpb.LogStreamReplicaMetadataDescriptor{}, snerrors.ErrClosed } @@ -373,8 +378,8 @@ func (sn *StorageNode) runLogStreamReplica(_ context.Context, tpid types.TopicID } func (sn *StorageNode) removeLogStreamReplica(_ context.Context, tpid types.TopicID, lsid types.LogStreamID) error { - sn.mu.RLock() - defer sn.mu.RUnlock() + rt := sn.mu.RLock() + defer sn.mu.RUnlock(rt) if sn.closed { return snerrors.ErrClosed } @@ -395,8 +400,8 @@ func (sn *StorageNode) removeLogStreamReplica(_ context.Context, tpid types.Topi } func (sn *StorageNode) seal(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID, lastCommittedGLSN types.GLSN) (varlogpb.LogStreamStatus, types.GLSN, error) { - sn.mu.RLock() - defer sn.mu.RUnlock() + rt := sn.mu.RLock() + defer sn.mu.RUnlock(rt) if sn.closed { return varlogpb.LogStreamStatusRunning, types.InvalidGLSN, snerrors.ErrClosed } @@ -410,8 +415,8 @@ func (sn *StorageNode) seal(ctx context.Context, tpid types.TopicID, lsid types. } func (sn *StorageNode) unseal(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID, replicas []varlogpb.LogStreamReplica) error { - sn.mu.RLock() - defer sn.mu.RUnlock() + rt := sn.mu.RLock() + defer sn.mu.RUnlock(rt) if sn.closed { return snerrors.ErrClosed } @@ -425,8 +430,8 @@ func (sn *StorageNode) unseal(ctx context.Context, tpid types.TopicID, lsid type } func (sn *StorageNode) sync(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID, dst varlogpb.LogStreamReplica) (*snpb.SyncStatus, error) { - sn.mu.RLock() - defer sn.mu.RUnlock() + rt := sn.mu.RLock() + defer sn.mu.RUnlock(rt) if sn.closed { return nil, snerrors.ErrClosed } diff --git a/vendor/github.com/puzpuzpuz/xsync/v2/.gitignore b/vendor/github.com/puzpuzpuz/xsync/v2/.gitignore new file mode 100644 index 000000000..66fd13c90 --- /dev/null +++ b/vendor/github.com/puzpuzpuz/xsync/v2/.gitignore @@ -0,0 +1,15 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ diff --git a/vendor/github.com/puzpuzpuz/xsync/v2/BENCHMARKS.md b/vendor/github.com/puzpuzpuz/xsync/v2/BENCHMARKS.md new file mode 100644 index 000000000..1e3169c4f --- /dev/null +++ b/vendor/github.com/puzpuzpuz/xsync/v2/BENCHMARKS.md @@ -0,0 +1,131 @@ +# xsync benchmarks + +If you're interested in `MapOf` comparison with some of the popular concurrent hash maps written in Go, check [this](https://github.com/cornelk/hashmap/pull/70) and [this](https://github.com/alphadose/haxmap/pull/22) PRs. + +The below results were obtained for xsync v2.3.1 on a c6g.metal EC2 instance (64 CPU, 128GB RAM) running Linux and Go 1.19.3. I'd like to thank [@felixge](https://github.com/felixge) who kindly run the benchmarks. + +The following commands were used to run the benchmarks: +```bash +$ go test -run='^$' -cpu=1,2,4,8,16,32,64 -bench . -count=30 -timeout=0 | tee bench.txt +$ benchstat bench.txt | tee benchstat.txt +``` + +The below sections contain some of the results. Refer to [this gist](https://gist.github.com/puzpuzpuz/e62e38e06feadecfdc823c0f941ece0b) for the complete output. + +### Counter vs. atomic int64 + +``` +name time/op +Counter 27.3ns ± 1% +Counter-2 27.2ns ±11% +Counter-4 15.3ns ± 8% +Counter-8 7.43ns ± 7% +Counter-16 3.70ns ±10% +Counter-32 1.77ns ± 3% +Counter-64 0.96ns ±10% +AtomicInt64 7.60ns ± 0% +AtomicInt64-2 12.6ns ±13% +AtomicInt64-4 13.5ns ±14% +AtomicInt64-8 12.7ns ± 9% +AtomicInt64-16 12.8ns ± 8% +AtomicInt64-32 13.0ns ± 6% +AtomicInt64-64 12.9ns ± 7% +``` + +Here `time/op` stands for average time spent on operation. If you divide `10^9` by the result in nanosecond per operation, you'd get the throughput in operations per second. Thus, ideal theoretical scalability of a concurrent data structure implies that the reported `time/op` decreases proportionally with the increased number of CPU cores. On the contrary, if the measured time per operation increases when run on more cores, it means performance degradation. + +### MapOf vs. sync.Map + +1,000 `[int, int]` entries with warm-up, 100% Loads: +``` +IntegerMapOf_WarmUp/reads=100% 24.0ns ± 0% +IntegerMapOf_WarmUp/reads=100%-2 12.0ns ± 0% +IntegerMapOf_WarmUp/reads=100%-4 6.02ns ± 0% +IntegerMapOf_WarmUp/reads=100%-8 3.01ns ± 0% +IntegerMapOf_WarmUp/reads=100%-16 1.50ns ± 0% +IntegerMapOf_WarmUp/reads=100%-32 0.75ns ± 0% +IntegerMapOf_WarmUp/reads=100%-64 0.38ns ± 0% +IntegerMapStandard_WarmUp/reads=100% 55.3ns ± 0% +IntegerMapStandard_WarmUp/reads=100%-2 27.6ns ± 0% +IntegerMapStandard_WarmUp/reads=100%-4 16.1ns ± 3% +IntegerMapStandard_WarmUp/reads=100%-8 8.35ns ± 7% +IntegerMapStandard_WarmUp/reads=100%-16 4.24ns ± 7% +IntegerMapStandard_WarmUp/reads=100%-32 2.18ns ± 6% +IntegerMapStandard_WarmUp/reads=100%-64 1.11ns ± 3% +``` + +1,000 `[int, int]` entries with warm-up, 99% Loads, 0.5% Stores, 0.5% Deletes: +``` +IntegerMapOf_WarmUp/reads=99% 31.0ns ± 0% +IntegerMapOf_WarmUp/reads=99%-2 16.4ns ± 1% +IntegerMapOf_WarmUp/reads=99%-4 8.42ns ± 0% +IntegerMapOf_WarmUp/reads=99%-8 4.41ns ± 0% +IntegerMapOf_WarmUp/reads=99%-16 2.38ns ± 2% +IntegerMapOf_WarmUp/reads=99%-32 1.37ns ± 4% +IntegerMapOf_WarmUp/reads=99%-64 0.85ns ± 2% +IntegerMapStandard_WarmUp/reads=99% 121ns ± 1% +IntegerMapStandard_WarmUp/reads=99%-2 109ns ± 3% +IntegerMapStandard_WarmUp/reads=99%-4 115ns ± 4% +IntegerMapStandard_WarmUp/reads=99%-8 114ns ± 2% +IntegerMapStandard_WarmUp/reads=99%-16 105ns ± 2% +IntegerMapStandard_WarmUp/reads=99%-32 97.0ns ± 3% +IntegerMapStandard_WarmUp/reads=99%-64 98.0ns ± 2% +``` + +1,000 `[int, int]` entries with warm-up, 75% Loads, 12.5% Stores, 12.5% Deletes: +``` +IntegerMapOf_WarmUp/reads=75%-reads 46.2ns ± 1% +IntegerMapOf_WarmUp/reads=75%-reads-2 36.7ns ± 2% +IntegerMapOf_WarmUp/reads=75%-reads-4 22.0ns ± 1% +IntegerMapOf_WarmUp/reads=75%-reads-8 12.8ns ± 2% +IntegerMapOf_WarmUp/reads=75%-reads-16 7.69ns ± 1% +IntegerMapOf_WarmUp/reads=75%-reads-32 5.16ns ± 1% +IntegerMapOf_WarmUp/reads=75%-reads-64 4.91ns ± 1% +IntegerMapStandard_WarmUp/reads=75%-reads 156ns ± 0% +IntegerMapStandard_WarmUp/reads=75%-reads-2 177ns ± 1% +IntegerMapStandard_WarmUp/reads=75%-reads-4 197ns ± 1% +IntegerMapStandard_WarmUp/reads=75%-reads-8 221ns ± 2% +IntegerMapStandard_WarmUp/reads=75%-reads-16 242ns ± 1% +IntegerMapStandard_WarmUp/reads=75%-reads-32 258ns ± 1% +IntegerMapStandard_WarmUp/reads=75%-reads-64 264ns ± 1% +``` + +### MPMCQueue vs. Go channels + +Concurrent producers and consumers (1:1), queue/channel size 1,000, some work: +``` +QueueProdConsWork100 252ns ± 0% +QueueProdConsWork100-2 206ns ± 5% +QueueProdConsWork100-4 136ns ±12% +QueueProdConsWork100-8 110ns ± 6% +QueueProdConsWork100-16 108ns ± 2% +QueueProdConsWork100-32 102ns ± 2% +QueueProdConsWork100-64 101ns ± 0% +ChanProdConsWork100 283ns ± 0% +ChanProdConsWork100-2 406ns ±21% +ChanProdConsWork100-4 549ns ± 7% +ChanProdConsWork100-8 754ns ± 7% +ChanProdConsWork100-16 828ns ± 7% +ChanProdConsWork100-32 810ns ± 8% +ChanProdConsWork100-64 832ns ± 4% +``` + +### RBMutex vs. sync.RWMutex + +Writer locks on each 100,000 iteration, both no work and some work in the critical section: +``` +RBMutexWorkWrite100000 146ns ± 0% +RBMutexWorkWrite100000-2 73.3ns ± 0% +RBMutexWorkWrite100000-4 36.7ns ± 0% +RBMutexWorkWrite100000-8 18.6ns ± 0% +RBMutexWorkWrite100000-16 9.83ns ± 3% +RBMutexWorkWrite100000-32 5.53ns ± 0% +RBMutexWorkWrite100000-64 4.04ns ± 3% +RWMutexWorkWrite100000 121ns ± 0% +RWMutexWorkWrite100000-2 128ns ± 1% +RWMutexWorkWrite100000-4 124ns ± 2% +RWMutexWorkWrite100000-8 101ns ± 1% +RWMutexWorkWrite100000-16 92.9ns ± 1% +RWMutexWorkWrite100000-32 89.9ns ± 1% +RWMutexWorkWrite100000-64 88.4ns ± 1% +``` diff --git a/vendor/github.com/puzpuzpuz/xsync/v2/LICENSE b/vendor/github.com/puzpuzpuz/xsync/v2/LICENSE new file mode 100644 index 000000000..837697194 --- /dev/null +++ b/vendor/github.com/puzpuzpuz/xsync/v2/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 Andrey Pechkurov + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/puzpuzpuz/xsync/v2/README.md b/vendor/github.com/puzpuzpuz/xsync/v2/README.md new file mode 100644 index 000000000..ca2cc0f8b --- /dev/null +++ b/vendor/github.com/puzpuzpuz/xsync/v2/README.md @@ -0,0 +1,124 @@ +[![GoDoc reference](https://img.shields.io/badge/godoc-reference-blue.svg)](https://pkg.go.dev/github.com/puzpuzpuz/xsync/v2) +[![GoReport](https://goreportcard.com/badge/github.com/puzpuzpuz/xsync/v2)](https://goreportcard.com/report/github.com/puzpuzpuz/xsync/v2) +[![codecov](https://codecov.io/gh/puzpuzpuz/xsync/branch/main/graph/badge.svg)](https://codecov.io/gh/puzpuzpuz/xsync) + +# xsync + +Concurrent data structures for Go. Aims to provide more scalable alternatives for some of the data structures from the standard `sync` package, but not only. + +### Benchmarks + +Benchmark results may be found [here](BENCHMARKS.md). + +## Counter + +A `Counter` is a striped `int64` counter inspired by the `j.u.c.a.LongAdder` class from Java standard library. + +```go +c := xsync.NewCounter() +// increment and decrement the counter +c.Inc() +c.Dec() +// read the current value +v := c.Value() +``` + +Works better in comparison with a single atomically updated `int64` counter in high contention scenarios. + +## Map + +A `Map` is like a concurrent hash table based map. It follows the interface of `sync.Map` with a number of valuable extensions like `Compute` or `Size`. + +```go +m := xsync.NewMap() +m.Store("foo", "bar") +v, ok := m.Load("foo") +s := m.Size() +``` + +`Map` uses a modified version of Cache-Line Hash Table (CLHT) data structure: https://github.com/LPD-EPFL/CLHT + +CLHT is built around idea to organize the hash table in cache-line-sized buckets, so that on all modern CPUs update operations complete with minimal cache-line transfer. Also, `Get` operations are obstruction-free and involve no writes to shared memory, hence no mutexes or any other sort of locks. Due to this design, in all considered scenarios `Map` outperforms `sync.Map`. + +One important difference with `sync.Map` is that only string keys are supported. That's because Golang standard library does not expose the built-in hash functions for `interface{}` values. + +`MapOf[K, V]` is an implementation with parametrized value type. It is available for Go 1.18 or later. While it's still a CLHT-inspired hash map, `MapOf`'s design is quite different from `Map`. As a result, less GC pressure and less atomic operations on reads. + +```go +m := xsync.NewMapOf[string]() +m.Store("foo", "bar") +v, ok := m.Load("foo") +``` + +One important difference with `Map` is that `MapOf` supports arbitrary `comparable` key types: + +```go +type Point struct { + x int32 + y int32 +} +m := NewTypedMapOf[Point, int](func(seed maphash.Seed, p Point) uint64 { + // provide a hash function when creating the MapOf; + // we recommend using the hash/maphash package for the function + var h maphash.Hash + h.SetSeed(seed) + binary.Write(&h, binary.LittleEndian, p.x) + hash := h.Sum64() + h.Reset() + binary.Write(&h, binary.LittleEndian, p.y) + return 31*hash + h.Sum64() +}) +m.Store(Point{42, 42}, 42) +v, ok := m.Load(point{42, 42}) +``` + +## MPMCQueue + +A `MPMCQeueue` is a bounded multi-producer multi-consumer concurrent queue. + +```go +q := xsync.NewMPMCQueue(1024) +// producer inserts an item into the queue +q.Enqueue("foo") +// optimistic insertion attempt; doesn't block +inserted := q.TryEnqueue("bar") +// consumer obtains an item from the queue +item := q.Dequeue() +// optimistic obtain attempt; doesn't block +item, ok := q.TryDequeue() +``` + +Based on the algorithm from the [MPMCQueue](https://github.com/rigtorp/MPMCQueue) C++ library which in its turn references D.Vyukov's [MPMC queue](https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue). According to the following [classification](https://www.1024cores.net/home/lock-free-algorithms/queues), the queue is array-based, fails on overflow, provides causal FIFO, has blocking producers and consumers. + +The idea of the algorithm is to allow parallelism for concurrent producers and consumers by introducing the notion of tickets, i.e. values of two counters, one per producers/consumers. An atomic increment of one of those counters is the only noticeable contention point in queue operations. The rest of the operation avoids contention on writes thanks to the turn-based read/write access for each of the queue items. + +In essence, `MPMCQueue` is a specialized queue for scenarios where there are multiple concurrent producers and consumers of a single queue running on a large multicore machine. + +To get the optimal performance, you may want to set the queue size to be large enough, say, an order of magnitude greater than the number of producers/consumers, to allow producers and consumers to progress with their queue operations in parallel most of the time. + +## RBMutex + +A `RBMutex` is a reader biased reader/writer mutual exclusion lock. The lock can be held by an many readers or a single writer. + +```go +mu := xsync.NewRBMutex() +// reader lock calls return a token +t := mu.RLock() +// the token must be later used to unlock the mutex +mu.RUnlock(t) +// writer locks are the same as in sync.RWMutex +mu.Lock() +mu.Unlock() +``` + +`RBMutex` is based on a modified version of BRAVO (Biased Locking for Reader-Writer Locks) algorithm: https://arxiv.org/pdf/1810.01553.pdf + +The idea of the algorithm is to build on top of an existing reader-writer mutex and introduce a fast path for readers. On the fast path, reader lock attempts are sharded over an internal array based on the reader identity (a token in case of Golang). This means that readers do not contend over a single atomic counter like it's done in, say, `sync.RWMutex` allowing for better scalability in terms of cores. + +Hence, by the design `RBMutex` is a specialized mutex for scenarios, such as caches, where the vast majority of locks are acquired by readers and write lock acquire attempts are infrequent. In such scenarios, `RBMutex` should perform better than the `sync.RWMutex` on large multicore machines. + +`RBMutex` extends `sync.RWMutex` internally and uses it as the "reader bias disabled" fallback, so the same semantics apply. The only noticeable difference is in the reader tokens returned from the `RLock`/`RUnlock` methods. + +## License + +Licensed under MIT. diff --git a/vendor/github.com/puzpuzpuz/xsync/v2/counter.go b/vendor/github.com/puzpuzpuz/xsync/v2/counter.go new file mode 100644 index 000000000..4bf2c91d8 --- /dev/null +++ b/vendor/github.com/puzpuzpuz/xsync/v2/counter.go @@ -0,0 +1,99 @@ +package xsync + +import ( + "sync" + "sync/atomic" +) + +// pool for P tokens +var ptokenPool sync.Pool + +// a P token is used to point at the current OS thread (P) +// on which the goroutine is run; exact identity of the thread, +// as well as P migration tolerance, is not important since +// it's used to as a best effort mechanism for assigning +// concurrent operations (goroutines) to different stripes of +// the counter +type ptoken struct { + idx uint32 + //lint:ignore U1000 prevents false sharing + pad [cacheLineSize - 4]byte +} + +// A Counter is a striped int64 counter. +// +// Should be preferred over a single atomically updated int64 +// counter in high contention scenarios. +// +// A Counter must not be copied after first use. +type Counter struct { + stripes []cstripe + mask uint32 +} + +type cstripe struct { + c int64 + //lint:ignore U1000 prevents false sharing + pad [cacheLineSize - 8]byte +} + +// NewCounter creates a new Counter instance. +func NewCounter() *Counter { + nstripes := nextPowOf2(parallelism()) + c := Counter{ + stripes: make([]cstripe, nstripes), + mask: nstripes - 1, + } + return &c +} + +// Inc increments the counter by 1. +func (c *Counter) Inc() { + c.Add(1) +} + +// Dec decrements the counter by 1. +func (c *Counter) Dec() { + c.Add(-1) +} + +// Add adds the delta to the counter. +func (c *Counter) Add(delta int64) { + t, ok := ptokenPool.Get().(*ptoken) + if !ok { + t = new(ptoken) + t.idx = fastrand() + } + for { + stripe := &c.stripes[t.idx&c.mask] + cnt := atomic.LoadInt64(&stripe.c) + if atomic.CompareAndSwapInt64(&stripe.c, cnt, cnt+delta) { + break + } + // Give a try with another randomly selected stripe. + t.idx = fastrand() + } + ptokenPool.Put(t) +} + +// Value returns the current counter value. +// The returned value may not include all of the latest operations in +// presence of concurrent modifications of the counter. +func (c *Counter) Value() int64 { + v := int64(0) + for i := 0; i < len(c.stripes); i++ { + stripe := &c.stripes[i] + v += atomic.LoadInt64(&stripe.c) + } + return v +} + +// Reset resets the counter to zero. +// This method should only be used when it is known that there are +// no concurrent modifications of the counter. +func (c *Counter) Reset() { + for i := 0; i < len(c.stripes); i++ { + stripe := &c.stripes[i] + atomic.StoreInt64(&stripe.c, 0) + } +} diff --git a/vendor/github.com/puzpuzpuz/xsync/v2/map.go b/vendor/github.com/puzpuzpuz/xsync/v2/map.go new file mode 100644 index 000000000..749293cbf --- /dev/null +++ b/vendor/github.com/puzpuzpuz/xsync/v2/map.go @@ -0,0 +1,785 @@ +package xsync + +import ( + "fmt" + "hash/maphash" + "math" + "runtime" + "strings" + "sync" + "sync/atomic" + "unsafe" +) + +type mapResizeHint int + +const ( + mapGrowHint mapResizeHint = 0 + mapShrinkHint mapResizeHint = 1 + mapClearHint mapResizeHint = 2 +) + +const ( + // number of entries per bucket; 3 entries lead to size of 64B + // (one cache line) on 64-bit machines + entriesPerMapBucket = 3 + // threshold fraction of table occupation to start a table shrinking + // when deleting the last entry in a bucket chain + mapShrinkFraction = 128 + // map load factor to trigger a table resize during insertion; + // a map holds up to mapLoadFactor*entriesPerMapBucket*mapTableLen + // key-value pairs (this is a soft limit) + mapLoadFactor = 0.75 + // minimal table size, i.e. number of buckets; thus, minimal map + // capacity can be calculated as entriesPerMapBucket*minMapTableLen + minMapTableLen = 32 + // minimal table capacity + minMapTableCap = minMapTableLen * entriesPerMapBucket + // minimum counter stripes to use + minMapCounterLen = 8 + // maximum counter stripes to use; stands for around 4KB of memory + maxMapCounterLen = 32 +) + +var ( + topHashMask = uint64((1<<20)-1) << 44 + topHashEntryMasks = [3]uint64{ + topHashMask, + topHashMask >> 20, + topHashMask >> 40, + } +) + +// Map is like a Go map[string]interface{} but is safe for concurrent +// use by multiple goroutines without additional locking or +// coordination. It follows the interface of sync.Map with +// a number of valuable extensions like Compute or Size. +// +// A Map must not be copied after first use. +// +// Map uses a modified version of Cache-Line Hash Table (CLHT) +// data structure: https://github.com/LPD-EPFL/CLHT +// +// CLHT is built around idea to organize the hash table in +// cache-line-sized buckets, so that on all modern CPUs update +// operations complete with at most one cache-line transfer. +// Also, Get operations involve no write to memory, as well as no +// mutexes or any other sort of locks. Due to this design, in all +// considered scenarios Map outperforms sync.Map. +// +// One important difference with sync.Map is that only string keys +// are supported. That's because Golang standard library does not +// expose the built-in hash functions for interface{} values. +type Map struct { + totalGrowths int64 + totalShrinks int64 + resizing int64 // resize in progress flag; updated atomically + resizeMu sync.Mutex // only used along with resizeCond + resizeCond sync.Cond // used to wake up resize waiters (concurrent modifications) + table unsafe.Pointer // *mapTable +} + +type mapTable struct { + buckets []bucketPadded + // striped counter for number of table entries; + // used to determine if a table shrinking is needed + // occupies min(buckets_memory/1024, 64KB) of memory + size []counterStripe + seed maphash.Seed +} + +type counterStripe struct { + c int64 + //lint:ignore U1000 prevents false sharing + pad [cacheLineSize - 8]byte +} + +type bucketPadded struct { + //lint:ignore U1000 ensure each bucket takes two cache lines on both 32 and 64-bit archs + pad [cacheLineSize - unsafe.Sizeof(bucket{})]byte + bucket +} + +type bucket struct { + next unsafe.Pointer // *bucketPadded + keys [entriesPerMapBucket]unsafe.Pointer + values [entriesPerMapBucket]unsafe.Pointer + // topHashMutex is a 2-in-1 value. + // + // It contains packed top 20 bits (20 MSBs) of hash codes for keys + // stored in the bucket: + // | key 0's top hash | key 1's top hash | key 2's top hash | bitmap for keys | mutex | + // | 20 bits | 20 bits | 20 bits | 3 bits | 1 bit | + // + // The least significant bit is used for the mutex (TTAS spinlock). + topHashMutex uint64 +} + +type rangeEntry struct { + key unsafe.Pointer + value unsafe.Pointer +} + +// NewMap creates a new Map instance. +func NewMap() *Map { + return NewMapPresized(minMapTableCap) +} + +// NewMapPresized creates a new Map instance with capacity enough to hold +// sizeHint entries. If sizeHint is zero or negative, the value is ignored. +func NewMapPresized(sizeHint int) *Map { + m := &Map{} + m.resizeCond = *sync.NewCond(&m.resizeMu) + var table *mapTable + if sizeHint <= minMapTableCap { + table = newMapTable(minMapTableLen) + } else { + tableLen := nextPowOf2(uint32(sizeHint / entriesPerMapBucket)) + table = newMapTable(int(tableLen)) + } + atomic.StorePointer(&m.table, unsafe.Pointer(table)) + return m +} + +func newMapTable(tableLen int) *mapTable { + buckets := make([]bucketPadded, tableLen) + counterLen := tableLen >> 10 + if counterLen < minMapCounterLen { + counterLen = minMapCounterLen + } else if counterLen > maxMapCounterLen { + counterLen = maxMapCounterLen + } + counter := make([]counterStripe, counterLen) + t := &mapTable{ + buckets: buckets, + size: counter, + seed: maphash.MakeSeed(), + } + return t +} + +// Load returns the value stored in the map for a key, or nil if no +// value is present. +// The ok result indicates whether value was found in the map. +func (m *Map) Load(key string) (value interface{}, ok bool) { + table := (*mapTable)(atomic.LoadPointer(&m.table)) + hash := hashString(table.seed, key) + bidx := uint64(len(table.buckets)-1) & hash + b := &table.buckets[bidx] + for { + topHashes := atomic.LoadUint64(&b.topHashMutex) + for i := 0; i < entriesPerMapBucket; i++ { + if !topHashMatch(hash, topHashes, i) { + continue + } + atomic_snapshot: + // Start atomic snapshot. + vp := atomic.LoadPointer(&b.values[i]) + kp := atomic.LoadPointer(&b.keys[i]) + if kp != nil && vp != nil { + if key == derefKey(kp) { + if uintptr(vp) == uintptr(atomic.LoadPointer(&b.values[i])) { + // Atomic snapshot succeeded. + return derefValue(vp), true + } + // Concurrent update/remove. Go for another spin. + goto atomic_snapshot + } + } + } + bptr := atomic.LoadPointer(&b.next) + if bptr == nil { + return + } + b = (*bucketPadded)(bptr) + } +} + +// Store sets the value for a key. +func (m *Map) Store(key string, value interface{}) { + m.doCompute( + key, + func(interface{}, bool) (interface{}, bool) { + return value, false + }, + false, + false, + ) +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +func (m *Map) LoadOrStore(key string, value interface{}) (actual interface{}, loaded bool) { + return m.doCompute( + key, + func(interface{}, bool) (interface{}, bool) { + return value, false + }, + true, + false, + ) +} + +// LoadAndStore returns the existing value for the key if present, +// while setting the new value for the key. +// It stores the new value and returns the existing one, if present. +// The loaded result is true if the existing value was loaded, +// false otherwise. +func (m *Map) LoadAndStore(key string, value interface{}) (actual interface{}, loaded bool) { + return m.doCompute( + key, + func(interface{}, bool) (interface{}, bool) { + return value, false + }, + false, + false, + ) +} + +// LoadOrCompute returns the existing value for the key if present. +// Otherwise, it computes the value using the provided function and +// returns the computed value. The loaded result is true if the value +// was loaded, false if stored. +func (m *Map) LoadOrCompute(key string, valueFn func() interface{}) (actual interface{}, loaded bool) { + return m.doCompute( + key, + func(interface{}, bool) (interface{}, bool) { + return valueFn(), false + }, + true, + false, + ) +} + +// Compute either sets the computed new value for the key or deletes +// the value for the key. When the delete result of the valueFn function +// is set to true, the value will be deleted, if it exists. When delete +// is set to false, the value is updated to the newValue. +// The ok result indicates whether value was computed and stored, thus, is +// present in the map. The actual result contains the new value in cases where +// the value was computed and stored. See the example for a few use cases. +func (m *Map) Compute( + key string, + valueFn func(oldValue interface{}, loaded bool) (newValue interface{}, delete bool), +) (actual interface{}, ok bool) { + return m.doCompute(key, valueFn, false, true) +} + +// LoadAndDelete deletes the value for a key, returning the previous +// value if any. The loaded result reports whether the key was +// present. +func (m *Map) LoadAndDelete(key string) (value interface{}, loaded bool) { + return m.doCompute( + key, + func(value interface{}, loaded bool) (interface{}, bool) { + return value, true + }, + false, + false, + ) +} + +// Delete deletes the value for a key. +func (m *Map) Delete(key string) { + m.doCompute( + key, + func(value interface{}, loaded bool) (interface{}, bool) { + return value, true + }, + false, + false, + ) +} + +func (m *Map) doCompute( + key string, + valueFn func(oldValue interface{}, loaded bool) (interface{}, bool), + loadIfExists, computeOnly bool, +) (interface{}, bool) { + // Read-only path. + if loadIfExists { + if v, ok := m.Load(key); ok { + return v, !computeOnly + } + } + // Write path. + for { + compute_attempt: + var ( + emptyb *bucketPadded + emptyidx int + hintNonEmpty int + ) + table := (*mapTable)(atomic.LoadPointer(&m.table)) + tableLen := len(table.buckets) + hash := hashString(table.seed, key) + bidx := uint64(len(table.buckets)-1) & hash + rootb := &table.buckets[bidx] + lockBucket(&rootb.topHashMutex) + if m.newerTableExists(table) { + // Someone resized the table. Go for another attempt. + unlockBucket(&rootb.topHashMutex) + goto compute_attempt + } + if m.resizeInProgress() { + // Resize is in progress. Wait, then go for another attempt. + unlockBucket(&rootb.topHashMutex) + m.waitForResize() + goto compute_attempt + } + b := rootb + for { + topHashes := atomic.LoadUint64(&b.topHashMutex) + for i := 0; i < entriesPerMapBucket; i++ { + if b.keys[i] == nil { + if emptyb == nil { + emptyb = b + emptyidx = i + } + continue + } + if !topHashMatch(hash, topHashes, i) { + hintNonEmpty++ + continue + } + if key == derefKey(b.keys[i]) { + vp := b.values[i] + if loadIfExists { + unlockBucket(&rootb.topHashMutex) + return derefValue(vp), !computeOnly + } + // In-place update/delete. + // We get a copy of the value via an interface{} on each call, + // thus the live value pointers are unique. Otherwise atomic + // snapshot won't be correct in case of multiple Store calls + // using the same value. + oldValue := derefValue(vp) + newValue, del := valueFn(oldValue, true) + if del { + // Deletion. + // First we update the value, then the key. + // This is important for atomic snapshot states. + atomic.StoreUint64(&b.topHashMutex, eraseTopHash(topHashes, i)) + atomic.StorePointer(&b.values[i], nil) + atomic.StorePointer(&b.keys[i], nil) + leftEmpty := false + if hintNonEmpty == 0 { + leftEmpty = isEmptyBucket(b) + } + unlockBucket(&rootb.topHashMutex) + table.addSize(bidx, -1) + // Might need to shrink the table. + if leftEmpty { + m.resize(table, mapShrinkHint) + } + return oldValue, !computeOnly + } + nvp := unsafe.Pointer(&newValue) + if assertionsEnabled && vp == nvp { + panic("non-unique value pointer") + } + atomic.StorePointer(&b.values[i], nvp) + unlockBucket(&rootb.topHashMutex) + if computeOnly { + // Compute expects the new value to be returned. + return newValue, true + } + // LoadAndStore expects the old value to be returned. + return oldValue, true + } + hintNonEmpty++ + } + if b.next == nil { + if emptyb != nil { + // Insertion into an existing bucket. + var zeroedV interface{} + newValue, del := valueFn(zeroedV, false) + if del { + unlockBucket(&rootb.topHashMutex) + return zeroedV, false + } + // First we update the value, then the key. + // This is important for atomic snapshot states. + topHashes = atomic.LoadUint64(&emptyb.topHashMutex) + atomic.StoreUint64(&emptyb.topHashMutex, storeTopHash(hash, topHashes, emptyidx)) + atomic.StorePointer(&emptyb.values[emptyidx], unsafe.Pointer(&newValue)) + atomic.StorePointer(&emptyb.keys[emptyidx], unsafe.Pointer(&key)) + unlockBucket(&rootb.topHashMutex) + table.addSize(bidx, 1) + return newValue, computeOnly + } + growThreshold := float64(tableLen) * entriesPerMapBucket * mapLoadFactor + if table.sumSize() > int64(growThreshold) { + // Need to grow the table. Then go for another attempt. + unlockBucket(&rootb.topHashMutex) + m.resize(table, mapGrowHint) + goto compute_attempt + } + // Insertion into a new bucket. + var zeroedV interface{} + newValue, del := valueFn(zeroedV, false) + if del { + unlockBucket(&rootb.topHashMutex) + return newValue, false + } + // Create and append the bucket. + newb := new(bucketPadded) + newb.keys[0] = unsafe.Pointer(&key) + newb.values[0] = unsafe.Pointer(&newValue) + newb.topHashMutex = storeTopHash(hash, newb.topHashMutex, 0) + atomic.StorePointer(&b.next, unsafe.Pointer(newb)) + unlockBucket(&rootb.topHashMutex) + table.addSize(bidx, 1) + return newValue, computeOnly + } + b = (*bucketPadded)(b.next) + } + } +} + +func (m *Map) newerTableExists(table *mapTable) bool { + curTablePtr := atomic.LoadPointer(&m.table) + return uintptr(curTablePtr) != uintptr(unsafe.Pointer(table)) +} + +func (m *Map) resizeInProgress() bool { + return atomic.LoadInt64(&m.resizing) == 1 +} + +func (m *Map) waitForResize() { + m.resizeMu.Lock() + for m.resizeInProgress() { + m.resizeCond.Wait() + } + m.resizeMu.Unlock() +} + +func (m *Map) resize(table *mapTable, hint mapResizeHint) { + var shrinkThreshold int64 + tableLen := len(table.buckets) + // Fast path for shrink attempts. + if hint == mapShrinkHint { + shrinkThreshold = int64((tableLen * entriesPerMapBucket) / mapShrinkFraction) + if tableLen == minMapTableLen || table.sumSize() > shrinkThreshold { + return + } + } + // Slow path. + if !atomic.CompareAndSwapInt64(&m.resizing, 0, 1) { + // Someone else started resize. Wait for it to finish. + m.waitForResize() + return + } + var newTable *mapTable + switch hint { + case mapGrowHint: + // Grow the table with factor of 2. + atomic.AddInt64(&m.totalGrowths, 1) + newTable = newMapTable(tableLen << 1) + case mapShrinkHint: + if table.sumSize() <= shrinkThreshold { + // Shrink the table with factor of 2. + atomic.AddInt64(&m.totalShrinks, 1) + newTable = newMapTable(tableLen >> 1) + } else { + // No need to shrink. Wake up all waiters and give up. + m.resizeMu.Lock() + atomic.StoreInt64(&m.resizing, 0) + m.resizeCond.Broadcast() + m.resizeMu.Unlock() + return + } + case mapClearHint: + newTable = newMapTable(minMapTableLen) + default: + panic(fmt.Sprintf("unexpected resize hint: %d", hint)) + } + // Copy the data only if we're not clearing the map. + if hint != mapClearHint { + for i := 0; i < tableLen; i++ { + copied := copyBucket(&table.buckets[i], newTable) + newTable.addSizePlain(uint64(i), copied) + } + } + // Publish the new table and wake up all waiters. + atomic.StorePointer(&m.table, unsafe.Pointer(newTable)) + m.resizeMu.Lock() + atomic.StoreInt64(&m.resizing, 0) + m.resizeCond.Broadcast() + m.resizeMu.Unlock() +} + +func copyBucket(b *bucketPadded, destTable *mapTable) (copied int) { + rootb := b + lockBucket(&rootb.topHashMutex) + for { + for i := 0; i < entriesPerMapBucket; i++ { + if b.keys[i] != nil { + k := derefKey(b.keys[i]) + hash := hashString(destTable.seed, k) + bidx := uint64(len(destTable.buckets)-1) & hash + destb := &destTable.buckets[bidx] + appendToBucket(hash, b.keys[i], b.values[i], destb) + copied++ + } + } + if b.next == nil { + unlockBucket(&rootb.topHashMutex) + return + } + b = (*bucketPadded)(b.next) + } +} + +func appendToBucket(hash uint64, keyPtr, valPtr unsafe.Pointer, b *bucketPadded) { + for { + for i := 0; i < entriesPerMapBucket; i++ { + if b.keys[i] == nil { + b.keys[i] = keyPtr + b.values[i] = valPtr + b.topHashMutex = storeTopHash(hash, b.topHashMutex, i) + return + } + } + if b.next == nil { + newb := new(bucketPadded) + newb.keys[0] = keyPtr + newb.values[0] = valPtr + newb.topHashMutex = storeTopHash(hash, newb.topHashMutex, 0) + b.next = unsafe.Pointer(newb) + return + } + b = (*bucketPadded)(b.next) + } +} + +func isEmptyBucket(rootb *bucketPadded) bool { + b := rootb + for { + for i := 0; i < entriesPerMapBucket; i++ { + if b.keys[i] != nil { + return false + } + } + if b.next == nil { + return true + } + b = (*bucketPadded)(b.next) + } +} + +// Range calls f sequentially for each key and value present in the +// map. If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot +// of the Map's contents: no key will be visited more than once, but +// if the value for any key is stored or deleted concurrently, Range +// may reflect any mapping for that key from any point during the +// Range call. +// +// It is safe to modify the map while iterating it. However, the +// concurrent modification rule apply, i.e. the changes may be not +// reflected in the subsequently iterated entries. +func (m *Map) Range(f func(key string, value interface{}) bool) { + var zeroEntry rangeEntry + // Pre-allocate array big enough to fit entries for most hash tables. + bentries := make([]rangeEntry, 0, 16*entriesPerMapBucket) + tablep := atomic.LoadPointer(&m.table) + table := *(*mapTable)(tablep) + for i := range table.buckets { + rootb := &table.buckets[i] + b := rootb + // Prevent concurrent modifications and copy all entries into + // the intermediate slice. + lockBucket(&rootb.topHashMutex) + for { + for i := 0; i < entriesPerMapBucket; i++ { + if b.keys[i] != nil { + bentries = append(bentries, rangeEntry{ + key: b.keys[i], + value: b.values[i], + }) + } + } + if b.next == nil { + unlockBucket(&rootb.topHashMutex) + break + } + b = (*bucketPadded)(b.next) + } + // Call the function for all copied entries. + for j := range bentries { + k := derefKey(bentries[j].key) + v := derefValue(bentries[j].value) + if !f(k, v) { + return + } + // Remove the reference to avoid preventing the copied + // entries from being GCed until this method finishes. + bentries[j] = zeroEntry + } + bentries = bentries[:0] + } +} + +// Clear deletes all keys and values currently stored in the map. +func (m *Map) Clear() { + table := (*mapTable)(atomic.LoadPointer(&m.table)) + m.resize(table, mapClearHint) +} + +// Size returns current size of the map. +func (m *Map) Size() int { + table := (*mapTable)(atomic.LoadPointer(&m.table)) + return int(table.sumSize()) +} + +func derefKey(keyPtr unsafe.Pointer) string { + return *(*string)(keyPtr) +} + +func derefValue(valuePtr unsafe.Pointer) interface{} { + return *(*interface{})(valuePtr) +} + +func lockBucket(mu *uint64) { + for { + var v uint64 + for { + v = atomic.LoadUint64(mu) + if v&1 != 1 { + break + } + runtime.Gosched() + } + if atomic.CompareAndSwapUint64(mu, v, v|1) { + return + } + runtime.Gosched() + } +} + +func unlockBucket(mu *uint64) { + v := atomic.LoadUint64(mu) + atomic.StoreUint64(mu, v&^1) +} + +func topHashMatch(hash, topHashes uint64, idx int) bool { + if topHashes&(1<<(idx+1)) == 0 { + // Entry is not present. + return false + } + hash = hash & topHashMask + topHashes = (topHashes & topHashEntryMasks[idx]) << (20 * idx) + return hash == topHashes +} + +func storeTopHash(hash, topHashes uint64, idx int) uint64 { + // Zero out top hash at idx. + topHashes = topHashes &^ topHashEntryMasks[idx] + // Chop top 20 MSBs of the given hash and position them at idx. + hash = (hash & topHashMask) >> (20 * idx) + // Store the MSBs. + topHashes = topHashes | hash + // Mark the entry as present. + return topHashes | (1 << (idx + 1)) +} + +func eraseTopHash(topHashes uint64, idx int) uint64 { + return topHashes &^ (1 << (idx + 1)) +} + +func (table *mapTable) addSize(bucketIdx uint64, delta int) { + cidx := uint64(len(table.size)-1) & bucketIdx + atomic.AddInt64(&table.size[cidx].c, int64(delta)) +} + +func (table *mapTable) addSizePlain(bucketIdx uint64, delta int) { + cidx := uint64(len(table.size)-1) & bucketIdx + table.size[cidx].c += int64(delta) +} + +func (table *mapTable) sumSize() int64 { + sum := int64(0) + for i := range table.size { + sum += atomic.LoadInt64(&table.size[i].c) + } + return sum +} + +type mapStats struct { + RootBuckets int + TotalBuckets int + EmptyBuckets int + Capacity int + Size int // calculated number of entries + Counter int // number of entries according to table counter + CounterLen int // number of counter stripes + MinEntries int // min entries per chain of buckets + MaxEntries int // max entries per chain of buckets + TotalGrowths int64 + TotalShrinks int64 +} + +func (s *mapStats) ToString() string { + var sb strings.Builder + sb.WriteString("\n---\n") + sb.WriteString(fmt.Sprintf("RootBuckets: %d\n", s.RootBuckets)) + sb.WriteString(fmt.Sprintf("TotalBuckets: %d\n", s.TotalBuckets)) + sb.WriteString(fmt.Sprintf("EmptyBuckets: %d\n", s.EmptyBuckets)) + sb.WriteString(fmt.Sprintf("Capacity: %d\n", s.Capacity)) + sb.WriteString(fmt.Sprintf("Size: %d\n", s.Size)) + sb.WriteString(fmt.Sprintf("Counter: %d\n", s.Counter)) + sb.WriteString(fmt.Sprintf("CounterLen: %d\n", s.CounterLen)) + sb.WriteString(fmt.Sprintf("MinEntries: %d\n", s.MinEntries)) + sb.WriteString(fmt.Sprintf("MaxEntries: %d\n", s.MaxEntries)) + sb.WriteString(fmt.Sprintf("TotalGrowths: %d\n", s.TotalGrowths)) + sb.WriteString(fmt.Sprintf("TotalShrinks: %d\n", s.TotalShrinks)) + sb.WriteString("---\n") + return sb.String() +} + +// O(N) operation; use for debug purposes only +func (m *Map) stats() mapStats { + stats := mapStats{ + TotalGrowths: atomic.LoadInt64(&m.totalGrowths), + TotalShrinks: atomic.LoadInt64(&m.totalShrinks), + MinEntries: math.MaxInt32, + } + table := (*mapTable)(atomic.LoadPointer(&m.table)) + stats.RootBuckets = len(table.buckets) + stats.Counter = int(table.sumSize()) + stats.CounterLen = len(table.size) + for i := range table.buckets { + nentries := 0 + b := &table.buckets[i] + stats.TotalBuckets++ + for { + nentriesLocal := 0 + stats.Capacity += entriesPerMapBucket + for i := 0; i < entriesPerMapBucket; i++ { + if atomic.LoadPointer(&b.keys[i]) != nil { + stats.Size++ + nentriesLocal++ + } + } + nentries += nentriesLocal + if nentriesLocal == 0 { + stats.EmptyBuckets++ + } + if b.next == nil { + break + } + b = (*bucketPadded)(b.next) + stats.TotalBuckets++ + } + if nentries < stats.MinEntries { + stats.MinEntries = nentries + } + if nentries > stats.MaxEntries { + stats.MaxEntries = nentries + } + } + return stats +} diff --git a/vendor/github.com/puzpuzpuz/xsync/v2/mapof.go b/vendor/github.com/puzpuzpuz/xsync/v2/mapof.go new file mode 100644 index 000000000..cadaeb438 --- /dev/null +++ b/vendor/github.com/puzpuzpuz/xsync/v2/mapof.go @@ -0,0 +1,688 @@ +//go:build go1.18 +// +build go1.18 + +package xsync + +import ( + "fmt" + "hash/maphash" + "math" + "sync" + "sync/atomic" + "unsafe" +) + +// MapOf is like a Go map[string]V but is safe for concurrent +// use by multiple goroutines without additional locking or +// coordination. It follows the interface of sync.Map with +// a number of valuable extensions like Compute or Size. +// +// A MapOf must not be copied after first use. +// +// MapOf uses a modified version of Cache-Line Hash Table (CLHT) +// data structure: https://github.com/LPD-EPFL/CLHT +// +// CLHT is built around idea to organize the hash table in +// cache-line-sized buckets, so that on all modern CPUs update +// operations complete with at most one cache-line transfer. +// Also, Get operations involve no write to memory, as well as no +// mutexes or any other sort of locks. Due to this design, in all +// considered scenarios MapOf outperforms sync.Map. +type MapOf[K comparable, V any] struct { + totalGrowths int64 + totalShrinks int64 + resizing int64 // resize in progress flag; updated atomically + resizeMu sync.Mutex // only used along with resizeCond + resizeCond sync.Cond // used to wake up resize waiters (concurrent modifications) + table unsafe.Pointer // *mapOfTable + hasher func(maphash.Seed, K) uint64 +} + +type mapOfTable[K comparable, V any] struct { + buckets []bucketOfPadded + // striped counter for number of table entries; + // used to determine if a table shrinking is needed + // occupies min(buckets_memory/1024, 64KB) of memory + size []counterStripe + seed maphash.Seed +} + +// bucketOfPadded is a CL-sized map bucket holding up to +// entriesPerMapBucket entries. +type bucketOfPadded struct { + //lint:ignore U1000 ensure each bucket takes two cache lines on both 32 and 64-bit archs + pad [cacheLineSize - unsafe.Sizeof(bucketOf{})]byte + bucketOf +} + +type bucketOf struct { + hashes [entriesPerMapBucket]uint64 + entries [entriesPerMapBucket]unsafe.Pointer // *entryOf + next unsafe.Pointer // *bucketOfPadded + mu sync.Mutex +} + +// entryOf is an immutable map entry. +type entryOf[K comparable, V any] struct { + key K + value V +} + +// NewMapOf creates a new MapOf instance with string keys. +func NewMapOf[V any]() *MapOf[string, V] { + return NewTypedMapOfPresized[string, V](hashString, minMapTableCap) +} + +// NewMapOfPresized creates a new MapOf instance with string keys and capacity +// enough to hold sizeHint entries. If sizeHint is zero or negative, the value +// is ignored. +func NewMapOfPresized[V any](sizeHint int) *MapOf[string, V] { + return NewTypedMapOfPresized[string, V](hashString, sizeHint) +} + +// IntegerConstraint represents any integer type. +type IntegerConstraint interface { + // Recreation of golang.org/x/exp/constraints.Integer to avoid taking a dependency on an + // experimental package. + ~int | ~int8 | ~int16 | ~int32 | ~int64 | ~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 | ~uintptr +} + +// NewIntegerMapOf creates a new MapOf instance with integer typed keys. +func NewIntegerMapOf[K IntegerConstraint, V any]() *MapOf[K, V] { + return NewTypedMapOfPresized[K, V](hashUint64[K], minMapTableCap) +} + +// NewIntegerMapOfPresized creates a new MapOf instance with integer typed keys +// and capacity enough to hold sizeHint entries. If sizeHint is zero or +// negative, the value is ignored. +func NewIntegerMapOfPresized[K IntegerConstraint, V any](sizeHint int) *MapOf[K, V] { + return NewTypedMapOfPresized[K, V](hashUint64[K], sizeHint) +} + +// NewTypedMapOf creates a new MapOf instance with arbitrarily typed keys. +// +// Keys are hashed to uint64 using the hasher function. It is strongly +// recommended to use the hash/maphash package to implement hasher. See the +// example for how to do that. +func NewTypedMapOf[K comparable, V any](hasher func(maphash.Seed, K) uint64) *MapOf[K, V] { + return NewTypedMapOfPresized[K, V](hasher, minMapTableCap) +} + +// NewTypedMapOfPresized creates a new MapOf instance with arbitrarily typed +// keys and capacity enough to hold sizeHint entries. If sizeHint is zero or +// negative, the value is ignored. +// +// Keys are hashed to uint64 using the hasher function. It is strongly +// recommended to use the hash/maphash package to implement hasher. See the +// example for how to do that. +func NewTypedMapOfPresized[K comparable, V any](hasher func(maphash.Seed, K) uint64, sizeHint int) *MapOf[K, V] { + m := &MapOf[K, V]{} + m.resizeCond = *sync.NewCond(&m.resizeMu) + m.hasher = hasher + var table *mapOfTable[K, V] + if sizeHint <= minMapTableCap { + table = newMapOfTable[K, V](minMapTableLen) + } else { + tableLen := nextPowOf2(uint32(sizeHint / entriesPerMapBucket)) + table = newMapOfTable[K, V](int(tableLen)) + } + atomic.StorePointer(&m.table, unsafe.Pointer(table)) + return m +} + +func newMapOfTable[K comparable, V any](tableLen int) *mapOfTable[K, V] { + buckets := make([]bucketOfPadded, tableLen) + counterLen := tableLen >> 10 + if counterLen < minMapCounterLen { + counterLen = minMapCounterLen + } else if counterLen > maxMapCounterLen { + counterLen = maxMapCounterLen + } + counter := make([]counterStripe, counterLen) + t := &mapOfTable[K, V]{ + buckets: buckets, + size: counter, + seed: maphash.MakeSeed(), + } + return t +} + +// Load returns the value stored in the map for a key, or nil if no +// value is present. +// The ok result indicates whether value was found in the map. +func (m *MapOf[K, V]) Load(key K) (value V, ok bool) { + table := (*mapOfTable[K, V])(atomic.LoadPointer(&m.table)) + hash := shiftHash(m.hasher(table.seed, key)) + bidx := uint64(len(table.buckets)-1) & hash + b := &table.buckets[bidx] + for { + for i := 0; i < entriesPerMapBucket; i++ { + // We treat the hash code only as a hint, so there is no + // need to get an atomic snapshot. + h := atomic.LoadUint64(&b.hashes[i]) + if h == uint64(0) || h != hash { + continue + } + eptr := atomic.LoadPointer(&b.entries[i]) + if eptr == nil { + continue + } + e := (*entryOf[K, V])(eptr) + if e.key == key { + return e.value, true + } + } + bptr := atomic.LoadPointer(&b.next) + if bptr == nil { + return + } + b = (*bucketOfPadded)(bptr) + } +} + +// Store sets the value for a key. +func (m *MapOf[K, V]) Store(key K, value V) { + m.doCompute( + key, + func(V, bool) (V, bool) { + return value, false + }, + false, + false, + ) +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. +// The loaded result is true if the value was loaded, false if stored. +func (m *MapOf[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { + return m.doCompute( + key, + func(V, bool) (V, bool) { + return value, false + }, + true, + false, + ) +} + +// LoadAndStore returns the existing value for the key if present, +// while setting the new value for the key. +// It stores the new value and returns the existing one, if present. +// The loaded result is true if the existing value was loaded, +// false otherwise. +func (m *MapOf[K, V]) LoadAndStore(key K, value V) (actual V, loaded bool) { + return m.doCompute( + key, + func(V, bool) (V, bool) { + return value, false + }, + false, + false, + ) +} + +// LoadOrCompute returns the existing value for the key if present. +// Otherwise, it computes the value using the provided function and +// returns the computed value. The loaded result is true if the value +// was loaded, false if stored. +func (m *MapOf[K, V]) LoadOrCompute(key K, valueFn func() V) (actual V, loaded bool) { + return m.doCompute( + key, + func(V, bool) (V, bool) { + return valueFn(), false + }, + true, + false, + ) +} + +// Compute either sets the computed new value for the key or deletes +// the value for the key. When the delete result of the valueFn function +// is set to true, the value will be deleted, if it exists. When delete +// is set to false, the value is updated to the newValue. +// The ok result indicates whether value was computed and stored, thus, is +// present in the map. The actual result contains the new value in cases where +// the value was computed and stored. See the example for a few use cases. +func (m *MapOf[K, V]) Compute( + key K, + valueFn func(oldValue V, loaded bool) (newValue V, delete bool), +) (actual V, ok bool) { + return m.doCompute(key, valueFn, false, true) +} + +// LoadAndDelete deletes the value for a key, returning the previous +// value if any. The loaded result reports whether the key was +// present. +func (m *MapOf[K, V]) LoadAndDelete(key K) (value V, loaded bool) { + return m.doCompute( + key, + func(value V, loaded bool) (V, bool) { + return value, true + }, + false, + false, + ) +} + +// Delete deletes the value for a key. +func (m *MapOf[K, V]) Delete(key K) { + m.doCompute( + key, + func(value V, loaded bool) (V, bool) { + return value, true + }, + false, + false, + ) +} + +func (m *MapOf[K, V]) doCompute( + key K, + valueFn func(oldValue V, loaded bool) (V, bool), + loadIfExists, computeOnly bool, +) (V, bool) { + // Read-only path. + if loadIfExists { + if v, ok := m.Load(key); ok { + return v, !computeOnly + } + } + // Write path. + for { + compute_attempt: + var ( + emptyb *bucketOfPadded + emptyidx int + hintNonEmpty int + ) + table := (*mapOfTable[K, V])(atomic.LoadPointer(&m.table)) + tableLen := len(table.buckets) + hash := shiftHash(m.hasher(table.seed, key)) + bidx := uint64(len(table.buckets)-1) & hash + rootb := &table.buckets[bidx] + rootb.mu.Lock() + if m.newerTableExists(table) { + // Someone resized the table. Go for another attempt. + rootb.mu.Unlock() + goto compute_attempt + } + if m.resizeInProgress() { + // Resize is in progress. Wait, then go for another attempt. + rootb.mu.Unlock() + m.waitForResize() + goto compute_attempt + } + b := rootb + for { + for i := 0; i < entriesPerMapBucket; i++ { + h := atomic.LoadUint64(&b.hashes[i]) + if h == uint64(0) { + if emptyb == nil { + emptyb = b + emptyidx = i + } + continue + } + if h != hash { + hintNonEmpty++ + continue + } + e := (*entryOf[K, V])(b.entries[i]) + if e.key == key { + if loadIfExists { + rootb.mu.Unlock() + return e.value, !computeOnly + } + // In-place update/delete. + // We get a copy of the value via an interface{} on each call, + // thus the live value pointers are unique. Otherwise atomic + // snapshot won't be correct in case of multiple Store calls + // using the same value. + oldv := e.value + newv, del := valueFn(oldv, true) + if del { + // Deletion. + // First we update the hash, then the entry. + atomic.StoreUint64(&b.hashes[i], uint64(0)) + atomic.StorePointer(&b.entries[i], nil) + leftEmpty := false + if hintNonEmpty == 0 { + leftEmpty = isEmptyBucketOf(b) + } + rootb.mu.Unlock() + table.addSize(bidx, -1) + // Might need to shrink the table. + if leftEmpty { + m.resize(table, mapShrinkHint) + } + return oldv, !computeOnly + } + newe := new(entryOf[K, V]) + newe.key = key + newe.value = newv + atomic.StorePointer(&b.entries[i], unsafe.Pointer(newe)) + rootb.mu.Unlock() + if computeOnly { + // Compute expects the new value to be returned. + return newv, true + } + // LoadAndStore expects the old value to be returned. + return oldv, true + } + hintNonEmpty++ + } + if b.next == nil { + if emptyb != nil { + // Insertion into an existing bucket. + var zeroedV V + newValue, del := valueFn(zeroedV, false) + if del { + rootb.mu.Unlock() + return zeroedV, false + } + newe := new(entryOf[K, V]) + newe.key = key + newe.value = newValue + // First we update the hash, then the entry. + atomic.StoreUint64(&emptyb.hashes[emptyidx], hash) + atomic.StorePointer(&emptyb.entries[emptyidx], unsafe.Pointer(newe)) + rootb.mu.Unlock() + table.addSize(bidx, 1) + return newValue, computeOnly + } + growThreshold := float64(tableLen) * entriesPerMapBucket * mapLoadFactor + if table.sumSize() > int64(growThreshold) { + // Need to grow the table. Then go for another attempt. + rootb.mu.Unlock() + m.resize(table, mapGrowHint) + goto compute_attempt + } + // Insertion into a new bucket. + var zeroedV V + newValue, del := valueFn(zeroedV, false) + if del { + rootb.mu.Unlock() + return newValue, false + } + // Create and append the bucket. + newb := new(bucketOfPadded) + newb.hashes[0] = hash + newe := new(entryOf[K, V]) + newe.key = key + newe.value = newValue + newb.entries[0] = unsafe.Pointer(newe) + atomic.StorePointer(&b.next, unsafe.Pointer(newb)) + rootb.mu.Unlock() + table.addSize(bidx, 1) + return newValue, computeOnly + } + b = (*bucketOfPadded)(b.next) + } + } +} + +func (m *MapOf[K, V]) newerTableExists(table *mapOfTable[K, V]) bool { + curTablePtr := atomic.LoadPointer(&m.table) + return uintptr(curTablePtr) != uintptr(unsafe.Pointer(table)) +} + +func (m *MapOf[K, V]) resizeInProgress() bool { + return atomic.LoadInt64(&m.resizing) == 1 +} + +func (m *MapOf[K, V]) waitForResize() { + m.resizeMu.Lock() + for m.resizeInProgress() { + m.resizeCond.Wait() + } + m.resizeMu.Unlock() +} + +func (m *MapOf[K, V]) resize(table *mapOfTable[K, V], hint mapResizeHint) { + var shrinkThreshold int64 + tableLen := len(table.buckets) + // Fast path for shrink attempts. + if hint == mapShrinkHint { + shrinkThreshold = int64((tableLen * entriesPerMapBucket) / mapShrinkFraction) + if tableLen == minMapTableLen || table.sumSize() > shrinkThreshold { + return + } + } + // Slow path. + if !atomic.CompareAndSwapInt64(&m.resizing, 0, 1) { + // Someone else started resize. Wait for it to finish. + m.waitForResize() + return + } + var newTable *mapOfTable[K, V] + switch hint { + case mapGrowHint: + // Grow the table with factor of 2. + atomic.AddInt64(&m.totalGrowths, 1) + newTable = newMapOfTable[K, V](tableLen << 1) + case mapShrinkHint: + if table.sumSize() <= shrinkThreshold { + // Shrink the table with factor of 2. + atomic.AddInt64(&m.totalShrinks, 1) + newTable = newMapOfTable[K, V](tableLen >> 1) + } else { + // No need to shrink. Wake up all waiters and give up. + m.resizeMu.Lock() + atomic.StoreInt64(&m.resizing, 0) + m.resizeCond.Broadcast() + m.resizeMu.Unlock() + return + } + case mapClearHint: + newTable = newMapOfTable[K, V](minMapTableLen) + default: + panic(fmt.Sprintf("unexpected resize hint: %d", hint)) + } + // Copy the data only if we're not clearing the map. + if hint != mapClearHint { + for i := 0; i < tableLen; i++ { + copied := copyBucketOf(&table.buckets[i], newTable, m.hasher) + newTable.addSizePlain(uint64(i), copied) + } + } + // Publish the new table and wake up all waiters. + atomic.StorePointer(&m.table, unsafe.Pointer(newTable)) + m.resizeMu.Lock() + atomic.StoreInt64(&m.resizing, 0) + m.resizeCond.Broadcast() + m.resizeMu.Unlock() +} + +func copyBucketOf[K comparable, V any]( + b *bucketOfPadded, + destTable *mapOfTable[K, V], + hasher func(maphash.Seed, K) uint64, +) (copied int) { + rootb := b + rootb.mu.Lock() + for { + for i := 0; i < entriesPerMapBucket; i++ { + if b.entries[i] != nil { + e := (*entryOf[K, V])(b.entries[i]) + hash := shiftHash(hasher(destTable.seed, e.key)) + bidx := uint64(len(destTable.buckets)-1) & hash + destb := &destTable.buckets[bidx] + appendToBucketOf(hash, b.entries[i], destb) + copied++ + } + } + if b.next == nil { + rootb.mu.Unlock() + return + } + b = (*bucketOfPadded)(b.next) + } +} + +// Range calls f sequentially for each key and value present in the +// map. If f returns false, range stops the iteration. +// +// Range does not necessarily correspond to any consistent snapshot +// of the Map's contents: no key will be visited more than once, but +// if the value for any key is stored or deleted concurrently, Range +// may reflect any mapping for that key from any point during the +// Range call. +// +// It is safe to modify the map while iterating it. However, the +// concurrent modification rule apply, i.e. the changes may be not +// reflected in the subsequently iterated entries. +func (m *MapOf[K, V]) Range(f func(key K, value V) bool) { + var zeroPtr unsafe.Pointer + // Pre-allocate array big enough to fit entries for most hash tables. + bentries := make([]unsafe.Pointer, 0, 16*entriesPerMapBucket) + tablep := atomic.LoadPointer(&m.table) + table := *(*mapOfTable[K, V])(tablep) + for i := range table.buckets { + rootb := &table.buckets[i] + b := rootb + // Prevent concurrent modifications and copy all entries into + // the intermediate slice. + rootb.mu.Lock() + for { + for i := 0; i < entriesPerMapBucket; i++ { + if b.entries[i] != nil { + bentries = append(bentries, b.entries[i]) + } + } + if b.next == nil { + rootb.mu.Unlock() + break + } + b = (*bucketOfPadded)(b.next) + } + // Call the function for all copied entries. + for j := range bentries { + entry := (*entryOf[K, V])(bentries[j]) + if !f(entry.key, entry.value) { + return + } + // Remove the reference to avoid preventing the copied + // entries from being GCed until this method finishes. + bentries[j] = zeroPtr + } + bentries = bentries[:0] + } +} + +// Clear deletes all keys and values currently stored in the map. +func (m *MapOf[K, V]) Clear() { + table := (*mapOfTable[K, V])(atomic.LoadPointer(&m.table)) + m.resize(table, mapClearHint) +} + +// Size returns current size of the map. +func (m *MapOf[K, V]) Size() int { + table := (*mapOfTable[K, V])(atomic.LoadPointer(&m.table)) + return int(table.sumSize()) +} + +func appendToBucketOf(hash uint64, entryPtr unsafe.Pointer, b *bucketOfPadded) { + for { + for i := 0; i < entriesPerMapBucket; i++ { + if b.entries[i] == nil { + b.hashes[i] = hash + b.entries[i] = entryPtr + return + } + } + if b.next == nil { + newb := new(bucketOfPadded) + newb.hashes[0] = hash + newb.entries[0] = entryPtr + b.next = unsafe.Pointer(newb) + return + } + b = (*bucketOfPadded)(b.next) + } +} + +func isEmptyBucketOf(rootb *bucketOfPadded) bool { + b := rootb + for { + for i := 0; i < entriesPerMapBucket; i++ { + if b.entries[i] != nil { + return false + } + } + if b.next == nil { + return true + } + b = (*bucketOfPadded)(b.next) + } +} + +func (table *mapOfTable[K, V]) addSize(bucketIdx uint64, delta int) { + cidx := uint64(len(table.size)-1) & bucketIdx + atomic.AddInt64(&table.size[cidx].c, int64(delta)) +} + +func (table *mapOfTable[K, V]) addSizePlain(bucketIdx uint64, delta int) { + cidx := uint64(len(table.size)-1) & bucketIdx + table.size[cidx].c += int64(delta) +} + +func (table *mapOfTable[K, V]) sumSize() int64 { + sum := int64(0) + for i := range table.size { + sum += atomic.LoadInt64(&table.size[i].c) + } + return sum +} + +func shiftHash(h uint64) uint64 { + // uint64(0) is a reserved value which stands for an empty slot. + if h == uint64(0) { + return uint64(1) + } + return h +} + +// O(N) operation; use for debug purposes only +func (m *MapOf[K, V]) stats() mapStats { + stats := mapStats{ + TotalGrowths: atomic.LoadInt64(&m.totalGrowths), + TotalShrinks: atomic.LoadInt64(&m.totalShrinks), + MinEntries: math.MaxInt32, + } + table := (*mapOfTable[K, V])(atomic.LoadPointer(&m.table)) + stats.RootBuckets = len(table.buckets) + stats.Counter = int(table.sumSize()) + stats.CounterLen = len(table.size) + for i := range table.buckets { + nentries := 0 + b := &table.buckets[i] + stats.TotalBuckets++ + for { + nentriesLocal := 0 + stats.Capacity += entriesPerMapBucket + for i := 0; i < entriesPerMapBucket; i++ { + if atomic.LoadPointer(&b.entries[i]) != nil { + stats.Size++ + nentriesLocal++ + } + } + nentries += nentriesLocal + if nentriesLocal == 0 { + stats.EmptyBuckets++ + } + if b.next == nil { + break + } + b = (*bucketOfPadded)(b.next) + stats.TotalBuckets++ + } + if nentries < stats.MinEntries { + stats.MinEntries = nentries + } + if nentries > stats.MaxEntries { + stats.MaxEntries = nentries + } + } + return stats +} diff --git a/vendor/github.com/puzpuzpuz/xsync/v2/mpmcqueue.go b/vendor/github.com/puzpuzpuz/xsync/v2/mpmcqueue.go new file mode 100644 index 000000000..96584e698 --- /dev/null +++ b/vendor/github.com/puzpuzpuz/xsync/v2/mpmcqueue.go @@ -0,0 +1,137 @@ +package xsync + +import ( + "runtime" + "sync/atomic" + "unsafe" +) + +// A MPMCQueue is a bounded multi-producer multi-consumer concurrent +// queue. +// +// MPMCQueue instances must be created with NewMPMCQueue function. +// A MPMCQueue must not be copied after first use. +// +// Based on the data structure from the following C++ library: +// https://github.com/rigtorp/MPMCQueue +type MPMCQueue struct { + cap uint64 + head uint64 + //lint:ignore U1000 prevents false sharing + hpad [cacheLineSize - 8]byte + tail uint64 + //lint:ignore U1000 prevents false sharing + tpad [cacheLineSize - 8]byte + slots []slotPadded +} + +type slotPadded struct { + slot + //lint:ignore U1000 prevents false sharing + pad [cacheLineSize - unsafe.Sizeof(slot{})]byte +} + +type slot struct { + turn uint64 + item interface{} +} + +// NewMPMCQueue creates a new MPMCQueue instance with the given +// capacity. +func NewMPMCQueue(capacity int) *MPMCQueue { + if capacity < 1 { + panic("capacity must be positive number") + } + return &MPMCQueue{ + cap: uint64(capacity), + slots: make([]slotPadded, capacity), + } +} + +// Enqueue inserts the given item into the queue. +// Blocks, if the queue is full. +func (q *MPMCQueue) Enqueue(item interface{}) { + head := atomic.AddUint64(&q.head, 1) - 1 + slot := &q.slots[q.idx(head)] + turn := q.turn(head) * 2 + for atomic.LoadUint64(&slot.turn) != turn { + runtime.Gosched() + } + slot.item = item + atomic.StoreUint64(&slot.turn, turn+1) +} + +// Dequeue retrieves and removes the item from the head of the queue. +// Blocks, if the queue is empty. +func (q *MPMCQueue) Dequeue() interface{} { + tail := atomic.AddUint64(&q.tail, 1) - 1 + slot := &q.slots[q.idx(tail)] + turn := q.turn(tail)*2 + 1 + for atomic.LoadUint64(&slot.turn) != turn { + runtime.Gosched() + } + item := slot.item + slot.item = nil + atomic.StoreUint64(&slot.turn, turn+1) + return item +} + +// TryEnqueue inserts the given item into the queue. Does not block +// and returns immediately. The result indicates that the queue isn't +// full and the item was inserted. +func (q *MPMCQueue) TryEnqueue(item interface{}) bool { + head := atomic.LoadUint64(&q.head) + for { + slot := &q.slots[q.idx(head)] + turn := q.turn(head) * 2 + if atomic.LoadUint64(&slot.turn) == turn { + if atomic.CompareAndSwapUint64(&q.head, head, head+1) { + slot.item = item + atomic.StoreUint64(&slot.turn, turn+1) + return true + } + } else { + prevHead := head + head = atomic.LoadUint64(&q.head) + if head == prevHead { + return false + } + } + runtime.Gosched() + } +} + +// TryDequeue retrieves and removes the item from the head of the +// queue. Does not block and returns immediately. The ok result +// indicates that the queue isn't empty and an item was retrieved. +func (q *MPMCQueue) TryDequeue() (item interface{}, ok bool) { + tail := atomic.LoadUint64(&q.tail) + for { + slot := &q.slots[q.idx(tail)] + turn := q.turn(tail)*2 + 1 + if atomic.LoadUint64(&slot.turn) == turn { + if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { + item = slot.item + ok = true + slot.item = nil + atomic.StoreUint64(&slot.turn, turn+1) + return + } + } else { + prevTail := tail + tail = atomic.LoadUint64(&q.tail) + if tail == prevTail { + return + } + } + runtime.Gosched() + } +} + +func (q *MPMCQueue) idx(i uint64) uint64 { + return i % q.cap +} + +func (q *MPMCQueue) turn(i uint64) uint64 { + return i / q.cap +} diff --git a/vendor/github.com/puzpuzpuz/xsync/v2/rbmutex.go b/vendor/github.com/puzpuzpuz/xsync/v2/rbmutex.go new file mode 100644 index 000000000..c4a503ff0 --- /dev/null +++ b/vendor/github.com/puzpuzpuz/xsync/v2/rbmutex.go @@ -0,0 +1,145 @@ +package xsync + +import ( + "runtime" + "sync" + "sync/atomic" + "time" +) + +// slow-down guard +const nslowdown = 7 + +// pool for reader tokens +var rtokenPool sync.Pool + +// RToken is a reader lock token. +type RToken struct { + slot uint32 + //lint:ignore U1000 prevents false sharing + pad [cacheLineSize - 4]byte +} + +// A RBMutex is a reader biased reader/writer mutual exclusion lock. +// The lock can be held by an many readers or a single writer. +// The zero value for a RBMutex is an unlocked mutex. +// +// A RBMutex must not be copied after first use. +// +// RBMutex is based on a modified version of BRAVO +// (Biased Locking for Reader-Writer Locks) algorithm: +// https://arxiv.org/pdf/1810.01553.pdf +// +// RBMutex is a specialized mutex for scenarios, such as caches, +// where the vast majority of locks are acquired by readers and write +// lock acquire attempts are infrequent. In such scenarios, RBMutex +// performs better than sync.RWMutex on large multicore machines. +// +// RBMutex extends sync.RWMutex internally and uses it as the "reader +// bias disabled" fallback, so the same semantics apply. The only +// noticeable difference is in reader tokens returned from the +// RLock/RUnlock methods. +type RBMutex struct { + rslots []rslot + rmask uint32 + rbias int32 + inhibitUntil time.Time + rw sync.RWMutex +} + +type rslot struct { + mu int32 + //lint:ignore U1000 prevents false sharing + pad [cacheLineSize - 4]byte +} + +// NewRBMutex creates a new RBMutex instance. +func NewRBMutex() *RBMutex { + nslots := nextPowOf2(parallelism()) + mu := RBMutex{ + rslots: make([]rslot, nslots), + rmask: nslots - 1, + rbias: 1, + } + return &mu +} + +// RLock locks m for reading and returns a reader token. The +// token must be used in the later RUnlock call. +// +// Should not be used for recursive read locking; a blocked Lock +// call excludes new readers from acquiring the lock. +func (mu *RBMutex) RLock() *RToken { + if atomic.LoadInt32(&mu.rbias) == 1 { + t, ok := rtokenPool.Get().(*RToken) + if !ok { + t = new(RToken) + t.slot = fastrand() + } + // Try all available slots to distribute reader threads to slots. + for i := 0; i < len(mu.rslots); i++ { + slot := t.slot + uint32(i) + rslot := &mu.rslots[slot&mu.rmask] + rslotmu := atomic.LoadInt32(&rslot.mu) + if atomic.CompareAndSwapInt32(&rslot.mu, rslotmu, rslotmu+1) { + if atomic.LoadInt32(&mu.rbias) == 1 { + // Hot path succeeded. + t.slot = slot + return t + } + // The mutex is no longer reader biased. Go to the slow path. + atomic.AddInt32(&rslot.mu, -1) + rtokenPool.Put(t) + break + } + // Contention detected. Give a try with the next slot. + } + } + // Slow path. + mu.rw.RLock() + if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) { + atomic.StoreInt32(&mu.rbias, 1) + } + return nil +} + +// RUnlock undoes a single RLock call. A reader token obtained from +// the RLock call must be provided. RUnlock does not affect other +// simultaneous readers. A panic is raised if m is not locked for +// reading on entry to RUnlock. +func (mu *RBMutex) RUnlock(t *RToken) { + if t == nil { + mu.rw.RUnlock() + return + } + if atomic.AddInt32(&mu.rslots[t.slot&mu.rmask].mu, -1) < 0 { + panic("invalid reader state detected") + } + rtokenPool.Put(t) +} + +// Lock locks m for writing. If the lock is already locked for +// reading or writing, Lock blocks until the lock is available. +func (mu *RBMutex) Lock() { + mu.rw.Lock() + if atomic.LoadInt32(&mu.rbias) == 1 { + atomic.StoreInt32(&mu.rbias, 0) + start := time.Now() + for i := 0; i < len(mu.rslots); i++ { + for atomic.LoadInt32(&mu.rslots[i].mu) > 0 { + runtime.Gosched() + } + } + mu.inhibitUntil = time.Now().Add(time.Since(start) * nslowdown) + } +} + +// Unlock unlocks m for writing. A panic is raised if m is not locked +// for writing on entry to Unlock. +// +// As with RWMutex, a locked RBMutex is not associated with a +// particular goroutine. One goroutine may RLock (Lock) a RBMutex and +// then arrange for another goroutine to RUnlock (Unlock) it. +func (mu *RBMutex) Unlock() { + mu.rw.Unlock() +} diff --git a/vendor/github.com/puzpuzpuz/xsync/v2/util.go b/vendor/github.com/puzpuzpuz/xsync/v2/util.go new file mode 100644 index 000000000..12d83e614 --- /dev/null +++ b/vendor/github.com/puzpuzpuz/xsync/v2/util.go @@ -0,0 +1,52 @@ +package xsync + +import ( + "hash/maphash" + "runtime" + _ "unsafe" +) + +// test-only assert()-like flag +var assertionsEnabled = false + +const ( + // cacheLineSize is used in paddings to prevent false sharing; + // 64B are used instead of 128B as a compromise between + // memory footprint and performance; 128B usage may give ~30% + // improvement on NUMA machines. + cacheLineSize = 64 +) + +// nextPowOf2 computes the next highest power of 2 of 32-bit v. +// Source: https://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 +func nextPowOf2(v uint32) uint32 { + v-- + v |= v >> 1 + v |= v >> 2 + v |= v >> 4 + v |= v >> 8 + v |= v >> 16 + v++ + return v +} + +func parallelism() uint32 { + maxProcs := uint32(runtime.GOMAXPROCS(0)) + numCores := uint32(runtime.NumCPU()) + if maxProcs < numCores { + return maxProcs + } + return numCores +} + +// hashString calculates a hash of s with the given seed. +func hashString(seed maphash.Seed, s string) uint64 { + var h maphash.Hash + h.SetSeed(seed) + h.WriteString(s) + return h.Sum64() +} + +//go:noescape +//go:linkname fastrand runtime.fastrand +func fastrand() uint32 diff --git a/vendor/github.com/puzpuzpuz/xsync/v2/util_mapof.go b/vendor/github.com/puzpuzpuz/xsync/v2/util_mapof.go new file mode 100644 index 000000000..fbb00c438 --- /dev/null +++ b/vendor/github.com/puzpuzpuz/xsync/v2/util_mapof.go @@ -0,0 +1,22 @@ +//go:build go1.18 +// +build go1.18 + +package xsync + +import ( + "hash/maphash" + "unsafe" +) + +// hashUint64 calculates a hash of v with the given seed. +// +//lint:ignore U1000 used in MapOf +func hashUint64[K IntegerConstraint](seed maphash.Seed, k K) uint64 { + n := uint64(k) + // Java's Long standard hash function. + n = n ^ (n >> 32) + nseed := *(*uint64)(unsafe.Pointer(&seed)) + // 64-bit variation of boost's hash_combine. + nseed ^= n + 0x9e3779b97f4a7c15 + (nseed << 12) + (nseed >> 4) + return nseed +} diff --git a/vendor/modules.txt b/vendor/modules.txt index b83d33e9b..f75897860 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -264,6 +264,9 @@ github.com/prometheus/common/model github.com/prometheus/procfs github.com/prometheus/procfs/internal/fs github.com/prometheus/procfs/internal/util +# github.com/puzpuzpuz/xsync/v2 v2.4.0 +## explicit; go 1.18 +github.com/puzpuzpuz/xsync/v2 # github.com/rogpeppe/go-internal v1.10.0 ## explicit; go 1.19 github.com/rogpeppe/go-internal/fmtsort