一、分布式 id 生成器

在高并发场景中,通常需要类似 MySQL 自增 id 一样不断增长且不会重复的 id。

比如某电商双 11 时,在 0:00 开始,会有千万到亿级的订单涌入,每秒要处理 10w+ 的订单。在将订单插入数据库前,我们需要给订单一个唯一的 id 再插入数据库内。也正因为订单量大,一个无意义的纯数字 id 在对数据库进行增删改查时不能起到优化作用。此 id 内应该包含一些时间信息,这样即使后端的系统对消息进行了分库分表,也能够以时间顺序对这些消息进行排序。

Twitter 的 snowflake 算法是这种场景下的一个典型解法,原理如图:

分布式id

首先确定的是,id 数值长度是64位,int64 类型,被分为四个部分(不含开头的符号 / unused ):

1111,1111,11110
4096

数据中心 id 加上实例 id 共有 10 位,每个数据中心可以部署 32 台实例,搭建 32 个数据中心,所以可以一共部署 1024 台实例。

41 位的时间戳(毫秒为单位)能够使用 69 年。

1 worker_id 分配

timestampdatacenter_idworker_idsequence_idtimestampsequence_iddatacenter_idworker_id
datacenter_id
mysql> insert into a (ip) values("10.1.2.101");
Query OK, 1 row affected (0.00 sec)

mysql> select last_insert_id();
+------------------+
| last_insert_id() |
+------------------+
|                2 |
+------------------+
1 row in set (0.00 sec)
worker_idworker_idworker_idworker_id

当然,使用 MySQL 相当于给我们简单的 id 生成服务增加了一个外部依赖。依赖越多,我们的服务的可运维性就越差。

worker_idworker_id

2 开源实例

2.1 标准 snowflake

github.com/bwmarrin/snowflake

snowflake

此库和标准的 snowflake 实现方式全完一致,使用也比较简单:

package main

import (
    // 示例代码只导入提供核心功能的package,其他内置package自行导入,下同
    "github.com/bwmarrin/snowflake"
)

func main() {
    node, err := snowflake.NewNode(1)
    if err != nil {
        println(err.Error())
        os.Exit(1)
    }

    for i := 0; i < 20; i++ {
        id := node.Generate()

        fmt.Printf("Int64  ID: %d\n", id)
        fmt.Printf("String ID: %s\n", id)
        fmt.Printf("Base2  ID: %s\n", id.Base2())
        fmt.Printf("Base64 ID: %s\n", id.Base64())

        fmt.Printf("ID Time  : %d\n", id.Time())

        fmt.Printf("ID Node  : %d\n", id.Node())

        fmt.Printf("ID Step  : %d\n", id.Step())

        fmt.Println("---------------------------------------------")
    }

}

这个库因为是单文件,所以也方便定制,其源码本身就预留了一些可定制字段:

var (
    // Epoch is set to the twitter snowflake epoch of Nov 04 2010 01:42:54 UTC in milliseconds
    // You may customize this to set a different epoch for your application.
    Epoch int64 = 1288834974657

    // NodeBits holds the number of bits to use for Node
    // Remember, you have a total 22 bits to share between Node/Step
    NodeBits uint8 = 10

    // StepBits holds the number of bits to use for Step
    // Remember, you have a total 22 bits to share between Node/Step
    StepBits uint8 = 12
)
EpochNodeBitsStepBits

2.2 sonyflake

sonyflake 同样是受 Twitter 的 snowflake 启发而来,但 sonyflake 侧重于多主机/实例的生命周期和性能,所以与 snowflake 使用不同的位分配:

snoyflake

sonyflake 的优点和缺点:

  • 比 snowflake 更长的生命周期,174年 > 69年
  • 能运行在更多的实例上,$2^{16}$ > $2^{10}$
  • 生成 ID 速度比 snowflake 慢,10 毫秒内最多生成 $2^{8}$ 个(snowflake 1 毫秒内最多生成 $2^{12}$ 个)

sonyflake 在启动阶段需要配置参数:

func NewSonyflake(st Settings) *Sonyflake
Settings
type Settings struct {
    StartTime      time.Time
    MachineID      func() (uint16, error)
    CheckMachineID func(uint16) bool
}
StartTime2014-09-01 00:00:00 +0000 UTCMachineIDCheckMachineIDtrue

使用示例:

package main

import (
    "github.com/sony/sonyflake"
)

func getMachineID() (uint16, error) {
    var machineID uint16 = 6
    return machineID, nil
}

func checkMachineID(machineID uint16) bool {
    existsMachines := []uint16{1, 2, 3, 4, 5}
    for _, v := range existsMachines {
        if v == machineID {
            return false
        }
    }
    return true
}

