Skip to content

Golang 接口使用介绍

基于 C ABI 规范的动态库,借助 cgo 封装。

创建连接

go
ip := "192.168.2.50"
port := 19504
user := "root"
password := "root"
timeout := 60 // seconds
database := "db" // default database
conn, err := ldbapi.GetConnection(ip, port, user, password, timeout, database)
if err != nil {
    fmt.Println(err)
    return
}
defer conn.Close()

使用 memCache 碎片化写入数据

适用于数据批量较小、频率较高的场景,如:数据采集、数据上报等

go
conn.UseDatabase("magus_abc")

rowTable := ldbapi.NewRowTable(tableName)
defer rowTable.Close()
rowTable.AddColumn("IsArray", ldbapi.VtFloat64)
rowTable.AddColumn("Quality", ldbapi.VtFloat64)
rowTable.AddColumn("TagValue", ldbapi.VtFloat64)
rowTable.AddColumn("UaDataType", ldbapi.VtFloat64)
rowTable.AddColumn("TagStringValue", ldbapi.VtString)
rowTable.AddColumn("HostNameTag", ldbapi.VtString)
rowTable.AddColumn("e_date", ldbapi.VtDatetime)
total := 10
rand := 1
tm := time.Now()
var err error
for i := 0; i < total; i++ {
    rowTable.BindFloat64(0, float64(i+rand))
    rowTable.BindFloat64(1, float64(i+rand))
    rowTable.BindFloat64(2, float64(i+rand))
    rowTable.BindFloat64(3, float64(i+rand))
    rowTable.BindString(4, strings.Repeat(strconv.Itoa(i+rand), 3))
    rowTable.BindString(5, strings.Repeat(strconv.Itoa(i+rand), 3))
    rowTable.BindDatetime(6, tm.Add(time.Second*time.Duration(i+rand)))
    if err = rowTable.BindRow(); err != nil {
        fmt.Println("BindRow error,", err)
        return
    }
}
err = conn.Insert(rowTable)
if err != nil {
    fmt.Println("Insert error,", err)
    return
} else {
    fmt.Println("insert success")
}

使用事务 TX 批量写入数据

适用于大批量数据写入场景,如:数据迁移、数据初始化等

go
conn.UseDatabase("magus_abc")

sheetTable := ldbapi.NewSheetTable(tableName)
defer sheetTable.Close()
sheetTable.AddColumn("IsArray", ldbapi.VtFloat64)
sheetTable.AddColumn("Quality", ldbapi.VtFloat64)
sheetTable.AddColumn("TagValue", ldbapi.VtFloat64)
sheetTable.AddColumn("UaDataType", ldbapi.VtFloat64)
sheetTable.AddColumn("TagStringValue", ldbapi.VtString)
sheetTable.AddColumn("HostNameTag", ldbapi.VtString)
sheetTable.AddColumn("e_date", ldbapi.VtDatetime)

tx, err := conn.BeginTx()
if err != nil {
    fmt.Println("BeginTx error,", err)
    return
}
defer tx.Close()

txId := tx.GetTxId()
fmt.Println("txId:", txId)

for i := 0; i < 10; i++ {
    if err = buildSheetTable(sheetTable, i); err != nil {
        fmt.Println("buildSheetTable error,", err)
        return
    }
    if err = tx.Insert(sheetTable); err != nil {
        fmt.Println("Insert error,", err)
    }
}

// commit or rollback
// commit error can try again
if err = tx.Commit(); err != nil {
    fmt.Println("Commit error,", err)
    // try commit again
    if err = tx.Commit(); err != nil {
        if err = tx.Rollback(); err != nil {
            fmt.Println("Rollback error,", err)
        }
    }
} else {
    fmt.Println("Commit success")
}

执行 SQL 语句

sql 遵循标准的 SQL 语法规范(MySQL),支持查询、更新、删除等操作

sql 示例

  • 建库 sql 示例
    sql
    CREATE DATABASE IF NOT EXISTS magus_abc
  • 建表 sql 示例
    sql
    CREATE TABLE dd_tt (
            IsArray VtDouble not null default 0.0,
            Quality VtDouble not null default 0.0,
            TagValue VtDouble not null default 0.0,
            UaDataType VtDouble not null default 0.0,
            TagStringValue VtString not null default '',
            HostNameTag VtString not null default '' keyCol,
            e_date VtDateTime not null DEFAULT '2000-01-01 00:00:00' timeCol
    )
  • 更新数据 sql 示例
    sql
    update dd_tt set Quality =1 where HostNameTag ="zs"
  • 删除数据 sql 示例
    sql
    delete from dd_tt where e_date < '2024-02-01'
  • 查询数据 sql 示例
    sql
    select * from db.dd_tt

