Go语言3小时光速入门05——并发编程

Do not communicate by sharing memory; instead, share memory by communicating.
不要以共享内存的方式来通信,相反,要通过通信来共享内存。

CSP

要说Go语言中的并发机制,就不得不说CSP模型。但是因为入门教程和篇幅原因这里只做简单概述(且本小结可以跳过)。详细信息可自行阅读Hoare的论文
👇
Communicating Sequential Processes

我们都知道线程在某种程度上可以理解为轻量级进程,一个进程由多个线程组成。Go语言中在线程基础上提供类似协程的机制。这里我们可以理解为对外部来看协程算是轻量级线程。从操作系统的角度来看,线程是在内核态中调度的,而协程是在用户态调度的,所以相对于线程来说,协程切换的成本更低。

那么协程和线程是如何协作的呢?有操作系统原理基础的同学类比操作系统的任务调度分时系统应该很好理解。线程依次去调用协程执行即可。亦可以了解Go中的MPG模型。👉go并发MPG模型

Golang 程序员中有句格言:“不要以共享内存方式通信,要以通信方式共享内存(即本文开头的那句话)。”虽然 Golang 中协程之间,也能够以共享内存的方式通信,但是并不推荐;而推荐的以通信的方式共享内存,实际上指的就是协程之间以消息传递方式来通信。

可以说这就是Golang的核心思想!也是Golang高并发的秘诀之一。

Goroutine

Goroutine是Go在语言层面对并发编程提供的支持:一种协程(在go中与系统线程KSE的对应关系是M:N),称作 goroutine 的机制。

goroutine应该是整个Go语言中最大名鼎鼎且核心的存在,核心到什么程度呢?它的关键字就叫go

我们只需在函数调用前添加go关键字,就可创建并发执行任务。调度器会自动将其安排到合适的系统线程上执行。goroutine是一种非常轻量级的实现,可在单个进程里执行成千上万的并发任务。

go 要调用的函数

go func(){

}()

上述结构相当于Java中的new Thread(() -> {}).start()。不过goroutine更加轻量,且更加简便。

这里再次强调:go是值传递,所以在写goroutine一定要注意变量值是否被共享。

//一个简单值传递说明
func TestValueParam(t *testing.T){
	a := 10
	valueParam(a)
	//这里将打印10而不是123
	t.Log(a)
}

func valueParam(a int){
	a = 123
}

goroutine示例

goroutine_test.go:

/*
线程和KSE(系统线程)对应关系1:1
协程和KSE对应关系:M:N
使用 go func(){}()来创建调用协程
 */
func TestGroutine(t *testing.T){
	for i:=0;i<99;i++{
		//go值传递,这里i的值被复制一份写的不同地址
		go func(i int){
			fmt.Println(i)
		}(i)//这个i的参数
	}
	//休眠1秒
	time.Sleep(time.Millisecond * 1000)
}

func TestGroutine2(t *testing.T){
	for i:=0;i<99;i++{
		////i 在协程中被共享,写的同一个地址,混乱
		go func(){
			fmt.Println(i)
		}()
	}
	time.Sleep(time.Millisecond * 1000)
}

goroutine优点:

线程在获得很多与进程相同的控制权时,也加大了开销,而在Goroutine中则不需这些额外的开销。并且每个goroutine(协程) 默认占用内存远比 Java 、C 的线程少

在Golang的程序中,操作系统级别的线程调度,通常不会做出合适的调度决策。
在应用层模拟的线程,避免了上下文切换的额外耗费,兼顾了多线程的优点。简化了高并发程序的复杂度。

缺点:
协程调度机制无法实现公平调度。

Channel

不要以共享内存的方式来通信,相反,要通过通信来共享内存(再次出现)。前面我们通过go 函数创建goroutine执行调用。那么两个协程之间应该如何通信呢?这里就要用到channel了。

Channel顾名思义也就是用于两个goroutine通信的管道(与传统的Actor模型不同的是 golang需要通过channel对消息进行接收,Actor如Erlang是直接通信)。类似于Java的Future用于获取异步执行结果数据