func main() {
    t, _ := time.Parse("2006-01-02", "2021-01-01")
    settings := sonyflake.Settings{
        StartTime:      t,
        MachineID:      getMachineID,
        CheckMachineID: checkMachineID,
    }

    sf := sonyflake.NewSonyflake(settings)

    for i := 0; i < 10; i++ {
        id, err := sf.NextID()
        if err != nil {
            fmt.Println(err)
            os.Exit(1)
        }
        fmt.Println(id)
    }

}

二、分布式锁

单机程序并发或并行修改全局变量时,需要对修改行为加锁以创造临界区。如果不加锁,得到的结果将不准确,如:

package main

import (
    "sync"
)

// 全局变量
var counter int

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
        defer wg.Done()
            counter++
        }()
    }

    wg.Wait()
    println(counter)
}

多次运行结果不同:

➜  go run main.go              
884
➜  go run main.go
957
➜  go run main.go
923

1 进程内加锁

想要得到正确的结果,要把计数器的操作代码部分加上锁:

    var wg sync.WaitGroup
    var lock sync.Mutex
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            lock.Lock()
            counter++
            lock.Unlock()
        }()
    }
    wg.Wait()
    println(counter)

这样能够得到正确结果:

➜  go run main.go
1000

2 trylock

在某些场景,我们只希望一个任务有单一的执行者,而不像计数器一样,所有的 Goroutine 都成功执行。后续的 Goroutine 在抢锁失败后,需要放弃执行,这时候就需要尝试加锁 / trylock

尝试加锁,在加锁成功后执行后续流程,失败时不可以阻塞,而是直接返回加锁的结果。

在 Go 语言中可以用大小为 1 的 Channel 来模拟 trylock:

// Lock try lock
type Lock struct {
    c chan struct{}
}

// Lock try lock, return lock result
func (l Lock) Lock() bool {
    result := false
    select {
    case <-l.c:
        result = true
    default:
    }
    return result
}

// Unlock the try lock
func (l Lock) Unlock() {
    l.c <- struct{}{}
}

// NewLock generate a try lock
func NewLock() Lock {
    var l Lock
    l.c = make(chan struct{}, 1)
    l.c <- struct{}{}
    return l
}

func main() {
    var lock = NewLock()
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            if !lock.Lock() {
                println("lock failed")
                return
            }
            counter++
            println("current counter: ", counter)
            lock.Unlock()
        }()
    }
    wg.Wait()
}
Lock()Unlock()Lock

在单机系统中,trylock 并不是一个好选择,因为大量的 Goruntine 抢锁会无意义地占用 cpu 资源,这就是活锁。

活锁指是的程序看起来在正常执行,但 cpu 周期被浪费在抢锁而非执行任务上,从而程序整体的执行效率低下。活锁的问题定位起来很麻烦,所以在单机场景下,不建议使用这种锁。

3 基于 Redis 的 setnx

setnx
package main

import (
    "github.com/go-redis/redis"
)

func setnx() {
    client := redis.NewClient(&redis.Options{})

    var lockKey = "counter_lock"
    var counterKey = "counter"

    // lock
    resp := client.SetNX(lockKey, 1, time.Second*5)
    lockStatus, err := resp.Result()
    if err != nil || !lockStatus {
        println("lock failed")
        return
    }

    // counter++
    getResp := client.Get(counterKey)
    cntValue, err := getResp.Int64()
    if err == nil || err == redis.Nil {
        cntValue++
        resp := client.Set(counterKey, cntValue, 0)
        _, err := resp.Result()
        if err != nil {
            println(err)
        }
    }
    println("current counter is ", cntValue)

    // unlock
    delResp := client.Del(lockKey)
    unlockStatus, err := delResp.Result()
    if err == nil && unlockStatus > 0 {
        println("unlock success")
    } else {
        println("unlock failed", err)
    }
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            setnx()
        }()
    }
    wg.Wait()
}

运行结果:

➜  go run main.go
lock failed
lock failed
lock failed
lock failed
lock failed
lock failed
lock failed
lock failed
current counter is  34
lock failed
unlock success
setnx
setnx

所以,我们需要依赖于这些请求到达 redis 节点的顺序来做正确的抢锁操作。

如果用户的网络环境比较差,是必然抢不到的。

4 基于 ZooKeeper

mutex.Lock
package main

import (
    "github.com/go-zookeeper/zk"
)

func main() {
    c, _, err := zk.Connect([]string{"127.0.0.1"}, time.Second)
    if err != nil {
        panic(err)
    }
    l := zk.NewLock(c, "/lock", zk.WorldACL(zk.PermAll))
    err = l.Lock()
    if err != nil {
        panic(err)
    }
    println("lock success, do your business logic")

    time.Sleep(time.Second * 10) // 模拟业务处理

    l.Unlock()
    println("unlock success, finish business logic")
}
/lock
粗粒度

