关于多进程和多线程的基本概念以及基本操作本人在以下系列文章已经介绍。

”浅谈操作系统原理”   http://zbpblog.com/blog-175.html

“python多线程和多进程”  http://zbpblog.com/blog-181.html


本篇主要介绍关于Go的并发机制,goroutine的调度和特性。当然既然是讲并发,肯定还是会涉及到多线程和多进程,因此也会再次简单的介绍线程和进程的同步方法,也算为goroutine协程的介绍做铺垫。下面正式开始go的并发之旅!


本章介绍多进程的同步


管道 Pipe

管道(pipe)是一种半双工(或者说单向)的通信方式,只能用于父进程与子进程以及同祖先的子进程之间的通信。

其实在shell中使用管道符就是典型的多个进程使用管道进行通信的例子

ps aux | grep go


这里linux系统会开两个进程,|就是管道,ps进程的输出会通过管道传输给grep进程作为grep命令的输入。


对于管道,Go是支持的。通过标准库代码包os/exec中的API,我们可以执行操作系统命令并在此之上建立管道。


下面我们用go编写一个使用系统命令和管道的小例子

func Example1(){
	command := "Using multiprocess programming by golang!"
	cmd0 := exec.Command("echo", command)
    
    // 创建一个cmd0命令的输出管道
	pipe, err := cmd0.StdoutPipe()
	if err != nil {
		log.Fatal(err)
	}
    defer pipe.Close()

    // 执行cmd0这个系统命令,这里相当于是fork出了一个子进程来运行cmd0这个命令,所以Start方法是异步非阻塞的
	if err := cmd0.Start(); err != nil{
		log.Fatal(err)
	}

    // 在父进程(也就是main goroutine所在的进程)并发的接收cmd0的输出管道的数据
	cont := make([]byte, 1024)
	pipe.Read(cont)	 // 阻塞,直到pipe管道中有数据可读
	fmt.Printf("接收到cmd0的数据:%s", cont)
}

注意cmd0执行Run方法或Start方法都表示执行,但Run是阻塞的,Start是非阻塞的。


所有适用于文件读写的规则都使用与pipe的读写。

管道是以流的形式传输数据的,这与队列不同,队列是以对象为单位传输数据。


这个例子就是典型的管道在父子进程间进行通信。在这个例子中,我们用一个长度为1K的字节切片接收管道中的数据。但是其实管道中的数据只有三十多个字节,没有必要用1K的字节切片接收,除了开头的三十多个字节,多出的部分会填充为0。

所以这里其实可以使用ioutil.ReadAll来代替。

cont := make([]byte, 1024)

cont, err := ioutil.ReadAll(pipe)		// 从管道中读取数据,如果管道中没有数据则会阻塞
if err != nil {
    log.Fatal(err)
}
fmt.Printf("接收到cmd0的数据:%s", cont)

在用pipe.Read时,如果输出管道中再没有可以读取的数据,那么Read 方法返回的第二个结果值就会是变量io.EOF 的值。如果没有遇到io.EOF说明数据还没有读完,需要多次pipe.Read读取。


还需要注意的是,Start方法是非阻塞的,也就是说,Start方法会让命令开始执行,但他不会等待命令执行完毕,如果希望命令执行完毕后才往下走,可以使用pipe.Wait()方法进行阻塞。


上面的例子没有调用Wait做阻塞等待是因为pipe.Read已经阻塞了main


管道可以把一个命令的输出作为另一个命令的输入,Go代码也可以做到这一点,而且实现起来可以很简洁。例如:

func Example3(){
	cmd1 := exec.Command("ps", "-aux")
	cmd2 := exec.Command("grep", "httpd")

	var (
		cmd1_outputbuf, cmd2_outputbuf bytes.Buffer
	)

	// 将cmd1的输出保存到 cmd1_outputbuf 这个动态缓冲区(要等cmd1运行之后)
	cmd1.Stdout = &cmd1_outputbuf		// 把 *bytes.Buffer 传给Stdout,而不是bytes.Buffer本身,因为cmd1.Stdout要求是一个io.Writer接口类型。而*bytes.Buffer才是io.Writer接口类型。

	// 将cmd1的输出作为cmd2命令的输入
	cmd2.Stdin = &cmd1_outputbuf
	cmd2.Stdout = &cmd2_outputbuf

	// 开始运行两个命令
	if err := cmd1.Start(); err != nil {
		log.Fatal(err)
	}
	cmd1.Wait()		// 等待命令1运行完才开始运行命令2

	// 当cmd1运行完之后,cmd1_outputbuf就有内容了
	if err := cmd2.Start(); err != nil{
		log.Fatal(err)
	}
	cmd2.Wait()

	// 将命令结果输出
	fmt.Printf("命令运行结果:\n %s", cmd2_outputbuf.Bytes())
}

这个例子模拟了 ps -aux | grep httpd命令,cmd1_outputbuf它起到了管道的作用。


上面所讲的管道(也就是 | 符)也叫作匿名管道,与此相对的是命名管道(named pipe)。与匿名管道不同的是,任何进程都可以通过命名管道交换数据。实际上,命名管道以文件的形式存在于文件系统中,使用它的方法与使用文件很类似。Linux操作系统支持通过mkfifo 命令创建和使用命名管道。

下面演示如何在Linux下使用mkfifo

# 创建一个命名管道, 会创建一个myfifo1 文件
mkfifo -m 644 myfifo1

