Appearance
Python 接口使用介绍
Python 版本要求 3.7 及以上。
创建连接
python
ip = "192.168.2.50"
port = 19504
user = "root"
password = "root"
timeout = 60 # seconds
database = "db" # default database
ldb = LightningDB(ip, port, timeout, user, password, database)
写入数据
python
ldb.use_database("db")
table = LdbTable("ee_tt")
table.add_column("IsArray", vtDOUBLE)
table.add_column("Quality", vtDOUBLE)
table.add_column("TagValue", vtDOUBLE)
table.add_column("UaDataType", vtDOUBLE)
table.add_column("TagStringValue", vtSTRING)
table.add_column("HostNameTag", vtSTRING)
table.add_column("e_date", vtDATETIME)
timestamp = int(time.time() * 1e9) # ns
for i in range(20):
if i % 5 == 0:
table.bind_nil(0)
else:
table.bind_double(0, 123.123 + i)
table.bind_double(1, 123.123 + i)
table.bind_double(2, 123.123 + i)
table.bind_double(3, 123.123 + i)
if i % 5 == 0:
table.bind_nil(4)
else:
table.bind_string(4, "py-{}".format(i))
table.bind_string(5, "py-{}".format(i))
table.bind_datetime(6, timestamp)
table.bind_row()
ldb_res = ldb.insert(table)
errno = ldb_res.get_errno()
if errno != 0:
err_msg = ldb_res.get_error()
print("res err,", err_msg)
else:
print("insert success")
table.close()
ldb_res.close()
ldb.close()
执行 SQL 语句
sql 遵循标准的 SQL 语法规范(MySQL),支持查询、更新、删除等操作
sql 示例
- 建库 sql 示例sql
CREATE DATABASE IF NOT EXISTS magus_abc
- 建表 sql 示例sql
CREATE TABLE dd_tt ( IsArray VtDouble not null default 0.0, Quality VtDouble not null default 0.0, TagValue VtDouble not null default 0.0, UaDataType VtDouble not null default 0.0, TagStringValue VtString not null default '', HostNameTag VtString not null default '' keyCol, e_date VtDateTime not null DEFAULT '2000-01-01 00:00:00' timeCol )
- 更新数据 sql 示例sql
update dd_tt set Quality =1 where HostNameTag ="zs"
- 删除数据 sql 示例sql
delete from dd_tt where e_date < '2024-02-01'
- 查询数据 sql 示例sql
select * from db.dd_tt
代码示例
python
sql = "select * from db.ee_tt"
ldb_res = ldb.exec(sql)
errno = ldb_res.get_errno()
if errno != 0:
err_msg = ldb_res.get_error()
print("res err,", err_msg)
else:
ldb_dataset = ldb_res.get_dataset()
col_count = ldb_dataset.columns_count()
total_count = 0
while ldb_dataset.next():
for i in range(col_count):
val_type = ldb_dataset.column_type(i)
if val_type == vtBOOL:
bV = ldb_dataset.column_bool(i)
print(bV, end=",")
elif val_type == vtINT8 or val_type == vtINT16 or val_type == vtINT32 or val_type == vtINT64 or val_type == vtDATETIME:
iV = ldb_dataset.column_int(i)
print(iV, end=",")
elif val_type == vtFLOAT or val_type == vtDOUBLE:
fV = ldb_dataset.column_double(i)
print(fV, end=",")
elif val_type == vtSTRING:
sV = ldb_dataset.column_string(i)
print(sV, end=",")
elif val_type == vtBINARY:
bV = ldb_dataset.column_binary(i)
print(bV, end=",")
else:
raise Exception("val type unsupport,", val_type)
print("")
total_count += 1
if ldb_dataset.ds_err() is not None:
print("ds err,", ldb_dataset.ds_err())
ldb_dataset.close()
print("total count:", total_count)
ldb_res.close()
ldb.close()