5 基于 etcd

etcd 是与 ZooKeeper 类似的分布式组件,也能实现与 ZooKeeper 锁相似的功能:

package main

import (
    clientv3 "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/concurrency"
)

func main() {
c := make(chan os.Signal)
    signal.Notify(c)

    client, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        panic(err)
    }
    defer client.Close()

    lockKey := "/lock"

    go func() {
        session, err := concurrency.NewSession(client)
        if err != nil {
            panic(err)
        }
        m := concurrency.NewMutex(session, lockKey)
        if err := m.Lock(context.Background()); err != nil {
            panic("go1 get mutex failed " + err.Error())
        }
        fmt.Printf("go1 get mutex sucess\n")
        fmt.Println(m)
        time.Sleep(time.Duration(10) * time.Second)
        m.Unlock(context.Background())
        fmt.Printf("go1 release lock\n")
    }()

    go func() {
        time.Sleep(time.Duration(2) * time.Second)
        session, err := concurrency.NewSession(client)
        if err != nil {
            panic(err)
        }
        m := concurrency.NewMutex(session, lockKey)
        if err := m.Lock(context.Background()); err != nil {
            panic("go2 get mutex failed " + err.Error())
        }
        fmt.Printf("go2 get mutex sucess\n")
        fmt.Println(m)
        time.Sleep(time.Duration(2) * time.Second)
        m.Unlock(context.Background())
        fmt.Printf("go2 release lock\n")
    }()

    <-c
}

三、延时任务系统

实时处理并反馈给用户是系统的主要内容,但有时也会遇到非实时的任务需求,比如在确定的时间点发布重要公告,或者需要在用户做了一件事情的几分钟后对其作出特定动作,比如发个优惠券。

如果业务规模比较小,有时也可以通过数据库配置轮询来对这种任务进行简单处理,但一旦上了规模,就要寻找更好的解决方案。

一般有两种思路来解决这个问题:

  1. 实现一套类似 crontab 的分布式定时任务管理系统
  2. 实现一个支持定时发送消息的消息队列

两种思路衍生出了一些不同的系统,但本质都差不多,都是需要实现一个定时器(timer)。

SetReadDeadline()

下面从定时器开始, 探究延时任务系统的实现。

1 定时器的实现

定时器(timer)的实现在工业界已经是有解的问题了,常见的就是时间堆(堆排序)和时间轮。

1.1 时间堆

是一种经过排序的完全二叉树,其中任一非终端节点的数据值均不大于(或不小于)其左子节点和右子节点的值。

最常见的时间堆一般用最小堆实现。

最小堆:根节点的键值是所有堆节点键值中最小者的堆

如下图:

ch6-binary_tree

对于定时器来说,如果堆顶元素比当前的时间还要大,说明堆项内所有元素都比当前时间大,进而说明这个时刻我们还没有必要对时间堆进行任何处理。定时检查的时间复杂度是 $O(1)$ 。

当我们发现堆项的元素小于当前时间时,说明可能已经有一批事件过期了,这时进行正常的弹出的堆调操作就好。每次堆调整的时间复杂度是 $O(log N)$ 。

Go 自身的内置定时器就是用时间堆实现的,不过使用的不是二叉树堆,而是扁平一些的四叉堆:

ch6-four-branch-tree

性质:父节点比其4个子节点都小,子节点之间没有特别的大小关系要求。

四叉堆中元素超时和堆调整与二叉堆没有本质区别。

1.2 时间轮

ch6-timewheel

用时间轮来实现定时器时,需要定义每一个格子的“刻度”,可以将时间想象成一个时钟,中心有秒针顺时针转动,每次转动到一个刻度时,需要去查看该刻度挂载的任务列表是否已经有到期的任务。

从结构上来讲,时间轮和哈希表很相似,如果把哈希算法定义为:触发时间 % 时间轮元素大小,那么这就是一个简单的哈希表。在哈希冲突时,采用链表挂载哈希冲突的定时器。

除了这种单层时间轮,还有一些时间轮采用多层实现。

用 Go 实现的时间轮项目不多,下面这个是其中性能较好的时间轮的延时任务示例:

package main

import (
    "github.com/antlabs/timer"
)