代码示例

go
    sql = "SELECT * from db.dd_tt"
	option := ldbapi.NewOption().SetTimeout(queryTimeout)
	err := conn.Exec(sql, option, func(resultSet *ldbapi.ResultSet, err error) error {
		if err != nil {
			fmt.Println("Exec callback error,", err)
			return nil
		}
		var rowCount int
		colCount := resultSet.ColumnCount()
		var index int
		var typ ldbapi.FieldType
		var name string

		for resultSet.Next() {
			for index = 0; index < colCount; index++ {
				typ = resultSet.ColumnType(index)
				name = resultSet.ColumnLabel(index)
				fmt.Printf("%v:", name)
				switch typ {
				case ldbapi.VtBool:
					v, n := resultSet.GetBool(index)
					if n {
						fmt.Printf("null,")
					} else {
						fmt.Printf("%v,", v)
					}
				case ldbapi.VtInt8, ldbapi.VtInt16, ldbapi.VtInt32, ldbapi.VtInt64:
					v, n := resultSet.GetInt(index)
					if n {
						fmt.Printf("null,")
					} else {
						fmt.Printf("%v,", v)
					}
				case ldbapi.VtDatetime:
					v, n := resultSet.GetDatetime(index)
					if n {
						fmt.Printf("null,")
					} else {
						fmt.Printf("%v,", v)
					}
				case ldbapi.VtFloat32, ldbapi.VtFloat64:
					v, n := resultSet.GetFloat64(index)
					if n {
						fmt.Printf("null,")
					} else {
						fmt.Printf("%v,", v)
					}
				case ldbapi.VtString:
					v, n := resultSet.GetString(index)
					if n {
						fmt.Printf("null,")
					} else {
						fmt.Printf("%v,", v)
					}
				case ldbapi.VtBinary:
					v, n := resultSet.GetBinary(index)
					if n {
						fmt.Printf("null,")
					} else {
						fmt.Printf("%v,", v)
					}
				default:
					fmt.Println("not support type:", typ)
				}
			}
			fmt.Printf("\n")

			rowCount++
		}

		fmt.Println("row count:", rowCount)
		return nil
	})
	if err != nil {
		fmt.Println("Exec error,", err)
		return
	}

订阅

订阅表结构变化和实时数据变更

注册订阅 (代码示例)

go
	req := client.GetRequest()
	req.SetID(int64(sequence.GetUniqueID()))
	req.SetAction(types.ActionSubscribeReq)
	req.SetSubject(types.SubjectCreate)

	table := opio.NewTable("subscribe", 1024)
	// FIXED FIELDS
	table.AddColumn("subType", types.VtInt32, 0)
	table.AddColumn("dbName", types.VtString, 0)
	table.AddColumn("tableName", types.VtString, 0)
	table.AddColumn("sql", types.VtString, 0)

	err := req.SetTable(table)
	if err != nil {
		return logging.ThrowError("req set table failed, err:%v", err)
	}
	err = req.Write()
	if err != nil {
		return logging.ThrowError("req write failed, err:%v", err)
	}
	/*
		const (
			SubscribeItemNil      = iota
			SubscribeItemMetadata = 0x01
			SubscribeItemRealtime = 0x02
		)
	*/

	// SET METADATA SUBSCRIBE
	table.SetColumnInt32(0, notice.SubscribeItemMetadata)
	table.SetColumnString(1, "")
	table.SetColumnString(2, "")
	table.SetColumnString(3, "")
	table.BindRow()

	// SET REALTIME SUBSCRIBE
	table.SetColumnInt32(0, notice.SubscribeItemRealtime)
	table.SetColumnString(1, "databaseName1")
	table.SetColumnString(2, "tableName1")
	table.SetColumnString(3, "select * from databaseName1.tableName1 where key_col1 = 'key1'")

	table.SetColumnInt32(0, notice.SubscribeItemRealtime)
	table.SetColumnString(1, "databaseName2")
	table.SetColumnString(2, "tableName2")
	table.SetColumnString(3, "select * from databaseName2.tableName2")

	table.BindRow()

	err = req.WriteContent(table)
	if err != nil {
		return logging.ThrowError("req write content failed, err:%v", err)
	}
	err = req.Flush()
	if err != nil {
		return logging.ThrowError("req flush failed, err:%v", err)
	}
	resp, err := req.GetResponse()
	if err != nil {
		return logging.ThrowError("req get response failed, err:%v", err)
	}
	logging.Infof("subscribe register, errNo: %d, errMsg: %s", resp.GetErrNo(), resp.GetError())

