事件驱动架构是计算机科学中一种高度可扩展的范例。它允许我们可以多方系统异步处理事件。

事件总线是发布/订阅模式的实现,其中发布者发布数据,并且感兴趣的订阅者可以监听这些数据并基于这些数据作出处理。这使发布者与订阅者松耦合。发布者将数据事件发布到事件总线,总线负责将它们发送给订阅者。

传统的实现事件总线的方法会涉及到使用回调。订阅者通常实现接口,然后事件总线通过接口传播数据。

channelchannel
我们专注于 基于主题(topic)的事件。发布者发布到主题,订阅者可以收听它们。

定义数据结构

structDataEvent
type DataEvent struct {
   Data interface{}
   Topic string
}

在这里,我们已经将基础数据定义为接口,这意味着它可以是任何值。我们还将主题定义为结构的成员。订阅者可能会收听多个主题,因此,我们通过主题来让订阅者可以区分不同的事件的做法是不错的。

介绍 channels

DataEventDataChannel
// DataChannel 是一个能接收 DataEvent 的 channel
type DataChannel chan DataEvent

// DataChannelSlice 是一个包含 DataChannels 数据的切片
type DataChannelSlice [] DataChannel
DataChannelSliceDataChannel

事件总线

// EventBus 存储有关订阅者感兴趣的特定主题的信息
type EventBus struct {
   subscribers map[string]DataChannelSlice
   rm sync.RWMutex
}
EventBussubscribersDataChannelSlices
maptopicsmapchannel

订阅主题

channelchannel
func (eb *EventBus)Subscribe(topic string, ch DataChannel)  {
   eb.rm.Lock()
   if prev, found := eb.subscribers[topic]; found {
      eb.subscribers[topic] = append(prev, ch)
   } else {
      eb.subscribers[topic] = append([]DataChannel{}, ch)
   }
   eb.rm.Unlock()
}
channel

发布主题

要发布事件,发布者需要提供广播给订阅者所需要的主题和数据。

func (eb *EventBus) Publish(topic string, data interface{}) {
   eb.rm.RLock()
   if chans, found := eb.subscribers[topic]; found {
      // 这样做是因为切片引用相同的数组,即使它们是按值传递的
      // 因此我们正在使用我们的元素创建一个新切片,从而能正确地保持锁定
      channels := append(DataChannelSlice{}, chans...)
      go func(data DataEvent, dataChannelSlices DataChannelSlice) {
         for _, ch := range dataChannelSlices {
            ch <- data
         }
      }(DataEvent{Data: data, Topic: topic}, channels)
   }
   eb.rm.RUnlock()
}
channel
请注意,我们在发布方法中使用了 Goroutine 来避免阻塞发布者

开始

EventBus
var eb = &EventBus{
   subscribers: map[string]DataChannelSlice{},
}

为了测试新创建的事件总线,我们将创建一个以随机间隔时间发布到指定主题的方法。

func publisTo(topic string, data string)  {
   for {
      eb.Publish(topic, data)
      time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
   }
}

接下来,我们需要一个可以收听主题的 main 函数。它使用辅助方法打印出事件的数据。

func printDataEvent(ch string, data DataEvent)  {
   fmt.Printf("Channel: %s; Topic: %s; DataEvent: %vn", ch, data.Topic, data.Data)
}
func main()  {
   ch1 := make(chan DataEvent)
   ch2 := make(chan DataEvent)
   ch3 := make(chan DataEvent)
   eb.Subscribe("topic1", ch1)
   eb.Subscribe("topic2", ch2)
   eb.Subscribe("topic2", ch3)
   go publisTo("topic1", "Hi topic 1")
   go publisTo("topic2", "Welcome to topic 2")
   for {
      select {
      case d := <-ch1:
         go printDataEvent("ch1", d)
      case d := <-ch2:
         go printDataEvent("ch2", d)
      case d := <-ch3:
         go printDataEvent("ch3", d)
      }
   }
}
channels
channel