Go语言中channel分为两种类型:

  1. 阻塞型:A到channel中等待B接收,接收完成后A才继续操作

  2. Buffer型:A直接将消息放到channel中然后继续执行,B自己去处理接收

channel中A和B两个goroutine,相当于快递小哥和买家的关系。

在阻塞型相当于货到付款,买家必须到对应地点(channel)等快递小哥把快递送过来才能拿到快递,同样快递小哥到目标地点后也需要等待买家来拿走快递后才能送新的快递。属于两者必须都等待对方一起完成响应后才能进行操作。

buffer型相当于对于一些普通快递,这个时候快递小哥直接将快递放到小区的蜂巢或菜鸟智能柜(channel的buffer)中,然后不需要等待买家取就可以直接送其他快递。当然如果蜂巢或菜鸟柜满了,就只能等买家取走快递柜子有空余才能继续放快递进去。

在Go中channel由chan 传递类型表示,我们可以通过make创建channel:

//阻塞型channel
make(chan 传递类型)

//buffer型
make(chan 传递类型,buffer大小)

我们通过<-符号来向channel中写入或取出值:

ch := make(chan string)

//向channel中写(放)入值(将"123"写入到channel中)
ch <- "123"

//从channel中读(取)数据
<- ch

chan_test.go

/*
阻塞channel,等待值取出
*/
func TestCh(t *testing.T){
	//构建一个阻塞型channel
	ch := make(chan string)
	go func() {
		//向channel中放入数据
		ch <- "123"
		//这里会阻塞等到从channel拿到值
		t.Log("我01执行完了!")
	}()

	go func() {
		time.Sleep(time.Second * 3)
		t.Log("接收数据!")
		//接收数据,上面的继续执行
		<- ch
	}()

	time.Sleep(time.Second * 5)
}

/*
阻塞channel,将等待值来
*/
func TestCh2(t *testing.T){
	//构建一个阻塞型channel
	ch := make(chan string)
	go func() {
		time.Sleep(time.Second * 3)
		//向channel中放入数据
		ch <- "123"
		//这里会阻塞等到从channel拿到值
		t.Log("我01执行了!")
	}()

	go func() {
		//会阻塞等到有值过来
		<- ch
		t.Log("我02执行了!")
	}()
	time.Sleep(time.Second * 5)
}

/*
带buffer的channel,channel不满不会阻塞
*/
func TestCh3(t *testing.T){
	//构建一个buffer为5的channel
	ch := make(chan string,5)
	go func() {
		//向channel中放入数据,放进去就可以了不阻塞
		ch <- "123"
		//不阻塞继续执行
		t.Log("我01执行了!")
	}()

	go func() {
		time.Sleep(time.Second * 3)
		<- ch
		t.Log("我02执行了!")
	}()
	time.Sleep(time.Second * 5)
}

Channel关闭

我们可以通过close(要关闭的channel)来关闭一个channel。

channel关闭后我们再向其发送数据,会导致panic,在获取值时,channel将返回一个状态 value,status <- chan其中第二个返回值status是一个bool类型。如果status为false表示channel已经关闭,同时在channel关闭时会给所有等待channel返回值的地方返回false,value为类型的零值。利用这种机制我们可以很方便的实现一个广播。

func TestChannelClose(t *testing.T){

	group := sync.WaitGroup{}
	ch := make(chan int,123)

	group.Add(3)
	go func() {
		ch <- 321
		defer func() {
			group.Done()
		}()
	}()

	go func() {
		v,ok := <- ch
		close(ch)
		if ok{
			t.Log(v)
		}else{
			//这个v将会是零值 
			t.Log("channel关闭了",v)
		}
		defer func() {
			group.Done()
		}()

	}()

	go func() {
		//如果channel关闭了会立即返回的
		v,ok := <- ch
		if ok{
			t.Log(v)
		}else{
			t.Log("channel关闭了",v)
		}

		defer func() {
			group.Done()
		}()
	}()

	group.Wait()
}

另外,我们可以通过channel快速的实现一个生产者消费者,生产者向channel写数据,消费者从channel读数据,生产者生产完毕不再生产,直接关闭channel通知。

