Skip to content

Golang openPlant API

1. 介绍

该文档是golang封装dll操作实时数据库的说明文档,实现了操作数据库的大部分功能,简化操作流程,优化接口

2. 设计原理

该项目底层封装了dll的大部分接口,实现了数据库连接池,用户只需调用封装的接口就行,不需要关心底层如何实现

3. 使用方式

  1. 将代码放到对应的gopath下面,保证引入的包路径正确
  2. 使用方式
    调用 opapi.InitConnProxy 获取连接池,使用连接对象的具体函数来操作数据,使用方式和java中的JDBC方式类似,获取连接,执行方法,获取ResultSet,
    解析ResultSet,关闭ResultSet,关闭连接。
    支持的方法:Insert,Update,Remove,Find,FindFilter,ExecSQL

4.使用示例

4.0 公有配置

var host = "192.168.3.243"
var port = 8200
var user = "sis"
var pwd = "openplant"
var timeout = 60
var min = 2
var max = 20

4.1 连接设置

连接配置
		var err error
		opPool, err = InitConnProxy(host, port, user, pwd, timeout, min, max)
		if err != nil {
			fmt.Println("init pool error:", err)
		}
		opPool.SetMinConnectCount(10)
		opPool.SetMaxConnectCount(20)
		opPool.GetPoolInfo()
		opPool.GetSystemTime()
		time.Sleep(time.Second * 10)
		Destory(opPool)

4.2 静态信息

插入
    var cols = []string{"GN", "RT"}
    var row ={"W3.NODE1.TEST", int8(1)}
    var rows = [][]interface{}{row}
    logs.Warn("cols:", cols, "rows:", rows)
    cols, rs, err := opPool.Insert("W3.Point", cols, rows)
    if err != nil {
        logs.Error("testing insert node  error :", err)
    }
    logs.Warn("columns:", cols)
    logs.Warn("rows:", rs)
更新
    var colums []string = []string{"GN", "ED", "EU", "AN"}
    var rows [][]interface{} = make([][]interface{}, 0)
    //for i := 0; i < 5; i++ {
    row := []interface{}{"W3.NODE1.TEST", "this is test1", "KG", "TEST1"}
    rows = append(rows, row)
    //}
    cols, rs, err := opPool.Update("Point", colums, rows)
    if err != nil {
        logs.Error("testing sql error :", err)
    }
    logs.Warn("columns:", cols)
    logs.Warn("rows:", rs)
删除
    logs.Info("testing delete ---------------------------")
    var rows [] interface{} = make([]interface{}, 0)
    rows = append(rows, "W3.NODE1.TEST")
    //rows = append(rows, "W415845")
    //rows = append(rows, "W6")
    //rows = append(rows, "W7")
    //rows = append(rows, "W8")
    cols, rs, err := opPool.Remove("Point", rows)
    if err != nil {
        logs.Error("testing sql error :", err)
    }
    logs.Warn("columns:", cols)
    logs.Warn("rows:", rs)

4.3 实时数据

写实时数据
    opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
        if err != nil {
            fmt.Println("init pool error:", err)
        }
        start := time.Now()
        IDs := make([]int64, 0)
        AVs := make([]float64, 0)
        DSs := make([]int32, 0)
        TMs := make([]int64, 0)
        tm := time.Now().Unix() - 8*3600 + 100
        //for i := 0; i < 12; i++ {
        IDs = append(IDs, int64(1492))
        AVs = append(AVs, 6.10003)
        DSs = append(DSs, int32(36))
        TMs = append(TMs, tm)
        IDs = append(IDs, int64(1492))
        AVs = append(AVs, 6.10003)
        DSs = append(DSs, int32(36))
        TMs = append(TMs, tm)
        //}
        cols, rs, err := opPool.WriteRealtime(IDs, AVs, DSs, TMs)
        if err != nil {
            logs.Error("testing sql error :", err)
        }
        logs.Warn("columns:", cols)
        logs.Warn("rows:", rs)
        logs.Warn("use time:", time.Since(start))
获取实时数据
    logs.Info("testing delete ---------------------------")
    var rows [] interface{} = make([]interface{}, 0)
    rows = append(rows, 4195358)
    rows = append(rows, 4195359)
    rows = append(rows, 4195360)
    rows = append(rows, 4195361)
    rows = append(rows, 4195362)
    cols := []string{"ID", "GN", "PN", "TM", "DS", "AV"}
    cols, rs, err := opPool.Find("W5.Realtime", cols, rows)
    if err != nil {
        logs.Error("testing sql error :", err)
    }
    logs.Warn("columns:", cols)
    logs.Warn("rows:", rs)

4.4 历史数据

