GoLang 的并发编程与通信 -- goroutine 与通道

2019-11-15 17:14:48   最后更新: 2019-11-20 09:28:06   访问数量:42




服务端程序每一时刻都在经受着大量并发流量的考验,而如今,CPU 指令运行频率的提升已经面临瓶颈,只能通过核心数的增长来大幅提升其指令的执行能力

因此,现代程序设计中,并发编程的支持就显得越来越重要

GoLang 进行并发编程十分轻松,GoLang 提供了 goroutine 用来实现并发编程,极为简单和方便,而用于并行执行过程中每个 goroutine 之间通信的通道机制也同样非常易于使用

 

本文,我们就来详细介绍一下 goroutine 与通道机制如何来使用

 

 

GoLang 中,每一个并发执行的活动都被称为 goroutine,每个 goroutine 类似于一个线程,但它与线程只有着非常大的差别,这将在下一篇文章中进行分析和讲述

当程序启动时,用来执行 main 函数的 goroutine 被称为主 goroutine,此后,只要在调用函数时,前面加上关键词 go,就可以创建一个新的 goroutine:

f()    // 调用函数 f(),并等待他返回

go f() // 并发调用函数 f(),不用等待

 

示例

下面的例子展示了主 goroutine 运行的同时,并发执行打点 goroutine,每毫秒更新一次标准输出:

package main import ( "fmt" "time" ) func fib(n int) int { if n < 2 { return n } return fib(n-1) + fib(n-2) } func ticking(delay time.Duration) { arrows := [...]string{"-", "\\", "|", "/"} for i := 1; true; i++ { time.Sleep(delay) fmt.Printf("\r%s %dms", arrows[i % 4], i) } } func main() { timestamp := time.Now().UnixNano() / 1e6 go ticking(100 * time.Millisecond) const n = 45 result := fib(n) fmt.Printf("\nFibonacci(%d) = %d\n", n, result) fmt.Printf("Use time: %.2fms", float64(time.Now().UnixNano() / 1e6 - timestamp)/100.0) }

 

 

可以看到标准输出伴随着一个字符的直线在旋转的同时,他后面的数字在增加,直到斐波那契数列指定位数的值完成计算并输出:

- 108ms

Fibonacci(45) = 1134903170

Use time: 108.24ms

 

一旦主 goroutine 运行结束,所有 goroutine 都会暴力地直接中止运行,然后程序退出

 

和 java 等很多语言中的线程一样,goroutine 也不能被其他 goroutine 中止,但多个 goroutine 之间可以进行通信

通过网络进行通信是非常常用的并发通信机制,在 golang 中,net 包提供了 TCP、UDP、域套接字 的支持

 

通过 TCP 实现 goroutine 间通信

TCP 是一种非常常用的网络通信协议,关于 TCP 的详细介绍,可以参看主页君此前的文章:

传输控制协议 -- TCP

TCP连接的建立和终止

 

下面的代码展示了使用 net 包提供的方法进行 TCP 通信的示例:

package main import ( "bufio" "fmt" "io" "net" "os" "time" ) func client(conn *net.TCPConn) { defer func() { _ = conn.Close() }() reader := bufio.NewReader(conn) b := []byte(time.Now().String() + " " + conn.LocalAddr().String() + " say hello to Server\n") _, _ = conn.Write(b) msg, err := reader.ReadString('\n') if err != nil || err == io.EOF { _, _ = fmt.Fprintf(os.Stderr, "client read failed: %v", err) } fmt.Println("client recieved: " + msg) } func server(tcpListener *net.TCPListener) { defer tcpListener.Close() fmt.Println("Server ready to read ...") serverConn, err := tcpListener.AcceptTCP() if err != nil { _, _ = fmt.Fprintf(os.Stderr, "server accept failed: %v", err) return } defer func() { _ = serverConn.Close() }() reader := bufio.NewReader(serverConn) message, err := reader.ReadString('\n') if nil != err || err == io.EOF { _, _ = fmt.Fprintf(os.Stderr, "server read failed: %v", err) return } fmt.Println("server recieved: " + message) b := []byte(serverConn.RemoteAddr().String() + " Server say world\n") _, _ = serverConn.Write(b) } func main() { tcpAddrServer, _ := net.ResolveTCPAddr("tcp", "localhost:8461") tcpListener, _ := net.ListenTCP("tcp", tcpAddrServer) go server(tcpListener) tcpAddrCient, _ := net.ResolveTCPAddr("tcp", "localhost:8461") clientConn, err := net.DialTCP("tcp", nil, tcpAddrCient) if nil != err { fmt.Println("Client connect error ! " + err.Error()) return } fmt.Println(clientConn.LocalAddr().String() + " : Client connected!") go client(clientConn) for {} }

 

 

