-
Notifications
You must be signed in to change notification settings - Fork 0
/
pysoemMaster.py
166 lines (119 loc) · 5.49 KB
/
pysoemMaster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import sys
import os
import pysoem
from enum import Enum
path = os.path.dirname(__file__)
path = path.replace("HAL/pysoem", "")
sys.path.append(path)
from helpers import *
from HAL.pysoem.pysoemHelpers import *
class pysoemHAL:
def __init__(self, ifname: str):
self.ifname = ifname
self.master = pysoem.Master()
### Network interface methods ###
def openNetworkInterface(self):
"""Opens the network interface with the given interface name."""
self.master.open(self.ifname) # pysoem doesn't return anything, so we can't check if it was successful
def closeNetworkInterface(self):
"""Closes the network interface."""
self.master.close()
### SDO methods ###
def SDORead(self, slaveInstance, address: tuple):
slave = self.master.slaves[slaveInstance.node]
if isinstance(address, Enum):
address = address.value.value
index, subIndex, packFormat, *_ = address
response = struct.unpack('<' + packFormat, slave.sdo_read(index, subIndex))
if len(response) == 1:
return response[0]
return response
def SDOWrite(self, slaveInstance, address: tuple, data, completeAccess=False):
slave = self.master.slaves[slaveInstance.node]
if isinstance(address, Enum):
address = address.value.value
index, subIndex, packFormat, *_ = address
slave.sdo_write(index, subIndex, struct.pack('<' + packFormat, data), ca=completeAccess)
### Slave configuration methods ###
def initializeSlaves(self):
""""Creates slave objects for each slave and assigns them to self.slaves. Returns the number of slaves."""
numSlaves = self.master.config_init()
self.slaves = self.master.slaves
return numSlaves
def addConfigurationFunc(self, slaveInstance, configFunc):
"""Adds a configuration function to the slave.
Inputs:
slave: high level master.py.slave instance
configFunc: function
The function to be run on the slave.
"""
self.master.slaves[slaveInstance.node].config_func = configFunc
def configureSlaves(self):
"""Configures the slaves"""
self.master.config_map()
def setWatchDog(self, slaveInstance, timeout: float):
"""Sets the watchdog timeout for the slave.
Inputs:
slave: high level master.py.slave instance
timeout: float
The timeout in milliseconds
"""
self.master.slaves[slaveInstance.node].set_watchdog('pdi', timeout)
self.master.slaves[slaveInstance.node].set_watchdog('processdata', timeout)
return 1
### Network state methods ###
# 1. Apply to all slaves
def assertNetworkWideState(self, state: int) -> bool:
if self.master.state_check(state) == state:
return True
return False
def getNetworkWideState(self):
self.master.read_state() # Cursed abstraction by pysoem, slaves can't refresh their own state :(
states = []
for i, slave in enumerate(self.master.slaves):
states += [slave.state]
return states
def setNetworkWideState(self, state: int):
if isinstance(state, Enum):
state = state.value
self.master.state = state
self.master.write_state()
# 2. Apply to individual slaves
def assertNetworkState(self, slaveInstance, state: int) -> bool:
self.master.read_state()
if self.master.slaves[slaveInstance.node].state == state:
return True
return False
def getNetworkState(self, slaveInstance):
self.master.read_state() # Cursed, the slaves can't refresh their own state
return self.master.slaves[slaveInstance.node].state
def setNetworkState(self, slaveInstance, state: int):
self.master.slaves[slaveInstance.node].state = state
self.master.slaves[slaveInstance.node].write_state()
### Device state methods ###
def assertDeviceState(self, slaveInstance, state: StatuswordStates | int) -> bool:
if isinstance(state, Enum): # Handle the pythonic class and int type
state = state.value
statusword = self.SDORead(slaveInstance, slaveInstance.objectDictionary.STATUSWORD)
maskedWord = statusword & STATUSWORD_STATE_BITMASK
maskedWord = maskedWord & state
return maskedWord == state
def getDeviceState(self, slaveInstance):
"""Returns the statusword of the slave."""
return self.SDORead(slaveInstance, slaveInstance.objectDictionary.STATUSWORD)
def setDeviceState(self, slaveInstance, state: stateCommands | int):
self.SDOWrite(slaveInstance, slaveInstance.objectDictionary.CONTROLWORD, state)
### PDO methods ###
def sendProcessData(self):
self.master.send_processdata()
def receiveProcessData(self):
self.master.receive_processdata(timeout=2000)
def updateSoftSlavePDOData(self, slaveInstance):
"""Puts the PDO data from the lower level pysoem.slave into the
higher level slave."""
slaveInstance.PDOInput = self.master.slaves[slaveInstance.node].input
def addPDOMessage(self, slaveInstance, packFormat, data):
"""Adds a PDO message to the slave's PDO buffer."""
self.master.slaves[slaveInstance.node].output = struct.pack('<' + packFormat, *data)
def __del__(self):
self.closeNetworkInterface()