Skip to content

Golang openPlant API

1. 介绍

该文档是golang封装dll操作实时数据库的说明文档,实现了操作数据库的大部分功能,简化操作流程,优化接口

2. 设计原理

该项目底层封装了dll的大部分接口,实现了数据库连接池,用户只需调用封装的接口就行,不需要关心底层如何实现

3. 使用方式

  1. 将代码放到对应的gopath下面,保证引入的包路径正确
  2. 使用方式(同步模式)
    调用 opapi.InitConnProxy 获取连接池,使用连接池执行对应的方法,连接池封装了具体操作数据库的方法,这种方式可以简单方便操作数据库,但是不适合大批量获取数据,因为同步模式内容占用较高。
    使用方式(异步模式)
    在初始化连接池和同步模式使用的方式是一样的,使用异步模式需要从连接池中获取连接,使用连接对象的具体函数来操作数据,使用方式和java中的JDBC方式类似,获取连接,执行方法,获取ResultSet,
    解析ResultSet,关闭ResultSet,关闭连接,主要是这几步。当前异步模式实现了,Insert,Update,Remove,Find,FindFilter,ExecSQL这几个方法

4.使用示例 (同步模式)

4.0公有配置

var host = "192.168.3.243"
var port = 8200
var user = "sis"
var pwd = "openplant"
var timeout = 60
var min = 2
var max = 20

4.1连接设置

连接配置
		var err error
		opPool, err = InitConnProxy(host, port, user, pwd, timeout, min, max)
		if err != nil {
			fmt.Println("init pool error:", err)
		}
		opPool.SetMinConnectCount(10)
		opPool.SetMaxConnectCount(20)
		opPool.GetPoolInfo()
		opPool.GetSystemTime()
		time.Sleep(time.Second * 10)
		Destory(opPool)

4.2静态信息

插入
    var cols = []string{"GN", "RT"}
    var row ={"W3.NODE1.TEST", int8(1)}
    var rows = [][]interface{}{row}
    logs.Warn("cols:", cols, "rows:", rows)
    cols, rs, err := opPool.Insert("W3.Point", cols, rows)
    if err != nil {
        logs.Error("testing insert node  error :", err)
    }
    logs.Warn("columns:", cols)
    logs.Warn("rows:", rs)
更新
    var colums []string = []string{"GN", "ED", "EU", "AN"}
    var rows [][]interface{} = make([][]interface{}, 0)
    //for i := 0; i < 5; i++ {
    row := []interface{}{"W3.NODE1.TEST", "this is test1", "KG", "TEST1"}
    rows = append(rows, row)
    //}
    cols, rs, err := opPool.Update("Point", colums, rows)
    if err != nil {
        logs.Error("testing sql error :", err)
    }
    logs.Warn("columns:", cols)
    logs.Warn("rows:", rs)
删除
    logs.Info("testing delete ---------------------------")
    var rows [] interface{} = make([]interface{}, 0)
    rows = append(rows, "W3.NODE1.TEST")
    //rows = append(rows, "W415845")
    //rows = append(rows, "W6")
    //rows = append(rows, "W7")
    //rows = append(rows, "W8")
    cols, rs, err := opPool.Remove("Point", rows)
    if err != nil {
        logs.Error("testing sql error :", err)
    }
    logs.Warn("columns:", cols)
    logs.Warn("rows:", rs)

4.3实时数据

写实时数据
    opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
        if err != nil {
            fmt.Println("init pool error:", err)
        }
        start := time.Now()
        IDs := make([]int64, 0)
        AVs := make([]float64, 0)
        DSs := make([]int32, 0)
        TMs := make([]int64, 0)
        tm := time.Now().Unix() - 8*3600 + 100
        //for i := 0; i < 12; i++ {
        IDs = append(IDs, int64(1492))
        AVs = append(AVs, 6.10003)
        DSs = append(DSs, int32(36))
        TMs = append(TMs, tm)
        IDs = append(IDs, int64(1492))
        AVs = append(AVs, 6.10003)
        DSs = append(DSs, int32(36))
        TMs = append(TMs, tm)
        //}
        cols, rs, err := opPool.WriteRealtime(IDs, AVs, DSs, TMs)
        if err != nil {
            logs.Error("testing sql error :", err)
        }
        logs.Warn("columns:", cols)
        logs.Warn("rows:", rs)
        logs.Warn("use time:", time.Since(start))