上面的代码由主 goroutine 分别依次启动了两个 goroutine -- server 和 client

server 等待 client 传来的字符串,打印并回传一句字符串

client 传递一句字符串给 server 后将 server 回传的字符串打印出来

 

执行代码,打印出了:

Server ready to read ...

127.0.0.1:5777 : Client connected!

server recieved: 2019-11-14 18:11:02.8257556 +0800 CST m=+3.096348701 127.0.0.1:5777 say hello to Server

client recieved: 127.0.0.1:5777 Server say world

 

上述代码显得较为繁琐,在实际的 goroutine 通信中,如果是在 unix 环境下,选择 unix 域套接字进行 goroutine 间通信是更好的选择

关于使用 net 包进行网络通信,后续会有文章进行详细介绍,敬请期待

 

上述通过 net 包实现的网络通信看上去非常复杂,别急,GoLang 提供了更为好用的连接 goroutine 的工具 -- 通道

通道实现了一个 goroutine 发送特定值到另一个 goroutine 的通信机制,它与 unix 环境中的管道非常类似

此前,我们已经介绍过 unix 环境中的管道的使用,他是 unix 环境下最为常用的进程间通信方式:

管道

 

通道的类型

通道的类型就是 chan xxx,其中 xxx 可以是任何类型名,例如:

chan int

chan struct{}

 

上述这样类型的通道既可以用来发送也可以用来接收数据

同样,我们也可以声明只用于发送或接收的单向通道:

chan<- int -- 只用于发送的通道

<-chan int -- 只用于接收的通道

 

通道的创建和关闭

通道的创建

和 map 一样,通道通过内置函数 make 就可以实现创建:

ch := make(chan int) ch := make(chan int, 3)

 

 

make 的第二个参数是可选的,用来表示创建的缓冲区大小,默认表示无缓冲区

如果缓冲区已满或没有缓冲区,那么在通道上的发送操作会被阻塞,直到另一个 goroutine 在该通道上接收数据或者发送操作被中止

同样,当缓冲区为空或没有缓冲区,接收操作也会被阻塞,直到由发送方发送新的数据

 

特别的,有时我们并不想通过通道传递任何数据,只是想要通过通道发送一个信号,此时,我们通常使用 chan struct{} 类型的通道,传递一个 struct{}{} 空对象

 

通道的关闭

内置函数同样提供了关闭通道的方法:

close(ch)

 

 

在通道关闭后,任何发送操作都会产生宕机,而接收操作会读取通道中所有剩余数据

如果在通道关闭后,所有数据已经被接收,再次执行接收操作会立即返回对应类型的零值

 

在 GoLang 中,如果在使用文件后没有执行 close 操作,将会造成无法回收的内存泄漏,但对于通道来说不会,垃圾回收器会根据通道是否可以被访问来决定是否回收相应的资源,无论通道是否进行过 close 操作

 

通道的发送和接收

ch <- x // 发送语句,将变量 x 的值发送到通道 ch x = <-ch // 接收语句,接收通道 ch 中的数据并赋值给变量 x <-ch // 接受语句,丢弃接收到的通道中的数据

 

 

示例

双向通道

有了通道,我们上面通过 TCP 实现通信的例子就可以十分简化了:

package main import ( "fmt" ) func client(ch chan string, sigover chan struct{}) { str := "Client say hello" ch <- str str = <-ch fmt.Println("client recieved: " + str) sigover <- struct{}{} } func server(ch chan string) { str := <-ch fmt.Println("server recieved: " + str) str = "Server say world" ch <- str close(ch) } func main() { ch := make(chan string) sigover := make(chan struct{}) go client(ch, sigover) go server(ch) <-sigover }

 

 

打印出了:

server recieved: Client say hello

client recieved: Server say world

 

这个例子展示了通过 chan struct{} 类型的通道报告 goroutine 运行结束的机制,这是非常常用的一种方法

 

单向通道

下面的例子通过单向通道计算了 1 到 10 的平方:

