根据需要实现动态创建超级表,设备表,灵活实现测量参数的名称类型和tags的适配,不用再每个测试改一个程序,实现全自动生成表,入库表
type ZngValue struct {
DeviceID string `json:"deviceid"` //设备名
Metric string `json:"metric"` //物理量 表名
Endpoint string `json:"endpoint"` //IP
Timestamp int64 `json:"timestamp"` //时间戳
Step int64 `json:"step"` //时间间隔
ValuesMap map[string]interface{} `json:"valuesMap"` //数据
CounterType string `json:"counterType"`
Tags string `json:"tags"`
TagsMap map[string]string `json:"tagsMap"` //保留2种格式,方便后端组件使用
Extra string `json:"extra"`
}
func (taosd *TDengineDataSource) autogencreatezngvaluesql(sql string, metric ZngValue) (sqlcmd string, err error) {
var sqlvaluename, sqlvalue string
sqlbody := sql
for name, value := range metric.ValuesMap {
switch vv := value.(type) {
case string:
sqlbody = sqlbody + "," + value.(string)
logger.Debug(vv)
case float32:
//vl := value.(float32)
//sqlbody = sqlbody + "," + name + "=" + strconv.FormatFloat(float64(value.(float32)), 'E', -1, 64)
sqlvaluename += name + ","
sqlvalue += strconv.FormatFloat(float64(value.(float32)), 'E', -1, 64) + ","
logger.Debug(vv)
case float64:
//sqlbody = sqlbody + "," + name + "=" + strconv.FormatFloat(value.(float64), 'E', -1, 64)
sqlvaluename += name + ","
sqlvalue += strconv.FormatFloat(float64(value.(float64)), 'E', -1, 64) + ","
logger.Debug(vv)
case uint64:
//sqlbody = sqlbody + "," + name + "=" + strconv.FormatInt(int64(value.(uint64)), 10)
sqlvaluename += name + ","
sqlvalue += strconv.FormatInt(int64(value.(uint64)), 10) + ","
logger.Debug(vv)
case int64:
//sqlbody = sqlbody + "," + name + "=" + strconv.FormatInt(value.(int64), 10)
sqlvaluename += name + ","
sqlvalue += strconv.FormatInt(int64(value.(int64)), 10) + ","
logger.Debug(vv)
case int:
//sqlbody = sqlbody + "," + name + "=" + strconv.Itoa(value.(int))
sqlvaluename += name + ","
sqlvalue += strconv.FormatInt(int64(value.(int)), 10) + ","
logger.Debug(vv)
case bool:
//sqlbody = sqlbody + "," + name + "=" + strconv.FormatBool(value.(bool))
sqlvaluename += name + ","
sqlvalue += strconv.FormatBool(value.(bool)) + ","
case int8:
//sqlbody = sqlbody + "," + name + "=" + strconv.FormatInt(int64(value.(uint64)), 10)
sqlvaluename += name + ","
sqlvalue += strconv.FormatInt(int64(value.(int8)), 10) + ","
logger.Debug(vv)
case int16:
//sqlbody = sqlbody + "," + name + "=" + strconv.FormatInt(int64(value.(uint64)), 10)
sqlvaluename += name + ","
sqlvalue += strconv.FormatInt(int64(value.(int16)), 10) + ","
logger.Debug(vv)
case byte:
default:
// var err error
//err.(string) = "not support"
//fmt.Printf()err("not support")
return "", errors.New("not support")
}
//var
}
sqlvaluename = sqlvaluename[:len(sqlvaluename)-1]
sqlvalue = sqlvalue[:len(sqlvalue)-1]
tl := metric.Timestamp
tls := strconv.FormatInt(tl, 10)
sqlcmd = sqlbody + sqlvaluename + ") values(" + tls + "000," + sqlvalue + ")\n"
return sqlcmd, nil
}
func (taosd *TDengineDataSource) serilizeZngTDengine(m ZngValue, tbn string, db *sql.DB) error {
idx := taosd.TAOShashID([]byte(tbn))
sqlcmd := " " + tbn
//tl := m.Timestamp
//tls := strconv.FormatInt(tl, 10)
sqlcmd = sqlcmd + "(ts,"
sqlcmd, err := taosd.autogencreatezngvaluesql(sqlcmd, m)
if err != nil {
logger.Debugf("sqlcmd err %v", err)
return err
}
id := idx % taosd.Section.config.insertsqlworkers
logger.Debugf("Send-->GO %v"+sqlcmd, id)
taosd.insertbatchChans[id] <- sqlcmd
return nil
}
func (taosd *TDengineDataSource) autogencreatezngtabelsql(stbname string, metric ZngValue) (sqlcmd string, err error) {
//var sqlbody string
sqlbody := " (ts timestamp"
for valuename, value := range metric.ValuesMap {
switch value.(type) {
case string:
sqlbody = sqlbody + "," + valuename + " BINARY(100)"
case float32:
sqlbody = sqlbody + "," + valuename + " FLOAT"
case float64:
sqlbody = sqlbody + "," + valuename + " DOUBLE"
case uint64:
sqlbody = sqlbody + "," + valuename + " BIGINT"
case int64:
sqlbody = sqlbody + "," + valuename + " BIGINT"
case int:
sqlbody = sqlbody + "," + valuename + " INT"
case bool:
sqlbody = sqlbody + "," + valuename + " BOOL"
case int8:
sqlbody = sqlbody + "," + valuename + " TINYINT"
case int16:
sqlbody = sqlbody + "," + valuename + " SMALLINT"
case byte:
default:
// var err error
//err.(string) = "not support"
//fmt.Printf()err("not support")
return "", errors.New("not support")
}
//var
}
sqlcmd = "create table if not exists " + stbname + sqlbody + ") tags(taghash binary(34)"
return sqlcmd, nil
}
func (taosd *TDengineDataSource) HandleZngStable(metric ZngValue, db *sql.DB) error {
taglist := list.New() // save push in data tags name include endpoint / nid
tbtaglist := list.New() //save ready add to stable table tags max num(tagnumlimit)
tagmap := make(map[string]string)
tbtagmap := make(map[string]string)
//m := make(metric, len(metric))
tagnum := taosd.Section.config.tagnumlimit
//var hasName bool = false
var metricsName string
var tbn string = ""
var ln string = ""
taglen := taosd.Section.config.taglen
var nt nametag
var sqlcmd string
//var annotlen int
//fmt.Println(ts)
j := 0
metricsName = metric.Metric
if metricsName != "" {
tbn += metricsName
//hasName = true
} else {
info := fmt.Sprintf("no name metric")
logger.Errorf(info)
return nil
}
if metric.Endpoint != "" {
tagmap["ip"] = metric.Endpoint
ln = "ip" //strings.ToLower(string(metric.Endpoint))
taosd.OrderInsertS(ln, taglist)
taosd.OrderInsertS(ln, tbtaglist)
tbtagmap[ln] = "y"
tbn += strings.ToLower(string(metric.Endpoint))
j++
}
if metric.DeviceID != "" {
tagmap["deviceid"] = metric.DeviceID
ln = "deviceid" //strings.ToLower(string(metric.Nid))
taosd.OrderInsertS(ln, taglist)
taosd.OrderInsertS(ln, tbtaglist)
tbtagmap[ln] = "y"
tbn += strings.ToLower(string(metric.DeviceID))
j++
}
for k, v := range metric.TagsMap {
//j <= tagnum
j++
ln = strings.ToLower(string(k))
//taosd.OrderInsertS(ln, taglist)
s := string(v)
if j <= tagnum {
tbn += s
taosd.OrderInsertS(ln, taglist)
taosd.OrderInsertS(ln, tbtaglist)
//tbtaglist.PushBack(ln)
if len(s) > taglen {
s = s[:taglen]
}
tagmap[ln] = s
tbtagmap[ln] = "y"
}
}
if taosd.Section.config.debugprt == 2 {
t := metric.Timestamp
//var ns int64 = 0
//if t/1000000000 > 10 {
// tm := t / 1000
// ns = t - tm*1000
//}
logger.Debugf(" Ts: %v, value: %v, ", time.Unix(t, 0), metric.ValuesMap)
//logger.Debug(ts)
}
stbname := taosd.tablenameEscape(metricsName)
//var ok bool
//i := 0
schema, ok := taosd.IsSTableCreated.Load(stbname)
if !ok { // no local record of super table structure
//获取表结构的TAG字段数组
stablehas := false
tags := taosd.taosdGetTableTagDescribe(db, stbname)
if tags != nil {
taostaglist := list.New()
taostagmap := make(map[string]string)
for _, tag := range tags {
taostaglist.PushBack(tag)
taostagmap[tag] = "y"
}
nt.taglist = taostaglist
nt.tagmap = taostagmap
//tbtaglist = nt.taglist
//tbtagmap = nt.tagmap
stablehas = true //stable is exist
}
//nt.tagmap
//nt.taglist
if stablehas { //超级表存在
//需要插入的 tags
//yes, the super table was already created in TDengine
for e := tbtaglist.Front(); e != nil; e = e.Next() {
k := e.Value.(string)
//i++
//if i < taosd.Section.config.tagnumlimit {
_, ok = nt.tagmap[k] //表结构的tags name
if !ok {
//tag以前不存在,需要添补插入,tag在超级表里没找到则改变表结构,添加此tag
sqlcmd = "alter table " + stbname + " add tag " + k + taosd.Section.tagstr + "\n"
_, err := taosd.execSql(taosd.Section.config.dbName, sqlcmd, db)
if err != nil {
logger.Error(err)
errorcode := fmt.Sprintf("%s", err)
if strings.Contains(errorcode, "duplicated column names") {
nt.taglist.PushBack(k)
//OrderInsertS(k, tbtaglist)
nt.tagmap[k] = "y"
}
} else {
nt.taglist.PushBack(k)
//OrderInsertS(k, tbtaglist)
nt.tagmap[k] = "y"
}
}
//}
}
tbtaglist = nt.taglist
tbtagmap = nt.tagmap
taosd.IsSTableCreated.Store(stbname, nt)
} else { //超级表不存在
// no, the super table haven't been created in TDengine, create it.
nt.taglist = tbtaglist
nt.tagmap = tbtagmap
sqlcmd, err := taosd.autogencreatezngtabelsql(stbname, metric)
if err != nil {
logger.Errorf("autogencreatetabelsql%s", err.Error())
return err
}
//sqlcmd = "create table if not exists " + stbname + " (ts timestamp, value double) tags(taghash binary(34)"
for e := tbtaglist.Front(); e != nil; e = e.Next() {
sqlcmd = sqlcmd + "," + e.Value.(string) + taosd.Section.tagstr
}
sqlcmd = sqlcmd + ")\n"
if taosd.Section.config.debugprt == 2 {
fmt.Printf("SQL:%s", sqlcmd)
}
_, err = taosd.execSql(taosd.Section.config.dbName, sqlcmd, db)
if err == nil {
//tbtaglist = nt.taglist
//tbtagmap = nt.tagmap
taosd.IsSTableCreated.Store(stbname, nt)
} else {
logger.Error(err)
return err
}
}
} else { //有本地 tag信息,就让需要插入的tags跟本地的tag信息进行比对
ntag := schema.(nametag)
//tbtaglist = ntag.taglist
//tbtagmap = ntag.tagmap
//i := 0
for e := tbtaglist.Front(); e != nil; e = e.Next() {
k := e.Value.(string)
//i++
//if i < taosd.Section.config.tagnumlimit {
_, ok := ntag.tagmap[k]
if !ok {
sqlcmd = "alter table " + stbname + " add tag " + k + taosd.Section.tagstr + "\n"
_, err := taosd.execSql(taosd.Section.config.dbName, sqlcmd, db)
if err != nil {
logger.Error(err)
errorcode := fmt.Sprintf("%s", err)
if strings.Contains(errorcode, "duplicated column names") {
ntag.taglist.PushBack(k)
//OrderInsertS(k, tbtaglist)
ntag.tagmap[k] = "y"
} else {
return err
}
} else {
ntag.taglist.PushBack(k)
//OrderInsertS(k, tbtaglist)
ntag.tagmap[k] = "y"
}
}
//}
}
tbtaglist = ntag.taglist
tbtagmap = ntag.tagmap
taosd.IsSTableCreated.Store(stbname, ntag)
}
// insert device table data ,tables create auto
tbnhash := "MD5_" + taosd.md5V2(tbn)
_, tbcreated := taosd.IsTableCreated.Load(tbnhash)
//tbtaglist,tbtagmap根据更新过程记录最新的tag结构
if !tbcreated {
var sqlcmdhead, sqlcmd string
sqlcmdhead = "create table if not exists " + tbnhash + " using " + stbname + " tags(\""
sqlcmd = ""
i := 0
for e := tbtaglist.Front(); e != nil; e = e.Next() {
if e.Value.(string) == "taghash" {
continue
}
tagvalue, has := tagmap[e.Value.(string)]
if len(tagvalue) > taglen {
tagvalue = tagvalue[:taglen]
}
if i == 0 {
if has {
sqlcmd = sqlcmd + "\"" + tagvalue + "\""
} else {
sqlcmd = sqlcmd + "null"
}
i++
} else {
if has {
sqlcmd = sqlcmd + ",\"" + tagvalue + "\""
} else {
sqlcmd = sqlcmd + ",null"
}
}
}
var keys []string
var tagHash = ""
for t := range tagmap {
keys = append(keys, t)
}
sort.Strings(keys)
for _, k := range keys {
tagHash += tagmap[k]
}
sqlcmd = sqlcmd + ")\n"
sqlcmd = sqlcmdhead + taosd.md5V2(tagHash) + "\"," + sqlcmd
_, err := taosd.execSql(taosd.Section.config.dbName, sqlcmd, db)
if err == nil {
taosd.IsTableCreated.Store(tbnhash, true)
} else {
return err
}
}
if taosd.Section.config.debugprt == 2 {
fmt.Println("stable:", metric)
}
taosd.serilizeZngTDengine(metric, tbnhash, db)
return nil
}
func (taosd *TDengineDataSource) PushZng2Queue(items []*ZngValue) {
errCnt := 0
var req ZngValue
for _, item := range items {
req = *item
var address string
//取Endpoint或Nid作为唯一标识经过和计算后为通道索引键值
if item.Endpoint != "" {
address = item.Endpoint
} else {
if item.DeviceID != "" {
address = item.DeviceID
} else {
continue
}
}
if address != "" {
//stats.Counter.Set("points.in", 1)
logger.Debugf("taosd Pushzng2Queue :%s", address)
if taosd.Section.config.debugprt == 2 {
fmt.Println("taosd Pushzng2Queue:", req)
}
idx := taosd.TAOShashID([]byte(address))
taosd.inputnodeChans[idx%taosd.Section.config.inputworkers] <- req //将需入库参数组写入通道队列
} else {
errCnt += 1
}
}
}
//接口实例
func (e Deviceinfo) Heart(c *gin.Context) {
var values taosd.ZngValue
var items []*taosd.ZngValue
err := c.ShouldBindBodyWith(&values, binding.JSON)
if err != nil {
fmt.Println("api heart", "ShouldBindBodyWith error", err)
e.Error(500, err, fmt.Sprintf("设备实时状态更新 失败,\r\n失败信息 %s", err.Error()))
//response.ReturnJSON(c, http.StatusOK, statuscode.InvalidParam.Code,statuscode.InvalidParam.Msg, nil)
return
}
fmt.Println("api heart values ", values)
//for _, item := range values {
items = append(items, &values)
//}
fmt.Println("/r/napi heart", items)
if items != nil {
taosd.Push2Queue(items)
//e.OK(values.DeviceID, "设备实时状态更新成功")
} else {
e.Error(500, err, fmt.Sprintf("设备实时状态更新 失败,\r\n失败信息 %s", err.Error()))
}
}
//http 接口的 body
{
"deviceid": "1002",
"metric": "heart",
"endpoint": "192.168.0.11",
"timestamp": 1629037578,
"valuesMap":{
"cputemp": 65,
"cpuload": 78,
"gputemp": 76,
"gpuload": 66,
"memempty": 30,
"diskempty": 50
},
"tagsMap": {
"mac": "ae:23:55:ed:66",
"localaddr": "北京",
"devicename": "智能设备1"
}
}