写历史数据
    opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
    if err != nil {
        fmt.Println("init pool error:", err)
    }
    start := time.Now()
    IDs := make([]int64, 0)
    AVs := make([]float64, 0)
    DSs := make([]int32, 0)
    TMs := make([]int64, 0)
    tm := time.Now().Unix()
    IDs = append(IDs, 1024)
    AVs = append(AVs, float64(4096))
    DSs = append(DSs, int32(36))
    TMs = append(TMs, tm)
    for i := 0; i < 12; i++ {
        IDs = append(IDs, int64(i+6292540))
        AVs = append(AVs, float64(i+1024))
        DSs = append(DSs, int32(36))
        TMs = append(TMs, tm)
    }
    errs, err := opPool.WriteArchive(IDs, AVs, DSs, TMs)
    if err != nil {
        logs.Error("testing sql error :", err)
    }
    logs.Warn("columns:", errs)
    logs.Warn("use time:", time.Since(start))
获取历史数据
	var rows []interface{} = make([]interface{}, 0)
    rows = append(rows, 1046)
    rows = append(rows, 1047)
    rows = append(rows, 1048)
    cols := []string{"*"}
    opPool, err := opapi.InitConnProxy(host, port, user, pwd, timeout, min, max)
    if err != nil {
            fmt.Println("init pool error:", err)
    }
    filters := make([]opapi.Filter, 0)
    filter1 := opapi.Filter{
            Key:      OPConst.ID,
            Operate:  OPConst.In,
            Value:    rows,
            Relation: OPConst.And,
    }
    filters = append(filters, filter1)
    filter2 := opapi.Filter{
            Key:      OPConst.TM,
            Operate:  OPConst.GE,
            Value:    "2021-07-06 11:00:00",
            Relation: OPConst.And,
    }
    filters = append(filters, filter2)
    filter3 := opapi.Filter{
            Key:      OPConst.TM,
            Operate:  OPConst.LE,
            Value:    "2021-07-06 11:56:47",
            Relation: OPConst.And,
    }
    filters = append(filters, filter3)

    filter4 := opapi.Filter{
            Key:   OPConst.Mode,
            Value: "span",
    }
    filters = append(filters, filter4)

    filter5 := opapi.Filter{
            Key:   OPConst.Interval,
            Value: "10m",
    }
    filters = append(filters, filter5)
    cols, rs, err := opPool.FindFilter("Archive", cols, filters)
    if err != nil {
            fmt.Println("testing sql error :", err)
    }
    fmt.Println("columns:", cols)
    fmt.Println("rows:", rs)

4.5 报警信息

报警实时
    logs.Info("testing sql ---------------------------")
    opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
    if err != nil {
        fmt.Println("init pool error:", err)
    }
    sqlSelect := "select * from Alarm "
    columns, rows, err := opPool.ExecSQL(sqlSelect)
    if err != nil {
        fmt.Println("testing sql error :", err)
    }
    logs.Warn("columns:", columns)
    logs.Warn("rows length:", len(rows))
    for index, row := range rows {
        logs.Warn("index:", index, "row:", row)
    }
报警历史
    logs.Info("testing sql ---------------------------")
    opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
    if err != nil {
        fmt.Println("init pool error:", err)
    }
    sqlSelect := "select * from AAlarm where TM between '2019-05-10' and '2019-05-20'"
    columns, rows, err := opPool.ExecSQL(sqlSelect)
    if err != nil {
        fmt.Println("testing sql error :", err)
    }
    logs.Warn("columns:", columns)
    logs.Warn("rows length:", len(rows))
    for index, row := range rows {
        logs.Warn("index:", index, "row:", row)
    }

4.6 统计信息

统计信息
    idss := []interface{}{1024, 1025, 1026, 1028, 1062, 118737}
    filters := make([]Filter, 0)
    filter := Filter{
        Key:      OPConst.ID,
        Operate:  OPConst.In,
        Value:    idss,
        Relation: OPConst.And,
    }
    filters = append(filters, filter)
    filter1 := Filter{
        Key:      OPConst.TM,
        Operate:  OPConst.GE,
        Value:    "2018-11-18 10:55:45",
        Relation: OPConst.And,
    }
    filters = append(filters, filter1)
    filter2 := Filter{
        Key:      OPConst.TM,
        Operate:  OPConst.LE,
        Value:    "2018-11-19 10:55:45",
        Relation: OPConst.And,
    }
    filters = append(filters, filter2)

    filter3 := Filter{
        Key:   OPConst.Limit,
        Value: "0,10",
    }
    filters = append(filters, filter3)
    filter4 := Filter{
        Key:   OPConst.Order,
        Value: "ID desc",
    }
    filters = append(filters, filter4)

    filter5 := Filter{
        Key:   OPConst.Interval,
        Value: "86400s",
    }
    filters = append(filters, filter5)

    cols := []string{OPConst.ID, OPConst.GN, OPConst.AV, OPConst.TM, OPConst.MAXV, OPConst.MAXTIME, OPConst.MINV, OPConst.MINTIME, OPConst.AVGV, OPConst.FLOW}
    cls, rs, err := opPool.FindFilter(OPConst.Stat, cols, filters)
    if err != nil {
        logs.Error("testing find filter stat error :", err)
    }
    logs.Warn("columns:", cls)
    logs.Warn("rows:", rs)