获取实时数据
    logs.Info("testing delete ---------------------------")
    var rows [] interface{} = make([]interface{}, 0)
    rows = append(rows, 4195358)
    rows = append(rows, 4195359)
    rows = append(rows, 4195360)
    rows = append(rows, 4195361)
    rows = append(rows, 4195362)
    cols := []string{"ID", "GN", "PN", "TM", "DS", "AV"}
    cols, rs, err := opPool.Find("W5.Realtime", cols, rows)
    if err != nil {
        logs.Error("testing sql error :", err)
    }
    logs.Warn("columns:", cols)
    logs.Warn("rows:", rs)

4.4历史数据

写历史数据
    opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
    if err != nil {
        fmt.Println("init pool error:", err)
    }
    start := time.Now()
    IDs := make([]int64, 0)
    AVs := make([]float64, 0)
    DSs := make([]int32, 0)
    TMs := make([]int64, 0)
    tm := time.Now().Unix()
    IDs = append(IDs, 1024)
    AVs = append(AVs, float64(4096))
    DSs = append(DSs, int32(36))
    TMs = append(TMs, tm)
    for i := 0; i < 12; i++ {
        IDs = append(IDs, int64(i+6292540))
        AVs = append(AVs, float64(i+1024))
        DSs = append(DSs, int32(36))
        TMs = append(TMs, tm)
    }
    errs, err := opPool.WriteArchive(IDs, AVs, DSs, TMs)
    if err != nil {
        logs.Error("testing sql error :", err)
    }
    logs.Warn("columns:", errs)
    logs.Warn("use time:", time.Since(start))
获取历史数据
	var rows []interface{} = make([]interface{}, 0)
    rows = append(rows, 1046)
    rows = append(rows, 1047)
    rows = append(rows, 1048)
    cols := []string{"*"}
    opPool, err := opapi.InitConnProxy(host, port, user, pwd, timeout, min, max)
    if err != nil {
            fmt.Println("init pool error:", err)
    }
    filters := make([]opapi.Filter, 0)
    filter1 := opapi.Filter{
            Key:      OPConst.ID,
            Operate:  OPConst.In,
            Value:    rows,
            Relation: OPConst.And,
    }
    filters = append(filters, filter1)
    filter2 := opapi.Filter{
            Key:      OPConst.TM,
            Operate:  OPConst.GE,
            Value:    "2021-07-06 11:00:00",
            Relation: OPConst.And,
    }
    filters = append(filters, filter2)
    filter3 := opapi.Filter{
            Key:      OPConst.TM,
            Operate:  OPConst.LE,
            Value:    "2021-07-06 11:56:47",
            Relation: OPConst.And,
    }
    filters = append(filters, filter3)

    filter4 := opapi.Filter{
            Key:   OPConst.Mode,
            Value: "span",
    }
    filters = append(filters, filter4)

    filter5 := opapi.Filter{
            Key:   OPConst.Interval,
            Value: "10m",
    }
    filters = append(filters, filter5)
    cols, rs, err := opPool.FindFilter("Archive", cols, filters)
    if err != nil {
            fmt.Println("testing sql error :", err)
    }
    fmt.Println("columns:", cols)
    fmt.Println("rows:", rs)

4.5报警信息

报警实时
    logs.Info("testing sql ---------------------------")
    opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
    if err != nil {
        fmt.Println("init pool error:", err)
    }
    sqlSelect := "select * from Alarm "
    columns, rows, err := opPool.ExecSQL(sqlSelect)
    if err != nil {
        fmt.Println("testing sql error :", err)
    }
    logs.Warn("columns:", columns)
    logs.Warn("rows length:", len(rows))
    for index, row := range rows {
        logs.Warn("index:", index, "row:", row)
    }
报警历史
    logs.Info("testing sql ---------------------------")
    opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
    if err != nil {
        fmt.Println("init pool error:", err)
    }
    sqlSelect := "select * from AAlarm where TM between '2019-05-10' and '2019-05-20'"
    columns, rows, err := opPool.ExecSQL(sqlSelect)
    if err != nil {
        fmt.Println("testing sql error :", err)
    }
    logs.Warn("columns:", columns)
    logs.Warn("rows length:", len(rows))
    for index, row := range rows {
        logs.Warn("index:", index, "row:", row)
    }

4.6统计信息

