工作流一般使用 channel 作为队列载体,上游程序获取源数据,写入队列,下游程序消费队列。
在停止程序的时候,需要先停止上游程序,使其不再写入队列;然后等待下游程序将队列消费完,才停止。上下游程序往往都是多协程的。
此处以阿里云日志消费为例。
拉取程序
func (c Logdatacommand) Handle() {
t, _ := time.ParseInLocation(public.DATE_FORMAT_SECOND, startTime, time.Local)
option := consumerLibrary.LogHubConfig{
Endpoint: endPoint,
AccessKeyID: accessKeyID,
AccessKeySecret: accessKeySecret,
Project: project,
Logstore: logStore,
DataFetchIntervalInMs: 1000,
MaxFetchLogGroupCount: 1,
ConsumerGroupName: consumerGroupName,
ConsumerName: consumerName,
CursorPosition: consumerLibrary.SPECIAL_TIMER_CURSOR,
CursorStartTime: t.Unix(),
}
// 启动下游消费程序
go service.DataConsumer()
consumerWorker := consumerLibrary.InitConsumerWorker(option, process)
// 停止信号
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
// 启动上游程序
consumerWorker.Start()
if _, ok := <-ch; ok {
level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName)
// 停止上游程序
consumerWorker.StopAndWait()
// 停止下游程序
service.StopConsumer()
}
}
func process(shardId int, logGroupList *sls.LogGroupList) string {
for _, logGroup := range logGroupList.LogGroups {
for _, log := range logGroup.Logs {
for _, content := range log.Contents {
if *content.Key == "request_uri" {
// 打印对比
fmt.Println("process--------", *content.Value)
// 写入队列
service.RequestDataChan <- service.RequestData{RequestUri: *content.Value, RequestTime: *log.Time}
break
}
}
}
}
return ""
}
消费程序
// 使用缓存通道
var RequestDataChan = make(chan RequestData, 20)
// 计数器
var FuncStartEndCount uint8
var FuncStartEndCountMu sync.Mutex
type RequestData struct {
RequestUri string
RequestTime uint32
}
func DataConsumer() {
for i := 0; i < 4; i++ {
go func() {
for v := range RequestDataChan {
ParseRequestUri(v)
}
}()
}
}
func StopConsumer() bool {
for {
if len(RequestDataChan) == 0 && FuncStartEndCount == 0 {
return true
}
}
}
func ParseRequestUri(rqd RequestData) {
funcStartEndCountModify(true)
defer func() {
funcStartEndCountModify(false)
if err := recover(); err != nil {
logging.Error("ParseRequestUri - ", err)
}
}()
// 打印对比
fmt.Println("consumer--------", rqd.RequestUri)
time.Sleep(1 * time.Second)
return
}
func funcStartEndCountModify(increment bool) {
FuncStartEndCountMu.Lock()
defer FuncStartEndCountMu.Unlock()
if increment {
FuncStartEndCount++
} else {
FuncStartEndCount--
}
}
consumer--------process--------
FuncStartEndCount
自增,自减操作非并发安全的,需要加锁,这个锁没有锁业务,其实性能影响不大。