Appearance
Golang openPlant API
1. 介绍
该文档是golang封装dll操作实时数据库的说明文档,实现了操作数据库的大部分功能,简化操作流程,优化接口
2. 设计原理
该项目底层封装了dll的大部分接口,实现了数据库连接池,用户只需调用封装的接口就行,不需要关心底层如何实现
3. 使用方式
- 将代码放到对应的gopath下面,保证引入的包路径正确
- 使用方式(同步模式)
调用 opapi.InitConnProxy 获取连接池,使用连接池执行对应的方法,连接池封装了具体操作数据库的方法,这种方式可以简单方便操作数据库,但是不适合大批量获取数据,因为同步模式内容占用较高。
使用方式(异步模式)
在初始化连接池和同步模式使用的方式是一样的,使用异步模式需要从连接池中获取连接,使用连接对象的具体函数来操作数据,使用方式和java中的JDBC方式类似,获取连接,执行方法,获取ResultSet,
解析ResultSet,关闭ResultSet,关闭连接,主要是这几步。当前异步模式实现了,Insert,Update,Remove,Find,FindFilter,ExecSQL这几个方法
4.使用示例 (同步模式)
4.0公有配置
var host = "192.168.3.243"
var port = 8200
var user = "sis"
var pwd = "openplant"
var timeout = 60
var min = 2
var max = 20
4.1连接设置
连接配置
var err error
opPool, err = InitConnProxy(host, port, user, pwd, timeout, min, max)
if err != nil {
fmt.Println("init pool error:", err)
}
opPool.SetMinConnectCount(10)
opPool.SetMaxConnectCount(20)
opPool.GetPoolInfo()
opPool.GetSystemTime()
time.Sleep(time.Second * 10)
Destory(opPool)
4.2静态信息
插入
var cols = []string{"GN", "RT"}
var row ={"W3.NODE1.TEST", int8(1)}
var rows = [][]interface{}{row}
logs.Warn("cols:", cols, "rows:", rows)
cols, rs, err := opPool.Insert("W3.Point", cols, rows)
if err != nil {
logs.Error("testing insert node error :", err)
}
logs.Warn("columns:", cols)
logs.Warn("rows:", rs)
更新
var colums []string = []string{"GN", "ED", "EU", "AN"}
var rows [][]interface{} = make([][]interface{}, 0)
//for i := 0; i < 5; i++ {
row := []interface{}{"W3.NODE1.TEST", "this is test1", "KG", "TEST1"}
rows = append(rows, row)
//}
cols, rs, err := opPool.Update("Point", colums, rows)
if err != nil {
logs.Error("testing sql error :", err)
}
logs.Warn("columns:", cols)
logs.Warn("rows:", rs)
删除
logs.Info("testing delete ---------------------------")
var rows [] interface{} = make([]interface{}, 0)
rows = append(rows, "W3.NODE1.TEST")
//rows = append(rows, "W415845")
//rows = append(rows, "W6")
//rows = append(rows, "W7")
//rows = append(rows, "W8")
cols, rs, err := opPool.Remove("Point", rows)
if err != nil {
logs.Error("testing sql error :", err)
}
logs.Warn("columns:", cols)
logs.Warn("rows:", rs)
4.3实时数据
写实时数据
opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
if err != nil {
fmt.Println("init pool error:", err)
}
start := time.Now()
IDs := make([]int64, 0)
AVs := make([]float64, 0)
DSs := make([]int32, 0)
TMs := make([]int64, 0)
tm := time.Now().Unix() - 8*3600 + 100
//for i := 0; i < 12; i++ {
IDs = append(IDs, int64(1492))
AVs = append(AVs, 6.10003)
DSs = append(DSs, int32(36))
TMs = append(TMs, tm)
IDs = append(IDs, int64(1492))
AVs = append(AVs, 6.10003)
DSs = append(DSs, int32(36))
TMs = append(TMs, tm)
//}
cols, rs, err := opPool.WriteRealtime(IDs, AVs, DSs, TMs)
if err != nil {
logs.Error("testing sql error :", err)
}
logs.Warn("columns:", cols)
logs.Warn("rows:", rs)
logs.Warn("use time:", time.Since(start))
获取实时数据
logs.Info("testing delete ---------------------------")
var rows [] interface{} = make([]interface{}, 0)
rows = append(rows, 4195358)
rows = append(rows, 4195359)
rows = append(rows, 4195360)
rows = append(rows, 4195361)
rows = append(rows, 4195362)
cols := []string{"ID", "GN", "PN", "TM", "DS", "AV"}
cols, rs, err := opPool.Find("W5.Realtime", cols, rows)
if err != nil {
logs.Error("testing sql error :", err)
}
logs.Warn("columns:", cols)
logs.Warn("rows:", rs)
4.4历史数据
写历史数据
opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
if err != nil {
fmt.Println("init pool error:", err)
}
start := time.Now()
IDs := make([]int64, 0)
AVs := make([]float64, 0)
DSs := make([]int32, 0)
TMs := make([]int64, 0)
tm := time.Now().Unix()
IDs = append(IDs, 1024)
AVs = append(AVs, float64(4096))
DSs = append(DSs, int32(36))
TMs = append(TMs, tm)
for i := 0; i < 12; i++ {
IDs = append(IDs, int64(i+6292540))
AVs = append(AVs, float64(i+1024))
DSs = append(DSs, int32(36))
TMs = append(TMs, tm)
}
errs, err := opPool.WriteArchive(IDs, AVs, DSs, TMs)
if err != nil {
logs.Error("testing sql error :", err)
}
logs.Warn("columns:", errs)
logs.Warn("use time:", time.Since(start))
获取历史数据
var rows []interface{} = make([]interface{}, 0)
rows = append(rows, 1046)
rows = append(rows, 1047)
rows = append(rows, 1048)
cols := []string{"*"}
opPool, err := opapi.InitConnProxy(host, port, user, pwd, timeout, min, max)
if err != nil {
fmt.Println("init pool error:", err)
}
filters := make([]opapi.Filter, 0)
filter1 := opapi.Filter{
Key: OPConst.ID,
Operate: OPConst.In,
Value: rows,
Relation: OPConst.And,
}
filters = append(filters, filter1)
filter2 := opapi.Filter{
Key: OPConst.TM,
Operate: OPConst.GE,
Value: "2021-07-06 11:00:00",
Relation: OPConst.And,
}
filters = append(filters, filter2)
filter3 := opapi.Filter{
Key: OPConst.TM,
Operate: OPConst.LE,
Value: "2021-07-06 11:56:47",
Relation: OPConst.And,
}
filters = append(filters, filter3)
filter4 := opapi.Filter{
Key: OPConst.Mode,
Value: "span",
}
filters = append(filters, filter4)
filter5 := opapi.Filter{
Key: OPConst.Interval,
Value: "10m",
}
filters = append(filters, filter5)
cols, rs, err := opPool.FindFilter("Archive", cols, filters)
if err != nil {
fmt.Println("testing sql error :", err)
}
fmt.Println("columns:", cols)
fmt.Println("rows:", rs)
4.5报警信息
报警实时
logs.Info("testing sql ---------------------------")
opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
if err != nil {
fmt.Println("init pool error:", err)
}
sqlSelect := "select * from Alarm "
columns, rows, err := opPool.ExecSQL(sqlSelect)
if err != nil {
fmt.Println("testing sql error :", err)
}
logs.Warn("columns:", columns)
logs.Warn("rows length:", len(rows))
for index, row := range rows {
logs.Warn("index:", index, "row:", row)
}
报警历史
logs.Info("testing sql ---------------------------")
opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
if err != nil {
fmt.Println("init pool error:", err)
}
sqlSelect := "select * from AAlarm where TM between '2019-05-10' and '2019-05-20'"
columns, rows, err := opPool.ExecSQL(sqlSelect)
if err != nil {
fmt.Println("testing sql error :", err)
}
logs.Warn("columns:", columns)
logs.Warn("rows length:", len(rows))
for index, row := range rows {
logs.Warn("index:", index, "row:", row)
}
4.6统计信息
统计信息
idss := []interface{}{1024, 1025, 1026, 1028, 1062, 118737}
filters := make([]Filter, 0)
filter := Filter{
Key: OPConst.ID,
Operate: OPConst.In,
Value: idss,
Relation: OPConst.And,
}
filters = append(filters, filter)
filter1 := Filter{
Key: OPConst.TM,
Operate: OPConst.GE,
Value: "2018-11-18 10:55:45",
Relation: OPConst.And,
}
filters = append(filters, filter1)
filter2 := Filter{
Key: OPConst.TM,
Operate: OPConst.LE,
Value: "2018-11-19 10:55:45",
Relation: OPConst.And,
}
filters = append(filters, filter2)
filter3 := Filter{
Key: OPConst.Limit,
Value: "0,10",
}
filters = append(filters, filter3)
filter4 := Filter{
Key: OPConst.Order,
Value: "ID desc",
}
filters = append(filters, filter4)
filter5 := Filter{
Key: OPConst.Interval,
Value: "86400s",
}
filters = append(filters, filter5)
cols := []string{OPConst.ID, OPConst.GN, OPConst.AV, OPConst.TM, OPConst.MAXV, OPConst.MAXTIME, OPConst.MINV, OPConst.MINTIME, OPConst.AVGV, OPConst.FLOW}
cls, rs, err := opPool.FindFilter(OPConst.Stat, cols, filters)
if err != nil {
logs.Error("testing find filter stat error :", err)
}
logs.Warn("columns:", cls)
logs.Warn("rows:", rs)
4.7订阅使用
订阅使用
var callback = func(columns []string, rows []map[string]interface{}, err error) {
logs.Warn("sub columns:", columns)
logs.Warn("sub rows:", rows)
logs.Warn("sub err:", err)
}
func sub() {
var rows [] interface{} = make([]interface{}, 0)
rows = append(rows, 117995)
rows = append(rows, 115048)
handler, errs, err := opPool.Subscribe(OPConst.Realtime, rows, callback)
if err != nil {
logs.Error("testing subscribe error :", err)
}
logs.Warn("handler:", handler)
logs.Warn("errs:", errs)
logs.Warn("error:", err)
time.Sleep(time.Second * 10)
rows = []interface{}{115041}
fmt.Println(opPool.Subscription(handler, rows))
time.Sleep(time.Second * 10)
rows = []interface{}{115048, 117995}
fmt.Println(opPool.Unsubscribe(handler, rows))
time.Sleep(time.Second * 10)
}
4.8SQL使用
SQL使用
opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
if err != nil {
fmt.Println("init pool error:", err)
}
sqlSelect := "delete from Point where GN ='W3.SYS.DBLOAD'"
columns, rows, err := opPool.ExecSQL(sqlSelect)
if err != nil {
fmt.Println("testing sql error :", err)
}
logs.Warn("columns:", columns)
logs.Warn("rows length:", len(rows))
for index, row := range rows {
logs.Warn("index:", index, "row:", row)
}
5.特殊接口使用
5.1写实时
写实时
opPool, _ := InitConnProxy(host, port, user, pwd, timeout, min, max)
var cols = []string{"ID", "AV"}
for i := 0; i < 10000; i++ {
realtime, err := NewRealtime(cols)
if err != nil {
logs.Warn("get realtime table error:", err)
return
}
rowLong := realtime.GetRow()
rowLong.SetID(int64(1029))
if i%5==0{
rowLong.SetAVNil()
}else{
rowLong.SetAVInt(int64(i))
}
realtime.AddRow(rowLong)
rowText := realtime.GetRow()
rowText.SetID(int64(1030))
rowText.SetAVString("this is text" + strconv.FormatInt(int64(i), 10))
realtime.AddRow(rowText)
rowBlob := realtime.GetRow()
rowBlob.SetID(int64(1031))
rowBlob.SetAVBytes([]byte("this" + strconv.FormatInt(int64(i), 10)))
realtime.AddRow(rowBlob)
opPool.WriteRealtimeTable(realtime)
time.Sleep(time.Second)
}
5.2写历史
写历史
opPool, _ := InitConnProxy(host, port, user, pwd, timeout, min, max)
var cols = []string{"ID", "AV", "TM"}
for i := 0; i < 10000; i++ {
archive, err := NewArchive(cols)
if err != nil {
logs.Warn("get realtime table error:", err)
return
}
t := time.Now().Unix() - 10
row := archive.GetRow()
row.SetID(int64(1024))
row.SetAVInt(int64(i))
row.SetTM(float64(t))
archive.AddRow(row)
row.SetID(int64(1025))
row.SetAVInt(int64(i % 2))
row.SetTM(float64(t))
archive.AddRow(row)
row = archive.GetRow()
row.SetID(int64(1026))
row.SetAVInt(int64(rand.Int31()))
row.SetTM(float64(t))
archive.AddRow(row)
row = archive.GetRow()
row.SetID(int64(1027))
row.SetAVInt(int64(rand.Int31()))
row.SetTM(float64(t))
archive.AddRow(row)
row = archive.GetRow()
row.SetID(int64(1028))
row.SetAVFloat(rand.Float64())
row.SetTM(float64(t))
archive.AddRow(row)
row = archive.GetRow()
row.SetID(int64(1029))
row.SetAVInt(int64(i))
row.SetTM(float64(t))
archive.AddRow(row)
row = archive.GetRow()
row.SetID(int64(1030))
row.SetAVString("this is text" + strconv.FormatInt(int64(i), 10))
row.SetTM(float64(t))
archive.AddRow(row)
row = archive.GetRow()
row.SetID(int64(1031))
row.SetAVBytes([]byte("this" + strconv.FormatInt(int64(i), 10)))
row.SetTM(float64(t))
archive.AddRow(row)
row = archive.GetRow()
row.SetID(int64(2098176))
row.SetAVInt(int64(i))
row.SetTM(float64(t))
archive.AddRow(row)
row.SetID(int64(2098177))
row.SetAVInt(int64(i % 2))
row.SetTM(float64(t))
archive.AddRow(row)
row = archive.GetRow()
row.SetID(int64(2098178))
row.SetAVInt(int64(rand.Int31()))
row.SetTM(float64(t))
archive.AddRow(row)
row = archive.GetRow()
row.SetID(int64(2098179))
row.SetAVInt(int64(rand.Int31()))
row.SetTM(float64(t))
archive.AddRow(row)
row = archive.GetRow()
row.SetID(int64(2098180))
row.SetAVFloat(rand.Float64())
row.SetTM(float64(t))
archive.AddRow(row)
row = archive.GetRow()
row.SetID(int64(2098181))
row.SetAVInt(int64(i))
row.SetTM(float64(t))
archive.AddRow(row)
row = archive.GetRow()
row.SetID(int64(2098182))
row.SetAVString("this is text" + strconv.FormatInt(int64(i), 10))
row.SetTM(float64(t))
archive.AddRow(row)
row = archive.GetRow()
row.SetID(int64(2098183))
row.SetAVBytes([]byte("this" + strconv.FormatInt(int64(i), 10)))
row.SetTM(float64(t))
archive.AddRow(row)
cols, rs, err := opPool.WriteArchiveTable(archive)
fmt.Println("index:", i)
fmt.Println("columns:", cols)
fmt.Println("rows:", rs)
fmt.Println("err:", err)
time.Sleep(time.Second)
}
6.操作数据(异步模式)
操作数据(异步模式)
初始化连接池和同步模式一样
var err error
opPool, err = InitConnProxy(host, port, user, pwd, timeout, min, max)
if err != nil {
fmt.Println("init pool error:", err)
}
opPool.SetMinConnectCount(10)
opPool.SetMaxConnectCount(20)
opPool.GetPoolInfo()
opPool.GetSystemTime()
time.Sleep(time.Second * 10)
Destory(opPool)
6.1插入数据 Insert
插入数据
logs.Info("testing conn insert ---------------------------")
opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
if err != nil {
log.Fatal("init pool error:", err)
}
conn, err := opPool.GetConnect()
if err != nil {
log.Fatal("get conn error:", err)
}
var cols = []string{"GN", "RT"}
var rows = [][]interface{}{{"W3.NODE.TEST", int8(1)}}
logs.Warn("cols:", cols, "rows:", rows)
rs, err := conn.Insert("Point", cols, rows)
if err != nil {
logs.Error("testing insert error :", err)
}
fmt.Println("result columns:", rs.GetColumns())
i := 0
for rs.Next() {
i++
fmt.Print(rs.GetInt(0))
fmt.Print(rs.GetInt(1))
fmt.Println(rs.GetString(2))
}
rs.Close()
opPool.Close(conn)
6.2更新数据 Update
更新数据
logs.Info("testing conn update ---------------------------")
opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
if err != nil {
log.Fatal("init pool error:", err)
}
conn, err := opPool.GetConnect()
if err != nil {
log.Fatal("get conn error:", err)
}
var cols = []string{"GN", "ED"}
var rows = [][]interface{}{{"W3.NODE.TEST", "this is test "}}
logs.Warn("cols:", cols, "rows:", rows)
rs, err := conn.Update("Point", cols, rows)
if err != nil {
logs.Error("testing update error :", err)
}
fmt.Println("result columns:", rs.GetColumns())
i := 0
for rs.Next() {
i++
fmt.Print(rs.GetInt(0))
fmt.Print(rs.GetString(1))
fmt.Print(rs.GetString(2))
}
rs.Close()
opPool.Close(conn)
6.3删除数据 Remove
删除数据
logs.Info("testing conn remove ---------------------------")
opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
if err != nil {
log.Fatal("init pool error:", err)
}
conn, err := opPool.GetConnect()
if err != nil {
log.Fatal("get conn error:", err)
}
var rows = []interface{}{"W3.NODE.TEST"}
logs.Warn("rows:", rows)
rs, err := conn.Remove("Point", rows)
if err != nil {
log.Fatal("testing remove error :", err)
}
fmt.Println("result columns:", rs.GetColumns())
i := 0
for rs.Next() {
i++
fmt.Print(rs.GetInt(0))
fmt.Print(rs.GetString(1))
}
**rs.Close()
opPool.Close(conn)**
6.4查找数据 Find
查找数据
logs.Info("testing begin")
opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
if err != nil {
log.Fatal("init pool error:", err)
}
logs.Info("testing find ---------------------------")
var colums []string = []string{"ID", "GN", "TM", "AV"}
var rows [] interface{} = make([]interface{}, 0)
for i := 1024; i < 10000; i++ {
rows = append(rows, i)
}
conn, err := opPool.GetConnect()
if err != nil {
log.Fatal("get conn error:", err)
}
ttt := time.Now()
rs, err := conn.Find("Realtime", colums, rows)
if err != nil {
log.Fatal("test conn find error:", err)
}
fmt.Println("result columns:", rs.GetColumns())
i := 0
for rs.Next() {
i++
if i%100 == 0 {
fmt.Print(rs.GetInt(0))
fmt.Print(rs.GetString(1))
fmt.Println(rs.GetDouble(2))
}
}
rs.Close()
opPool.Close(conn)
fmt.Println("async use time:", time.Since(ttt))
fmt.Println("total:", i)
6.5查找数据带过滤条件 FindFilter
查找数据带过滤条件
logs.Info("testing begin")
opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
if err != nil {
log.Fatal("init pool error:", err)
}
logs.Info("testing find filter ---------------------------")
idss := []interface{}{1024, 1025, 1026, 1028, 1062, 118737}
filters := make([]Filter, 0)
filter := Filter{
Key: OPConst.ID,
Operate: OPConst.In,
Value: idss,
Relation: OPConst.And,
}
filters = append(filters, filter)
filter1 := Filter{
Key: OPConst.TM,
Operate: OPConst.GE,
Value: "2018-11-18 10:55:45",
Relation: OPConst.And,
}
filters = append(filters, filter1)
filter2 := Filter{
Key: OPConst.TM,
Operate: OPConst.LE,
Value: "2018-11-19 10:55:45",
Relation: OPConst.And,
}
filters = append(filters, filter2)
filter3 := Filter{
Key: OPConst.Limit,
Value: "0,10",
}
filters = append(filters, filter3)
filter4 := Filter{
Key: OPConst.Order,
Value: "ID desc",
}
filters = append(filters, filter4)
filter5 := Filter{
Key: OPConst.Interval,
Value: "86400s",
}
filters = append(filters, filter5)
cols := []string{OPConst.ID, OPConst.GN, OPConst.AV, OPConst.TM, OPConst.MAXV, OPConst.MAXTIME, OPConst.MINV, OPConst.MINTIME, OPConst.AVGV, OPConst.FLOW}
conn, err := opPool.GetConnect()
if err != nil {
log.Fatal("get conn error:", err)
}
ttt := time.Now()
rs, err := conn.FindFilter(OPConst.Stat, cols, filters)
fmt.Println("result columns:", rs.GetColumns())
i := 0
for rs.Next() {
i++
//if i%10000 == 0 {
fmt.Print(rs.GetInt(0))
fmt.Print(rs.GetString(1))
fmt.Print(rs.GetDouble(2))
fmt.Println(rs.GetDouble(3))
//}
}
rs.Close()
opPool.Close(conn)
fmt.Println("async use time:", time.Since(ttt))
fmt.Println("total:", i)
6.6执行SQL ExecSQL
执行SQL
logs.Info("testing begin")
opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
if err != nil {
log.Fatal("init pool error:", err)
}
logs.Info("testing async exec sql ---------------------------")
sql := "select ID,GN,AV,TM from Realtime where GN like 'W3.SYS.%' limit 0,100"
conn, err := opPool.GetConnect()
if err != nil {
log.Fatal("get conn error:", err)
}
ttt := time.Now()
rs, err := conn.ExecSQL(sql)
if err != nil {
log.Fatal("test conn find error:", err)
}
fmt.Println("result columns:", rs.GetColumns())
i := 0
for rs.Next() {
i++
//if i%10000 == 0 {
fmt.Print(rs.GetInt(0))
fmt.Print(rs.GetString(1))
fmt.Print(rs.GetDouble(2))
fmt.Println(rs.GetDouble(3))
//}
}
rs.Close()
opPool.Close(conn)
fmt.Println("async use time:", time.Since(ttt))
fmt.Println("total:", i)