我正在在线阅读管道教程,并试图构建一个像这样运行的阶段-

  • 将传入事件分批发送,每组10个,然后将其发送到外出通道
  • 如果我们在5秒钟内没有看到10个事件,请结合我们收到的事件并发送,然后关闭chan并返回。
  • 但是,我不知道第一个选择的情况会是什么样子。
    任何指针,不胜感激!

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    func BatchEvents(inChan <- chan *Event) <- chan *Event {
        batchSize := 10
        comboEvent := Event{}
        go func() {
            defer close(out)
            i = 0
            for event := range inChan {
                select {
                case -WHAT GOES HERE?-:
                    if i < batchSize {
                        comboEvent.data = append(comboEvent.data, event.data)
                        i++;
                    } else {
                        out <- &comboEvent
                        // reset for next batch
                        comboEvent = Event{}
                        i=0;
                    }
                case <-time.After(5 * time.Second):
                    // process whatever we have seen so far if the batch size isn't filled in 5 secs
                    out <- &comboEvent
                    // stop after
                    return
                }
            }
        }()
        return out
    }

    首先选择的情况应该是从该通道开始,而不是在通道范围内进行,整个过程都在无限循环内。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    func BatchEvents(inChan <-chan *Event) <-chan *Event {
        batchSize := 10
        comboEvent := Event{}
        go func() {
            defer close(out)
            i = 0
            for {
                select {
                case event, ok := <-inChan:
                    if !ok {
                        return
                    }
                    comboEvent.data = append(comboEvent.data, event.data)
                    i++
                    if i == batchSize {
                        out <- &comboEvent
                        // reset for next batch
                        comboEvent = Event{}
                        i = 0
                    }
                case <-time.After(5 * time.Second):
                    // process whatever we have seen so far if the batch size isn't filled in 5 secs
                    if i > 0 {
                        out <- &comboEvent
                    }
                    // stop after
                    return
                }
            }
        }()
        return out
    }
    • 请注意,select并非按优先级排序。您必须人为地产生优先级为select的集合。见stackoverflow.com/questions/11117382/…