1.进程、线程、协程区别

a.各自特点

  • 进程:拥有自己独立的堆和栈,既不共享堆,也不共享栈,进程由操作系统调度;
  • 线程:拥有自己独立的栈和共享的堆,共享堆,不共享栈,标准线程由操作系统调度;
  • 协程:拥有自己独立的栈和共享的堆,共享堆,不共享栈,协程由程序员在协程的代码里显示调度。

协程与线程:
每个单位时间内,一个CPU只能处理一个线程(操作系统:thread),线程是CPU处理的单位或单元,底层资源占用中等(比进程少)。线程中程序的执行过程是:同步阻塞的(依次执行),非抢占式的(依代码编写顺序)。开发上比较清晰明了。
协程是“用户级”的线程,通过把线程的分段运行:主动暂停、主动运行,切换逻辑点,针对i/o请求可以节约连接、对方处理的中间环节等待时间,一个线程上可以跑多个协程。协程中的程序执行是触发、跳转的,异步非阻塞的(事件触发),抢占式的(线程挂起等待响应)。开发上很复杂。

channel信道,是go用于在线程间传递数据的,下面关于channel的例子观察线程与协程使用情况

b.上代码一:

使用一个无缓存channel时:

百度查找关于go的多线程,写法也跟协程没有明显区别。参照上面特点的话,线程部分:go func(){}()另起一个线程,变量继承当前父进程/主线程、运行空间为{}、内部顺序执行,如果有数据流阻塞;协程部分:对线程添加异步代码,实现事件驱动的执行状态切换,运行空间为当前线程、数据流驱动(输出、输入)不阻塞。
参考 上官二狗《Go 缓冲 channel 和 非缓冲 channel 的区别》

c、代码二:

使用一个缓存channel + 一个无缓存channel时:

package main

import (
    "fmt"
    "math/rand"
    "strconv"
    "time"
)

type array2j struct {
    a []string
    b string

}

func main() {
    ch := make(chan string, 3)
    c2 := make(chan string)
    var queue array2j
    for i:=1; i<=5; i++ {
        go func(i int) {
            time.Sleep(time.Duration(rand.Intn(100)) * time.Microsecond)
            fmt.Println("go func:" + strconv.Itoa(i))
            ch <- strconv.Itoa(i) + "_ch_" + strconv.Itoa(rand.Int())
        }(i)
    }
    for j:=1; j<=2; j++ {
        go func() {
            time.Sleep(1 * time.Second)
            ch <- "c2"
        }()
    }
    //等下,切到其他线程
    time.Sleep(1 * time.Second)
    for {
        select {
        case a,e := <-ch:
            fmt.Println(a,e)
            queue.a = append(queue.a, a)
        case b,e := <-c2:
            fmt.Println(b,e)
            queue.b = b
        }
        //下面这快代码是不报错的关键:当所有信道为空时退出循环
        if len(ch) + len(c2) == 0 {
            fmt.Println("queue", queue)
            break
        }
        //执行不到
        res, err :=  <- ch
        fmt.Println(res, err)
    }

    fmt.Println("hello go!")
}
/**

go func:1
go func:4
go func:2
go func:3
go func:5
1_ch_5577006791947779410 true
4_ch_8674665223082153551 true
2_ch_6129484611666145821 true
3_ch_4037200794235010051 true
5_ch_3916589616287113937 true
c2 true
c2 true
queue {[1_ch_5577006791947779410 4_ch_8674665223082153551 2_ch_6129484611666145821 3_ch_4037200794235010051 5_ch_3916589616287113937 c2 c2] }
hello go!

*/

这里信道ch宽度是3,有5个线程输入-对应将有5个输出,运行流出正常。说明:有缓存、超量时会自动阻塞,当读取完其中数值时,又可以继续写入。

2.应用测试

a、数据库的批量写入

实际操作只有看到线程和异步, 协程是线程的一个异步表现。
准备测试环境,使用php进行建表、生成10w测试数据sql.data的准备略。

package main

import (
    "database/sql"
    _ "github.com/go-sql-driver/mysql"
    "fmt"
    "io"
    "os"
    "path"
    "strconv"
    _ "strings"
    "bufio"
    _ "fmt"
    _ "io"
    _ "io/ioutil"
    "time"
)

func ReadFile(filePath string, handle func(string)) error {
    f, err := os.Open(filePath)
    defer f.Close()
    if err != nil {
        return err
    }
    //255*100 测试行大于4K时读取被截断 [:4K]
    buf := bufio.NewReaderSize(f, 25500)
    for {
        line, _, err := buf.ReadLine()
        statistic.readLine++

        handle(string(line))
        if err != nil {
            if err == io.EOF{
                //fmt.Println( "io.EOF:", err, string(line))
                return nil
            }
            return err
        }
        //return nil
    }
}