统计信息
    idss := []interface{}{1024, 1025, 1026, 1028, 1062, 118737}
    filters := make([]Filter, 0)
    filter := Filter{
        Key:      OPConst.ID,
        Operate:  OPConst.In,
        Value:    idss,
        Relation: OPConst.And,
    }
    filters = append(filters, filter)
    filter1 := Filter{
        Key:      OPConst.TM,
        Operate:  OPConst.GE,
        Value:    "2018-11-18 10:55:45",
        Relation: OPConst.And,
    }
    filters = append(filters, filter1)
    filter2 := Filter{
        Key:      OPConst.TM,
        Operate:  OPConst.LE,
        Value:    "2018-11-19 10:55:45",
        Relation: OPConst.And,
    }
    filters = append(filters, filter2)

    filter3 := Filter{
        Key:   OPConst.Limit,
        Value: "0,10",
    }
    filters = append(filters, filter3)
    filter4 := Filter{
        Key:   OPConst.Order,
        Value: "ID desc",
    }
    filters = append(filters, filter4)

    filter5 := Filter{
        Key:   OPConst.Interval,
        Value: "86400s",
    }
    filters = append(filters, filter5)

    cols := []string{OPConst.ID, OPConst.GN, OPConst.AV, OPConst.TM, OPConst.MAXV, OPConst.MAXTIME, OPConst.MINV, OPConst.MINTIME, OPConst.AVGV, OPConst.FLOW}
    cls, rs, err := opPool.FindFilter(OPConst.Stat, cols, filters)
    if err != nil {
        logs.Error("testing find filter stat error :", err)
    }
    logs.Warn("columns:", cls)
    logs.Warn("rows:", rs)

4.7订阅使用

订阅使用
    var callback = func(columns []string, rows []map[string]interface{}, err error) {
    	logs.Warn("sub columns:", columns)
    	logs.Warn("sub rows:", rows)
    	logs.Warn("sub err:", err)
    }
    
    func sub() {
    	var rows [] interface{} = make([]interface{}, 0)
    	rows = append(rows, 117995)
    	rows = append(rows, 115048)
    	handler, errs, err := opPool.Subscribe(OPConst.Realtime, rows, callback)
    	if err != nil {
    		logs.Error("testing subscribe error :", err)
    	}
    	logs.Warn("handler:", handler)
    	logs.Warn("errs:", errs)
    	logs.Warn("error:", err)
    	time.Sleep(time.Second * 10)
    	rows = []interface{}{115041}
    	fmt.Println(opPool.Subscription(handler, rows))
    	time.Sleep(time.Second * 10)
    	rows = []interface{}{115048, 117995}
    	fmt.Println(opPool.Unsubscribe(handler, rows))
    	time.Sleep(time.Second * 10)
    }

4.8SQL使用

SQL使用
    opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
    if err != nil {
        fmt.Println("init pool error:", err)
    }
    sqlSelect := "delete from Point where GN ='W3.SYS.DBLOAD'"
    columns, rows, err := opPool.ExecSQL(sqlSelect)
    if err != nil {
        fmt.Println("testing sql error :", err)
    }
    logs.Warn("columns:", columns)
    logs.Warn("rows length:", len(rows))
    for index, row := range rows {
        logs.Warn("index:", index, "row:", row)
    }

5.特殊接口使用

5.1写实时

写实时
    opPool, _ := InitConnProxy(host, port, user, pwd, timeout, min, max)
    var cols = []string{"ID", "AV"}
    for i := 0; i < 10000; i++ {
        realtime, err := NewRealtime(cols)
        if err != nil {
            logs.Warn("get realtime table error:", err)
            return
        }
        rowLong := realtime.GetRow()
        rowLong.SetID(int64(1029))
		if i%5==0{
			rowLong.SetAVNil()
		}else{
			rowLong.SetAVInt(int64(i))
		}
        realtime.AddRow(rowLong)
        rowText := realtime.GetRow()
        rowText.SetID(int64(1030))
        rowText.SetAVString("this is text" + strconv.FormatInt(int64(i), 10))
        realtime.AddRow(rowText)
        rowBlob := realtime.GetRow()
        rowBlob.SetID(int64(1031))
        rowBlob.SetAVBytes([]byte("this" + strconv.FormatInt(int64(i), 10)))
        realtime.AddRow(rowBlob)
        opPool.WriteRealtimeTable(realtime)
        time.Sleep(time.Second)
    }

5.2写历史

