golang使用channel实现归并排序的merge

本质:给定两个有序的channel, 然后将其合并为一个有序的channel

已知:

  • 如果管道满了,但是数据没有读取就会一直阻塞
  • 如果函数结束了,那么管道一定要被关闭,如果没有关闭就会fatal

前提:两个channle中存放的数据是有序的

两个channel,无论这两个channel什么时候将数据放入,放入了哪些数据【两个管道中的数据是有序的】

  • 如果两个管道都没有被关闭
    • 管道1的第一个数据比较小,将这个数据读取到结果管道中。【此时管道2被阻塞】,读取完成之后,继续试图读取管道1
    • 否则,将管道2中的数据读取到结果管道中。【此时管道1被阻塞】,读取完成之后,继续试图读取管道2
    • 直到有一个管道被关闭了或者两个管道都被关闭了
  • 如果管道1没有关闭&& 管道2被关闭了
    • 将管道1中的数据读取到结果管道中
  • 如果管道1被关闭了&& 管道2没有关闭
    • 将管道2中的数据读取到结果管道中
  • 如果两个管道都被关闭
    • 关闭结果管道,退出程序
package main

import (
	"fmt"
	"runtime"
)

func main() {
	ch1 := create([]int{1, 2, 3, 4, 5, 6, 7, 8, 9})
	ch2 := create([]int{ 2, 3, 4, 5, 15, 15, 16, 17, 18, 19})
	ch := Merge(ch1, ch2)
    //range可以对channel进行迭代,不断接收channel里的数据(没有数据时阻塞),直到channel被关闭后自动退出迭代。
	for c := range ch {
		fmt.Print(c , "\t")
	}
	for {
		runtime.GC()
	}

	fmt.Scanln()
}

// cha<- 只用来接受的管道
// <-chan: 只用来发送的管道
func create(arr []int) <-chan int {
	/*
	对于unbufferd channel,不存储任何数据,只负责数据的流通,并且数据的接收一定发生在数据发送完成之前。更详细的解释是,
	goroutine A在往channel发送数据完成之前,一定有goroutine B在等着从这个channel接收数据,
	否则发送就会导致发送的goruntine被block住,所以发送和接收的goruntine是耦合的
	*/
	out := make(chan int)  //单个goruntine中不要使用buffered channel来做缓存队列,send和receive操作很容让goruntine被永久block住导致整个程序死锁
	go func() {
		for _, v := range arr {
			out <- v //channel携带的数据只能被一个goruntine得到,一个goruntine取走数据后这份数据在channel里就不复存在了。
		}
		// 如果不关闭channel,会引发panic
		close(out)
	}()

	return out
}