// 一次性定时器
func after(tm timer.Timer) {
    var wg sync.WaitGroup
    wg.Add(2)
    defer wg.Wait()

    go func() {
        defer wg.Done()
        tm.AfterFunc(1*time.Second, func() {
            log.Printf("after 1 second\n")
        })
    }()

    go func() {
        defer wg.Done()
        tm.AfterFunc(10*time.Second, func() {
            log.Printf("after 10 seconds\n")
        })
    }()

    go func() {
        defer wg.Done()
        tm.AfterFunc(30*time.Second, func() {
            log.Printf("after 30 seconds\n")
        })
    }()

    go func() {
        defer wg.Done()
        tm.AfterFunc(time.Minute, func() {
            log.Printf("after 1 minute\n")
        })
    }()

    go func() {
        defer wg.Done()
        tm.AfterFunc(time.Minute+30*time.Second, func() {
            log.Printf("after 1 minute and 30 seconds\n")
        })
    }()

    go func() {
        defer wg.Done()
        tm.AfterFunc(2*time.Minute+45*time.Second, func() {
            log.Printf("after 2 minutes and 45 seconds\n")
        })
    }()
}

// 周期性定时器
func schedule(tm timer.Timer) {
    tm.ScheduleFunc(500*time.Millisecond, func() {
        log.Printf("schedule 500 milliseconds\n")
    })

    tm.ScheduleFunc(time.Second, func() {
        log.Printf("schedule 1 second\n")
    })

    tm.ScheduleFunc(20*time.Second, func() {
        log.Printf("schedule 20 seconds\n")
    })

    tm.ScheduleFunc(1*time.Minute, func() {
        log.Printf("schedule 1 minute\n")
    })
}

func main() {
    tm := timer.NewTimer()
    defer tm.Stop()

    // go after(tm)
    go schedule(tm)

    go func() {
        time.Sleep(2*time.Minute + 50*time.Second)
        tm.Stop()
    }()

    tm.Run()
}

2 任务分发

有了基本的定时器实现方案,如果我们开发的是单机系统,就可以开始干活了,但如果是分布式,还需要把“定时”或“延时”的任务分发出去。

示例图:

任务分发示意图

task_id % shard_count = shard_id

当这些定时任务被触发之后需要通知用户侧,有两种思路实现:

  1. 将任务被触发的信息封装成一条消息,发往消息队列,由用户侧对消息队列进行监听
  2. 对用户预先配置的回调函数进行调用
1
2

3 数据再平衡的幂等考量

当任务执行集群的机器故障时,需要对任务进行重新分配。按照之前的求模策略,对这台机器还没有处理的任务进行重新分配就比较麻烦,如果实际运行的线上系统,还要在故障时的任务平衡方面花更多的心思。

参考 Elasticsearch 的数据分布设计,每份任务数据都有多个副本,这里假设有两个副本,如下图所示:

分布式1

一份数据虽然有两个持有者,但持有者持有的副本会进行区分,比如持有的是主副本还是非主副本。

一个任务只会在持有主副本的节点上被执行。

节点 1

2

节点 1 节点 2节点 3

当然,也可以用稍微复杂一点的思路,比如对集群中的节点进行角色划分,由协调节点来做这种故障时的任务重新分配工作,考虑到高可用,协调节点可能也需要有 1 至 2 个备用节点以防不测。

exactly once

四、分布式搜索引擎

MySQL 很脆弱,数据库系统本身要保证实时和强一致性,所以其功能设计上都是为了满足这种一致性需求。比如 write ahead log 的设计,基于 B+ 树实现的索引和数据组织,以及基于 MVCC 实现的事务等等。

关系型数据库一般被用于实现 OLTP 系统:

联机事务处理OLTP, Online transaction processing)是指透过信息系统、电脑网络及数据库,以在线交易的方式处理一般即时性的作业资料,和更早期传统数据库系统大量批量的作业方式并不相同。OLTP通常被运用于自动化的资料处理工作,如订单输入、金融业务…等反复性的日常性交易活动。 和其相对的是属于决策分析层次的联机分析处理(OLAP)。

在互联网的业务场景中,也有一些实时性要求不高(可以接受多秒的延迟),但是查询复杂性却很高的场景。比如,在电商的 WMS 系统中,或者在大多数业务场景丰富的 CRM 或者客服系统中,可能需要提供几十个字段的随意组合查询功能。这种系统的数据维度天生众多,比如一个电商的 WMS 中对一件货物的描述,可能有下面这些字段:

仓库id,入库时间,库位分区id,储存货架id,入库操作员id,出库操作员id,库存数量,过期时间,SKU类型,产品品牌,产品分类,内件数量

除了上述信息,如果商品在仓库内有流转。可能还有关联的流程 id,当前的流转状态等。

如果经营一个大型电商,每天千万级别的订单,那么在这个数据库中查询和建立合适的索引都是一件非常难的事情。

在 CRM 或客服类系统中,常常有根据关键字进行搜索的需求,大型互联网公司每天接收数以万计的用户请求。考虑到事件溯源,用户的请求至少要保存 2~3 年,这些数据是千万级甚至上亿级的数据,如果根据关键字进行一次 like 查询,可能整个 MySQL 就崩溃了。

这时,就需要搜索引擎解决。

1 搜索引擎

Elasticsearch

