解决MySQL写、更新数据,需要异步并发,且同一个ID的数据需要顺序执行问题。
思路
将消息中的ID转成字符串、字符串转成byte组、byte数组相加取10的余数,然后根据余数将数据放到对应通道里面。最后将所有的通道遍历出来并异步处理数据。这样同一个ID的数据一定会在同一个通道里面,同样也保障了数据处理的先后顺序。
这个方法被我用在MySQL异步插入或者更新数据上,其实这种方式在数据库(MySQL)上可以使用乐观锁加事务解决。但是没有这种代码上控制效率高。
代码
package main
import (
"fmt"
"strconv"
"sync"
"time"
)
var (
chMap = new(sync.Map)
)
type exChan struct {
Chan chan string
}
func main() {
//模拟队列生成数据
queue := make(chan string, 500)
go func() {
for i := 10000; i < 20000; i++ {
time.Sleep(1 * time.Second)
queue <- strconv.Itoa(i)
}
}()
// 初始化10个通道
for i := 0; i < 10; i++ {
e := new(exChan)
e.Chan = make(chan string, 50)
chMap.Store(i, e)
}
// 根据当前通道处理数据
ex := func(key, value interface{}) bool {
ch := value.(*exChan)
go func() {
for data := range ch.Chan {
// 此处改为数据库处理即可
fmt.Println(data, key)
}
}()
return true
}
chMap.Range(ex)
// 分发数据到各个通道
for true {
data := <-queue
// 获取通道号
n := bytesToNumer([]byte(data))
temp, _ := chMap.Load(n)
lock := temp.(*exChan)
lock.Chan <- data
}
}
func bytesToNumer(bs []byte) int {
var count = 0
for i := 0; i < len(bs); i++ {
count += int(bs[i])
}
return count % 10
}