写历史
    opPool, _ := InitConnProxy(host, port, user, pwd, timeout, min, max)
    var cols = []string{"ID", "AV", "TM"}
    for i := 0; i < 10000; i++ {
        archive, err := NewArchive(cols)
        if err != nil {
            logs.Warn("get realtime table error:", err)
            return
        }
        t := time.Now().Unix() - 10

        row := archive.GetRow()
        row.SetID(int64(1024))
        row.SetAVInt(int64(i))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row.SetID(int64(1025))
        row.SetAVInt(int64(i % 2))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(1026))
        row.SetAVInt(int64(rand.Int31()))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(1027))
        row.SetAVInt(int64(rand.Int31()))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(1028))
        row.SetAVFloat(rand.Float64())
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(1029))
        row.SetAVInt(int64(i))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(1030))
        row.SetAVString("this is text" + strconv.FormatInt(int64(i), 10))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(1031))
        row.SetAVBytes([]byte("this" + strconv.FormatInt(int64(i), 10)))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(2098176))
        row.SetAVInt(int64(i))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row.SetID(int64(2098177))
        row.SetAVInt(int64(i % 2))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(2098178))
        row.SetAVInt(int64(rand.Int31()))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(2098179))
        row.SetAVInt(int64(rand.Int31()))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(2098180))
        row.SetAVFloat(rand.Float64())
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(2098181))
        row.SetAVInt(int64(i))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(2098182))
        row.SetAVString("this is text" + strconv.FormatInt(int64(i), 10))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(2098183))
        row.SetAVBytes([]byte("this" + strconv.FormatInt(int64(i), 10)))
        row.SetTM(float64(t))
        archive.AddRow(row)

        cols, rs, err := opPool.WriteArchiveTable(archive)
        fmt.Println("index:", i)
        fmt.Println("columns:", cols)
        fmt.Println("rows:", rs)
        fmt.Println("err:", err)
        time.Sleep(time.Second)
    }

6.操作数据(异步模式)

操作数据(异步模式)
	初始化连接池和同步模式一样
	var err error
    opPool, err = InitConnProxy(host, port, user, pwd, timeout, min, max)
    if err != nil {
        fmt.Println("init pool error:", err)
    }
    opPool.SetMinConnectCount(10)
    opPool.SetMaxConnectCount(20)
    opPool.GetPoolInfo()
    opPool.GetSystemTime()
    time.Sleep(time.Second * 10)
    Destory(opPool)

6.1插入数据 Insert

插入数据
	logs.Info("testing conn insert ---------------------------")
	opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
	if err != nil {
		log.Fatal("init pool error:", err)
	}
	conn, err := opPool.GetConnect()
	if err != nil {
		log.Fatal("get conn error:", err)
	}
	var cols = []string{"GN", "RT"}
	var rows = [][]interface{}{{"W3.NODE.TEST", int8(1)}}
	logs.Warn("cols:", cols, "rows:", rows)
	rs, err := conn.Insert("Point", cols, rows)
	if err != nil {
		logs.Error("testing insert error :", err)
	}
	fmt.Println("result columns:", rs.GetColumns())
	i := 0
	for rs.Next() {
		i++
		fmt.Print(rs.GetInt(0))
		fmt.Print(rs.GetInt(1))
		fmt.Println(rs.GetString(2))
	}
	rs.Close()
	opPool.Close(conn)

6.2更新数据 Update

更新数据
	logs.Info("testing conn update ---------------------------")
	opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
	if err != nil {
		log.Fatal("init pool error:", err)
	}
	conn, err := opPool.GetConnect()
	if err != nil {
		log.Fatal("get conn error:", err)
	}
	var cols = []string{"GN", "ED"}
	var rows = [][]interface{}{{"W3.NODE.TEST", "this is test "}}
	logs.Warn("cols:", cols, "rows:", rows)
	rs, err := conn.Update("Point", cols, rows)
	if err != nil {
		logs.Error("testing update error :", err)
	}
	fmt.Println("result columns:", rs.GetColumns())
	i := 0
	for rs.Next() {
		i++
		fmt.Print(rs.GetInt(0))
		fmt.Print(rs.GetString(1))
		fmt.Print(rs.GetString(2))
	}
	rs.Close()
	opPool.Close(conn)

6.3删除数据 Remove

删除数据
	logs.Info("testing conn remove ---------------------------")
	opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
	if err != nil {
		log.Fatal("init pool error:", err)
	}
	conn, err := opPool.GetConnect()
	if err != nil {
		log.Fatal("get conn error:", err)
	}
	var rows = []interface{}{"W3.NODE.TEST"}
	logs.Warn("rows:", rows)
	rs, err := conn.Remove("Point", rows)
	if err != nil {
		log.Fatal("testing remove error :", err)
	}
	fmt.Println("result columns:", rs.GetColumns())
	i := 0
	for rs.Next() {
		i++
		fmt.Print(rs.GetInt(0))
		fmt.Print(rs.GetString(1))
	}
	**rs.Close()
	opPool.Close(conn)**

6.4查找数据 Find