Select

多路选择

在go语言中一个select语句用来选择哪个case中的发送或接收操作可以被立即执行。它类似于switch语句,但是它的case涉及到channel有关的I/O操作。

简单点说就是用来监听和channel有关的IO操作,当 IO 操作发生时,触发相应的动作。如果对应的channel返回就执行对应的操作。

如下(select多路选择的基本格式):

select{
case ret := <-retCh1:
	doA
case ret := <-retCh2
	doB
...
default:
	都没有执行默认
}

只要数channel中的任何一个返回都会执行case下面对应的函数,都没有返回则执行默认default,没有default将一直阻塞。

select_test.go:

func testA()string{
	ret := "任务A执行中"
	fmt.Println(ret)
	time.Sleep(time.Second * 3)
	return "任务A执行完成"
}

func testB()string{
	fmt.Println("任务B执行中")
	time.Sleep(time.Second * 2)
	return "任务B执行完成"
}

func TestSelectChoose(t *testing.T){
	retCh1 := make(chan string)
	retCh2 := make(chan string)
	go func() {
		ret := testA()
		retCh1 <-ret
	}()
	go func() {
		ret := testB()
		retCh2 <- ret
	}()
	select {
	//谁先返回值,先执行谁!
	case ret := <- retCh1:
		t.Logf("result %s",ret)
	case ret:=<-retCh2:
		t.Logf("result %s",ret)
	}
}

超时控制

从上面我们可以得知,如果channel都没有返回那么就会执行default的操作。我们可以基于这个特性配合time.After() 指定时间后返回。设计出一个超时控制器(即如果指定时间没有返回就指向default下的超时逻辑)。

/**
select 可用于超时控制
 */
func slowService()string{
	time.Sleep(time.Second * 60)
	return "finish"
}

func TestTimeOut(t *testing.T){
	retch := make(chan string)
	go func() {
		ret := slowService()
		retch <- ret
	}()
	select {
	case ret := <- retch:
		fmt.Println(ret)
		//设置5秒钟后超时
	case <- time.After(time.Second * 5):
		fmt.Println("处理超时")
	}
}

任务取消

通过Select,创建一个取消的channel,在需要取消时向channel中发送一个值进行通知。然后通过前面channel中讲的Clost(channel)关闭的方式关闭channel即可.

channel_test.go:

func TestCancel(t *testing.T){
	
	for i := 0; i < 10; i ++{
		cancelChan := make(chan struct{},0)
		go func(i int,cancelChan chan struct{}) {
			for{
				if isCancelled(cancelChan){
					fmt.Println(i,"Close")
					break
				}
				time.Sleep(time.Millisecond * 5)
			}
			fmt.Println(i,"Done")

		}(i,cancelChan)
		doCancel(cancelChan)
		cancel(cancelChan)
	}
}


func isCancelled(chancelChan chan struct{}) bool{
	select {
	case <-chancelChan:
		return true;
	default:
		return false
	}
}


func doCancel(canceChan chan struct{}){
	//取消就直接向取消的channel写入值,select去接收处理
	canceChan <- struct{}{}
}

func cancel(cancelChan chan struct{}){
	//取消完了关闭channel
	close(cancelChan)
}



Context

在使用Goroutine时,经常会出现这个协程里面创建另一个新的协程,而在新的协程中又有新的调用。如果数量过大,就形成了多个任务上下层级类似于树的关系。如果我们自己通过channel进行管理如停止某一个协程,将会使代码变得非常复杂和冗余。

比如我们处理一个网络请求request,每个request都需要开启一个goroutine做一些事情,而这个goroutine又会创建其它的goroutine。所以我们需要一种可以跟踪goroutine的方案才可以达到控制的目的,go为我们提供了Context

一个Context表示一个上下文,当前Context被取消时,基于他等子context都会取消

根:context:通过context.Background()或context.TODO()创建

