在现实世界中,经常有一些工作是属于流水线类型的,它们每一个步骤都是紧密关联的,第一步先做什么,再做做么,最后做什么。特别是制造业这个行业,基本全是流水线生产车间。在我们开发中也经常遇到这类的业务场景。

假如我们有个流水线共分三个步骤,分别是 job1、job2和job3。代码:https://play.golang.org/p/e7ZlP9ofXB3

package main

import (
   "fmt"
   "time"
)

func job1(count int) <-chan int {
   outCh := make(chan int, 2)

   go func() {
       defer close(outCh)
       for i := 0; i < count; i++ {
           time.Sleep(time.Second)
           fmt.Println("job1 finish:", 1)
           outCh <- 1
       }
   }()

   return outCh
}

func job2(inCh <-chan int) <-chan int {
   outCh := make(chan int, 2)

   go func() {
       defer close(outCh)
       for val := range inCh {
           // 耗时2秒
           time.Sleep(time.Second * 2)
           val++
           fmt.Println("job2 finish:", val)
           outCh <- val
       }
   }()

   return outCh
}

func job3(inCh <-chan int) <-chan int {
   outCh := make(chan int, 2)

   go func() {
       defer close(outCh)
       for val := range inCh {
           val++
           fmt.Println("job3 finish:", val)
           outCh <- val
       }
   }()

   return outCh
}

func main() {
   t := time.Now()

   firstResult := job1(10)
   secondResult := job2(firstResult)
   thirdResult := job3(secondResult)

   for v := range thirdResult {
       fmt.Println(v)
   }

   fmt.Println("all finish")
   fmt.Println("duration:", time.Since(t).String())
}

输出结果为

job1 finish: 1
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
all finish
duration: 21s

共计计算21秒。主要是因为job2中的耗时太久导致,现在我们的主要任务就是解决掉这个问题了。
这里只用了一个job2来处理job1的结果,如果我们能多开启几个goroutine job2并行处理会不会提升性能呢?

现在我们改进下代码,解决job2耗时的问题,需要注意一下,这里对ch的关闭也要作一下调整,由于启用了多个job2的goroutine,所以在job2内部进行关闭了。代码https://play.golang.org/p/qebQ00v973C

package main

import (
    "fmt"
    "sync"
    "time"
)

func job1(count int) <-chan int {
    outCh := make(chan int, 2)

    go func() {
        defer close(outCh)
        for i := 0; i < count; i++ {
            time.Sleep(time.Second)
            fmt.Println("job1 finish:", 1)
            outCh <- 1
        }
    }()

    return outCh
}

func job2(inCh <-chan int) <-chan int {
    outCh := make(chan int, 2)

    go func() {
        defer close(outCh)
        for val := range inCh {
            // 耗时2秒
            time.Sleep(time.Second * 2)
            val++
            fmt.Println("job2 finish:", val)
            outCh <- val
        }
    }()

    return outCh
}

func job3(inCh <-chan int) <-chan int {
    outCh := make(chan int, 2)

    go func() {
        defer close(outCh)
        for val := range inCh {
            val++
            fmt.Println("job3 finish:", val)
            outCh <- val
        }
    }()

    return outCh
}

func merge(inCh ...<-chan int) <-chan int {
    outCh := make(chan int, 2)

    var wg sync.WaitGroup
    for _, ch := range inCh {
        wg.Add(1)
        go func(wg *sync.WaitGroup, in <-chan int) {
            defer wg.Done()
            for val := range in {
                outCh <- val
            }
        }(&wg, ch)
    }

    // 重要注意,wg.Wait() 一定要在goroutine里运行,否则会引起deadlock
    go func() {
        wg.Wait()
        close(outCh)
    }()

    return outCh
}

func main() {
    t := time.Now()

    firstResult := job1(10)

    // 拆分成三个job2,即3个goroutine (扇出)
    secondResult1 := job2(firstResult)
    secondResult2 := job2(firstResult)
    secondResult3 := job2(firstResult)

    // 合并结果(扇入)
    secondResult := merge(secondResult1, secondResult2, secondResult3)

    thirdResult := job3(secondResult)

    for v := range thirdResult {
        fmt.Println(v)
    }

    fmt.Println("all finish")
    fmt.Println("duration:", time.Since(t).String())
}

输出结果

job1 finish: 1
job1 finish: 1
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job1 finish: 1
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
job2 finish: 2
job3 finish: 3
3
all finish
duration: 12s

可以看到,性能提升了90%,由原来的22s减少到12s。上面代码中为了演示效果,使用的缓冲chan很小,如果调大的话,性能更明显。

FAN-OUT模式:多个goroutine从同一个通道读取数据,直到该通道关闭。OUT是一种张开的模式(1对多),所以又被称为扇出,可以用来分发任务。

FAN-IN模式:1个goroutine从多个通道读取数据,直到这些通道关闭。IN是一种收敛的模式(多对1),所以又被称为扇入,用来收集处理的结果。

是不是很像扇子的状态, 先展开(扇出)再全并(扇入)。

总结:在类似流水线这类的逻辑中,我们可以使用FAN-IN和FAN-OUT模式来提升程序性能。

参考