1.1 倒排列表

ElasticsearchElasticsearch

倒排列表

Elasticsearch
TiT(i+1)Elasticsearch

ch6-terms

当用户搜索“天气很好”时,其实就是求:天气、气很、很好三组倒排列表的交集。但这里的相等判断逻辑有些特殊,用伪代码表示:

func equal() {
    if postEntry.docID of '天气' == postEntry.docID of '气很' &&
        postEntry.offset + 1 of '天气' == postEntry.offset of '气很' {
            return true
    }

    if postEntry.docID of '气很' == postEntry.docID of '很好' &&
        postEntry.offset + 1 of '气很' == postEntry.offset of '很好' {
        return true
    }

    if postEntry.docID of '天气' == postEntry.docID of '很好' &&
        postEntry.offset + 2 of '天气' == postEntry.offset of '很好' {
        return true
    }

    return false
}

多个有序列表求交集的时间复杂度是 $O(N*M)$ ,$N$ 为给定列表中元素数最小的集合,$M$ 为给定列表的个数。

在整个算法中起决定作用的一是最短的倒排列表的长度,其次是词数总和,一般词数不会很大,所以起决定性作用的,一般是所有倒排列表中最短的那一个的长度。

因此,文档总数很多的情况下,搜索词的倒排列表最短的那一个不长时,搜索速度也会很快。如果用关系型数据库,那就需要按照索引来慢慢扫描。

2 查询 DSL

ElasticsearchElasticsearch
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "field_1": {
              "query": "1",
              "type": "phrase"
            }
          }
        },
        {
          "match": {
            "field_2": {
              "query": "2",
              "type": "phrase"
            }
          }
        },
        {
          "match": {
            "field_3": {
              "query": "3",
              "type": "phrase"
            }
          }
        },
        {
          "match": {
            "field_4": {
              "query": "4",
              "type": "phrase"
            }
          }
        }
      ]
    }
  },
  "from": 0,
  "size": 1
}

看起来比较麻烦,但表达的意思很简单:

if field_1 == 1 && field_2 == 2 && field_3 == 3 && field_4 == 4 {
    return true
}

用 bool should query 可以表示 or 的逻辑:

{
  "query": {
    "bool": {
      "should": [
        {
          "match": {
            "field_1": {
              "query": "1",
              "type": "phrase"
            }
          }
        },
        {
          "match": {
            "field_2": {
              "query": "3",
              "type": "phrase"
            }
          }
        }
      ]
    }
  },
  "from": 0,
  "size": 1
}

这里表示的是类似:

if field_1 == 1 || field_2 == 2 {
    return true
}
Boolean Expression
4 > 1
5 == 2
3 < i && x > 10
ElasticsearchBool Query

3 基于 client SDK 开发

elasticsearch 官方 Go 客户端版本已更新到 8,且客户端内容较多,单独写了一篇文章以供参考:Elasticsearch Go 客户端。

因为 Lucene 的性质,本质上搜索引擎内的数据是不可变的,所以如果要对文档进行更新,Lucene 内部是按照 id 进行完全覆盖(本质是取同一 id 最新的 segment 中的数据)的操作,所以与插入的情况是一样的。

ElasticsearchElasticsearchElasticsearchElasticsearchElasticsearch

4 将 sql 转换为 DSL

user_id = 1 and (product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1)
select * from xxx 
where user_id = 1 and
(
    product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1
)
Elasticsearch
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "user_id": {
              "query": "1",
              "type": "phrase"
            }
          }
        },
        {
          "match": {
            "product_id": {
              "query": "1",
              "type": "phrase"
            }
          }
        },
        {
          "bool": {
            "should": [
              {
                "match": {
                  "star_num": {
                    "query": "4",
                    "type": "phrase"
                  }
                }
              },
              {
                "match": {
                  "star_num": {
                    "query": "5",
                    "type": "phrase"
                  }
                }
              }
            ]
          }
        },
        {
          "match": {
            "banned": {
              "query": "1",
              "type": "phrase"
            }
          }
        }
      ]
    }
  },
  "from": 0,
  "size": 1
}
Elasticsearch
Elasticsearch
Elasticsearch

AST和DSL之间的对应关系

既然结构上完全一致,逻辑上就可以相互转换。以广度优先对 AST 树进行遍历,然后将二元表达式转换成 json 字符串,再拼装起来就可以了,参考:github.com/cch123/elasticsql

5 异构数据同步

在实际应用中,很少直接向搜索引擎中写入数据,更为常见方式是,将 MySQL 或其他关系型数据中的数据同步到搜索引擎中,而搜索引擎的使用方式只能对数据进行查询,无法进行修改和删除。

常见的同步方案有两种。

5.1 通过时间戳进行增量数据同步

基于时间戳的数据同步