func buildQuery(line string){
    if len(line) == 0 {
        //结尾
        query := curSql[:len(curSql)-1]
        fmt.Println( "==> EOF队列:" + strconv.Itoa(statistic.sqlLineNum), line)
        go execQuery(query)
    }else{
        newSql := curSql + "(" + line +"),"

        if len(newSql) > maxSqlLen {
            query := curSql[:len(curSql)-1]
            fmt.Println( "任务队列:" + strconv.Itoa(statistic.sqlLineNum))
            go execQuery(query)
            //回归
            curSql = sqlBuild + "(" + line +"),"
        }else{
            curSql = newSql
        }
    }
}
func execQuery(query string) {
    statistic.sqlLineNum++
    res, err := myDb.Exec(query) //Result
    if err != nil {
        fmt.Println(err.Error()) //显示异常
        panic(err.Error()) //抛出异常
    }
    re, err := res.RowsAffected() //int64, error
    if err != nil {
        fmt.Println(err.Error()) //显示异常
        fmt.Println(err) //抛出异常
    }
    string := strconv.FormatInt(re, 10)

    rows, err := strconv.Atoi(string)
    if err != nil {
        fmt.Println(err) //抛出异常
    }
    channelResult <- rows
}

type statistics struct {
    execDoneNum int
    sqlLineNum int
    chanRecNum int
    readLine int
}

var sqlBuild string
var curSql string
var myDb *sql.DB
var maxSqlLen = 1024*1024*2
var statistic statistics = statistics{0,0,0,0}
//容器mysql的最大连接数是150 (200崩溃)
var channelResult = make(chan int, 20)

func main(){
    var err error
    myDb, err = sql.Open("mysql", "root:123456@tcp(172.1.11.11:3306)/testdb?charset=utf8")
    if err != nil {
        fmt.Println(err.Error()) //显示异常
        panic(err.Error()) //抛出异常
    }
    defer myDb.Close()
    var count int
    rows, err := myDb.Query("SELECT COUNT(id) as count FROM t10_5")
    if err != nil {
        fmt.Println(err.Error()) //显示异常
        panic(err.Error()) //抛出异常
    }
    for rows.Next() {
        rows.Scan(&count)
    }
    fmt.Println(count)
    fmt.Println()

    //初始化sql
    sqlBuild = "INSERT INTO `t10_5` ("
    for i:=1; i<100; i++ {
        sqlBuild += "`field_"+ strconv.Itoa(i) +"`,"
    }
    sqlBuild = sqlBuild[:len(sqlBuild)-1] + ") VALUES "

    pwd, _ := os.Getwd()
    dataPath := path.Join(pwd, "sql.data")
    fmt.Println(dataPath)
    curSql = sqlBuild;
    ReadFile(dataPath, buildQuery)
    time.Sleep(time.Second)
    for {
        x, ok := <- channelResult
        statistic.execDoneNum += x
        statistic.chanRecNum++
        fmt.Println(statistic.sqlLineNum, statistic.execDoneNum, ok, len(channelResult))
        if len(channelResult)==0 {
            print("---------- 完成 ----------")
            fmt.Println(statistic.execDoneNum, statistic.sqlLineNum)
            break
        }
    }
    fmt.Println("chanRecNum=", statistic.chanRecNum, "sqlLineNum=", statistic.sqlLineNum, "readLine=", statistic.readLine, statistic.execDoneNum, len(channelResult))
    var count2 int
    rows, err = myDb.Query("SELECT COUNT(id) as count FROM t10_5")
    if err != nil {
        fmt.Println(err.Error()) //显示异常
        panic(err.Error()) //抛出异常
    }
    for rows.Next() {
        rows.Scan(&count2)
    }
    fmt.Println(count2)
    fmt.Println(count2-count, statistic.execDoneNum, "缺失行:", count2-count-statistic.execDoneNum)
}

/**
...
523 100000 true 0
---------- 完成 ----------100000 523
chanRecNum= 523 sqlLineNum= 523 readLine= 100001 100000 0
100000
100000 100000 缺失行: 0

...
523 100000 true 0
---------- 完成 ----------100000 523
chanRecNum= 523 sqlLineNum= 523 readLine= 100001 100000 0
200000
100000 100000 缺失行: 0

real    1m4.293s
user    0m28.764s
sys    0m1.944s
 */

大量写入在主从库时,会占用大量内存,导致主机多次内存和磁盘空间不足,需要注意。