本质:给定两个有序的channel, 然后将其合并为一个有序的channel
已知:
- 如果管道满了,但是数据没有读取就会一直阻塞
- 如果函数结束了,那么管道一定要被关闭,如果没有关闭就会fatal
前提:两个channle中存放的数据是有序的
两个channel,无论这两个channel什么时候将数据放入,放入了哪些数据【两个管道中的数据是有序的】
- 如果两个管道都没有被关闭
- 管道1的第一个数据比较小,将这个数据读取到结果管道中。【此时管道2被阻塞】,读取完成之后,继续试图读取管道1
- 否则,将管道2中的数据读取到结果管道中。【此时管道1被阻塞】,读取完成之后,继续试图读取管道2
- 直到有一个管道被关闭了或者两个管道都被关闭了
- 如果管道1没有关闭&& 管道2被关闭了
- 将管道1中的数据读取到结果管道中
- 如果管道1被关闭了&& 管道2没有关闭
- 将管道2中的数据读取到结果管道中
- 如果两个管道都被关闭
- 关闭结果管道,退出程序
package main
import (
"fmt"
"runtime"
)
func main() {
ch1 := create([]int{1, 2, 3, 4, 5, 6, 7, 8, 9})
ch2 := create([]int{ 2, 3, 4, 5, 15, 15, 16, 17, 18, 19})
ch := Merge(ch1, ch2)
//range可以对channel进行迭代,不断接收channel里的数据(没有数据时阻塞),直到channel被关闭后自动退出迭代。
for c := range ch {
fmt.Print(c , "\t")
}
for {
runtime.GC()
}
fmt.Scanln()
}
// cha<- 只用来接受的管道
// <-chan: 只用来发送的管道
func create(arr []int) <-chan int {
/*
对于unbufferd channel,不存储任何数据,只负责数据的流通,并且数据的接收一定发生在数据发送完成之前。更详细的解释是,
goroutine A在往channel发送数据完成之前,一定有goroutine B在等着从这个channel接收数据,
否则发送就会导致发送的goruntine被block住,所以发送和接收的goruntine是耦合的
*/
out := make(chan int) //单个goruntine中不要使用buffered channel来做缓存队列,send和receive操作很容让goruntine被永久block住导致整个程序死锁
go func() {
for _, v := range arr {
out <- v //channel携带的数据只能被一个goruntine得到,一个goruntine取走数据后这份数据在channel里就不复存在了。
}
// 如果不关闭channel,会引发panic
close(out)
}()
return out
}
// ch1 && ch2都必须是有序channel。 这里chan被当成是先进先出管道
func Merge(ch1, ch2 <-chan int) chan int { // ch1, ch2 被当成是先进先出队列。
ch := make(chan int, 1024)
go func() {
v1, ok1 := <- ch1 //检查Channel是否已经被关闭了
v2, ok2 := <- ch2 //检查Channel是否已经被关闭了
for ok1 || ok2 { //只要有一个管道没有被关闭,就会一直循环
if ok1 && ok2 { // 当两个管道都没有被关闭时: 哪个小就将哪个数据读取到结果数据中
if v1 < v2 { // 判断这两个数据哪个小
ch <- v1 // 就把小的数据从原来管道中读出来,然后放入结果管道ch中
v1, ok1 = <- ch1 // 然后接着试图去读取小的哪个
}else{
ch <- v2
v2, ok2 = <- ch2
}
}else if ok1 && ! ok2{ // ch1没有关闭:
ch <- v1 // 从ch1中读取数据,然后放入结果管道ch中
v1, ok1 = <- ch1 // 不断的从没有关闭的ch1
}else{
ch <- v2
v2, ok2 = <- ch2
}
}
close(ch) // 两个管道中的数据都被取完了
}()
return ch
}
package main
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"math/rand"
"os"
"runtime"
"strconv"
"time"
)
func RandomSource(count int)<-chan int{
rand.Seed(time.Now().Unix())
out:=make(chan int) // 没有缓冲,只是用作数据流通:必须及时把数据从管道中读取,不然就会阻塞
//fmt.Println("主线程,将要开启一个生成随机数的线程", goID())
go func(){
defer close(out)//关闭管道: go func函数结果的时候close
for i:=0;i<count;i++{
fmt.Println("生成随机数的线程正在运行:", goID())
out<- rand.Int() //压入随机数
}
}()
//fmt.Println("主线程,已经开启一个生成随机数的线程", goID())
return out
}
//写入
func WriterSlink(writer io.Writer,in <-chan int){
//fmt.Println("主线程,将要读取随机数通道并写入文件中:", goID())
for v:=range in{ // 不断的读取并写入到文件中
//fmt.Println("主线程,正在读取随机数通道并写入文件中:", goID())
buf:=make([]byte,8)//64位 8字节
binary.BigEndian.PutUint64(buf,uint64(v))//字节转换
writer.Write(buf)//写入
}
//fmt.Println("主线程,完成读取随机数通道并写入文件中:", goID())
}
func main() {
fileName := "test.txt"
file ,err:=os.Create(fileName)
if err!=nil{
panic(err)
}
defer file.Close()//延迟关闭文件
filesize := 1000 // 一个int 8字节, 当文件大小为80000时,一共80000/8个int
mypipe:=RandomSource(filesize/8) //开启一个线程不断的往mypipe管道里装随机数,直到装了filesize/8个随机数
writer:=bufio.NewWriter(file)//写入
WriterSlink(writer,mypipe)//写入
writer.Flush()
fmt.Println("主线程,刷新写入文件:", goID())
// 一直要等待所有数据都写入到了文件中才会执行下面的语句
/* 为什么 造数据的时候 开线程,从文件中读取数据的时刻开线程,但是把造成的数据写入文件的时候不开一个线程写,而是要用主线程写呢?
回答:
* 就是要等数据都全部写入了才去读啊,用主线程去写的话,在主线程写完数据之前会一直没有空闲,一直在干这个事情【暂时理解为阻塞】。如果用子线程写的话, 就不知道什么时候写完了,读取的数据可能是不全,或者完全随机的,因此我写线程的时候用主线程写而不是分线程。
*/
//打开文件,将生成的数据读取到管道, 并且遍历
file,err =os.Open(fileName)//打开文件
if err!=nil{
panic(err)
}
defer file.Close()
p := ReaderSource(bufio.NewReader(file),-1)
for v := range p {
fmt.Println("主线程,正在从第二个读取通道中读取数据:", goID(), v)
}
}
//开启一个线程来不断的去读数据到out管道中
func ReaderSource(reader io.Reader,chunksize int)<-chan int{
out:=make(chan int ,1024)
//fmt.Println("主线程,将要开启线程去文件中读取数据并放入管道中:", goID())
go func() {
defer close(out) // 当前goroutine完毕之后关闭管道
buf:=make([]byte,8) //64
readsize:=0
for{ // 不断的读取数据到buf中
n,err:=reader.Read(buf)
readsize+=n // 统计读取的数据长度
if n>0{ // 如果读取到了数据
//fmt.Println("读线程,正在从文件中读取数据并放入管道中:", goID())
out<-int(binary.BigEndian.Uint64(buf)) //就数据压入out通道中
}
if err!=nil || (chunksize!=-1 && readsize>= chunksize ){ // 当读取有错误的时候,或者读取完成了就跳出循环
//fmt.Println("读线程,将要跳出:", goID(), "读取到的文件个数:", readsize)
break//跳出循环
}
}
}()
//fmt.Println("主线程,完成开启线程去文件中读取数据并放入管道中:", goID())
return out
}
func goID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = bytes.TrimPrefix(b, []byte("goroutine "))
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
多线程两路归并排序
package main
import (
"bytes"
"fmt"
"runtime"
"sort"
"strconv"
)
// 将数据压入管道
func ArraySource(arr ...int)<-chan int {
out := make(chan int)
go func() {
//fmt.Println("子线程将待排序数据压入管道将要开始:", goID())
defer close(out)
for _, v := range arr {
fmt.Println("读数据线程将待排序数据压入管道ing:", goID(), v)
out <- v
}
fmt.Println("读数据将待排序数据压入管道完成:", goID())
}()
return out
}
func InMemerySort(in <- chan int) <- chan int{
out := make(chan int, 1024)
fmt.Println("开启一个内存排序线程:", goID())
go func() {
defer close(out)
// 从管道中读取数据
store := []int{}
for v := range in {
fmt.Println("内存排序线程从in管道中读取数据存入临时数组中:", goID(), v)
store = append(store, v)
}
//fmt.Println("内存排序线程从in管道中读取数据读取完成:", goID())
// 读取完成之后排序
sort.Ints(store)
//fmt.Println("内存排序线程已经排序了,开始将排序好的数据放入out管道中:", goID())
for _, v := range store {
fmt.Println("内存排序线程已经排序了,开始将排序好的数据放入out管道中ing:", goID(), v)
out <- v
}
fmt.Println("内存排序线程已经排序了,完成将排序好的数据放入out管道中:", goID())
}()
return out
}
// 将两个有序chan整合成一个有序chan
func merge(in1, in2 <-chan int) chan int {
out := make(chan int, 1024)
fmt.Println("开启一个合并两个有序chan线程:", goID())
go func() {
defer close(out)
v1, ok1 := <- in1
v2, ok2 := <- in2
for ok1 || ok2{
if ok1 && ok2{
if v1 < v2 {
fmt.Println("正在合并数据:", goID(), v1)
out <- v1
v1, ok1 = <- in1
}else{
fmt.Println("正在合并数据:", goID(), v2)
out <- v2
v2, ok2 = <- in2
}
}else if ok1 && ! ok2{
fmt.Println("正在合并数据之v2已经读取完毕:", goID(), v1)
out <- v1
v1, ok1 = <- in1
}else if !ok1 && ok2{
fmt.Println("正在合并数据之v1已经读取完毕:", goID(), v2)
out <- v2
v2, ok2 = <- in2
}
}
}()
return out
}
func main() {
/*
InMemerySort(ArraySource(3,9,2,1,10))
总结: 待排序数据管道, 开两个线程,开一个线程一边往里面放数据,同时另一个线程从这个管道中读取数据。
待管道中数据读取完成之后: 排序, 排好序之后将排序号的数据放入另一个管道中
*/
p := merge(InMemerySort(ArraySource(3,9,2,1,10)), InMemerySort(ArraySource(13,19,21,12,101, 10, 3)));
for v := range p {
fmt.Println(v)
}
fmt.Scanln()
}
func goID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = bytes.TrimPrefix(b, []byte("goroutine "))
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
-
为什么这里我只开三种线程呢? 把数据放入管道的放线程, 从管道中拿数据并且排序的排序线程,合并两堆有序管道的合并线程。
工程中我们干活不就是这样吗? 先将物资分成一小份一小份的【这时物资是散乱在地上的】。每一份数据前面有一个工人,将数据放到传送带上,然后另外一个工人将传送带上的数据放入箱子中,等所有的数据都捡到了箱子中,就排序好数据,然后将数据按照顺序传送给另外一个人 -
主线程必须阻塞住,要不然就完成不了
0
1
2
3、4、
5、
6、
package main
import (
"bytes"
"fmt"
"runtime"
"sort"
"strconv"
)
// 将数据压入管道
func ArraySource(arr ...int)<-chan int {
out := make(chan int)
go func() {
//fmt.Println("子线程将待排序数据压入管道将要开始:", goID())
defer close(out)
for _, v := range arr {
//fmt.Println("读数据线程将待排序数据压入管道ing:", goID(), v)
out <- v
}
//fmt.Println("读数据将待排序数据压入管道完成:", goID())
}()
return out
}
func InMemerySort(in <- chan int) <- chan int{
out := make(chan int, 1024)
fmt.Println("开启一个内存排序线程:", goID())
go func() {
defer close(out)
// 从管道中读取数据
store := []int{}
for v := range in {
//fmt.Println("内存排序线程从in管道中读取数据存入临时数组中:", goID(), v)
store = append(store, v)
}
//fmt.Println("内存排序线程从in管道中读取数据读取完成:", goID())
// 读取完成之后排序
sort.Ints(store)
//fmt.Println("内存排序线程已经排序了,开始将排序好的数据放入out管道中:", goID())
for _, v := range store {
//fmt.Println("内存排序线程已经排序了,开始将排序好的数据放入out管道中ing:", goID(), v)
out <- v
}
//fmt.Println("内存排序线程已经排序了,完成将排序好的数据放入out管道中:", goID())
}()
return out
}
// 将两个有序chan整合成一个有序chan
func merge(in1, in2 <-chan int) chan int {
out := make(chan int, 1024)
//fmt.Println("开启一个合并两个有序chan线程:", goID())
go func() {
defer close(out)
v1, ok1 := <- in1
v2, ok2 := <- in2
for ok1 || ok2{
if ok1 && ok2{
if v1 < v2 {
//fmt.Println("正在合并数据:", goID(), v1)
out <- v1
v1, ok1 = <- in1
}else{
//fmt.Println("正在合并数据:", goID(), v2)
out <- v2
v2, ok2 = <- in2
}
}else if ok1 && ! ok2{
//fmt.Println("正在合并数据之v2已经读取完毕:", goID(), v1)
out <- v1
v1, ok1 = <- in1
}else if !ok1 && ok2{
// fmt.Println("正在合并数据之v1已经读取完毕:", goID(), v2)
out <- v2
v2, ok2 = <- in2
}
}
}()
return out
}
func mergeN(inputs... <-chan int) <-chan int {
length := len(inputs)
if length == 1 {
return inputs[0]
}else{
mid := length/2
return merge(mergeN(inputs[:mid]...), mergeN(inputs[mid:]...))
}
}
func main() {
p := mergeN(InMemerySort(ArraySource(3,9,2,1,10)),
InMemerySort(ArraySource(133,9,2,15,110)),
InMemerySort(ArraySource(13,9,24,157,130)),
InMemerySort(ArraySource(3,9,32,51,1990)),
InMemerySort(ArraySource(3,9,24,12,1410)),
InMemerySort(ArraySource(13,19,21,12,101, 10, 3)));
for v := range p {
fmt.Println(v)
}
fmt.Scanln()
}
func goID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = bytes.TrimPrefix(b, []byte("goroutine "))
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
package main
import (
"bufio"
"bytes"
"encoding/binary"
"fmt"
"io"
"math/rand"
"os"
"runtime"
"sort"
"strconv"
"time"
)
func GenerateDataToFile(filename string, filesize int){
pipi := GeneRandomSource(filesize / 8)
file, err := os.Create(filename)
if err != nil {
panic(err)
}
defer file.Close()
writer:=bufio.NewWriter(file)//写入
WriterSlink(writer, pipi)
writer.Flush()//刷新
}
func main() {
filesize := 10000
filename := "data.in"
// 生成测试数据
GenerateDataToFile(filename, filesize);
chunkCount := 8 // 有多少个chan用来排序
chunkSize := filesize/chunkCount // 每根chan里面放多少数据
file, err := os.Open(filename)
if err!=nil{
panic(err)
}
defer file.Close()
sortResults:=[]<-chan int{} //排序结果,一个数组,每一个元素是个管道
for i:=0;i<chunkCount;i++{
file.Seek(int64(i*chunkSize),io.SeekStart)//跳到文件指针 : 从文件开始偏移 i*chunkSize
// 读取文件到内存中然后在内存中排序,将排序好的数据放入有序chan中
sortResults=append(sortResults, InMemerySort(ReaderSource(bufio.NewReader(file),chunkSize)))
}
sort := MergeN(sortResults...)
for v := range sort{
fmt.Println(v)
}
fmt.Scanln()
}
func InMemerySort(in <- chan int) <- chan int{
out := make(chan int, 1024)
go func() {
defer close(out)
store := []int{}
for v := range in {
store = append(store, v)
}
// 读取完成之后排序
sort.Ints(store)
for _, v := range store {
out <- v
}
}()
return out
}
// 将两个有序chan整合成一个有序chan
func merge(in1, in2 <-chan int) chan int {
out := make(chan int, 1024)
go func() {
defer close(out)
v1, ok1 := <- in1
v2, ok2 := <- in2
for ok1 || ok2{
if ok1 && ok2{
if v1 < v2 {
out <- v1
v1, ok1 = <- in1
}else{
out <- v2
v2, ok2 = <- in2
}
}else if ok1 && ! ok2{
out <- v1
v1, ok1 = <- in1
}else if !ok1 && ok2{
out <- v2
v2, ok2 = <- in2
}
}
}()
return out
}
func MergeN(inputs... <-chan int) <-chan int {
length := len(inputs)
if length == 0{
return nil
}else if length == 1 {
return inputs[0]
}else{
mid := length/2
return merge(MergeN(inputs[:mid]...), MergeN(inputs[mid:]...))
}
}
func ReaderSource(reader io.Reader,chunksize int)<-chan int{
out:=make(chan int ,1024)
go func() {
defer close(out) // 当前goroutine完毕之后关闭管道
buf:=make([]byte,8) //64
readsize:=0
for{ // 不断的读取数据到buf中
n,err:=reader.Read(buf)
readsize+=n // 统计读取的数据长度
if n>0{ // 如果读取到了数据
out<-int(binary.BigEndian.Uint64(buf)) //就数据压入out通道中
}
if err!=nil || (chunksize!=-1 && readsize>= chunksize ){ // 当读取有错误的时候,或者读取完成了就跳出循环
break//跳出循环
}
}
}()
return out
}
func WriterSlink(writer io.Writer,in <-chan int){
for v:=range in{
buf:=make([]byte,8)//64位 8字节
binary.BigEndian.PutUint64(buf,uint64(v))//字节转换
writer.Write(buf)//写入
}
}
func GeneRandomSource(count int)<-chan int{
rand.Seed(time.Now().Unix())
out:=make(chan int) // 没有缓冲,只是用作数据流通:必须及时把数据从管道中读取,不然就会阻塞
go func(){
defer close(out)//关闭管道: go func函数结果的时候close
for i:=0;i<count;i++{
out<- rand.Int() //压入随机数
}
}()
return out
}
func goID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = bytes.TrimPrefix(b, []byte("goroutine "))
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
‘
总结:这里有一堆数据,主线程就叫了三堆人:
- 主线程计算好每个人要处理的资源,两两分组,一个人将数据放入待排序管道,一个人读取数据并拍好序,放入有序管道。
- 主线程将有序管道两两分组,交了另外一堆人将两两分组的有序管道合并成一个