-
Notifications
You must be signed in to change notification settings - Fork 289
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
98 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
* @Author: shgopher [email protected] | ||
* @Date: 2023-05-14 23:08:19 | ||
* @LastEditors: shgopher [email protected] | ||
* @LastEditTime: 2023-12-28 22:44:09 | ||
* @LastEditTime: 2023-12-29 00:23:49 | ||
* @FilePath: /GOFamily/并发/同步原语/README.md | ||
* @Description: | ||
* | ||
|
@@ -553,7 +553,11 @@ func main(){ | |
``` | ||
注意,我们这里每次循环都调用了一次 add,但是 add 的调用始终发生在 wait 之前,这还是属于同一轮的多次 add 调用,这符合 waigroup 的规定 | ||
|
||
## singleflight | ||
|
||
## cyclicBarrier | ||
|
||
## errgroup | ||
|
||
## sync.Once | ||
once 用来执行仅发生一次的动作,常用与单例模式,对象初始化的行为,并且经常在 init 函数中使用 | ||
|
@@ -875,11 +879,102 @@ pool.Schedule(task1) | |
|
||
|
||
## semaphore | ||
信号量 (英语:semaphore) 又称为信号标,是一个同步对象,用于保持在 0 至指定最大值之间的一个计数值。 | ||
|
||
## singleflight | ||
在系统中,给予每一个进程一个信号量,代表每个进程目前的状态,未得到控制权的进程会在特定地方被强迫停下来,等待可以继续进行的信号到来 | ||
|
||
## errgroup | ||
根据信号量的不同可以分为计数信号量和二进制信号量,前者使用一个整数作为信号量,后者使用一个二进制 0 1 作为信号量 | ||
|
||
信号量拥有两个操作: | ||
- p 操作会减少信号量的数值 | ||
- v 操作会增加信号量的数值 | ||
|
||
其中二进制信号量是特殊的信号量,它就是互斥锁的功能 | ||
|
||
go 语言在 x/sync 中提供了一个 weighted 的包,它就是提供的信号量的功能 | ||
|
||
- Acquire p 操作,减少信号量的数值,表示获取了资源 | ||
- Release v 操作,增加信号量的数值,表示释放了资源 | ||
- TryAcquire 尝试获取资源,如果获取成功,则返回 true,否则返回 false | ||
它类似于 trylock 锁,也就是失败直接返回 false,并不会阻塞 | ||
|
||
让我们使用信号量来实现一个 worker pool | ||
```go | ||
package main | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"log" | ||
"runtime" | ||
"time" | ||
) | ||
|
||
var ( | ||
// 最大的worker数量 | ||
maxWorkers = runtime.GOMAXPROCS(0) | ||
sema = semaphore.NewWeighted(int64(maxWorkers)) //信号量 | ||
task = make([]int, maxWorkers*4) // 任务数,是worker的四倍 | ||
) | ||
|
||
func main() { | ||
ctx := context.Background() | ||
for i := range task { | ||
// 如果没有worker可用,会阻塞在这里,直到某个worker被释放 | ||
if err := sema.Acquire(ctx, 1); err != nil { | ||
break | ||
} | ||
// 启动worker goroutine | ||
go func(i int) { | ||
defer sema.Release(1) | ||
time.Sleep(100 * time.Millisecond) // 模拟一个耗时操作 | ||
task[i] = i + 1 | ||
}(i) | ||
} | ||
// 请求所有的worker,这样能确保前面的worker都执行完 | ||
if err := sema.Acquire(ctx, int64(maxWorkers)); err != nil { | ||
log.Printf("获取所有的worker失败: %v", err) | ||
} | ||
fmt.Println(task) | ||
} | ||
``` | ||
### 使用信号量时的注意事项 | ||
- 请求了资源,忘记了释放 | ||
- 释放了从未请求的资源 | ||
- 长时间持有一个资源但是不使用它 | ||
- 不持有一个资源,但是直接使用了它 | ||
### 使用 channel 去实现一个信号量 | ||
使用一个缓存为 n 的 channel 去实现一个信号量 | ||
|
||
```go | ||
package main | ||
|
||
import "sync" | ||
|
||
// Semaphore 数据结构,并且还实现了Locker接口 | ||
type semaphore struct { | ||
ch chan struct{} | ||
} | ||
|
||
// 创建一个新的信号量 | ||
func NewSemaphore(capacity int) sync.Locker { | ||
if capacity <= 0 { | ||
capacity = 1 // 容量为1就变成了一个互斥锁 | ||
} | ||
return &semaphore{ch: make(chan struct{}, capacity)} | ||
} | ||
|
||
// 请求一个资源 | ||
func (s *semaphore) Lock() { | ||
s.ch <- struct{}{} | ||
} | ||
|
||
// 释放资源 | ||
func (s *semaphore) Unlock() { | ||
<-s.ch | ||
} | ||
|
||
``` | ||
## issues | ||
### 问题一:有互斥锁就一定有临界区吗? | ||
互斥锁的存在不等于必须存在临界区。 | ||
|