Elasticsearch
select * from wms_orders where update_time >= date_sub(now(), interval 10 minute);

当然,考虑到边界情况,可以让这个时间段的数据与前一次的有一些重叠:

select * from wms_orders where update_time >= date_sub(
    now(), interval 11 minute
);
Elasticsearch

5.2 通过 binlog 进行数据同步

基于binlog的数据同步

业界使用较多的是阿里开源的 Canal 来进行 binlog 解析与同步。canal 会伪装成 MySQL 的从库,然后解析好行格式的 binlog,再以更容易解析的格式(如 json)发送到消息队列。

Elasticsearch
Elasticsearch

五、负载均衡

1 常见的负载均衡思路

如果不考虑均衡的话,现在有 n 个服务节点,完成业务流程只需要从这 n 个中挑选其中的一个,有几种思路:

rand.Intn() % n

当然,实际场景不可能无脑轮询或者无脑随机,如果对下游请求失败了,还需要某种机制来进行重试。如果纯粹随机,存在一定的可能性让下一次选择的节点仍是问题节点。

2 基于洗牌算法的负载均衡

如果随机选取每次发送请求的节点,在遇到下游出问题时换其他节点重试,需要设计一个大小和节点数组大小一致的索引数组,每次来新的请求,对索引数组做洗牌,然后取第一个元素作为选中的服务节点。如果请求失败,那么选择下一个节点重试,依此类推:

var endpoints = []string {
    "100.69.62.1:3232",
    "100.69.62.32:3232",
    "100.69.62.42:3232",
    "100.69.62.81:3232",
    "100.69.62.11:3232",
    "100.69.62.113:3232",
    "100.69.62.101:3232",
}

// 重点在这个 shuffle
func shuffle(slice []int) {
    for i := 0; i < len(slice); i++ {
        a := rand.Intn(len(slice))
        b := rand.Intn(len(slice))
        slice[a], slice[b] = slice[b], slice[a]
    }
}

func request(params map[string]interface{}) error {
    var indexes = []int {0,1,2,3,4,5,6}
    var err error

    shuffle(indexes)
    maxRetryTimes := 3

    idx := 0
    for i := 0; i < maxRetryTimes; i++ {
        err = apiRequest(params, indexes[idx])
        if err == nil {
            break
        }
        idx++
    }

    if err != nil {
        // logging
        return err
    }

    return nil
}

循环一遍slice,随机生成两个索引,交换这两个索引对应的值,完成洗牌,看起来没有什么问题。

2.1 错误的洗牌导致的负载不均衡

但其实上述洗牌是有问题的,存在两个问题:

rand.Intn()

第一个问题不用多说,简单但容易犯,特别注意即可规避。

第二个问题用概率简单解释。如果每次挑选都是真随机,那么这种洗牌的结果是 $7^7=823543$,但我们需要的是每次选中的数字的概率相等,则应该有 $7!=5040$ 种结果,823543 明显不是 5040 的倍数,所以肯定有某个数字以比其他数字更高的概率被选择,所以这种代码是错误的,计算概率的过程很复杂,直接上结论:索引为 0 的元素被选中的概率约为 22%,其他元素被选中的概率约为 13%

2.2 修正洗牌算法

从数学上得到过证明的还是经典 fisher-yates 算法,主要思路为每次随机挑选一个值,放到数组末尾。然后在 n-1 个元素的数组中再随机挑选一个值放在数组末尾,依此类推。

func shuffle(indexes []int) {
    for i:=len(indexes); i>0; i-- {
        lastIdx := i - 1
        idx := rand.Intn(i)
        indexes[lastIdx], indexes[idx] = indexes[idx], indexes[lastIdx]
    }
}

Go标准库中内置了该算法:

func shuffle(n int) []int {
    b := rand.Perm(n)
    return b
}
rand.Perm

3 ZooKeeper集群的随机节点挑选问题

本节中的场景是从 N 个节点中选择一个节点发送请求,初始请求结束之后,后续请求会重新对数组洗牌,所以每两个请求之间没有关联。因此本节最开始的洗牌算法,不初始化随机库的种子理论上也没有什么问题。

但在一些特殊的场景下,例如使用 ZooKeeper 时,客户端初始化从多个服务节点中挑选一个节点后,是会向该节点建立长连接的。之后客户端请求都会发往该节点,直到该节点不可用才会在节点列表中挑选下一个节点。在这种场景下,初始连接节点的选择就必须是真随机了,否则,所有客户端启动时,都会去连接同一个 ZooKeeper 的实例,根本无法起到负载均衡的目的。

如果在日常开发中,当前业务也是类似的场景,就必须考虑一下是否会发生类似的情况。

rand
rand.Seed(time.Now().UnixNano())

4 负载均衡算法效果验证

