解决MySQL写、更新数据,需要异步并发,且同一个ID的数据需要顺序执行问题。

golang_S1-l6gVIX.jpg

思路

将消息中的ID转成字符串、字符串转成byte组、byte数组相加取10的余数,然后根据余数将数据放到对应通道里面。最后将所有的通道遍历出来并异步处理数据。这样同一个ID的数据一定会在同一个通道里面,同样也保障了数据处理的先后顺序。

这个方法被我用在MySQL异步插入或者更新数据上,其实这种方式在数据库(MySQL)上可以使用乐观锁加事务解决。但是没有这种代码上控制效率高。

代码

package main

import (
    "fmt"
    "strconv"
    "sync"
    "time"
)

var (
    chMap = new(sync.Map)
)

type exChan struct {
    Chan chan string
}

func main() {
    //模拟队列生成数据
    queue := make(chan string, 500)
    go func() {
        for i := 10000; i < 20000; i++ {
            time.Sleep(1 * time.Second)
            queue <- strconv.Itoa(i)
        }
    }()

    // 初始化10个通道
    for i := 0; i < 10; i++ {
        e := new(exChan)
        e.Chan = make(chan string, 50)
        chMap.Store(i, e)
    }

    // 根据当前通道处理数据
    ex := func(key, value interface{}) bool {
        ch := value.(*exChan)
        go func() {
            for data := range ch.Chan {
                // 此处改为数据库处理即可
                fmt.Println(data, key)
            }
        }()
        return true
    }
    chMap.Range(ex)

    // 分发数据到各个通道
    for true {
        data := <-queue
        // 获取通道号
        n := bytesToNumer([]byte(data))
        temp, _ := chMap.Load(n)
        lock := temp.(*exChan)
        lock.Chan <- data
    }
}

func bytesToNumer(bs []byte) int {
    var count = 0
    for i := 0; i < len(bs); i++ {
        count += int(bs[i])
    }
    return count % 10
}