当使用复杂的分布式系统时,可能会遇到并发处理的需求。我们知道golang的协程是处理并发的利器之一,加上Golang为静态类型和编译型使得其在企业中使用越来越广泛。Mode.net公司系统每天要处理实时,快速和灵活的以毫秒为单位动态路由数据包的全球专用网络和数据,需要高度并发的系统,而他们的动态路由就是使用Golang来构建的,本文我们介绍Mode.net在Golang构建分布式动态路由系统时的经验教训。

并发探测链接指标

Mode.net的路由系统称为HALO,是Hop-by-Hop Adaptive Link-State Optimal Routing(逐跳自适应链路状态最佳路由)的前缀字母简称。动态路由算法部分依赖于链路度量来计算路由表。这些指标由位于每个PoP(存活节点)上的独立组件收集。PoP是代表网络中单个路由实体的机器,它们通过链接连接并分布在形成Mode网络的多个位置。组件使用网络数据包探测临近的主机,这些邻居将回复数据包给探测。链路等待的时间值回复包中得到。由于每个PoP都会有一个以上的邻居,因此这种探测任务的本质是并发的,需要实时测量每个邻居链路的延迟。为了计算此指标,无法使用顺序处理,必须尽快处理每个探针。

序列号和重置

探测组件交换数据包并依靠序列号进行数据包处理。旨在避免处理分组重复或乱序分组。HALO的第一个实现依靠特殊的序列号0来重置序列号。这样的数字仅在组件初始化期间使用。主要问题是考虑一个始终从0开始的递增序列号值,组件重新启动后,可能会发生数据包重新排序,并且数据包可以轻松地用重置之前使用的值替换序列号。这样随后的数据包将被忽略,直接复位之前使用的序列号。

UDP握手和有限状态机

有一个问题是组件重新启动后序列号是否正确一致。有几种方法可以解决此问题,在讨论了可能的选项之后,HALO选择实现带有清晰状态定义的三向握手协议。该握手在初始化期间通过链接建立会话。这样可以确保节点通过同一会话进行通信并为其使用适当的序列号。为了正确实现这一点,必须定义一个具有清晰状态和过渡的有限状态机,这样就能够正确管理所有握手形成的极端情况。

会话ID由握手初始化程序生成。完整的交换顺序如下:

1.发送方发送一个SYN(ID)数据包。

2.接收器存储接收到的ID并发送SYN-ACK(ID)。

3.发送方接收SYN-ACK(ID)并发出ACK(ID)。它还开始发送从序列号0开始的数据包。

4.接收器检查最后收到的ID,如果ID匹配,则接受ACK(ID)。它还开始接受序列号为0的数据包。

处理状态超时

基本上,在每种状态下,最多都需要处理三种类型的事件:链接事件,数据包事件和超时事件。这些事件会同时显示,因此必须正确处理并发。

链接事件是链接更新或链接更新。这可以启动链接会话或中断现有会话。

数据包事件是控制数据包(SYN/SYN-ACK/ACK)或只是探测响应。

超时事件是针对当前会话状态的预定超时到期后触发的事件。

这方面主要挑战是如何处理并发超时到期和其他事件。这是一个容易陷入僵局和竞争状况陷阱的地方。

第一种方法:HALO项目使用的语言是Golang。它确实提供了本机同步机制,例如本机通道和锁,并且能够使用轻量级线程(协程)以进行并发处理。

具体处理过程:

首先,设计一个代表会话和超时处理程序的数据结构。

type Session struct {

State SessionState

Id SessionId

RemoteIp string

}

type TimeoutHandler struct {

callback func(Session)

session Session

duration int

timer *timer.Timer

}

会话数据结构使用会话ID,相邻链路IP和当前会话状态来标识连接会话。

TimeoutHandler包含回调函数,session表示任务运行的会话,持续时间(duration)以及指向已调度计时器的timer指针。

有一个全局映射,该映射将为每个相邻的链接会话存储计划的超时处理程序。

SessionTimeout map[Session]*TimeoutHandler

通过以下方法可以注册和取消超时:

// schedules the timeout callback function.

func (timeout* TimeoutHandler) Register() {

timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() {

timeout.callback(timeout.session)

})

}

对于超时的创建和存储,可以使用如下方法:

func CreateTimeoutHandler(callback func(Session), session Session, duration int) *TimeoutHandler {

if sessionTimeout[session] == nil {

sessionTimeout[session] := new(TimeoutHandler)

}

timeout = sessionTimeout[session]

timeout.session = session

timeout.callback = callback

timeout.duration = duration

return timeout

}

一旦创建并注册了超时处理程序,它就会在持续时间秒数之后运行回调。但是,某些事件将要求重新安排超时处理程序(在SYN状态下发生,即每3秒一次)。

为此,可以让回调函数重新安排新的超时:

func synCallback(session Session) {

sendSynPacket(session)

// reschedules the same callback.

newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION)

newTimeout.Register()

sessionTimeout[state] = newTimeout

}

该回调将在新的超时处理程序中重新安排时间,并更新全局sessionTimeout映射。

数据竞争和引用

一个简单的测试是检查计时器到期后是否执行了超时回调。为此,注册一个超时,在其持续时间内休眠,然后检查回调操作是否已完成。执行测试后,最好取消预定的超时时间,因此不会在测试之间产生副作用。令人惊讶的是,这个简单的测试在发现了解决方案中的一个错误。使用cancel方法取消超时没有完成其工作。以下事件顺序将导致数据争用情况:

1.有一个计划的超时处理程序。

2.线程1:

a)收到一个控制数据包,现在要取消注册的超时并进入下一个会话状态。 (例如,发送了SYN后收到了SYN-ACK)。