shuffleshuffle
package main

import (
    "fmt"
    "math/rand"
    "time"
)

func init() {
    rand.Seed(time.Now().UnixNano())
}

func shuffle1(slice []int) {
    for i := 0; i < len(slice); i++ {
        a := rand.Intn(len(slice))
        b := rand.Intn(len(slice))
        slice[a], slice[b] = slice[b], slice[a]
    }
}

func shuffle2(indexes []int) {
    for i := len(indexes); i > 0; i-- {
        lastIdx := i - 1
        idx := rand.Intn(i)
        indexes[lastIdx], indexes[idx] = indexes[idx], indexes[lastIdx]
    }
}

func main() {
    var cnt1 = map[int]int{}
    for i := 0; i < 1000000; i++ {
        var sl = []int{0, 1, 2, 3, 4, 5, 6}
        shuffle1(sl)
        cnt1[sl[0]]++
    }

    var cnt2 = map[int]int{}
    for i := 0; i < 1000000; i++ {
        var sl = []int{0, 1, 2, 3, 4, 5, 6}
        shuffle2(sl)
        cnt2[sl[0]]++
    }

    fmt.Println(cnt1, "\n", cnt2)
}

结果:

map[0:223847 1:128878 2:129454 3:129201 4:129353 5:129609 6:129658] 
map[0:142507 1:142003 2:143590 3:142715 4:142487 5:143064 6:143634]

六 分布式配置管理

在分布式系统中,常困扰我们的还有上线问题。

虽然目前有一些优雅的重启方案,但实际应用中可能受限于系统内部的运行情况而无法做到真正的优雅。比如为了对去下游的流量进行限制,在内存中堆积一些数据,并对堆积设定时间或总量的阈值。在任意阈值达到之后将数据统一发送给下游,以避免频繁的请求超出下游的承载能力让下游崩溃。这种情况下重启要做到优雅就比较难了。

所以应该尽量避免采用或绕过上线的方式对线上程序做一些修改。比较典型的修改内容就是程序的配置项。

1 场景举例

1.1 报表系统

在一些偏 OLAP 或者离线的数据平台中,经过长期的迭代开发,整个系统的功能模块已经渐渐稳定,可变动的项只出现在数据层,而数据层的变动大多可以认为是 SQL 的变动。架构师们自然而然地会想着把这些变动项抽离到系统外部。

当业务提出了新的需求时,需要将新的 SQL 录入到系统内部,或者简单修改一个老的 SQL,不对系统进行上线,就可以直接完成修改。

1.2 业务配置

大公司的平台部门服务众多业务线,在平台内为各业务线分配唯一 id。平台本身也由多个模块组成,这些模块需要共享相同的业务线定义。当公司新开产品线时,需要能够在短时间内打通所有平台系统的流程,这时每个系统都走上线流程肯定来不及。另外需要对这种公共配置进行统一管理,同时对其增减逻辑也做统一管理。这些信息变更时,需要自动通知业务方的系统,而不需要人力介入或只需简单介入。

除业务管理外,很多互联网公司会按照城市来铺展自己的业务。在某个城市未开城之前,理论上所有模块都应该认为带有该城市 id 的数据都是脏数据并自动过滤。如果业务开城,就应该将这个新的城市 id 加入到白名单中,这样业务流程便可以自动运转。

互联网公司的运营系统中会有各种类型的运营活动,有些运营活动推出后可能出现超出预期的事件(比如公关危机),需要紧急将系统下线。这时会用到一些开关来快速关闭相应的功能,或者快速将想要剔除的活动 id 从白名单中剔除。

2 使用 etcd 实现配置更新

使用 etcd 实现一个简单的配置读取和动态更新流程,以此来了解线上的配置更新流程。

2.1 配置定义

简单的配置,可以将内容完全存储在 etcd 中,如:

etcdctl put /configs/remote_config.json -- '{"addr":"127.0.0.1:1080","aes_key":"01B345B7A9ABC00F0123456789ABCDAF","https":false,"secret":"","private_key_path":"","cert_file_path":""}'
etcdctl get /configs/remote_config.json
{
    "addr" : "127.0.0.1:1080",
    "aes_key" : "01B345B7A9ABC00F0123456789ABCDAF",
    "https" : false,
    "secret" : "",
    "private_key_path" : "",
    "cert_file_path" : ""
}