# 使用myfifo1 让ps -aux 和 grep httpd 这两个进程通信
grep httpd < myfifo1	# 从myfifo1管道接收信息作为 grep httpd 的输入,此时会阻塞,因为myfifo1中没有任何内容,是不可读的

# 新开一个窗口执行ps
ps -aux > myfifo1    # 将ps的输出输出到myfifo1管道。此时grep 命令会马上被唤醒并执行

上面如果先执行 ps -aux > myfifo1 也同样被阻塞,因为此时还没有grep进程去接收这个管道的消息。


命名管道默认是阻塞式的。更具体地说,只有在对这个命令管道的读操作和写操作都已准备就绪之后,数据才开始流转。还要注意,命名管道仍然是单向的,又由于可以实现多路复用(即多个进程同时向命名管道写数据),所以有时候也需要考虑多个进程同时向命名管道写数据的情况下的操作原子性问题。

顺带提一句,管道不是进程安全的,因此在多进程并发读写的时候需要同步。例如:

假设现在有4个shell窗口,A和B窗口是管道的消费者,他们分别执行

grep mysql < myfifo1   # 阻塞

grep httpd < myfifo1   # 阻塞

C窗口是生产者,它在A和B执行了这两句之后执行

ps -aux > myfifo1


结果是,A 和 B同时被唤醒,但是接收到管道数据的只有A,B没有接收到,原因是管道中的数据已经被A给消费掉了,B被唤醒只能读到空数据。

如果我们重复上面的过程,在此基础上用D窗口也执行 ps -aux > myfifo1

结果没有变,还是A接收到数据,B没接收到,而且D窗口被阻塞。

现在,我们再看一种情况:

C和D都执行 ps -aux > myfifo1

A执行grep httpd < myfifo1


此时C和D同时被唤醒,A会把C和D的两份消息都消费掉。于是A打印了2遍httpd的信息。


实际上,我们在开发的过程中,在C和D写入结果到有名管道时要加锁让C和D串行的把数据写入到myfifo1,原因是管道是进程不安全的,同时写入管道可能会导致数据混乱。


Linux中命名管道和匿名管道(管道符)的异同点

相同点:二者都会在其中一端还未就绪的时候阻塞另一端的进程。例如匿名管道中 ps -aux | grep httpd

当 ps在执行的时候,ps进程就是未就绪的,此时grep进程会被阻塞。当ps开始输出数据到管道的时候,ps这一端就是写就绪的,此时grep就会被唤醒。命名管道同理。

不同点:匿名管道由于不是以文件的形式存在,因此管道内的数据是保存在一个固定大小的缓冲区的,匿名管道会在管道缓冲区被写满之后使写数据的进程(生产者)阻塞,当消费者消费了一部分数据后才能继续执行。此时生产者进程的执行速度取决于消费者的消费速度。

命名管道则不会因为写入数据过多而被阻塞或者被消费者限速,因为命名管道的数据存在文件中,生产者的产出数据可以存到文件中。


在Go标准库代码包os中,包含了可以创建这种命名管道的API

reader, writer, err := os.Pipe() 

Go使用系统函数来创建管道,并把它的两端封装成两个*os.File 类型的值 reader 和 writer。reader 可以从管道中读内容,writer同理。第三个结果err 代表可能发生的错误,若无错误发生,则其值为nil。

 

其实上面例子中的 cmd.StdoutPipe()也是基于os.Pipe()实现的。

 

os.Pipe()相当于裸用操作系统的管道,他是一个进程不安全的管道。


命名管道可以被多路复用。所以,当有多个输入端同时写入数据的时候,就不得不需要考虑操作原子性的问题。操作系统提供的管道(os.Pipe())是不提供原子操作支持的。为此,Go在标准库代码包io中提供了一个基于内存的有原子性操作保证的管道io.Pipe()(以下简称内存管道) 

reader, writer := io.Pipe() 


函数io.Pipe 返回两个结果值。第一个结果值是代表了该管道输出端的*io.PipeReader 类型的值,第二个结果值是代表了该管道输入端的*io.PipeWriter 类型的值。

在使用Close 方法关闭管道的某一端之后,另一端在写数据或读数据的时候会得到一个预定义的error 类型值。不过我们也可以通过调用CloseWithError 来自定义这种情况下得到的error 类型值。

函数io.Pipe 返回两个结果值。第一个结果值是代表了该管道输出端的*io.PipeReader 类型的值,第二个结果值是代表了该管道输入端的*io.PipeWriter 类型的值。

在使用Close 方法关闭管道的某一端之后,另一端在写数据或读数据的时候会得到一个预定义的error 类型值。不过我们也可以通过调用CloseWithError 来自定义这种情况下得到的error 类型值。


当然啦,无论是 os.Pipe 还是 io.Pipe ,我们都要并发的编写生产端和接收端的逻辑(生产端的发送pipe和接收方的接收pipe操作要放在两个goroutine中),如果把生产端和接收端的管道逻辑放在一个goroutine中就会导致永久阻塞。


另外,内存管道io.Pipe并不是基于文件系统的,也没有作为中介的缓冲区,所以通过它传递的数据只会被复制一次。这也就更进一步地提高了数据传递的效率。不过这样的话生产者的生产速度和执行速度会被消费者的消费速度限制。

   

总结 io.Pipe和os.Pipe的异同点:

前者是进程安全的,后者不是

前者相当于是无缓存的匿名管道,后者是基于文件的命名管道。

 

  

本文转载自:
张柏沛IT技术博客 > Go并发编程系列(一) 多进程编程与进程同步之Pipe管道