示例输出将如下所示

Channel: ch1; Topic: topic1; DataEvent: Hi topic 1
Channel: ch2; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch2; Topic: topic2; DataEvent: Welcome to topic 2
Channel: ch1; Topic: topic1; DataEvent: Hi topic 1
Channel: ch3; Topic: topic2; DataEvent: Welcome to topic 2
...
channel
channel

完整的代码

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type DataEvent struct {
    Data  interface{}
    Topic string
}

// DataChannel 是一个能接收 DataEvent 的 channel
type DataChannel chan DataEvent

// DataChannelSlice 是一个包含 DataChannels 数据的切片
type DataChannelSlice []DataChannel

// EventBus 存储有关订阅者感兴趣的特定主题的信息
type EventBus struct {
    subscribers map[string]DataChannelSlice
    rm          sync.RWMutex
}

func (eb *EventBus) Publish(topic string, data interface{}) {
    eb.rm.RLock()
    if chans, found := eb.subscribers[topic]; found {
        // 这样做是因为切片引用相同的数组,即使它们是按值传递的
        // 因此我们正在使用我们的元素创建一个新切片,从而正确地保持锁定
        channels := append(DataChannelSlice{}, chans...)
        go func(data DataEvent, dataChannelSlices DataChannelSlice) {
            for _, ch := range dataChannelSlices {
                ch <- data
            }
        }(DataEvent{Data: data, Topic: topic}, channels)
    }
    eb.rm.RUnlock()
}

func (eb *EventBus) Subscribe(topic string, ch DataChannel) {
    eb.rm.Lock()
    if prev, found := eb.subscribers[topic]; found {
        eb.subscribers[topic] = append(prev, ch)
    } else {
        eb.subscribers[topic] = append([]DataChannel{}, ch)
    }
    eb.rm.Unlock()
}

var eb = &EventBus{
    subscribers: map[string]DataChannelSlice{},
}

func printDataEvent(ch string, data DataEvent) {
    fmt.Printf("Channel: %s; Topic: %s; DataEvent: %vn", ch, data.Topic, data.Data)
}

func publisTo(topic string, data string) {
    for {
        eb.Publish(topic, data)
        time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    }
}

func main() {
    ch1 := make(chan DataEvent)
    ch2 := make(chan DataEvent)
    ch3 := make(chan DataEvent)

    eb.Subscribe("topic1", ch1)
    eb.Subscribe("topic2", ch2)
    eb.Subscribe("topic2", ch3)

    go publisTo("topic1", "Hi topic 1")
    go publisTo("topic2", "Welcome to topic 2")

    for {
        select {
        case d := <-ch1:
            go printDataEvent("ch1", d)
        case d := <-ch2:
            go printDataEvent("ch2", d)
        case d := <-ch3:
            go printDataEvent("ch3", d)
        }
    }
}

使用 channel 取代回调的理由

传统的回调方式要求实现某种接口。

例如,

type Subscriber interface {
   onData(event Event)
}

使用回调的话,如果你想订阅一个事件,你需要实现该接口,以便事件总线可以传播它。

type MySubscriber struct {
}
func (m MySubscriber) onData(event Event)  {
   // 处理事件
}
channel
func main() {
   ch1 := make(chan DataEvent)
   eb.Subscribe("topic1", ch1)
   fmt.Println((<-ch1).Data)
   ...
}

结论

本文的目的是指出编写事件总线的不同实现方法。

这可能不是理想的解决方案。
channel
我已经使用切片来存储主题的所有订阅者。这用于简化文章。这需要用 SET 替换,以至于列表中不存在重复的订阅者。

传统的回调方法可以使用提供的相同的原理去简单地实现。你可以轻松地在 Goroutine 中进行异步装饰发布事件。

我很想听听你对这篇文章的看法。

04c7494636e5853723f0b5ee2788cbf3.png