1 Goroutine

1.1 定义与特点

01.Goroutine定义
    a.概念
        Go语言的轻量级线程
        由Go运行时管理
    b.特点
        启动成本低(约2KB栈空间)
        调度开销小
        支持百万级并发

02.与线程对比
    a.内存占用
        线程:MB级
        Goroutine:KB级
    b.调度方式
        线程:操作系统调度
        Goroutine:Go运行时调度

1.2 创建与启动

01.基本语法
    a.go关键字
        ---
        package main

        import (
            "fmt"
            "time"
        )

        func sayHello() {
            fmt.Println("Hello")
        }

        func main() {
            go sayHello()
            time.Sleep(time.Second)
        }
        ---

02.匿名函数
    a.直接启动
        ---
        go func() {
            fmt.Println("匿名goroutine")
        }()
        ---
    b.传递参数
        ---
        for i := 0; i < 5; i++ {
            go func(n int) {
                fmt.Println(n)
            }(i)
        }
        ---

03.常见错误
    a.闭包陷阱
        ---
        // 错误
        for i := 0; i < 5; i++ {
            go func() {
                fmt.Println(i) // 都打印5
            }()
        }

        // 正确
        for i := 0; i < 5; i++ {
            go func(n int) {
                fmt.Println(n)
            }(i)
        }
        ---

1.3 Goroutine调度:GMP模型

01.GMP模型
    a.组成
        G:Goroutine
        M:Machine(系统线程)
        P:Processor(逻辑处理器)
    b.调度流程
        P维护本地队列
        M从P获取G执行
        全局队列作为备份

02.调度策略
    a.工作窃取
        空闲P从其他P窃取G
    b.抢占式调度
        防止单个G长时间占用
    c.系统调用处理
        M阻塞时P转移到其他M

03.GOMAXPROCS
    a.设置P数量
        ---
        import "runtime"

        func main() {
            // 设置使用的CPU核心数
            runtime.GOMAXPROCS(4)

            // 获取当前设置
            n := runtime.GOMAXPROCS(0)
            fmt.Println("GOMAXPROCS:", n)
        }
        ---

1.4 Goroutine泄漏

01.泄漏原因
    a.channel阻塞
        发送或接收永久阻塞
    b.死循环
        goroutine无法退出
    c.资源未释放
        忘记关闭channel

02.检测方法
    a.runtime.NumGoroutine
        ---
        fmt.Println("Goroutines:", runtime.NumGoroutine())
        ---
    b.pprof工具
        ---
        import _ "net/http/pprof"

        go func() {
            http.ListenAndServe("localhost:6060", nil)
        }()
        ---

03.预防措施
    a.使用context
        ---
        func worker(ctx context.Context) {
            for {
                select {
                case <-ctx.Done():
                    return
                default:
                    // 工作...
                }
            }
        }
        ---
    b.超时控制
        ---
        select {
        case <-ch:
            // 处理
        case <-time.After(time.Second):
            return
        }
        ---

1.5 Goroutine池

01.为什么需要池
    a.控制并发数
        避免创建过多goroutine
    b.资源复用
        减少创建销毁开销

02.简单实现
    a.固定大小池
        ---
        package main

        import (
            "fmt"
            "sync"
        )

        type Pool struct {
            tasks chan func()
            wg    sync.WaitGroup
        }

        func NewPool(size int) *Pool {
            p := &Pool{
                tasks: make(chan func(), 100),
            }

            for i := 0; i < size; i++ {
                p.wg.Add(1)
                go p.worker()
            }

            return p
        }

        func (p *Pool) worker() {
            defer p.wg.Done()
            for task := range p.tasks {
                task()
            }
        }

        func (p *Pool) Submit(task func()) {
            p.tasks <- task
        }

        func (p *Pool) Close() {
            close(p.tasks)
            p.wg.Wait()
        }

        func main() {
            pool := NewPool(3)

            for i := 0; i < 10; i++ {
                n := i
                pool.Submit(func() {
                    fmt.Println("Task", n)
                })
            }

            pool.Close()
        }
        ---

03.第三方库
    a.ants
        高性能goroutine池
    b.tunny
        简单易用的池实现

2 并发控制

2.1 Channel通道

01.Channel类型
    a.无缓冲channel
        ---
        ch := make(chan int)
        go func() {
            ch <- 42
        }()
        v := <-ch
        ---
    b.有缓冲channel
        ---
        ch := make(chan int, 3)
        ch <- 1
        ch <- 2
        ---

02.单向channel
    a.只发送
        ---
        func send(ch chan<- int) {
            ch <- 1
        }
        ---
    b.只接收
        ---
        func receive(ch <-chan int) {
            v := <-ch
        }
        ---

03.关闭channel
    a.close函数
        ---
        ch := make(chan int, 2)
        ch <- 1
        close(ch)

        v, ok := <-ch
        if ok {
            fmt.Println(v)
        }
        ---

2.2 Select多路复用

