我是OPC UA和Python的新手,但是使用Asyncua示例,我创建了一个真正项目所需的示例.现在,我需要为服务器和客户端增加安全性,目前,使用用户名和密码就可以了.
以下是我不带安全性的功能代码.如果有人知道我需要使用什么功能来创建两个用户,一个拥有管理员权限,另一个没有,请让我知道.
import asyncio
from asyncua import Server, ua
from asyncua.common.methods import uamethod
from asyncua.common.structures104 import new_struct, new_struct_field
@uamethod
def func(parent, value):
return value * 2
async def main():
server = Server()
await server.init()
server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
uri = "http://examples.freeopcua.github.io"
idx = await server.register_namespace(uri)
myobj = await server.nodes.objects.add_object(idx, "MyObject")
myvar = await myobj.add_variable(idx, "MyVariable", 0.0)
await server.nodes.objects.add_method(
ua.NodeId("ServerMethod", idx),
ua.QualifiedName("ServerMethod", idx),
func,
[ua.VariantType.Int64],
[ua.VariantType.Int64],
)
struct = await new_struct(server, idx, "MyStruct", [
new_struct_field("FirstValue", ua.VariantType.Float, 0.0),
new_struct_field("SecondValue", ua.VariantType.Float, 0.0),
new_struct_field("ThirdValue", ua.VariantType.Float, 0.0),
new_struct_field("FourthValue", ua.VariantType.Float, 0.0),
new_struct_field("FifthValue", ua.VariantType.Float, 0.0),
])
custom_objs = await server.load_data_type_definitions()
mystruct = await myobj.add_variable(idx, "my_struct", ua.Variant(ua.MyStruct(), ua.VariantType.ExtensionObject))
await mystruct.set_writable()
await myvar.set_writable()
print("Starting server!")
async with server:
while True:
await asyncio.sleep(0.5)
n_struct = await mystruct.get_value()
var = await myvar.read_value()
print ("\n%f\n%f\n%f\n%f\n%f\n%f" % (var, n_struct.FirstValue, n_struct.SecondValue, n_struct.ThirdValue, n_struct.FourthValue, n_struct.FifthValue))
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
print('Async event loop already running. Adding coroutine to the event loop.')
tsk = loop.create_task(main())
tsk.add_done_callback(
lambda t: print(f'Task done with result={t.result()} << return val of main()'))
else:
print('Starting new event loop')
result = asyncio.run(main(), debug=True)
我试图使用Asyncua中的加密示例,但我无法使其工作.所以,我试着从Asyncua服务器上读取主代码的函数,自己做一些事情,但我只得到了错误.
谢谢.