2.2 新建 etcd client

    client, err = clientv3.New(clientv3.Config{
        Endpoints:   []string{"http://127.0.0.1:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        panic(err)
    }

2.3 获取配置

    var resp *clientv3.GetResponse
    resp, err = client.Get(context.Background(), "/configs/remote_config.json")
    if err != nil {
        log.Fatalln(err)
    }
    log.Println(resp)
clientkvGet()

2.4 更新订阅

    watch := client.Watch(context.Background(), "/configs/remote_config.json")

    for wresp := range watch {
        for _, ev := range wresp.Events {
            log.Println("new values is ", string(ev.Kv.Value))
            err = json.Unmarshal(ev.Kv.Value, &appConfig)
            if err != nil {
                log.Fatalln(err)
            }
        }
    }

通过订阅 config 路径的变动事件,在该路径下内容发生变化时,客户端侧可以收到变动通知,并收到变动后的字符串值。

2.5 完整代码

导包时也只保留了核心包。

package main

import (
    clientv3 "go.etcd.io/etcd/client/v3"
)

type Config struct {
    Addr           string `json:"addr"`
    AesKey         string `json:"aes_key"`
    HTTPS          bool   `json:"https"`
    Secret         string `json:"secret"`
    PrivateKeyPath string `json:"private_key_path"`
    CertFilePath   string `json:"cert_file_path"`
}

var (
    appConfig  Config
    configPath = `/configs/remote_config.json`
    client     *clientv3.Client
    err        error
)

func init() {
    client, err = clientv3.New(clientv3.Config{
        Endpoints:   []string{"http://127.0.0.1:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        panic(err)
    }

    initConfig()
}

func watchAndUpdate() {
    watch := client.Watch(context.Background(), configPath)

    for wresp := range watch {
        for _, ev := range wresp.Events {
            log.Println("new values is ", string(ev.Kv.Value))
            err = json.Unmarshal(ev.Kv.Value, &appConfig)
            if err != nil {
                log.Fatalln(err)
            }
        }
    }

}

func initConfig() {
    var resp *clientv3.GetResponse
    resp, err = client.Get(context.Background(), configPath)
    if err != nil {
        panic(err)
    }
    json.Unmarshal(resp.Kvs[0].Value, &appConfig)
}

func getConfig() Config {
    return appConfig
}

func main() {
    defer client.Close()
    watchAndUpdate()
}

上述代码在更新配置时进行了一系列操作:watch 响应、json 解析,这些操作都不具备原子性。当单个业务请求流程中多次获取 config 时,有可能因为中途 config 发生变化而导致单个请求前后逻辑不一致。因此,在使用类似方法进行更新配置时,需要在单个请求的生命周期内使用相同的配置。具体实现方式可以是只在请求开始时获取一次配置,然后依次向下透传。当然具体情况需要具体分析。

3 配置膨胀

随着业务的发展,配置系统本身所承载的压力可能也会越来越大,配置文件可能成千上万,客户端也可能成千上万,此时,将配置内容存储在 etcd 内部就不合适了。

随着配置文件数量的膨胀,除了存储系统本身的吞吐量问题,还有配置信息的管理问题。

需要对相应的配置进行权限管理,需要根据业务量进行配置存储的集群划分。如果客户端太多,导致配置存储系统无法承受瞬时大量的 QPS,可能还需要在客户端侧进行缓存优化等等。

这也是为什么大公司都会针对自己的业务额外开发一套复杂配置系统的原因。

4 配置版本管理

在配置管理过程中,难免出现用户误操作的情况,例如在更新配置时,输入了无法解析的配置。此时可以通过配置校验来解决。

有时错误的配置可能不是格式上的问题,而是逻辑上的问题。比如写 SQL 时少 select 了一个字段,更新配置时,不小心丢掉了 json 字符串里的一个 field 而导致程序无法理解新的配置而运行异常。为了快速止损,最快且最有效的办法就是进行版本管理,并支持按版本回滚。

在配置进行更新时,要为每份配置的新内容赋予一个版本号,并将修改前的内容和版本号记录下来,当发现新配置出现问题时,能够及时回滚。

常见的做法是,使用 MySQL 来存储配置文件或配置字符串的不同版本内容,在需要回滚时,只需进行简单查询即可。

5 客户端容错

在业务系统的配置被剥离到配置中心后,并不意味着系统就可以高枕无忧了。

当配置中心本身宕机时,也需要一定的容错能力,至少保证在其宕机期间,业务仍然可以运转。这要求系统能够在配置中心宕机时,也能够拿到需要的配置信息,即使这些信息不是最新版本。

具体来讲,在给业务提供配置读取的 SDK 时,最好能够将拿到的配置在业务机器的磁盘上也缓存一份。这样远程配置中心不可用时,可以直接用硬盘上的内容继续运行。当重新连接上配置中心时,再把相应的内容进行更新。

加入缓存之后,必须考虑数据一致性的问题。当有个别业务机器因为网络错误而与其他机器配置不一致时,应该能够从监控系统中知晓。

使用一种手段可能解决配置更新的痛点,但同时此手段也可能带来新的问题。实际开发中,要对每一步决策多多思考,以使自己能够在发生问题时不会手足无措。