package edgedb import ( "database/sql" "errors" "fmt" "sort" "strings" "time" "gorm.io/driver/clickhouse" "gorm.io/driver/postgres" "gorm.io/gorm" ) var ( pgConn *gorm.DB ckConn *gorm.DB ) type Config struct { // 遥感数据存储,(默认: postgresql) // 支持 postgresql/clickhouse,可多选通过 "," 连接多个选择 DeviceTelemetryRecordBackend string `yaml:"" env:"DEVICE_TELEMETRY_RECORD_BACKEND" envDefault:"postgresql"` Postgresql struct { Host string `yaml:"host" env:"POSTGRESQL_SERVER_HOST" envDefault:"postgresql"` Port int `yaml:"port" env:"POSTGRESQL_SERVER_PORT" envDefault:"5432"` EdgedbUser string `yaml:"user" env:"POSTGRESQL_SERVER_EDGEDB_USER" envDefault:"postgres"` EdgedbPass string `yaml:"pass" env:"POSTGRESQL_SERVER_EDGEDB_PASS" envDefault:"password"` EdgedbName string `yaml:"name" env:"POSTGRESQL_SERVER_EDGEDB_NAME" envDefault:"edgedb"` PoolSize int `yaml:"pool_size" env:"POSTGRESQL_SERVER_POOLSIZE" envDefault:"500"` // 设置连接池的最大连接数 } Clickhouse struct { Enabled bool `yaml:"host" env:"CLICKHOUSE_SERVER_ENABLED" envDefault:"false"` Host string `yaml:"host" env:"CLICKHOUSE_SERVER_HOST" envDefault:"clickhouse"` Port int `yaml:"port" env:"CLICKHOUSE_SERVER_PORT" envDefault:"9000"` User string `yaml:"port" env:"CLICKHOUSE_SERVER_PORT" envDefault:"default"` Pass string `yaml:"port" env:"CLICKHOUSE_SERVER_PORT" envDefault:""` EdgedbUser string `yaml:"user" env:"CLICKHOUSE_SERVER_EDGEDB_USER" envDefault:"root"` EdgedbPass string `yaml:"pass" env:"CLICKHOUSE_SERVER_EDGEDB_PASS" envDefault:"[yz^_^edge]"` EdgedbName string `yaml:"name" env:"CLICKHOUSE_SERVER_EDGEDB_NAME" envDefault:"edgedb"` ReadTimeout int `yaml:"read_timeout" env:"READ_TIMEOUT" envDefault:"10"` WriteTimeout int `yaml:"write_timeout" env:"WRITE_TIMEOUT" envDefault:"10"` } } // SetupConn 初始化连接,配置连接池 // 注意:不要关闭数据库连接客户端 sql.DB,否则连接池无法生效,如无特殊需求,只需判断有无异常即可。通过 GetXXConn 函数向连接池申请连接 func SetupConn(cfg *Config) (*sql.DB, error) { // 加载配置 if cfg == nil { err := errors.New("Config needed") return nil, err } // 初始化 Postgres 连接 db, err := _setupPostgresConn(cfg) if err != nil { return nil, err } defer db.Close() // 如系统配置 Clickhouse 作为遥测扩展存储,则额外增加 CK 连接的初始化 telemetryDatabaseBackends := strings.Split(cfg.DeviceTelemetryRecordBackend, ",") sort.Strings(telemetryDatabaseBackends) index := sort.SearchStrings(telemetryDatabaseBackends, "clickhouse") if index < len(telemetryDatabaseBackends) && telemetryDatabaseBackends[index] == "clickhouse" { // 初始化 Clickhouse 连接 if _, err := _setupClickhouseConn(cfg); err != nil { return nil, err } } return db, nil } func _setupPostgresConn(cfg *Config) (*sql.DB, error) { if cfg == nil { if config, err := configs.LoadConfig(); err != nil { return nil, err } else { cfg = config } } dsn := fmt.Sprintf( "host=%s port=%d user=%s password=%s dbname=%s sslmode=disable TimeZone=Asia/Shanghai", cfg.Postgresql.Host, cfg.Postgresql.Port, cfg.Postgresql.EdgedbUser, cfg.Postgresql.EdgedbPass, cfg.Postgresql.EdgedbName, ) conn, err := gorm.Open(postgres.New(postgres.Config{ DSN: dsn, PreferSimpleProtocol: true, // disables implicit prepared statement usage }), &gorm.Config{}) if err != nil { return nil, err } pgDB, err := conn.DB() if err != nil { return nil, err } // SetMaxIdleConns sets the maximum number of connections in the idle connection pool. // 设置可空闲的最大连接数,随时等待调用 pgDB.SetMaxIdleConns(10) // SetMaxOpenConns sets the maximum number of open connections to the database. // 设置连接池的最大连接数,不配置默认就是不限制 (当前500,依赖 postgres max_connections) pgDB.SetMaxOpenConns(cfg.Postgresql.PoolSize) // SetConnMaxLifetime sets the maximum amount of time a connection may be reused. // 连接的最长存活期,超过这个时间,连接将会重置,不再被复用,不配置默认就是永不过期 pgDB.SetConnMaxLifetime(time.Second * 600) // Global pgConn = conn return pgDB, nil } // 初始化clickhouse的连接,配置连接池 func _setupClickhouseConn(cfg *Config) (*sql.DB, error) { if cfg == nil { if config, err := configs.LoadConfig(); err != nil { return nil, err } else { cfg = config } } dsn := fmt.Sprintf( "tcp://%s:%d?database=%s&username=%s&password=%s&read_timeout=%d&write_timeout=%d", cfg.Clickhouse.Host, cfg.Clickhouse.Port, cfg.Clickhouse.EdgedbName, cfg.Clickhouse.EdgedbUser, cfg.Clickhouse.EdgedbPass, cfg.Clickhouse.ReadTimeout, cfg.Clickhouse.WriteTimeout, ) conn, err := gorm.Open(clickhouse.New(clickhouse.Config{ DSN: dsn, DisableDatetimePrecision: true, // disable datetime64 precision, not supported before clickhouse 20.4 DontSupportRenameColumn: true, // rename column not supported before clickhouse 20.4 SkipInitializeWithVersion: false, // smart configure based on used version DefaultGranularity: 3, // 1 granule = 8192 rows DefaultCompression: "LZ4", // default compression algorithm. LZ4 is lossless DefaultIndexType: "minmax", // index stores extremes of the expression DefaultTableEngineOpts: "ENGINE=MergeTree() ORDER BY tuple()", }), &gorm.Config{}) if err != nil { return nil, err } ckDB, err := conn.DB() if err != nil { return nil, err } // SetMaxIdleConns sets the maximum number of connections in the idle connection pool. // 设置可空闲的最大连接数,随时等待调用 ckDB.SetMaxIdleConns(1) // SetMaxOpenConns sets the maximum number of open connections to the database. // 设置连接池的最大连接数,不配置,默认为 0,就是不限制 ckDB.SetMaxOpenConns(10) // SetConnMaxLifetime sets the maximum amount of time a connection may be reused. // 连接的最长存活期,超过这个时间,连接将会重置,不再被复用,不配置默认就是永不过期 ckDB.SetConnMaxLifetime(time.Second * 600) ckConn = conn return ckDB, nil } // GetDBConn 从连接池获取数据库连接 func GetDBConn() (*gorm.DB, error) { var pgDB *sql.DB if pgConn == nil { if db, err := _setupPostgresConn(nil); err != nil { return nil, err } else { pgDB = db } } else if db, err := pgConn.DB(); err != nil { // 尝试获取连接池失败,重新建立连接 if db, err := _setupPostgresConn(nil); err != nil { return nil, err } else { pgDB = db } } else { pgDB = db } if err := pgDB.Ping(); err != nil { // 尝试 PING 失败,重新建立连接 _ = pgDB.Close() if db, err := _setupPostgresConn(nil); err != nil { return nil, err } else { pgDB = db } } return pgConn, nil } // GetDBConn 从连接池获取数据库连接 func GetClickhouseConn() (*gorm.DB, error) { var ckDB *sql.DB if ckConn == nil { if db, err := _setupClickhouseConn(nil); err != nil { return nil, err } else { ckDB = db } } else if db, err := ckConn.DB(); err != nil { if db, err := _setupClickhouseConn(nil); err != nil { return nil, err } else { ckDB = db } } else { ckDB = db } if err := ckDB.Ping(); err != nil { _ = ckDB.Close() if db, err := _setupClickhouseConn(nil); err != nil { return nil, err } else { ckDB = db } } return ckConn, nil }