使用channel做goroutine同步

channel和goroutine是golang CSP并发模型的承载体,是goroutine同步绝对主角。channel的使用场景远比MutexWaitGroup多得多。

实现和WaitGroup一样的功能

func main() {
	ch := make(chan struct{}, 2)
	go func() {
		defer func() {
			ch <- struct{}{}
		}()
		time.Sleep(1e9)
		fmt.Println("after 1 second")
	}()

	go func() {
		defer func() {
			ch <- struct{}{}
		}()
		time.Sleep(2e9)
		fmt.Println("after 2 second")
	}()

	i := 0
	for _ = range ch {
		i++
		if i == 2 {
			close(ch)
		}
	}
	fmt.Println("the end")
}

生产消费模型

这是个常用的1对N的生产消费模型,常常用于消费redis队列。

package main
import (
	"os"
	"fmt"
	"os/signal"
	"syscall"
	"sync"
)

var quit bool = false
const THREAD_NUM  = 5
func main() {
	sigs := make(chan os.Signal, 1)
	//signal.Notify 注册这个给定的通道用于接收特定信号。
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGUSR2)
	quitChan := make(chan bool)
	rowkeyChan := make(chan string, THREAD_NUM)
	go func() {
		 <-sigs  //等待信号
		quit = true
		close(rowkeyChan)
	}()
	var wg sync.WaitGroup
	for i := 0; i < THREAD_NUM;i++ {
		wg.Add(1)
		go func(n int) {
			for {
				rowkey,ok := <- rowkeyChan
				if !ok {
					break
				}
				//do something with rowkey
				fmt.Println(rowkey)
			}
			wg.Done()
		}(i)
	}
	go func() {
		wg.Wait()
		quitChan <- true
	}()

	for  !quit {
		//rowkey 可能来着redis的队列
		rowkey := ""
		rowkeyChan <- rowkey
	}
	<- quitChan
}

上面的代码稍微修改下很容易支持N:M的生产消费模型,这边就不再赘述。

赛道模型

在现实很多场景我们需要并发的做个任务,我们想知道他们的先后顺序,或者只想知道最快的那个。channel的特性很容易做到这个需求

package main
import (
	"fmt"
	"net/http"
)
func main() {
	ch := make(chan string)
	go func() {
		http.Head("https://www.taobao.com")
		ch <- "taobao"
	}()

	go func() {
		http.Head("https://www.jd.com")
		ch <- "jd"
	}()
	go func() {
		http.Head("https://www.vip.com")
		ch <- "vip"
	}()

	//只想知道最快的
	dm := <- ch
	fmt.Println(dm)

/* 知道它们的排名
	for dm:= range ch {
		fmt.Println(dm)
	}
	*/
}

如何控制并发数

虽然golang新开一个goroutine占用的资源很小,但是无谓的goroutine开销对golang的调度性能很有影响,同时也会浪费cpu的资源。那么golang中如何控制并发数。

方式1——使用带缓冲的channel

package main
import (
	"fmt"
	"time"
)

type Task struct {
	Id int
}

func work(task *Task,limit chan struct{})  {
	time.Sleep(1e9)
	fmt.Println(task.Id)
	<- limit
}
func main() {
	ch := make(chan *Task,10)
	limit := make(chan struct{},3)
	go func() {
		for i:=0;i<10;i++ {
			ch <- &Task{i}
		}
		close(ch)
	}()

	for {
		task ,ok :=<- ch
		if ok {
			limit <- struct{}{}
			go work(task,limit)
		}
	}
}

方式2——使用master-worker模型,worker数量固定

package main

import (
	"fmt"
	"time"
)
const WorkerNum = 3
type Task struct {
	Id int
}
func work(ch chan *Task)  {
	for {//worker中循环消费任务
		task,ok :=<- ch          
		if ok {
			time.Sleep(1e9)
			fmt.Println(task.Id)
		}
	}
}
func main() {
	ch := make(chan *Task,3)
	exit := make(chan struct{})
	go func() {
		for i:=0;i<10;i++ {
			ch <- &Task{i}
		}
		close(ch)
	}()

	for i:=0;i<WorkerNum;i++{  //控制worker数量
		go work(ch)
	}
	<- exit
}

控制goroutine优雅退出

除了我们需要控制goroutine的数量之外,我们还需要控制goroutine的生命周期同样以防止不不要的资源和性能消耗。那么接下来我们介绍下控制goroutine生命周期的办法。

方法1——goroutine超时控制

package main
import (
	"fmt"
	"time"
)
func main() {
	ch := make(chan struct{})
	task := func() {
		time.Sleep(3e9)
		ch <- struct{}{}
	}
	go task()
	select {
		case <-ch: //在3s内正常完成
			fmt.Println("task finish")
		case <-time.After(3*time.Second): //超过3秒
			fmt.Println("timeout")
	}
}
$ go run main.go 
timeout

我们使用了 slecettime.After 来控制goroutine超时。

方法2——使用context.Context

package main
import (
	"context"
	"fmt"
	"time"
)
func main() {
	ctx ,cancel:= context.WithCancel(context.Background())  //使用带cancel函数的context
	task := func(ctx context.Context) {
		for {
			select {
			case <-ctx.Done(): //cancel函数已经被执行了
			default:
				time.Sleep(1e9)
				fmt.Println("hello")
			}
		}
	}
	go task(ctx)
	time.Sleep(3e9) //等待3s
	cancel()          //让goroutine退出
}
$ go run main.go
hello
hello

当然我们还可以使用 context.WithTimeout的方式

package main

import (
	"context"
	"fmt"
	"time"
)
func main() {
	exit := make(chan struct{})
	ctx,_ := context.WithTimeout(context.Background(),3*time.Second) //使用带超时的context
	task := func(ctx context.Context) {
		HE:
		for {
			select {
			case <-ctx.Done(): //cancel函数已经被执行了
				break HE       //退出for循环
			default:
				time.Sleep(1e9)
				fmt.Println("hello")
			}
		}
		exit<- struct{}{}
	}
	go task(ctx)
	<-exit
}
$ go run main.go 
hello
hello
hello

这样的用法和上面的用法的区别是可以不需要手动去调用cancel函数。

总结

本小节我们介绍了使用channel同步goroutine的方法,介绍了channel的常见使用场景,已经如果控制goroutine的几种方法。这些方法在很是实际开发中都会用到。