01.单生产者单消费者
a.基本实现
---
func producer(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
func consumer(ch <-chan int) {
for v := range ch {
fmt.Println(v)
}
}
func main() {
ch := make(chan int, 5)
go producer(ch)
consumer(ch)
}
---
02.多生产者多消费者
a.使用WaitGroup
---
func main() {
ch := make(chan int, 10)
var wg sync.WaitGroup
// 3个生产者
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < 5; j++ {
ch <- id*10 + j
}
}(i)
}
go func() {
wg.Wait()
close(ch)
}()
// 2个消费者
var cwg sync.WaitGroup
for i := 0; i < 2; i++ {
cwg.Add(1)
go func(id int) {
defer cwg.Done()
for v := range ch {
fmt.Printf("Consumer %d: %d\n", id, v)
}
}(i)
}
cwg.Wait()
}
---
3.2 工作池模式
01.固定worker数
a.实现
---
type Job struct {
ID int
}
type Result struct {
Job Job
Sum int
}
func worker(id int, jobs <-chan Job, results chan<- Result) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job.ID)
sum := 0
for i := 0; i < 1000; i++ {
sum += i
}
results <- Result{Job: job, Sum: sum}
}
}
func main() {
jobs := make(chan Job, 100)
results := make(chan Result, 100)
// 启动3个worker
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// 发送任务
for j := 1; j <= 9; j++ {
jobs <- Job{ID: j}
}
close(jobs)
// 收集结果
for a := 1; a <= 9; a++ {
result := <-results
fmt.Printf("Job %d result: %d\n", result.Job.ID, result.Sum)
}
}
---
3.3 扇入扇出
01.扇出
a.一对多
---
func fanOut(in <-chan int, n int) []<-chan int {
outs := make([]<-chan int, n)
for i := 0; i < n; i++ {
outs[i] = process(in)
}
return outs
}
func process(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range in {
out <- v * 2
}
}()
return out
}
---
02.扇入
a.多对一
---
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
---
3.4 管道模式
01.多阶段处理
a.链式处理
---
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
c := gen(2, 3, 4)
out := sq(c)
for v := range out {
fmt.Println(v)
}
}
---
3.5 超时控制
01.单个操作超时
a.time.After
---
select {
case result := <-doWork():
fmt.Println(result)
case <-time.After(time.Second):
fmt.Println("timeout")
}
---
02.整体超时
a.context.WithTimeout
---
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
done := make(chan struct{})
go func() {
// 执行工作
time.Sleep(time.Second * 3)
done <- struct{}{}
}()
select {
case <-done:
fmt.Println("完成")
case <-ctx.Done():
fmt.Println("超时")
}
---
4 并发安全
4.1 数据竞争检测
01.race detector
a.使用方法
---
go run -race main.go
go test -race
go build -race
---
02.示例
a.检测竞争
---
package main
import (
"fmt"
"time"
)
var counter int
func main() {
go func() {
counter++
}()
go func() {
counter++
}()
time.Sleep(time.Second)
fmt.Println(counter)
}
---
4.2 互斥锁Mutex
01.基本使用
a.Lock和Unlock
---
var (
counter int
mu sync.Mutex
)
func increment() {
mu.Lock()
defer mu.Unlock()
counter++
}
---
02.封装
a.安全的计数器
---
type SafeCounter struct {
mu sync.Mutex
count int
}
func (c *SafeCounter) Inc() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
func (c *SafeCounter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
---
4.3 读写锁RWMutex
01.基本使用
a.RLock和Lock
---
var (
data = make(map[string]int)
rwmu sync.RWMutex
)
func read(key string) int {
rwmu.RLock()
defer rwmu.RUnlock()
return data[key]
}
func write(key string, value int) {
rwmu.Lock()
defer rwmu.Unlock()
data[key] = value
}
---
02.适用场景
a.读多写少
多个goroutine可以同时读
写操作独占
4.4 原子操作atomic
01.基本操作
a.Add和Load
---
var counter int64
atomic.AddInt64(&counter, 1)
value := atomic.LoadInt64(&counter)
---
02.CompareAndSwap
a.CAS操作
---
var value int64 = 10
swapped := atomic.CompareAndSwapInt64(&value, 10, 20)
fmt.Println(swapped, value)
---
03.atomic.Value
a.存储任意类型
---
var config atomic.Value
config.Store(map[string]string{"key": "value"})
cfg := config.Load().(map[string]string)
---
4.5 sync.Map并发安全Map
01.基本使用
a.Store和Load
---
var m sync.Map
m.Store("key", "value")
value, ok := m.Load("key")
if ok {
fmt.Println(value)
}
---
02.其他方法
a.Delete和Range
---
m.Delete("key")
m.Range(func(key, value interface{}) bool {
fmt.Println(key, value)
return true
})
---
03.适用场景
a.读多写少
键值相对稳定
b.多个goroutine操作不同键
减少锁竞争