当使用复杂的分布式系统时,可能会遇到并发处理的需求。我们知道golang的协程是处理并发的利器之一,加上Golang为静态类型和编译型使得其在企业中使用越来越广泛。Mode.net公司系统每天要处理实时,快速和灵活的以毫秒为单位动态路由数据包的全球专用网络和数据,需要高度并发的系统,而他们的动态路由就是使用Golang来构建的,本文我们介绍Mode.net在Golang构建分布式动态路由系统时的经验教训。
并发探测链接指标Mode.net的路由系统称为HALO,是Hop-by-Hop Adaptive Link-State Optimal Routing(逐跳自适应链路状态最佳路由)的前缀字母简称。动态路由算法部分依赖于链路度量来计算路由表。这些指标由位于每个PoP(存活节点)上的独立组件收集。PoP是代表网络中单个路由实体的机器,它们通过链接连接并分布在形成Mode网络的多个位置。组件使用网络数据包探测临近的主机,这些邻居将回复数据包给探测。链路等待的时间值回复包中得到。由于每个PoP都会有一个以上的邻居,因此这种探测任务的本质是并发的,需要实时测量每个邻居链路的延迟。为了计算此指标,无法使用顺序处理,必须尽快处理每个探针。
序列号和重置探测组件交换数据包并依靠序列号进行数据包处理。旨在避免处理分组重复或乱序分组。HALO的第一个实现依靠特殊的序列号0来重置序列号。这样的数字仅在组件初始化期间使用。主要问题是考虑一个始终从0开始的递增序列号值,组件重新启动后,可能会发生数据包重新排序,并且数据包可以轻松地用重置之前使用的值替换序列号。这样随后的数据包将被忽略,直接复位之前使用的序列号。
UDP握手和有限状态机有一个问题是组件重新启动后序列号是否正确一致。有几种方法可以解决此问题,在讨论了可能的选项之后,HALO选择实现带有清晰状态定义的三向握手协议。该握手在初始化期间通过链接建立会话。这样可以确保节点通过同一会话进行通信并为其使用适当的序列号。为了正确实现这一点,必须定义一个具有清晰状态和过渡的有限状态机,这样就能够正确管理所有握手形成的极端情况。
会话ID由握手初始化程序生成。完整的交换顺序如下:
1.发送方发送一个SYN(ID)数据包。
2.接收器存储接收到的ID并发送SYN-ACK(ID)。
3.发送方接收SYN-ACK(ID)并发出ACK(ID)。它还开始发送从序列号0开始的数据包。
4.接收器检查最后收到的ID,如果ID匹配,则接受ACK(ID)。它还开始接受序列号为0的数据包。
处理状态超时基本上,在每种状态下,最多都需要处理三种类型的事件:链接事件,数据包事件和超时事件。这些事件会同时显示,因此必须正确处理并发。
链接事件是链接更新或链接更新。这可以启动链接会话或中断现有会话。
数据包事件是控制数据包(SYN/SYN-ACK/ACK)或只是探测响应。
超时事件是针对当前会话状态的预定超时到期后触发的事件。
这方面主要挑战是如何处理并发超时到期和其他事件。这是一个容易陷入僵局和竞争状况陷阱的地方。
第一种方法:HALO项目使用的语言是Golang。它确实提供了本机同步机制,例如本机通道和锁,并且能够使用轻量级线程(协程)以进行并发处理。
具体处理过程:
首先,设计一个代表会话和超时处理程序的数据结构。
type Session struct {
State SessionState
Id SessionId
RemoteIp string
}
type TimeoutHandler struct {
callback func(Session)
session Session
duration int
timer *timer.Timer
}
会话数据结构使用会话ID,相邻链路IP和当前会话状态来标识连接会话。
TimeoutHandler包含回调函数,session表示任务运行的会话,持续时间(duration)以及指向已调度计时器的timer指针。
有一个全局映射,该映射将为每个相邻的链接会话存储计划的超时处理程序。
SessionTimeout map[Session]*TimeoutHandler
通过以下方法可以注册和取消超时:
// schedules the timeout callback function.
func (timeout* TimeoutHandler) Register() {
timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() {
timeout.callback(timeout.session)
})
}
对于超时的创建和存储,可以使用如下方法:
func CreateTimeoutHandler(callback func(Session), session Session, duration int) *TimeoutHandler {
if sessionTimeout[session] == nil {
sessionTimeout[session] := new(TimeoutHandler)
}
timeout = sessionTimeout[session]
timeout.session = session
timeout.callback = callback
timeout.duration = duration
return timeout
}
一旦创建并注册了超时处理程序,它就会在持续时间秒数之后运行回调。但是,某些事件将要求重新安排超时处理程序(在SYN状态下发生,即每3秒一次)。
为此,可以让回调函数重新安排新的超时:
func synCallback(session Session) {
sendSynPacket(session)
// reschedules the same callback.
newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION)
newTimeout.Register()
sessionTimeout[state] = newTimeout
}
该回调将在新的超时处理程序中重新安排时间,并更新全局sessionTimeout映射。
数据竞争和引用一个简单的测试是检查计时器到期后是否执行了超时回调。为此,注册一个超时,在其持续时间内休眠,然后检查回调操作是否已完成。执行测试后,最好取消预定的超时时间,因此不会在测试之间产生副作用。令人惊讶的是,这个简单的测试在发现了解决方案中的一个错误。使用cancel方法取消超时没有完成其工作。以下事件顺序将导致数据争用情况:
1.有一个计划的超时处理程序。
2.线程1:
a)收到一个控制数据包,现在要取消注册的超时并进入下一个会话状态。 (例如,发送了SYN后收到了SYN-ACK)。
b)调用timeout.Cancel(),它调用了timer.Stop()。(请注意,Golang计时器停止不会阻止已过期的计时器运行。)
3.线程2:
a)在该取消调用之前,计时器已到期,并且回调即将执行。
b)执行回调,它计划新的超时并更新全局映射。
4.线程1:
a)转换到新的会话状态并注册新的超时,从而更新全局映射。
两个线程正在同时更新超时映射。最终结果是无法取消已注册的超时,然后又丢失了对线程2完成的重新安排的超时的引用。这导致处理程序在一段时间内继续执行和重新安排,并执行了非预期的行为。
锁也解决不了问题使用锁也不能完全解决问题。如果在处理任何事件之前和执行回调之前添加了锁,它仍然不能阻止过期的回调运行:
func (timeout* TimeoutHandler) Register() {
timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time._Second_, func() {
stateLock.Lock()
defer stateLock.Unlock()
timeout.callback(timeout.session)
})
}
和无锁的区别是全局映射中的更新是同步的,但这不能阻止在调用超时后运行timeout.Cancel(),如果计划的计时器已过期但未抓住锁,则情况如此然而。
使用Cancel通道可以使用cancel通道,而不必依赖timer.Stop()(不会阻止到期的计时器执行),
这是一个略有不同的方法。这样可以将不再通过回调进行递归重新安排,而会注册一个无限循环,等待cancel信号或超时事件。
新的Register产生一个新的go线程,该线程在超时后运行回调,并在执行前一个超时后安排新的超时。cancel通道返回给调用方,以控制循环应在何时停止。
func (timeout *TimeoutHandler) Register() chan struct{} {
cancelChan := make(chan struct{})
go func () {
select {
case _ =
return
case _ =
func () {
stateLock.Lock()
defer stateLock.Unlock()
timeout.callback(timeout.session)
} ()
}
} ()
return cancelChan
}
func (timeout* TimeoutHandler) Cancel() {
if timeout.cancelChan == nil {
return
}
timeout.cancelChan
}
这种方法为注册的每个超时提供了一个cancel通道。取消调用将一个空结构发送到通道并触发取消。但是,这也不能解决先前的问题;超时可能会在通过通道调用Cancel之前以及超时线程获取锁之前到期。
对应的解决方案是在锁之后检查超时范围内的cacel通道。
case _ =
func () {
stateLock.Lock()
defer stateLock.Unlock()
select {
case _ =
return
default:
timeout.callback(timeout.session)
}
} ()
}
最后,这可以确保仅在遇到锁之后才执行回调,并且不会触发取消。
死锁此解决方案似乎有效;但是存在一个潜在的隐患——死锁。
仔细检查代码,考虑并发调用的方法。问题在cancel通道本身。我们将其设置为无缓冲通道,这意味着其发送是阻塞调用。在超时处理程序中调用"取消"后,只有在该处理程序被取消后才能继续操作。这里的问题是,当有多个调用到同一取消通道时,取消请求仅使用一次。如果并发事件要取消相同的超时处理程序,例如链接断开或控制数据包事件,则很容易发生这种情况。这将导致死锁,可能会使应用程序停止。
应对该死锁问题的解决方案是让通道缓冲一下,让发送并不总是阻塞,并且在并发调用的情况下显式使发送变为非阻塞。这样可以确保取消发送一次,并且不会阻止后续的取消调用。
func (timeout* TimeoutHandler) Cancel() {
if timeout.cancelChan == nil {
return
}
select {
case timeout.cancelChan
default:
// can't send on the channel, someone has already requested the cancellation.
}
}
结论实践中了解了在使用并发代码时出现常见的常见错误。由于其不确定性,即使进行大量测试,也很容易发现这些问题。这是HALO在实现中遇到的三个主要问题:
在不同步的情况下更新共享数据这似乎很明显,但是如果同时进行的更新发生在不同的位置,则实际上很难发现。结果是数据竞争,由于一个更新会覆盖另一个更新,对同一数据的多次更新可能导致更新丢失。在HALO中,正在更新同一共享映射上的计划超时参考。(有趣的是,如果Go在同一个Map对象上检测到并发读/写操作,会引发致命错误,可以尝试运行Go的数据竞争检测器)。最终会导致丢失超时引用,并且无法取消给定的超时。不要是可以使用锁。
缺少条件检查在不能仅依靠锁独占性的情况下,需要进行条件检查。想象一个经典的场景,有一个生产者和多个消费者使用一个共享队列。生产者可以将一项添加到队列中,并唤醒所有消费者。唤醒调用意味着队列中有一些数据可用,并且由于队列是共享的,因此必须通过锁来同步访问。每个消费者都有机会遇到锁;但是,仍然需要检查队列中是否有项目。需要进行条件检查,因为当遇到锁时还不知道队列状态。
在HALO中,超时处理程序收到了来自计时器到期的"唤醒"调用,但是它仍需要检查是否已向其发送了取消信号,然后才能继续执行回调。
死锁当一个线程被卡住,无限期地等待一个信号唤醒时,就会发生这种情况,但是这个信号永远不会到达。
在HALO中,由于多次发送调用到一个非缓冲且阻塞的通道导致死锁,这样仅在同一通道上完成接收后,发送调用才会返回。超时线程循环迅速在取消通道上接收信号;但是,在接收到第一个信号后,它将中断环路,并且再也不会从该通道读取数据。其余的调用将会被卡住。
为避免这种情况,需要仔细检查代码,谨慎处理阻塞调用,并确保不会发生线程饥饿。HALO中解决方法是使取消调用成为非阻塞调用,因为不需要阻塞调用。