// ch1 && ch2都必须是有序channel。  这里chan被当成是先进先出管道
func Merge(ch1, ch2 <-chan int) chan int { // ch1, ch2 被当成是先进先出队列。
	ch := make(chan int, 1024)
	go func() {
		v1, ok1 := <- ch1 //检查Channel是否已经被关闭了
		v2, ok2 := <- ch2 //检查Channel是否已经被关闭了
		for  ok1 || ok2 { //只要有一个管道没有被关闭,就会一直循环
			if ok1 && ok2 { // 当两个管道都没有被关闭时: 哪个小就将哪个数据读取到结果数据中
				if v1 < v2 {  // 判断这两个数据哪个小
					ch <- v1 // 就把小的数据从原来管道中读出来,然后放入结果管道ch中
					v1, ok1 = <- ch1  // 然后接着试图去读取小的哪个
				}else{
					ch <- v2
					v2, ok2 = <- ch2
				}
			}else if ok1 && ! ok2{ // ch1没有关闭:
				ch <- v1 // 从ch1中读取数据,然后放入结果管道ch中
				v1, ok1 = <- ch1  // 不断的从没有关闭的ch1
			}else{
				ch <- v2
				v2, ok2 = <- ch2
			}
		}
		close(ch) // 两个管道中的数据都被取完了
	}()

	return ch
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

多线程:使用chan运输随机数并写入到文件中
package main

import (
	"bufio"
	"bytes"
	"encoding/binary"
	"fmt"
	"io"
	"math/rand"
	"os"
	"runtime"
	"strconv"
	"time"
)

func  RandomSource(count int)<-chan int{
	rand.Seed(time.Now().Unix())
	out:=make(chan int)  // 没有缓冲,只是用作数据流通:必须及时把数据从管道中读取,不然就会阻塞
	//fmt.Println("主线程,将要开启一个生成随机数的线程", goID())
	go func(){
		defer close(out)//关闭管道: go func函数结果的时候close
		for i:=0;i<count;i++{
			fmt.Println("生成随机数的线程正在运行:", goID())
			out<- rand.Int() //压入随机数
		}
	}()
	//fmt.Println("主线程,已经开启一个生成随机数的线程", goID())

	return out
}

//写入
func  WriterSlink(writer io.Writer,in <-chan int){
	//fmt.Println("主线程,将要读取随机数通道并写入文件中:", goID())
	for v:=range in{  // 不断的读取并写入到文件中
		//fmt.Println("主线程,正在读取随机数通道并写入文件中:", goID())
		buf:=make([]byte,8)//64位 8字节
		binary.BigEndian.PutUint64(buf,uint64(v))//字节转换
		writer.Write(buf)//写入
	}
	//fmt.Println("主线程,完成读取随机数通道并写入文件中:", goID())
}
func main() {
	fileName := "test.txt"
	file ,err:=os.Create(fileName)
	if err!=nil{
		panic(err)
	}
	defer file.Close()//延迟关闭文件
	
	
	filesize := 1000 // 一个int 8字节, 当文件大小为80000时,一共80000/8个int
	mypipe:=RandomSource(filesize/8) //开启一个线程不断的往mypipe管道里装随机数,直到装了filesize/8个随机数
	
	writer:=bufio.NewWriter(file)//写入
	WriterSlink(writer,mypipe)//写入
	writer.Flush()
	fmt.Println("主线程,刷新写入文件:", goID())

	// 一直要等待所有数据都写入到了文件中才会执行下面的语句
	/* 为什么 造数据的时候 开线程,从文件中读取数据的时刻开线程,但是把造成的数据写入文件的时候不开一个线程写,而是要用主线程写呢?
	回答:
		* 就是要等数据都全部写入了才去读啊,用主线程去写的话,在主线程写完数据之前会一直没有空闲,一直在干这个事情【暂时理解为阻塞】。如果用子线程写的话, 就不知道什么时候写完了,读取的数据可能是不全,或者完全随机的,因此我写线程的时候用主线程写而不是分线程。
  */


	//打开文件,将生成的数据读取到管道, 并且遍历
	file,err =os.Open(fileName)//打开文件
	if err!=nil{
		panic(err)
	}
	defer file.Close()
	p := ReaderSource(bufio.NewReader(file),-1)
	for v := range 	p {
		fmt.Println("主线程,正在从第二个读取通道中读取数据:", goID(), v)
	}

}


//开启一个线程来不断的去读数据到out管道中
func ReaderSource(reader io.Reader,chunksize int)<-chan int{
	out:=make(chan int ,1024)
	//fmt.Println("主线程,将要开启线程去文件中读取数据并放入管道中:", goID())
	go func() {
		defer close(out) // 当前goroutine完毕之后关闭管道
		buf:=make([]byte,8) //64
		readsize:=0
		for{  // 不断的读取数据到buf中
			n,err:=reader.Read(buf)
			readsize+=n // 统计读取的数据长度
			if n>0{ // 如果读取到了数据
				//fmt.Println("读线程,正在从文件中读取数据并放入管道中:", goID())
				out<-int(binary.BigEndian.Uint64(buf)) //就数据压入out通道中
			}
			if err!=nil || (chunksize!=-1 && readsize>= chunksize ){  // 当读取有错误的时候,或者读取完成了就跳出循环
				//fmt.Println("读线程,将要跳出:", goID(), "读取到的文件个数:", readsize)
				break//跳出循环
			}
		}
	}()
	//fmt.Println("主线程,完成开启线程去文件中读取数据并放入管道中:", goID())
	return out
}



func goID() uint64 {
	b := make([]byte, 64)
	b = b[:runtime.Stack(b, false)]
	b = bytes.TrimPrefix(b, []byte("goroutine "))
	b = b[:bytes.IndexByte(b, ' ')]
	n, _ := strconv.ParseUint(string(b), 10, 64)
	return n
}

在这里插入图片描述

多线程两路归并排序
package main

import (
	"bytes"
	"fmt"
	"runtime"
	"sort"
	"strconv"
)



// 将数据压入管道
func ArraySource(arr ...int)<-chan int {
	out := make(chan int)
	go func() {
		//fmt.Println("子线程将待排序数据压入管道将要开始:", goID())
		defer close(out)
		for _, v := range arr {
			fmt.Println("读数据线程将待排序数据压入管道ing:", goID(), v)
			out <- v
		}
		fmt.Println("读数据将待排序数据压入管道完成:", goID())
	}()

	return out
}

func InMemerySort(in <- chan int) <- chan int{
	out := make(chan int, 1024)
	fmt.Println("开启一个内存排序线程:", goID())
	go func() {
		defer close(out)

		// 从管道中读取数据
		store := []int{}
		for v := range in {
			fmt.Println("内存排序线程从in管道中读取数据存入临时数组中:", goID(), v)
			store = append(store, v)
		}
		//fmt.Println("内存排序线程从in管道中读取数据读取完成:", goID())
		// 读取完成之后排序
		sort.Ints(store)
		//fmt.Println("内存排序线程已经排序了,开始将排序好的数据放入out管道中:", goID())
		for _, v := range store {
			fmt.Println("内存排序线程已经排序了,开始将排序好的数据放入out管道中ing:", goID(), v)
			out <- v
		}
		fmt.Println("内存排序线程已经排序了,完成将排序好的数据放入out管道中:", goID())
	}()

	return out
}

//  将两个有序chan整合成一个有序chan
func merge(in1, in2 <-chan int) chan  int {
	out := make(chan int, 1024)
	fmt.Println("开启一个合并两个有序chan线程:", goID())
	go func() {
		defer close(out)
		v1, ok1 := <- in1
		v2, ok2 := <- in2
		for   ok1  || ok2{
			if ok1 && ok2{
				if v1 < v2 {
					fmt.Println("正在合并数据:", goID(), v1)
					out <- v1
					v1, ok1 = <- in1
				}else{
					fmt.Println("正在合并数据:", goID(), v2)
					out <- v2
					v2, ok2 = <- in2
				}
			}else if ok1 && ! ok2{
				fmt.Println("正在合并数据之v2已经读取完毕:", goID(), v1)
				 out <- v1
				 v1, ok1 = <- in1
			}else if !ok1 && ok2{
				fmt.Println("正在合并数据之v1已经读取完毕:", goID(), v2)
				 out <- v2
				 v2, ok2 = <- in2
			}
		}
	}()


	return out
}


func main() {
	/*
	InMemerySort(ArraySource(3,9,2,1,10))
	总结: 待排序数据管道, 开两个线程,开一个线程一边往里面放数据,同时另一个线程从这个管道中读取数据。
	        待管道中数据读取完成之后: 排序, 排好序之后将排序号的数据放入另一个管道中
	*/
	p := merge(InMemerySort(ArraySource(3,9,2,1,10)), InMemerySort(ArraySource(13,19,21,12,101, 10, 3)));
	for  v := range p {
		fmt.Println(v)
	}
	fmt.Scanln()
}





func goID() uint64 {
	b := make([]byte, 64)
	b = b[:runtime.Stack(b, false)]
	b = bytes.TrimPrefix(b, []byte("goroutine "))
	b = b[:bytes.IndexByte(b, ' ')]
	n, _ := strconv.ParseUint(string(b), 10, 64)
	return n
}

在这里插入图片描述

  • 为什么这里我只开三种线程呢? 把数据放入管道的放线程, 从管道中拿数据并且排序的排序线程,合并两堆有序管道的合并线程。
    工程中我们干活不就是这样吗? 先将物资分成一小份一小份的【这时物资是散乱在地上的】。每一份数据前面有一个工人,将数据放到传送带上,然后另外一个工人将传送带上的数据放入箱子中,等所有的数据都捡到了箱子中,就排序好数据,然后将数据按照顺序传送给另外一个人

  • 主线程必须阻塞住,要不然就完成不了
    在这里插入图片描述
    在这里插入图片描述
    0
    在这里插入图片描述
    1
    在这里插入图片描述
    2
    在这里插入图片描述
    3、在这里插入图片描述4、
    在这里插入图片描述
    5、
    在这里插入图片描述
    6、
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

本机多线程多路归并排序
package main

import (
	"bytes"
	"fmt"
	"runtime"
	"sort"
	"strconv"
)



// 将数据压入管道
func ArraySource(arr ...int)<-chan int {
	out := make(chan int)
	go func() {
		//fmt.Println("子线程将待排序数据压入管道将要开始:", goID())
		defer close(out)
		for _, v := range arr {
			//fmt.Println("读数据线程将待排序数据压入管道ing:", goID(), v)
			out <- v
		}
		//fmt.Println("读数据将待排序数据压入管道完成:", goID())
	}()

	return out
}

func InMemerySort(in <- chan int) <- chan int{
	out := make(chan int, 1024)
	fmt.Println("开启一个内存排序线程:", goID())
	go func() {
		defer close(out)

		// 从管道中读取数据
		store := []int{}
		for v := range in {
			//fmt.Println("内存排序线程从in管道中读取数据存入临时数组中:", goID(), v)
			store = append(store, v)
		}
		//fmt.Println("内存排序线程从in管道中读取数据读取完成:", goID())
		// 读取完成之后排序
		sort.Ints(store)
		//fmt.Println("内存排序线程已经排序了,开始将排序好的数据放入out管道中:", goID())
		for _, v := range store {
			//fmt.Println("内存排序线程已经排序了,开始将排序好的数据放入out管道中ing:", goID(), v)
			out <- v
		}
		//fmt.Println("内存排序线程已经排序了,完成将排序好的数据放入out管道中:", goID())
	}()

	return out
}

//  将两个有序chan整合成一个有序chan
func merge(in1, in2 <-chan int) chan  int {
	out := make(chan int, 1024)
	//fmt.Println("开启一个合并两个有序chan线程:", goID())
	go func() {
		defer close(out)
		v1, ok1 := <- in1
		v2, ok2 := <- in2
		for   ok1  || ok2{
			if ok1 && ok2{
				if v1 < v2 {
					//fmt.Println("正在合并数据:", goID(), v1)
					out <- v1
					v1, ok1 = <- in1
				}else{
					//fmt.Println("正在合并数据:", goID(), v2)
					out <- v2
					v2, ok2 = <- in2
				}
			}else if ok1 && ! ok2{
				//fmt.Println("正在合并数据之v2已经读取完毕:", goID(), v1)
				 out <- v1
				 v1, ok1 = <- in1
			}else if !ok1 && ok2{
			//	fmt.Println("正在合并数据之v1已经读取完毕:", goID(), v2)
				 out <- v2
				 v2, ok2 = <- in2
			}
		}
	}()


	return out
}

func mergeN(inputs... <-chan int) <-chan  int {
	length := len(inputs)
	if length == 1 {
		return inputs[0]
	}else{
		mid := length/2
		return merge(mergeN(inputs[:mid]...), mergeN(inputs[mid:]...))
	}
}
func main() {
	p := mergeN(InMemerySort(ArraySource(3,9,2,1,10)),
		InMemerySort(ArraySource(133,9,2,15,110)),
		InMemerySort(ArraySource(13,9,24,157,130)),
		InMemerySort(ArraySource(3,9,32,51,1990)),
		InMemerySort(ArraySource(3,9,24,12,1410)),
		InMemerySort(ArraySource(13,19,21,12,101, 10, 3)));
	for  v := range p {
		fmt.Println(v)
	}
	fmt.Scanln()


}





func goID() uint64 {
	b := make([]byte, 64)
	b = b[:runtime.Stack(b, false)]
	b = bytes.TrimPrefix(b, []byte("goroutine "))
	b = b[:bytes.IndexByte(b, ' ')]
	n, _ := strconv.ParseUint(string(b), 10, 64)
	return n
}

在这里插入图片描述

package main

import (
	"bufio"
	"bytes"
	"encoding/binary"
	"fmt"
	"io"
	"math/rand"
	"os"
	"runtime"
	"sort"
	"strconv"
	"time"
)

func GenerateDataToFile(filename string, filesize int){
	pipi := GeneRandomSource(filesize / 8)


	file, err := os.Create(filename)
	if err != nil {
		panic(err)
	}
	defer file.Close()
	writer:=bufio.NewWriter(file)//写入
	WriterSlink(writer, pipi)
	writer.Flush()//刷新
}


func main() {
	filesize := 10000
	filename := "data.in"

	// 生成测试数据
	GenerateDataToFile(filename, filesize);

	chunkCount := 8  // 有多少个chan用来排序
	chunkSize := filesize/chunkCount // 每根chan里面放多少数据
	file, err := os.Open(filename)
	if err!=nil{
		panic(err)
	}
	defer file.Close()
	sortResults:=[]<-chan int{} //排序结果,一个数组,每一个元素是个管道
	for  i:=0;i<chunkCount;i++{

		file.Seek(int64(i*chunkSize),io.SeekStart)//跳到文件指针 : 从文件开始偏移 i*chunkSize

       // 读取文件到内存中然后在内存中排序,将排序好的数据放入有序chan中
		sortResults=append(sortResults, InMemerySort(ReaderSource(bufio.NewReader(file),chunkSize)))
	}

	sort := MergeN(sortResults...)
	for v := range sort{
		fmt.Println(v)
	}
	fmt.Scanln()
}

func InMemerySort(in <- chan int) <- chan int{
	out := make(chan int, 1024)

	go func() {
		defer close(out)
		store := []int{}
		for v := range in {
			store = append(store, v)
		}
		// 读取完成之后排序
		sort.Ints(store)
		for _, v := range store {
			out <- v
		}
	}()

	return out
}

//  将两个有序chan整合成一个有序chan
func merge(in1, in2 <-chan int) chan  int {
	out := make(chan int, 1024)
	go func() {
		defer close(out)
		v1, ok1 := <- in1
		v2, ok2 := <- in2
		for   ok1  || ok2{
			if ok1 && ok2{
				if v1 < v2 {
					out <- v1
					v1, ok1 = <- in1
				}else{
					out <- v2
					v2, ok2 = <- in2
				}
			}else if ok1 && ! ok2{
				out <- v1
				v1, ok1 = <- in1
			}else if !ok1 && ok2{
				out <- v2
				v2, ok2 = <- in2
			}
		}
	}()


	return out
}

func MergeN(inputs... <-chan int) <-chan  int {
	length := len(inputs)
	if length == 0{
		return nil
	}else if length == 1 {
		return inputs[0]
	}else{
		mid := length/2
		return merge(MergeN(inputs[:mid]...), MergeN(inputs[mid:]...))
	}
}
func ReaderSource(reader io.Reader,chunksize int)<-chan int{
	out:=make(chan int ,1024)
	go func() {
		defer close(out) // 当前goroutine完毕之后关闭管道
		buf:=make([]byte,8) //64
		readsize:=0
		for{  // 不断的读取数据到buf中
			n,err:=reader.Read(buf)
			readsize+=n // 统计读取的数据长度
			if n>0{ // 如果读取到了数据
				out<-int(binary.BigEndian.Uint64(buf)) //就数据压入out通道中
			}
			if err!=nil || (chunksize!=-1 && readsize>= chunksize ){  // 当读取有错误的时候,或者读取完成了就跳出循环
				break//跳出循环
			}
		}
	}()
	return out
}
func  WriterSlink(writer io.Writer,in <-chan int){
	for v:=range in{
		buf:=make([]byte,8)//64位 8字节
		binary.BigEndian.PutUint64(buf,uint64(v))//字节转换
		writer.Write(buf)//写入
	}

}
func  GeneRandomSource(count int)<-chan int{
	rand.Seed(time.Now().Unix())
	out:=make(chan int)  // 没有缓冲,只是用作数据流通:必须及时把数据从管道中读取,不然就会阻塞
	go func(){
		defer close(out)//关闭管道: go func函数结果的时候close

		for i:=0;i<count;i++{
			out<- rand.Int() //压入随机数
		}
	}()

	return out
}

func goID() uint64 {
	b := make([]byte, 64)
	b = b[:runtime.Stack(b, false)]
	b = bytes.TrimPrefix(b, []byte("goroutine "))
	b = b[:bytes.IndexByte(b, ' ')]
	n, _ := strconv.ParseUint(string(b), 10, 64)
	return n
}


总结:这里有一堆数据,主线程就叫了三堆人:

  • 主线程计算好每个人要处理的资源,两两分组,一个人将数据放入待排序管道,一个人读取数据并拍好序,放入有序管道。
  • 主线程将有序管道两两分组,交了另外一堆人将两两分组的有序管道合并成一个
    在这里插入图片描述