package main import "fmt" func counter(out chan<- int) { for x := 1; x <= 10; x++ { out <- x } close(out) } func squarer(out chan<- int, in <-chan int) { for v := range in { out <- v*v } close(out) } func printer(in <-chan int) { for v := range in { fmt.Println(v) } } func main() { nums := make(chan int) squares := make(chan int) go counter(nums) go squarer(squares, nums) printer(squares) }

 

 

打印出了:

1

4

9

16

25

36

49

64

81

100

 

缓冲通道

上面通道的创建操作中,我们已经讲述过具有缓冲的通道的创建和使用

带有缓冲区的通道可以看作是一个队列,进行先入先出操作

 

获取缓冲通道的缓冲区容量和已缓冲元素数

cap(ch) // 获取缓冲通道缓冲区容量 len(ch) // 获取缓冲通道已缓冲元素数

 

 

通过上述两个方法,我们可以实现非阻塞的通道读写操作

 

通常,操作系统中的 IO 操作同时只能对一个 fd 执行读取或写入操作,但对于服务端程序来说,多个客户端与服务端建立连接,任何时刻任何连接都有可能有数据到来,那么如果使用传统的阻塞式 IO,我们的进程一旦阻塞等待某个连接,其他连接都将无法被处理,而入股哦使用非阻塞式 IO,那么一遍遍轮询全部连接将大大降低执行效率

现代操作系统提供了这样问题的理想解决方案 -- IO 复用模型

IO复用 & UNIX下的五种IO模型

 

GoLang 中,通道的使用也存在同样的问题,那就是按照上面描述的通道的使用,一个 goroutine 同时只能与另一个 goroutine 通信,那么,如果一个 goroutine 要同时接收多个通道中数据的到来,上面的使用方式就显得力不从心了

GoLang 中提供了与操作系统中的 IO 复用模型类似的通道多路复用模型 -- select

 

使用方式

select 的使用方式与 switch 语句非常相似:

select { case value1 <- ch1: // do something case value2 <- ch2: // do something }

 

 

示例

下面的例子是基于上面计算数字平方的修改:

package main import ( "fmt" "os" "time" ) func counter(out chan<- int, abort <-chan struct{}) { tick := time.Tick(time.Second) i := 1 for { select { case <-tick: out <- i i += 1 case <-abort: close(out) return } } } func squarer(out chan<- int, in <-chan int) { for v := range in { out <- v*v } close(out) } func printer(in <-chan int) { for v := range in { fmt.Println(v) } } func setabort(abort chan struct{}) { _, _ = os.Stdin.Read(make([]byte, 1)) // 等待读取单个字节 abort <- struct{}{} } func main() { nums := make(chan int) squares := make(chan int) abort := make(chan struct{}) fmt.Println("Calculate the square of each number per second, press any key to abort") go setabort(abort) go counter(nums, abort) go squarer(squares, nums) printer(squares) }

 

 

我们将上面十个数的循环改成了无限循环,通过接受一个字符来向 abort 通道发送一个信号,从而实现流程的中止

counter 函数同时从每秒生成心跳的 tick 通道和随时可能产生中止信号的 abort 通道读取数据,此时,select 多路复用就显得非常有用了

执行程序并在适当时候输入字符 a 生成中止信号,程序打印出了:

1

4

9

a

 

通过 select 实现非阻塞式通道读写

与 switch 语句一样,select 也可以加入 default 语句,如果所有的 case 条件中的通道均没有数据就绪,那么 select 语句不会阻塞等待,而是会去执行 default 语句,这就实现了通道的非阻塞式读写

下面的例子展示了通道的非阻塞式读写:

package main import "fmt" func main() { abort := make(chan struct{}) over := make(chan struct{}) go func(over chan struct{}) { select { case <-abort: fmt.Println("Launch aborted!") return default: fmt.Println("No abort signal launched") } over <- struct{}{} }(over) <-over }

 

 

执行程序,因为没有 goroutine 向这个匿名 goroutine 传递 abort 信号,所以他打印出了:

No abort signal launched

 

欢迎关注微信公众号,以技术为主,涉及历史、人文等多领域的学习与感悟,每周三到七篇推文,只有全部原创,只有干货没有鸡汤

 

 

《Go 语言程序设计》

 






select      技术贴      channel      golang      goroutine      通道      io复用模型      多路复用     


京ICP备15018585号