4.7 订阅使用

订阅使用
    var callback = func(columns []string, rows []map[string]interface{}, err error) {
    	logs.Warn("sub columns:", columns)
    	logs.Warn("sub rows:", rows)
    	logs.Warn("sub err:", err)
    }
    
    func sub() {
    	var rows [] interface{} = make([]interface{}, 0)
    	rows = append(rows, 117995)
    	rows = append(rows, 115048)
    	handler, errs, err := opPool.Subscribe(OPConst.Realtime, rows, callback)
    	if err != nil {
    		logs.Error("testing subscribe error :", err)
    	}
    	logs.Warn("handler:", handler)
    	logs.Warn("errs:", errs)
    	logs.Warn("error:", err)
    	time.Sleep(time.Second * 10)
    	rows = []interface{}{115041}
    	fmt.Println(opPool.Subscription(handler, rows))
    	time.Sleep(time.Second * 10)
    	rows = []interface{}{115048, 117995}
    	fmt.Println(opPool.Unsubscribe(handler, rows))
    	time.Sleep(time.Second * 10)
    }

4.8 SQL使用

SQL使用
    opPool, err := InitConnProxy(host, port, user, pwd, timeout, min, max)
    if err != nil {
        fmt.Println("init pool error:", err)
    }
    sqlSelect := "delete from Point where GN ='W3.SYS.DBLOAD'"
    columns, rows, err := opPool.ExecSQL(sqlSelect)
    if err != nil {
        fmt.Println("testing sql error :", err)
    }
    logs.Warn("columns:", columns)
    logs.Warn("rows length:", len(rows))
    for index, row := range rows {
        logs.Warn("index:", index, "row:", row)
    }

5.特殊接口使用

5.1写实时

写实时
    opPool, _ := InitConnProxy(host, port, user, pwd, timeout, min, max)
    var cols = []string{"ID", "AV"}
    for i := 0; i < 10000; i++ {
        realtime, err := NewRealtime(cols)
        if err != nil {
            logs.Warn("get realtime table error:", err)
            return
        }
        rowLong := realtime.GetRow()
        rowLong.SetID(int64(1029))
		if i%5==0{
			rowLong.SetAVNil()
		}else{
			rowLong.SetAVInt(int64(i))
		}
        realtime.AddRow(rowLong)
        rowText := realtime.GetRow()
        rowText.SetID(int64(1030))
        rowText.SetAVString("this is text" + strconv.FormatInt(int64(i), 10))
        realtime.AddRow(rowText)
        rowBlob := realtime.GetRow()
        rowBlob.SetID(int64(1031))
        rowBlob.SetAVBytes([]byte("this" + strconv.FormatInt(int64(i), 10)))
        realtime.AddRow(rowBlob)
        opPool.WriteRealtimeTable(realtime)
        time.Sleep(time.Second)
    }

5.2 写历史

写历史
    opPool, _ := InitConnProxy(host, port, user, pwd, timeout, min, max)
    var cols = []string{"ID", "AV", "TM"}
    for i := 0; i < 10000; i++ {
        archive, err := NewArchive(cols)
        if err != nil {
            logs.Warn("get realtime table error:", err)
            return
        }
        t := time.Now().Unix() - 10

        row := archive.GetRow()
        row.SetID(int64(1024))
        row.SetAVInt(int64(i))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row.SetID(int64(1025))
        row.SetAVInt(int64(i % 2))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(1026))
        row.SetAVInt(int64(rand.Int31()))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(1027))
        row.SetAVInt(int64(rand.Int31()))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(1028))
        row.SetAVFloat(rand.Float64())
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(1029))
        row.SetAVInt(int64(i))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(1030))
        row.SetAVString("this is text" + strconv.FormatInt(int64(i), 10))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(1031))
        row.SetAVBytes([]byte("this" + strconv.FormatInt(int64(i), 10)))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(2098176))
        row.SetAVInt(int64(i))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row.SetID(int64(2098177))
        row.SetAVInt(int64(i % 2))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(2098178))
        row.SetAVInt(int64(rand.Int31()))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(2098179))
        row.SetAVInt(int64(rand.Int31()))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(2098180))
        row.SetAVFloat(rand.Float64())
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(2098181))
        row.SetAVInt(int64(i))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(2098182))
        row.SetAVString("this is text" + strconv.FormatInt(int64(i), 10))
        row.SetTM(float64(t))
        archive.AddRow(row)

        row = archive.GetRow()
        row.SetID(int64(2098183))
        row.SetAVBytes([]byte("this" + strconv.FormatInt(int64(i), 10)))
        row.SetTM(float64(t))
        archive.AddRow(row)

        cols, rs, err := opPool.WriteArchiveTable(archive)
        fmt.Println("index:", i)
        fmt.Println("columns:", cols)
        fmt.Println("rows:", rs)
        fmt.Println("err:", err)
        time.Sleep(time.Second)
    }