package edgedb
import (
"database/sql"
"errors"
"fmt"
"sort"
"strings"
"time"
"gorm.io/driver/clickhouse"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
var (
pgConn *gorm.DB
ckConn *gorm.DB
)
type Config struct {
DeviceTelemetryRecordBackend string `yaml:"" env:"DEVICE_TELEMETRY_RECORD_BACKEND" envDefault:"postgresql"`
Postgresql struct {
Host string `yaml:"host" env:"POSTGRESQL_SERVER_HOST" envDefault:"postgresql"`
Port int `yaml:"port" env:"POSTGRESQL_SERVER_PORT" envDefault:"5432"`
EdgedbUser string `yaml:"user" env:"POSTGRESQL_SERVER_EDGEDB_USER" envDefault:"postgres"`
EdgedbPass string `yaml:"pass" env:"POSTGRESQL_SERVER_EDGEDB_PASS" envDefault:"password"`
EdgedbName string `yaml:"name" env:"POSTGRESQL_SERVER_EDGEDB_NAME" envDefault:"edgedb"`
PoolSize int `yaml:"pool_size" env:"POSTGRESQL_SERVER_POOLSIZE" envDefault:"500"`
}
Clickhouse struct {
Enabled bool `yaml:"host" env:"CLICKHOUSE_SERVER_ENABLED" envDefault:"false"`
Host string `yaml:"host" env:"CLICKHOUSE_SERVER_HOST" envDefault:"clickhouse"`
Port int `yaml:"port" env:"CLICKHOUSE_SERVER_PORT" envDefault:"9000"`
User string `yaml:"port" env:"CLICKHOUSE_SERVER_PORT" envDefault:"default"`
Pass string `yaml:"port" env:"CLICKHOUSE_SERVER_PORT" envDefault:""`
EdgedbUser string `yaml:"user" env:"CLICKHOUSE_SERVER_EDGEDB_USER" envDefault:"root"`
EdgedbPass string `yaml:"pass" env:"CLICKHOUSE_SERVER_EDGEDB_PASS" envDefault:"[yz^_^edge]"`
EdgedbName string `yaml:"name" env:"CLICKHOUSE_SERVER_EDGEDB_NAME" envDefault:"edgedb"`
ReadTimeout int `yaml:"read_timeout" env:"READ_TIMEOUT" envDefault:"10"`
WriteTimeout int `yaml:"write_timeout" env:"WRITE_TIMEOUT" envDefault:"10"`
}
}
func SetupConn(cfg *Config) (*sql.DB, error) {
if cfg == nil {
err := errors.New("Config needed")
return nil, err
}
db, err := _setupPostgresConn(cfg)
if err != nil {
return nil, err
}
defer db.Close()
telemetryDatabaseBackends := strings.Split(cfg.DeviceTelemetryRecordBackend, ",")
sort.Strings(telemetryDatabaseBackends)
index := sort.SearchStrings(telemetryDatabaseBackends, "clickhouse")
if index < len(telemetryDatabaseBackends) && telemetryDatabaseBackends[index] == "clickhouse" {
if _, err := _setupClickhouseConn(cfg); err != nil {
return nil, err
}
}
return db, nil
}
func _setupPostgresConn(cfg *Config) (*sql.DB, error) {
if cfg == nil {
if config, err := configs.LoadConfig(); err != nil {
return nil, err
} else {
cfg = config
}
}
dsn := fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=disable TimeZone=Asia/Shanghai",
cfg.Postgresql.Host,
cfg.Postgresql.Port,
cfg.Postgresql.EdgedbUser,
cfg.Postgresql.EdgedbPass,
cfg.Postgresql.EdgedbName,
)
conn, err := gorm.Open(postgres.New(postgres.Config{
DSN: dsn,
PreferSimpleProtocol: true,
}), &gorm.Config{})
if err != nil {
return nil, err
}
pgDB, err := conn.DB()
if err != nil {
return nil, err
}
pgDB.SetMaxIdleConns(10)
pgDB.SetMaxOpenConns(cfg.Postgresql.PoolSize)
pgDB.SetConnMaxLifetime(time.Second * 600)
pgConn = conn
return pgDB, nil
}
func _setupClickhouseConn(cfg *Config) (*sql.DB, error) {
if cfg == nil {
if config, err := configs.LoadConfig(); err != nil {
return nil, err
} else {
cfg = config
}
}
dsn := fmt.Sprintf(
"tcp://%s:%d?database=%s&username=%s&password=%s&read_timeout=%d&write_timeout=%d",
cfg.Clickhouse.Host,
cfg.Clickhouse.Port,
cfg.Clickhouse.EdgedbName,
cfg.Clickhouse.EdgedbUser,
cfg.Clickhouse.EdgedbPass,
cfg.Clickhouse.ReadTimeout,
cfg.Clickhouse.WriteTimeout,
)
conn, err := gorm.Open(clickhouse.New(clickhouse.Config{
DSN: dsn,
DisableDatetimePrecision: true,
DontSupportRenameColumn: true,
SkipInitializeWithVersion: false,
DefaultGranularity: 3,
DefaultCompression: "LZ4",
DefaultIndexType: "minmax",
DefaultTableEngineOpts: "ENGINE=MergeTree() ORDER BY tuple()",
}), &gorm.Config{})
if err != nil {
return nil, err
}
ckDB, err := conn.DB()
if err != nil {
return nil, err
}
ckDB.SetMaxIdleConns(1)
ckDB.SetMaxOpenConns(10)
ckDB.SetConnMaxLifetime(time.Second * 600)
ckConn = conn
return ckDB, nil
}
func GetDBConn() (*gorm.DB, error) {
var pgDB *sql.DB
if pgConn == nil {
if db, err := _setupPostgresConn(nil); err != nil {
return nil, err
} else {
pgDB = db
}
} else if db, err := pgConn.DB(); err != nil {
if db, err := _setupPostgresConn(nil); err != nil {
return nil, err
} else {
pgDB = db
}
} else {
pgDB = db
}
if err := pgDB.Ping(); err != nil {
_ = pgDB.Close()
if db, err := _setupPostgresConn(nil); err != nil {
return nil, err
} else {
pgDB = db
}
}
return pgConn, nil
}
func GetClickhouseConn() (*gorm.DB, error) {
var ckDB *sql.DB
if ckConn == nil {
if db, err := _setupClickhouseConn(nil); err != nil {
return nil, err
} else {
ckDB = db
}
} else if db, err := ckConn.DB(); err != nil {
if db, err := _setupClickhouseConn(nil); err != nil {
return nil, err
} else {
ckDB = db
}
} else {
ckDB = db
}
if err := ckDB.Ping(); err != nil {
_ = ckDB.Close()
if db, err := _setupClickhouseConn(nil); err != nil {
return nil, err
} else {
ckDB = db
}
}
return ckConn, nil
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)