01.基本用法
    a.多路选择
        ---
        select {
        case v := <-ch1:
            fmt.Println("ch1:", v)
        case v := <-ch2:
            fmt.Println("ch2:", v)
        case ch3 <- 1:
            fmt.Println("sent to ch3")
        default:
            fmt.Println("no channel ready")
        }
        ---

02.超时模式
    a.time.After
        ---
        select {
        case v := <-ch:
            fmt.Println(v)
        case <-time.After(time.Second):
            fmt.Println("timeout")
        }
        ---

03.退出模式
    a.done channel
        ---
        done := make(chan struct{})

        go func() {
            for {
                select {
                case <-done:
                    return
                default:
                    // 工作...
                }
            }
        }()

        // 通知退出
        close(done)
        ---

2.3 Context上下文

01.Context类型
    a.Background和TODO
        ---
        ctx := context.Background()
        ctx := context.TODO()
        ---

02.WithCancel
    a.取消控制
        ---
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()

        go func(ctx context.Context) {
            for {
                select {
                case <-ctx.Done():
                    return
                default:
                    // 工作...
                }
            }
        }(ctx)

        time.Sleep(time.Second)
        cancel()
        ---

03.WithTimeout
    a.超时控制
        ---
        ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
        defer cancel()

        select {
        case <-doWork(ctx):
            fmt.Println("完成")
        case <-ctx.Done():
            fmt.Println("超时")
        }
        ---

04.WithValue
    a.传递值
        ---
        ctx := context.WithValue(context.Background(), "key", "value")
        value := ctx.Value("key").(string)
        ---

2.4 WaitGroup等待组

01.基本用法
    a.Add、Done、Wait
        ---
        var wg sync.WaitGroup

        for i := 0; i < 5; i++ {
            wg.Add(1)
            go func(n int) {
                defer wg.Done()
                fmt.Println(n)
            }(i)
        }

        wg.Wait()
        ---

02.常见错误
    a.Add在goroutine内
        ---
        // 错误
        go func() {
            wg.Add(1)
            defer wg.Done()
        }()

        // 正确
        wg.Add(1)
        go func() {
            defer wg.Done()
        }()
        ---

2.5 ErrGroup错误组

01.基本概念
    a.定义
        golang.org/x/sync/errgroup
        WaitGroup的增强版
        支持错误处理和context

02.基本用法
    a.并发执行
        ---
        import "golang.org/x/sync/errgroup"

        func main() {
            g := new(errgroup.Group)

            urls := []string{
                "http://example.com",
                "http://example.org",
            }

            for _, url := range urls {
                url := url
                g.Go(func() error {
                    resp, err := http.Get(url)
                    if err != nil {
                        return err
                    }
                    defer resp.Body.Close()
                    return nil
                })
            }

            if err := g.Wait(); err != nil {
                fmt.Println("Error:", err)
            }
        }
        ---

03.WithContext
    a.自动取消
        ---
        g, ctx := errgroup.WithContext(context.Background())

        g.Go(func() error {
            return doWork(ctx)
        })

        if err := g.Wait(); err != nil {
            fmt.Println(err)
        }
        ---

3 并发模式

3.1 生产者消费者

01.单生产者单消费者
    a.基本实现
        ---
        func producer(ch chan<- int) {
            for i := 0; i < 10; i++ {
                ch <- i
            }
            close(ch)
        }

        func consumer(ch <-chan int) {
            for v := range ch {
                fmt.Println(v)
            }
        }

        func main() {
            ch := make(chan int, 5)
            go producer(ch)
            consumer(ch)
        }
        ---

02.多生产者多消费者
    a.使用WaitGroup
        ---
        func main() {
            ch := make(chan int, 10)
            var wg sync.WaitGroup

            // 3个生产者
            for i := 0; i < 3; i++ {
                wg.Add(1)
                go func(id int) {
                    defer wg.Done()
                    for j := 0; j < 5; j++ {
                        ch <- id*10 + j
                    }
                }(i)
            }

            go func() {
                wg.Wait()
                close(ch)
            }()

            // 2个消费者
            var cwg sync.WaitGroup
            for i := 0; i < 2; i++ {
                cwg.Add(1)
                go func(id int) {
                    defer cwg.Done()
                    for v := range ch {
                        fmt.Printf("Consumer %d: %d\n", id, v)
                    }
                }(i)
            }

            cwg.Wait()
        }
        ---

3.2 工作池模式

01.固定worker数
    a.实现
        ---
        type Job struct {
            ID int
        }

        type Result struct {
            Job Job
            Sum int
        }

        func worker(id int, jobs <-chan Job, results chan<- Result) {
            for job := range jobs {
                fmt.Printf("Worker %d processing job %d\n", id, job.ID)
                sum := 0
                for i := 0; i < 1000; i++ {
                    sum += i
                }
                results <- Result{Job: job, Sum: sum}
            }
        }

        func main() {
            jobs := make(chan Job, 100)
            results := make(chan Result, 100)

            // 启动3个worker
            for w := 1; w <= 3; w++ {
                go worker(w, jobs, results)
            }

            // 发送任务
            for j := 1; j <= 9; j++ {
                jobs <- Job{ID: j}
            }
            close(jobs)

            // 收集结果
            for a := 1; a <= 9; a++ {
                result := <-results
                fmt.Printf("Job %d result: %d\n", result.Job.ID, result.Sum)
            }
        }
        ---