取消订阅 (代码示例)

go
	req := client.GetRequest()
	if req == nil {
		return logging.ThrowError("request is nil")
	}

	req.SetID(int64(sequence.GetUniqueID()))
	req.SetAction(types.ActionSubscribeReq)
	req.SetSubject(types.SubjectDelete)

	err := req.WriteAndFlush()
	if err != nil {
		return logging.ThrowError("req flush failed, err:%v", err)
	}
	resp, err := req.GetResponse()
	if err != nil {
		return logging.ThrowError("req get response failed, err:%v", err)
	}
	logging.Infof("subscribe register, errNo: %d, errMsg: %s", resp.GetErrNo(), resp.GetError())

接收订阅推送 (代码示例)

go
	req := client.GetRequest()
	if req == nil {
		return logging.ThrowError("request is nil")
	}
	resp := req.MakeResponse()
	for {
		resp.Reset()

		err := resp.Read()
		if err != nil {
			if err != io.EOF {
				logging.Errorf("opio read failed, err: %v", err)
			}
			logging.Infof("listen notice end")
			break
		}
		action := resp.GetAction()
		if action != types.ActionSubscribeNotice {
			logging.Errorf("action is not subscribe notice")
			continue
		}
		itemsVal := resp.Get(types.SubscribeItems)
		if itemsVal == nil {
			logging.Errorf("subscribe items is nil")
			continue
		}
		items, ok := itemsVal.(int32)
		if !ok {
			logging.Errorf("subscribe items is not int32")
			continue
		}
		switch {
		case notice.HasMetadataSubscribe(uint32(items)):
			err := receiveMetadataNotice(resp)
			if err != nil {
				logging.Errorf("receive metadata notice failed, err:%v", err)
				continue
			}
		case notice.HasRealtimeSubscribe(uint32(items)):
			err := receiveRealtimeNotice(resp)
			if err != nil {
				logging.Errorf("receive realtime notice failed, err:%v", err)
				continue
			}
		default:
			logging.Errorf("subscribe items is invalid, items: %d", items)
		}
	}
	func receiveMetadataNotice(resp *opio.Response) error {
		if resp == nil {
			return logging.ThrowError("response is nil")
		}
		subject := resp.GetSubject()
		switch subject {
		case types.SubjectCreate, types.SubjectUpdate:
			return receiveMetadataChangeNotice(resp)
		case types.SubjectDelete:
			return receiveMetadataDeleteNotice(resp)
		case types.SubjectError:
			return receiveErrorNotice(resp)

		default:
			logging.Errorf("subject is invalid, subject: %s", subject)
		}
		return nil
	}

	func receiveMetadataChangeNotice(resp *opio.Response) error {
		if resp == nil {
			return logging.ThrowError("response is nil")
		}
		dataset := resp.GetDataSet()
		if dataset == nil {
			return logging.ThrowError("dataset is nil")
		}
		for {
			ok, err := dataset.Next()
			if err != nil {
				if err != io.EOF {
					logging.Errorf("err: %v", err)
					return err
				}
				break
			}

			if !ok {
				break
			}

			tableName, err := dataset.GetString(0)
			if err != nil {
				return err
			}

			columnsStr, err := dataset.GetString(1)
			if err != nil {
				return err
			}

			var columns []*metadata.ColumnInfo
			err = json.Unmarshal([]byte(columnsStr.Value), &columns)
			if err != nil {
				return err
			}

			columnVersionStr, err := dataset.GetString(2)
			if err != nil {
				return err
			}

			var versionColumns []*metadata.TableVersionInfo
			err = json.Unmarshal([]byte(columnVersionStr.Value), &versionColumns)
			if err != nil {
				return err
			}

			dbName, err := dataset.GetString(3)
			if err != nil {
				return err
			}

			timeColumnId, err := dataset.GetInt32(4)
			if err != nil {
				return err
			}

			keyColumnId, err := dataset.GetInt32(5)
			if err != nil {
				return err
			}

			isMetatable, err := dataset.GetBool(6)
			if err != nil {
				return err
			}

			colIDCount, err := dataset.GetInt32(7)
			if err != nil {
				return err
			}

			versionCount, err := dataset.GetInt32(8)
			if err != nil {
				return err
			}

			tableOptionsStr, err := dataset.GetString(9)
			if err != nil {
				return err
			}

			options := map[string]interface{}{}
			err = json.Unmarshal([]byte(tableOptionsStr.Value), &options)
			if err != nil {
				options = nil
			}
			keyColumn := ""
			if keyColumnId.Value < int32(len(columns)) {
				keyColumn = columns[keyColumnId.Value].Name
			}
			timeColumn := ""
			if timeColumnId.Value < int32(len(columns)) {
				timeColumn = columns[timeColumnId.Value].Name
			}

			metadatas[metadata.MakeDBTableName(dbName.Value, tableName.Value)] = &metadata.TableInfo{
				DbName:       dbName.Value,
				TableName:    tableName.Value,
				TimeColumnID: timeColumnId.Value,
				KeyColumnID:  keyColumnId.Value,
				Metatable:    isMetatable.Value,
				ColIDCount:   colIDCount.Value,
				VersionCount: versionCount.Value,
				Options:      options,
				Versions:     versionColumns,
				KeyCol:       keyColumn,
				TimeCol:      timeColumn,
			}
			logging.Infof("metadata change, dbName: %s, tableName: %s", dbName.Value, tableName.Value)
		}
		return nil
	}

	func receiveMetadataDeleteNotice(resp *opio.Response) error {
		if resp == nil {
			return logging.ThrowError("response is nil")
		}
		dbName := resp.GetDB()
		tableName := resp.GetTableName()
		if dbName == "" || tableName == "" {
			return logging.ThrowError("dbName or tableName is nil")
		}
		dbTable := metadata.MakeDBTableName(dbName, tableName)
		if _, ok := metadatas[dbTable]; ok {
			delete(metadatas, dbTable)
			logging.Infof("delete metadata, dbName: %s, tableName: %s", dbName, tableName)
		} else {
			logging.Errorf("metadata not found, dbName: %s, tableName: %s", dbName, tableName)
		}
		return nil
	}

	func receiveErrorNotice(resp *opio.Response) error {
		if resp == nil {
			return logging.ThrowError("response is nil")
		}
		var errCode int32
		var errMsg string
		var ok bool
		errCodeValue := resp.Get(types.PropErrNo)
		if errCodeValue != nil {
			errCode, ok = errCodeValue.(int32)
			if !ok {
				return logging.ThrowError("errCode is not int32")
			}
		}
		errMsgValue := resp.Get(types.PropError)
		if errMsgValue != nil {
			errMsg, ok = errMsgValue.(string)
			if !ok {
				return logging.ThrowError("errMsg is not string")
			}
		}
		logging.Infof("receive error notice, errCode: %d, errMsg: %s", errCode, errMsg)
		return nil
	}

	func receiveRealtimeNotice(resp *opio.Response) error {
		if resp == nil {
			return logging.ThrowError("response is nil")
		}
		subject := resp.GetSubject()
		switch subject {
		case types.SubjectChange:
			return receiveRealtimeChangeNotice(resp)

		case types.SubjectDelete:
			return receiveRealtimeDeleteNotice(resp)

		case types.SubjectError:
			return receiveErrorNotice(resp)

		default:
			logging.Errorf("subject is invalid, subject: %s", subject)
		}
		return nil
	}

	func receiveRealtimeChangeNotice(resp *opio.Response) error {
		if resp == nil {
			return logging.ThrowError("response is nil")
		}
		dataset := resp.GetDataSet()
		if dataset == nil {
			return logging.ThrowError("dataset is nil")
		}
		values, count := common.UnpackDataSet(dataset, 10)

		fmt.Println(values.PrettyPrint())
		fmt.Printf("total count: %v", count)
		return nil
}

	func receiveRealtimeDeleteNotice(resp *opio.Response) error {
		if resp == nil {
			return logging.ThrowError("response is nil")
		}
		dbName := resp.GetDB()
		tableName := resp.GetTableName()
		if dbName == "" || tableName == "" {
			return logging.ThrowError("dbName or tableName is nil")
		}
		deleteKeysVal := resp.Get(types.PropOperationDelete)
		if deleteKeysVal == nil {
			return logging.ThrowError("deleteKeys is nil")
		}
		deleteKeys, ok := deleteKeysVal.(string)
	if !ok {
		return logging.ThrowError("deleteKeys is not string")
	}
	logging.Infof("receive realtime delete notice, dbName: %s, tableName: %s, deleteKeys: %s", dbName, tableName, deleteKeys)
	return nil
}