01.基本实现
a.使用channel
---
package main
import (
"fmt"
"time"
)
func producer(ch chan<- int) {
for i := 0; i < 5; i++ {
ch <- i
fmt.Println("生产:", i)
time.Sleep(time.Millisecond * 100)
}
close(ch)
}
func consumer(ch <-chan int) {
for v := range ch {
fmt.Println("消费:", v)
time.Sleep(time.Millisecond * 200)
}
}
func main() {
ch := make(chan int, 2)
go producer(ch)
consumer(ch)
}
---
3.2 工作池模式
01.工作池实现
a.固定数量worker
---
package main
import (
"fmt"
"sync"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d 处理任务 %d\n", id, job)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 10)
results := make(chan int, 10)
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
for i := 1; i <= 5; i++ {
jobs <- i
}
close(jobs)
wg.Wait()
close(results)
for result := range results {
fmt.Println("结果:", result)
}
}
---
3.3 扇入扇出模式
01.扇出模式
a.一个输入多个输出
---
func fanOut(input <-chan int, n int) []<-chan int {
outputs := make([]<-chan int, n)
for i := 0; i < n; i++ {
outputs[i] = process(input)
}
return outputs
}
func process(input <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range input {
out <- v * 2
}
}()
return out
}
---
02.扇入模式
a.多个输入一个输出
---
func fanIn(channels ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
---
3.4 管道模式
01.管道实现
a.多阶段处理
---
package main
import "fmt"
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
c := gen(2, 3, 4)
out := square(c)
for v := range out {
fmt.Println(v)
}
}
---
01.细粒度锁
a.减少锁竞争
---
type SafeMap struct {
mu sync.RWMutex
data map[string]int
}
func (m *SafeMap) Get(key string) int {
m.mu.RLock()
defer m.mu.RUnlock()
return m.data[key]
}
func (m *SafeMap) Set(key string, value int) {
m.mu.Lock()
defer m.mu.Unlock()
m.data[key] = value
}
---
02.分段锁
a.提高并发性
---
type ShardedMap struct {
shards []*Shard
}
type Shard struct {
mu sync.RWMutex
data map[string]int
}
func (m *ShardedMap) getShard(key string) *Shard {
hash := hash(key)
return m.shards[hash%len(m.shards)]
}
---
4.3 性能优化
01.选择合适的锁
a.Mutex vs RWMutex
读多写少用RWMutex
简单场景用Mutex
b.Mutex vs atomic
简单计数用atomic
复杂操作用Mutex
02.减少锁持有时间
a.最佳实践
---
// 不好
mu.Lock()
result := expensiveOperation()
data = result
mu.Unlock()
// 好
result := expensiveOperation()
mu.Lock()
data = result
mu.Unlock()
---