3.3 扇入扇出

01.扇出
    a.一对多
        ---
        func fanOut(in <-chan int, n int) []<-chan int {
            outs := make([]<-chan int, n)
            for i := 0; i < n; i++ {
                outs[i] = process(in)
            }
            return outs
        }

        func process(in <-chan int) <-chan int {
            out := make(chan int)
            go func() {
                defer close(out)
                for v := range in {
                    out <- v * 2
                }
            }()
            return out
        }
        ---

02.扇入
    a.多对一
        ---
        func fanIn(channels ...<-chan int) <-chan int {
            out := make(chan int)
            var wg sync.WaitGroup

            for _, ch := range channels {
                wg.Add(1)
                go func(c <-chan int) {
                    defer wg.Done()
                    for v := range c {
                        out <- v
                    }
                }(ch)
            }

            go func() {
                wg.Wait()
                close(out)
            }()

            return out
        }
        ---

3.4 管道模式

01.多阶段处理
    a.链式处理
        ---
        func gen(nums ...int) <-chan int {
            out := make(chan int)
            go func() {
                for _, n := range nums {
                    out <- n
                }
                close(out)
            }()
            return out
        }

        func sq(in <-chan int) <-chan int {
            out := make(chan int)
            go func() {
                for n := range in {
                    out <- n * n
                }
                close(out)
            }()
            return out
        }

        func main() {
            c := gen(2, 3, 4)
            out := sq(c)

            for v := range out {
                fmt.Println(v)
            }
        }
        ---

3.5 超时控制

01.单个操作超时
    a.time.After
        ---
        select {
        case result := <-doWork():
            fmt.Println(result)
        case <-time.After(time.Second):
            fmt.Println("timeout")
        }
        ---

02.整体超时
    a.context.WithTimeout
        ---
        ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
        defer cancel()

        done := make(chan struct{})

        go func() {
            // 执行工作
            time.Sleep(time.Second * 3)
            done <- struct{}{}
        }()

        select {
        case <-done:
            fmt.Println("完成")
        case <-ctx.Done():
            fmt.Println("超时")
        }
        ---

4 并发安全

4.1 数据竞争检测

01.race detector
    a.使用方法
        ---
        go run -race main.go
        go test -race
        go build -race
        ---

02.示例
    a.检测竞争
        ---
        package main

        import (
            "fmt"
            "time"
        )

        var counter int

        func main() {
            go func() {
                counter++
            }()

            go func() {
                counter++
            }()

            time.Sleep(time.Second)
            fmt.Println(counter)
        }
        ---

4.2 互斥锁Mutex

01.基本使用
    a.Lock和Unlock
        ---
        var (
            counter int
            mu      sync.Mutex
        )

        func increment() {
            mu.Lock()
            defer mu.Unlock()
            counter++
        }
        ---

02.封装
    a.安全的计数器
        ---
        type SafeCounter struct {
            mu    sync.Mutex
            count int
        }

        func (c *SafeCounter) Inc() {
            c.mu.Lock()
            defer c.mu.Unlock()
            c.count++
        }

        func (c *SafeCounter) Value() int {
            c.mu.Lock()
            defer c.mu.Unlock()
            return c.count
        }
        ---

4.3 读写锁RWMutex

01.基本使用
    a.RLock和Lock
        ---
        var (
            data = make(map[string]int)
            rwmu sync.RWMutex
        )

        func read(key string) int {
            rwmu.RLock()
            defer rwmu.RUnlock()
            return data[key]
        }

        func write(key string, value int) {
            rwmu.Lock()
            defer rwmu.Unlock()
            data[key] = value
        }
        ---

02.适用场景
    a.读多写少
        多个goroutine可以同时读
        写操作独占

4.4 原子操作atomic

01.基本操作
    a.Add和Load
        ---
        var counter int64

        atomic.AddInt64(&counter, 1)
        value := atomic.LoadInt64(&counter)
        ---

02.CompareAndSwap
    a.CAS操作
        ---
        var value int64 = 10

        swapped := atomic.CompareAndSwapInt64(&value, 10, 20)
        fmt.Println(swapped, value)
        ---

03.atomic.Value
    a.存储任意类型
        ---
        var config atomic.Value

        config.Store(map[string]string{"key": "value"})
        cfg := config.Load().(map[string]string)
        ---

4.5 sync.Map并发安全Map

01.基本使用
    a.Store和Load
        ---
        var m sync.Map

        m.Store("key", "value")
        value, ok := m.Load("key")
        if ok {
            fmt.Println(value)
        }
        ---

02.其他方法
    a.Delete和Range
        ---
        m.Delete("key")

        m.Range(func(key, value interface{}) bool {
            fmt.Println(key, value)
            return true
        })
        ---

03.适用场景
    a.读多写少
        键值相对稳定
    b.多个goroutine操作不同键
        减少锁竞争