b)调用timeout.Cancel(),它调用了timer.Stop()。(请注意,Golang计时器停止不会阻止已过期的计时器运行。)

3.线程2:

a)在该取消调用之前,计时器已到期,并且回调即将执行。

b)执行回调,它计划新的超时并更新全局映射。

4.线程1:

a)转换到新的会话状态并注册新的超时,从而更新全局映射。

两个线程正在同时更新超时映射。最终结果是无法取消已注册的超时,然后又丢失了对线程2完成的重新安排的超时的引用。这导致处理程序在一段时间内继续执行和重新安排,并执行了非预期的行为。

锁也解决不了问题

使用锁也不能完全解决问题。如果在处理任何事件之前和执行回调之前添加了锁,它仍然不能阻止过期的回调运行:

func (timeout* TimeoutHandler) Register() {

timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time._Second_, func() {

stateLock.Lock()

defer stateLock.Unlock()

timeout.callback(timeout.session)

})

}

和无锁的区别是全局映射中的更新是同步的,但这不能阻止在调用超时后运行timeout.Cancel(),如果计划的计时器已过期但未抓住锁,则情况如此然而。

使用Cancel通道

可以使用cancel通道,而不必依赖timer.Stop()(不会阻止到期的计时器执行),

这是一个略有不同的方法。这样可以将不再通过回调进行递归重新安排,而会注册一个无限循环,等待cancel信号或超时事件。

新的Register产生一个新的go线程,该线程在超时后运行回调,并在执行前一个超时后安排新的超时。cancel通道返回给调用方,以控制循环应在何时停止。

func (timeout *TimeoutHandler) Register() chan struct{} {

cancelChan := make(chan struct{})

go func () {

select {

case _ =

return

case _ =

func () {

stateLock.Lock()

defer stateLock.Unlock()

timeout.callback(timeout.session)

} ()

}

} ()

return cancelChan

}

func (timeout* TimeoutHandler) Cancel() {

if timeout.cancelChan == nil {

return

}

timeout.cancelChan

}

这种方法为注册的每个超时提供了一个cancel通道。取消调用将一个空结构发送到通道并触发取消。但是,这也不能解决先前的问题;超时可能会在通过通道调用Cancel之前以及超时线程获取锁之前到期。

对应的解决方案是在锁之后检查超时范围内的cacel通道。

case _ =

func () {

stateLock.Lock()

defer stateLock.Unlock()

select {

case _ =

return

default:

timeout.callback(timeout.session)

}

} ()

}

最后,这可以确保仅在遇到锁之后才执行回调,并且不会触发取消。

死锁

此解决方案似乎有效;但是存在一个潜在的隐患——死锁。

仔细检查代码,考虑并发调用的方法。问题在cancel通道本身。我们将其设置为无缓冲通道,这意味着其发送是阻塞调用。在超时处理程序中调用"取消"后,只有在该处理程序被取消后才能继续操作。这里的问题是,当有多个调用到同一取消通道时,取消请求仅使用一次。如果并发事件要取消相同的超时处理程序,例如链接断开或控制数据包事件,则很容易发生这种情况。这将导致死锁,可能会使应用程序停止。

应对该死锁问题的解决方案是让通道缓冲一下,让发送并不总是阻塞,并且在并发调用的情况下显式使发送变为非阻塞。这样可以确保取消发送一次,并且不会阻止后续的取消调用。

func (timeout* TimeoutHandler) Cancel() {

if timeout.cancelChan == nil {

return

}

select {

case timeout.cancelChan

default:

// can't send on the channel, someone has already requested the cancellation.

}

}

结论

实践中了解了在使用并发代码时出现常见的常见错误。由于其不确定性,即使进行大量测试,也很容易发现这些问题。这是HALO在实现中遇到的三个主要问题:

在不同步的情况下更新共享数据

这似乎很明显,但是如果同时进行的更新发生在不同的位置,则实际上很难发现。结果是数据竞争,由于一个更新会覆盖另一个更新,对同一数据的多次更新可能导致更新丢失。在HALO中,正在更新同一共享映射上的计划超时参考。(有趣的是,如果Go在同一个Map对象上检测到并发读/写操作,会引发致命错误,可以尝试运行Go的数据竞争检测器)。最终会导致丢失超时引用,并且无法取消给定的超时。不要是可以使用锁。

缺少条件检查

在不能仅依靠锁独占性的情况下,需要进行条件检查。想象一个经典的场景,有一个生产者和多个消费者使用一个共享队列。生产者可以将一项添加到队列中,并唤醒所有消费者。唤醒调用意味着队列中有一些数据可用,并且由于队列是共享的,因此必须通过锁来同步访问。每个消费者都有机会遇到锁;但是,仍然需要检查队列中是否有项目。需要进行条件检查,因为当遇到锁时还不知道队列状态。

在HALO中,超时处理程序收到了来自计时器到期的"唤醒"调用,但是它仍需要检查是否已向其发送了取消信号,然后才能继续执行回调。

死锁

当一个线程被卡住,无限期地等待一个信号唤醒时,就会发生这种情况,但是这个信号永远不会到达。

在HALO中,由于多次发送调用到一个非缓冲且阻塞的通道导致死锁,这样仅在同一通道上完成接收后,发送调用才会返回。超时线程循环迅速在取消通道上接收信号;但是,在接收到第一个信号后,它将中断环路,并且再也不会从该通道读取数据。其余的调用将会被卡住。

为避免这种情况,需要仔细检查代码,谨慎处理阻塞调用,并确保不会发生线程饥饿。HALO中解决方法是使取消调用成为非阻塞调用,因为不需要阻塞调用。