子:context:基于不同的类型通过
WithCancel(parentContext)
WithDeadline(parentContext)
WithTimeout(parentContext)
WithValue`(parentContext)
进行创建(deadline和timeout可以通过使用cancel和timer实现)

context在go中的源码(context/context.go):

// A Context carries a deadline, a cancellation signal, and other values across
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface {
	// Deadline returns the time when work done on behalf of this context
	// should be canceled. Deadline returns ok==false when no deadline is
	// set. Successive calls to Deadline return the same results.
	Deadline() (deadline time.Time, ok bool)

	// Done returns a channel that's closed when work done on behalf of this
	// context should be canceled. Done may return nil if this context can
	// never be canceled. Successive calls to Done return the same value.
	//
	// WithCancel arranges for Done to be closed when cancel is called;
	// WithDeadline arranges for Done to be closed when the deadline
	// expires; WithTimeout arranges for Done to be closed when the timeout
	// elapses.
	//
	// Done is provided for use in select statements:
	//
	//  // Stream generates values with DoSomething and sends them to out
	//  // until DoSomething returns an error or ctx.Done is closed.
	//  func Stream(ctx context.Context, out chan<- Value) error {
	//  	for {
	//  		v, err := DoSomething(ctx)
	//  		if err != nil {
	//  			return err
	//  		}
	//  		select {
	//  		case <-ctx.Done():
	//  			return ctx.Err()
	//  		case out <- v:
	//  		}
	//  	}
	//  }
	//
	// See https://blog.golang.org/pipelines for more examples of how to use
	// a Done channel for cancellation.
	Done() <-chan struct{}

	// If Done is not yet closed, Err returns nil.
	// If Done is closed, Err returns a non-nil error explaining why:
	// Canceled if the context was canceled
	// or DeadlineExceeded if the context's deadline passed.
	// After Err returns a non-nil error, successive calls to Err return the same error.
	Err() error

	// Value returns the value associated with this context for key, or nil
	// if no value is associated with key. Successive calls to Value with
	// the same key returns the same result.
	//
	// Use context values only for request-scoped data that transits
	// processes and API boundaries, not for passing optional parameters to
	// functions.
	//
	// A key identifies a specific value in a Context. Functions that wish
	// to store values in Context typically allocate a key in a global
	// variable then use that key as the argument to context.WithValue and
	// Context.Value. A key can be any type that supports equality;
	// packages should define keys as an unexported type to avoid
	// collisions.
	//
	// Packages that define a Context key should provide type-safe accessors
	// for the values stored using that key:
	//
	// 	// Package user defines a User type that's stored in Contexts.
	// 	package user
	//
	// 	import "context"
	//
	// 	// User is the type of value stored in the Contexts.
	// 	type User struct {...}
	//
	// 	// key is an unexported type for keys defined in this package.
	// 	// This prevents collisions with keys defined in other packages.
	// 	type key int
	//
	// 	// userKey is the key for user.User values in Contexts. It is
	// 	// unexported; clients use user.NewContext and user.FromContext
	// 	// instead of using this key directly.
	// 	var userKey key
	//
	// 	// NewContext returns a new Context that carries value u.
	// 	func NewContext(ctx context.Context, u *User) context.Context {
	// 		return context.WithValue(ctx, userKey, u)
	// 	}
	//
	// 	// FromContext returns the User value stored in ctx, if any.
	// 	func FromContext(ctx context.Context) (*User, bool) {
	// 		u, ok := ctx.Value(userKey).(*User)
	// 		return u, ok
	// 	}
	Value(key interface{}) interface{}
}

上面很长一堆,但是如果我们去掉注释(之所以没去掉是因为复制粘贴快啊注释中对接口有详细的说明示例非常详细可以参考),发现context接口总共就提供以下几个函数签名:

Deadline() (deadline time.Time, ok bool)
返回deadline,表明对应的context对应的工作需要被cancel。如果返回的ok是false,那么表明就没有设置deadline时间。

Done() <-chan struct{}
返回一个channel,当work做完以后这个channel就会被关闭。当对应的context被cancel了就会返回一个chan

Err() error
如果Done没有被close,那么Err返回的都是nil
如果Done已经执行了close,那么Err返回的是非nil,返回的Error类型可以根据具体的情况判断。就是两种,一种是cancel另一个汇总deadlineExceeded。

Value(key interface{}) interface{}
返回与context的key关联的value

一般我们使用根节点都直接用context.Background,后边可以与WithCancel,DeadLine,TimeOut配合使用。ParentContext可以通过创建子节点时返回的cancel fun等来控制其子context的结束(也对应着子gorountine),并且层层的gorountine需要判断Done。

context的使用原则:

  1. 一般context的生存都为一个某一起始操作(根节点),当整个操作结束的时候需要把context销毁。

  2. 每次创建一个gorountine,要么将当前的context传递给这个gorountine或者创建一个新的context传递给gorountine(作为父节点)。

  3. 父类context创建子context后会返回一个cancel函数,可以用来控制子gorountine的流程。

  4. 当子context传递到goroutine应该监控Done对应的channel,如果上层context关闭,就需要做相应的处理。

注意:cancel类型的context在使用的最后一定要调用cancel释放资源,不然有可能Done没有被使用,导致资源无法被释放。或者parent对应的context Done close了。所有在代码中应该尽量及早的cancel对应context

以取消为例(其他的在源码的注释里面有说明)
context_test.go:

func isCancelled(ctx context.Context) bool{
	select{
	//子context的channel有数据返回
	case <- ctx.Done():
	return true;
	default:
		return false
	}
}

func TestCancel(t *testing.T){
	//创建子context后返回context和cancel函数
	ctx,cancel := context.WithCancel(context.Background())
	for i:=0;i<5;i++{
		go func(i int,ctx context.Context) {
			for{
				//如果取消了就直接返回不会无限休眠
				if isCancelled(ctx){
					break
				}
				time.Sleep(time.Millisecond * 5)
			}
			fmt.Println(i,"Cancelled")
		}(i,ctx)
	}
	//调用cancel控制取消,当然ctx中也有其他函数,源码注释有详细示例
	cancel()
	time.Sleep(time.Second * 1)
}


Sync

虽然我们提到go语言中建议使用通信而非共享内存,当时go语言中同样提供一些共享内存的方式。和Java/python/C++等其他语言一样,也提供如锁等同步工具.

go语言中同步工具大部分在sync包下。

Mutex互斥锁

sync.Mutex提供与传统的锁相同的功能。通过sync.Mutex.Lock()加锁,通过sync.Mutex.Unlock()解锁

如:
sync_test.go:

/*
go语言互斥锁示例
*/
func TestCounter(t *testing.T){
	var mute sync.Mutex
	counter := 0
	for i := 0;i< 99;i++{
		go func(i int) {
			//相当于finally释放锁
			defer func() {
				mute.Unlock()
			}()
			//加锁保证同步
			mute.Lock()
			counter++
		}(i)
	}
	time.Sleep(1 * time.Second)
	t.Logf("counter = %d",counter)

}


sync.WaitGroup

Go语言中提供了类似于Java中CountDownLatch的机制,即:sync.WaitGroup。与CountDownLatch一样WaitGroup也是用于阻塞并等待其它线程完成对应计数后继续执行。sync.WaitGroup通过Add(添加次数)添加对应的等待次数,通过Done()来完成一次等待的计数

如:

sync_test.go:

/**
相当于java中等 Thread join 和 countDownLatch的结合体(可以countDown指定数)
 */
func TestWaitGroup(t *testing.T){
	var waitGroup sync.WaitGroup
	var mut sync.Mutex
	count := 0
	for i := 0; i < 9999; i++{
//添加2此等待计数,相应的后面需要done两次(只是为了演示所以是2)
		waitGroup .Add(2)
		go func() {
			defer func() {
				mut.Unlock()
				waitGroup .Done()
				waitGroup .Done()
			}()
			mut.Lock()
			count++
		}()
	}
	//等待所有的等待计算完成
	waitGroup .Wait()
	fmt.Printf("所有协程执行完毕后继续执行,count = %d",count)
}

sync.RWMutex读写锁

Go中提供sync.RWMutex{}读写锁,通过Lock/Unlock加解写锁,通过RLock/UnRLock加解读锁.

func TestRWLock(t *testing.T){

	var readWriteLock sync.RWMutex

	num := 0
	go func() {
		for i := 0; i < 999; i++{
			readWriteLock.Lock()
			num++
			readWriteLock.Unlock()
		}
	}()

	go func() {
		for i:=0; i < 999; i++{
			readWriteLock.RLock()
			t.Log(num)
			readWriteLock.RUnlock()
		}
	}()
	time.Sleep(time.Second*5)
}

sync.Once只执行一次

sync.Once表明只执行一次,第二次直接返回结果。

sync.Once.Do(执行的函数)

基于这个特性我们可以很方便的创建一个单例,如:

sync_test.go:

/*
单例模式
 */
type SingletonObj struct {
	//要创建等单例模式实体
}


var onece sync.Once

var obj *SingletonObj

func GetSingletonObj() *SingletonObj{
	onece.Do(func() {
		//只会执行一次
		fmt.Println("创建单例对象")
		obj = new(SingletonObj)
	})
	return obj
}


func TestGetSingletonObj(t *testing.T){
	wg := sync.WaitGroup{}
	for i:= 0;i<10;i++{
		go func() {
			wg.Add(2)
			 ret1 := GetSingletonObj()
			 wg.Done()
			 ret2 := GetSingletonObj()
			 wg.Done()
			 t.Log(ret1 == ret2)
		}()
	}
	wg.Wait()
}

sync.Pool对象池

注意是对象池而不是我们通常理解的连接池或线程池(我也不知道为啥会被放到sync包里面来...)。

我们知道Go语言的一个优点是自动的垃圾回收,但是只要有GC就必然会影响性能,为了减少GC,go语言提供了对象重用的机制,也就是sync.Pool对象池。

sync.Pool可伸缩且并发安全。大小仅受限于内存的大小,有点类似于对象缓存。设计的目的是存放已经分配的但是暂时不用的对象,在需要用到的时候直接从pool中取。

任何存放在pool中的值可以在任何时候被删除而不通知,在高负载下可以动态的扩容,在不活跃时对象池会收缩。

sync_test.go

func TestPool(t *testing.T) {
	pool := &sync.Pool{
		//当没有时指定创建,指定New函数
		New: func() interface{} {
			return 789
		},
	}
	//向pool中放一个值
	pool.Put(123)
	pool.Put(321)
	//从pool中取出一个值
	num1 := pool.Get().(int)
	num2 := pool.Get().(int)
	fmt.Println(num1, num2)
}

sync.Pool 对象获取顺序:
首先尝试从私有对象获取
如果私有对象不存在,尝试从当前Processor的共享池pool获取
如果当前Processor共享池也是空的,那么就尝试去其他Processor的pool获取
如果所有子池都是空的,最后就用用户指定的New函数产⽣生一个新的对象返回

其他Sync下的工具

go是开源的所以在sync下源码里面就可以直接看出来,也有详细的说明注释。另外在前面的第2章中也有常用函数的调用demo说明链接,可以自行了解。

竞态检测器

事实上,错误是不可避免的,在并发编程上更是如此。而很多并发上的错误足以令人抓狂(碰到的人估计都深有体会特别是线上)。
Go语言运行时和工具链自备了一个精致并易于使用的动态分析工具:竞态检测器(race detector)

我们只需要在执行诸如:go build/run/test等命令时把-race命令行参数加上即可开启。

编译器会自动构建一个修改后的版本,这个版本有额外的手法用于高效记录在执行时对共享变量的所有访问,以及读写这些变量的goroutine标识。并且对于所有的同步事件都会加以记录。

竞态检测器会自动研究这些事件流,找到其中有问题的案例,并生成一个对应的报告,包括变量的标识以及读写goroutine时的调用栈。

感兴趣的话可以自行详细了解。Introducing the Go Race Detector

更新时间:2020-02-23 15:09:59

本文由 寻非 创作,如果您觉得本文不错,请随意赞赏
采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
原文链接:https://www.zhouning.group/archives/go语言3小时光速入门05并发编程
最后更新:2020-02-23 15:09:59

评论

Your browser is out of date!

Update your browser to view this website correctly. Update my browser now

×