python opc ua客户端和服务器案例

python | 2021-09-27 17:13:11

1.安装python-opcua组件 

pip安装:pip install opcua  

2.python opcua client 客户端读取数据案例

# encoding=utf-8

import sys,time
sys.path.insert(0, "..")
from opcua import Client

if __name__ == "__main__":

    client = Client("opc.tcp://localhost:4840/freeopcua/server/")
    #client = Client("opc.tcp://127.0.0.1:4840/freeopcua/server/")
    #client = Client("opc.tcp://admin@localhost:4840/freeopcua/server/") #connect using a user
    try:
        res = client.connect()
		
		
		#依次读取所有节点的值
		print("##################start########################");
		while True:
			time.sleep(1)
			rs={"timestamp":int(time.time()*1000)}
			rs["device"]="th_fdj_plc_1"
			try:
				#client.connect()
				root=client.get_root_node()
				for m in root.get_children():
					for i in m.get_children():
						print(i)
						try:
							print(i.get_value())
							key=str(i)
							keyArr=key.split("=")
							key=keyArr[len(keyArr)-1]
							key=key.replace("+","_j_").replace(">","_d_").replace("<","_x_").replace("-","_h_")
							rs[key]=i.get_value()
						except:
							print("err")
				#client.disconnect()
			except:
				print("get err")

		print("##################end########################");
		
		

        # Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
        root = client.get_root_node()
        print("Objects node is: ", root)

        # Node objects have methods to read and write node attributes as well as browse or populate address space
        print("Children of root are: ", root.get_children())

        # get a specific node knowing its node id
        #var = client.get_node(ua.NodeId(1002, 2))
        #var = client.get_node("ns=3;i=2002")
        #print(var)
        #var.get_data_value() # get value of node as a DataValue object
        #var.get_value() # get value of node as a python builtin
        #var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
        #var.set_value(3.9) # set node value using implicit data type

        # Now getting a variable node using its browse path
        while(1):
            myvar = root.get_child(["0:Objects", "2:MyObject", "2:MyVariable"])
            obj = root.get_child(["0:Objects", "2:MyObject"])
            print("myvar is: ", myvar)
            print("myobj is: ", obj)
            
            # Stacked myvar access
            print("myvar is: ", root.get_children()[0].get_children()[1].get_variables()[0].get_value())
            time.sleep(2)

    finally:
        client.disconnect()

 

3.python opcua server 服务端生产数据案例

# encoding=utf-8

import sys
sys.path.insert(0, "..")

import time

from opcua import ua, Server

if __name__ == "__main__":

    # setup our server
    server = Server()
    server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")

    # setup our own namespace, not really necessary but should as spec
    uri = "http://examples.freeopcua.github.io"
    idx = server.register_namespace(uri)

    # get Objects node, this is where we should put our nodes
    objects = server.get_objects_node()

    # populating our address space
    myobj = objects.add_object(idx, "MyObject")
    myvar = myobj.add_variable(idx, "MyVariable", 6.7)
    myvar.set_writable()    # Set MyVariable to be writable by clients

    # starting!
    server.start()
    
    try:
        count = 0
        while True:
            time.sleep(1)
            count += 0.1
            myvar.set_value(count)
    finally:
        #close connection, remove subcsriptions, etc
        server.stop()

 

登录后即可回复 登录 | 注册
    
关注编程学问公众号