查找数据
	logs.Info("testing begin")
	opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
	if err != nil {
		log.Fatal("init pool error:", err)
	}
	logs.Info("testing find ---------------------------")
	var colums []string = []string{"ID", "GN", "TM", "AV"}
	var rows [] interface{} = make([]interface{}, 0)
	for i := 1024; i < 10000; i++ {
		rows = append(rows, i)
	}
	conn, err := opPool.GetConnect()
	if err != nil {
		log.Fatal("get conn error:", err)
	}
	ttt := time.Now()
	rs, err := conn.Find("Realtime", colums, rows)
	if err != nil {
		log.Fatal("test conn find error:", err)
	}
	fmt.Println("result columns:", rs.GetColumns())
	i := 0
	for rs.Next() {
		i++
		if i%100 == 0 {
			fmt.Print(rs.GetInt(0))
			fmt.Print(rs.GetString(1))
			fmt.Println(rs.GetDouble(2))
		}
	}
	rs.Close()
	opPool.Close(conn)
	fmt.Println("async use time:", time.Since(ttt))
	fmt.Println("total:", i)

6.5查找数据带过滤条件 FindFilter

查找数据带过滤条件
	logs.Info("testing begin")
	opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
	if err != nil {
		log.Fatal("init pool error:", err)
	}
	logs.Info("testing find filter ---------------------------")
	idss := []interface{}{1024, 1025, 1026, 1028, 1062, 118737}
	filters := make([]Filter, 0)
	filter := Filter{
		Key:      OPConst.ID,
		Operate:  OPConst.In,
		Value:    idss,
		Relation: OPConst.And,
	}
	filters = append(filters, filter)
	filter1 := Filter{
		Key:      OPConst.TM,
		Operate:  OPConst.GE,
		Value:    "2018-11-18 10:55:45",
		Relation: OPConst.And,
	}
	filters = append(filters, filter1)
	filter2 := Filter{
		Key:      OPConst.TM,
		Operate:  OPConst.LE,
		Value:    "2018-11-19 10:55:45",
		Relation: OPConst.And,
	}
	filters = append(filters, filter2)

	filter3 := Filter{
		Key:   OPConst.Limit,
		Value: "0,10",
	}
	filters = append(filters, filter3)
	filter4 := Filter{
		Key:   OPConst.Order,
		Value: "ID desc",
	}
	filters = append(filters, filter4)

	filter5 := Filter{
		Key:   OPConst.Interval,
		Value: "86400s",
	}
	filters = append(filters, filter5)
	cols := []string{OPConst.ID, OPConst.GN, OPConst.AV, OPConst.TM, OPConst.MAXV, OPConst.MAXTIME, OPConst.MINV, OPConst.MINTIME, OPConst.AVGV, OPConst.FLOW}
	conn, err := opPool.GetConnect()
	if err != nil {
		log.Fatal("get conn error:", err)
	}
	ttt := time.Now()
	rs, err := conn.FindFilter(OPConst.Stat, cols, filters)
	fmt.Println("result columns:", rs.GetColumns())
	i := 0
	for rs.Next() {
		i++
		//if i%10000 == 0 {
		fmt.Print(rs.GetInt(0))
		fmt.Print(rs.GetString(1))
		fmt.Print(rs.GetDouble(2))
		fmt.Println(rs.GetDouble(3))
		//}
	}
	rs.Close()
	opPool.Close(conn)
	fmt.Println("async use time:", time.Since(ttt))
	fmt.Println("total:", i)

6.6执行SQL ExecSQL

执行SQL
	logs.Info("testing begin")
	opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
	if err != nil {
		log.Fatal("init pool error:", err)
	}
	logs.Info("testing async exec sql ---------------------------")
	sql := "select ID,GN,AV,TM from Realtime where GN like 'W3.SYS.%' limit 0,100"

	conn, err := opPool.GetConnect()
	if err != nil {
		log.Fatal("get conn error:", err)
	}
	ttt := time.Now()
	rs, err := conn.ExecSQL(sql)
	if err != nil {
		log.Fatal("test conn find error:", err)
	}
	fmt.Println("result columns:", rs.GetColumns())
	i := 0
	for rs.Next() {
		i++
		//if i%10000 == 0 {
		fmt.Print(rs.GetInt(0))
		fmt.Print(rs.GetString(1))
		fmt.Print(rs.GetDouble(2))
		fmt.Println(rs.GetDouble(3))
		//}
	}
	rs.Close()
	opPool.Close(conn)
	fmt.Println("async use time:", time.Since(ttt))
	fmt.Println("total:", i)