-
Notifications
You must be signed in to change notification settings - Fork 47
/
localsession.py
289 lines (230 loc) · 11 KB
/
localsession.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
from __future__ import absolute_import, print_function, unicode_literals
import logging
from concurrent import futures
from subprocess import PIPE
from wolframclient.evaluation.base import WolframEvaluator
from wolframclient.evaluation.kernel.kernelcontroller import WolframKernelController
from wolframclient.serializers import export
logger = logging.getLogger(__name__)
__all__ = ["WolframLanguageSession"]
# Some callback methods for internal use.
def do_get_wxf(result):
return result.wxf
def do_get_result(result):
return result.get()
class WolframLanguageSession(WolframEvaluator):
"""A session to a Wolfram kernel enabling evaluation of Wolfram Language expressions.
Start a new session and send an expression for evaluation::
with WolframLanguageSession() as session:
session.evaluate('Range[3]')
Set `timeout` to a number to set an evaluation timeout in seconds. If the evaluation
time extends the timeout, a :class:`~concurrent.futures.TimeoutError` is raised.
Evaluate an expression taking 10 seconds to return using a 5-second timeout::
long_evaluation = wl.Pause(10)
with WolframLanguageSession() as session:
session.evaluate(long_evaluation, timeout=5)
The asynchronous evaluation method
:meth:`~wolframclient.evaluation.kernel.localsession.WolframLanguageSession.evaluate_future`
returns an instance of :class:`~concurrent.futures.Future` class wrapping the evaluation result::
with WolframLanguageSession() as session:
future = session.evaluate_future('1+1')
result = future.result()
When `consumer` is set to a :class:`~wolframclient.deserializers.WXFConsumer` instance, this instance is passed to
:func:`~wolframclient.deserializers.binary_deserialize` when deserializing the WXF output.
By default, packed arrays are deserialized as :class:`list`. Specify a consumer instance that supports NumPy arrays
:class:`~wolframclient.deserializers.WXFConsumerNumpy`::
from wolframclient.deserializers import WXFConsumerNumpy
with WolframLanguageSession(consumer=WXFConsumerNumpy()) as session:
numpy_array = session.evaluate('Range[3]')
Communication with a given kernel is based on ZMQ sockets:
* one `PUSH` socket to send expressions for evaluation
* one `PULL` socket to receive evaluation results
Kernel logging is disabled by default and is done through a third socket (type `SUB`). The initial log level is
specified by the parameter `kernel_loglevel`.
If the log level was not set at initialization, logging is not available for the entire session.
The kernel associated with a given session provides the following logging functions:
* ``ClientLibrary`debug`` corresponding to :py:meth:`logging.Logger.debug`
* ``ClientLibrary`info`` corresponding to :py:meth:`logging.Logger.info`
* ``ClientLibrary`warn`` corresponding to :py:meth:`logging.Logger.warning`
* ``ClientLibrary`error`` corresponding to :py:meth:`logging.Logger.error`
* ``ClientLibrary`SetDebugLogLevel[]`` send debug messages and above
* ``ClientLibrary`SetInfoLogLevel[]`` send info messages and above
* ``ClientLibrary`SetWarnLogLevel[]`` send warning messages and above
* ``ClientLibrary`SetErrorLogLevel[]`` only send error messages
* ``ClientLibrary`DisableKernelLogging[]`` stop sending error message to the logging socket
The standard input, output and error file handles can be specified with `stdin`, `stdout` and `stderr` named
parameters. Valid values are those accepted by :class:`subprocess.Popen` (e.g. :data:`sys.stdout`). Those parameters
should be handled with care as deadlocks can arise from misconfiguration.
"""
def __init__(
self,
kernel=None,
consumer=None,
initfile=None,
kernel_loglevel=logging.NOTSET,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
inputform_string_evaluation=True,
wxf_bytes_evaluation=True,
controller_class=WolframKernelController,
**kwargs
):
super().__init__(inputform_string_evaluation=inputform_string_evaluation)
self.kernel = kernel
self.consumer = None
self.initfile = None
self.kernel_loglevel = logging.NOTSET
self._stdin = stdin
self._stdout = stdout
self._stderr = stderr
self.wxf_bytes_evaluation = wxf_bytes_evaluation
self.controller_class = controller_class
self.kernel_controller = self.controller_class(
kernel=kernel,
initfile=initfile,
kernel_loglevel=kernel_loglevel,
stdin=stdin,
stdout=stdout,
stderr=stderr,
**kwargs
)
self.parameters = kwargs
self.stopped = True
def duplicate(self):
return self.__class__(
kernel=self.kernel,
consumer=self.consumer,
initfile=self.initfile,
kernel_loglevel=self.kernel_loglevel,
stdin=self._stdin,
stdout=self._stdout,
stderr=self._stderr,
inputform_string_evaluation=self.inputform_string_evaluation,
controller_class=self.controller_class,
**self.parameters
)
@property
def started(self):
return self.kernel_controller.started
def start(self, block=True, timeout=None):
""" Start a kernel controller and eventually start a fresh one if the previous one was terminated.
Set `block` to :data:`True` (default is :data:`False`) to wait for the kernel to be up and running
before returning. Optionally, set a timeout in seconds. If the timeout is reached, a :data:`TimeoutError`
will be raised and the kernel is terminated.
"""
try:
future = self.start_future()
if future and block:
future.result(timeout=timeout)
except Exception as e:
try:
self.terminate()
finally:
raise e
def start_future(self):
""" Request the Wolfram kernel to start and return a future object.
The result of the future object is :data:`True` when the kernel is ready to evaluate input."""
self.stopped = False
if self.kernel_controller.terminated:
self.kernel_controller = self.kernel_controller.duplicate()
if not self.started:
return self.kernel_controller.request_kernel_start()
future = futures.Future()
future.set_result(True)
return future
def stop(self):
""" Request the Wolfram kernel to stop gracefully.
When recovering from an error state, it's recommended to use terminate() which
is more resilient.
"""
self._stop(gracefully=True)
def terminate(self):
""" Request the Wolfram kernel to stop immediately.
Ongoing evaluations are likely to be cancelled.
"""
self._stop(gracefully=False)
def _stop(self, gracefully=True):
# if the kernel is terminated the queue no more accept new tasks. Stop would hang.
if not self.stopped:
future = self.stop_future(gracefully=gracefully)
future.result()
def stop_future(self, gracefully=True):
""" Request the Wolfram kernel to stop and return a future object.
The result of the future object is :data:`True` when the controller thread is no longer alive.
Set `gracefully` to :data:`False` to request an immediate stop, eventually cancelling ongoing
evaluations.
"""
self.stopped = True
if gracefully:
return self.kernel_controller.stop()
else:
return self.kernel_controller.terminate()
def ensure_started(self):
if not self.started:
self.start(block=True, timeout=None)
if self.stopped:
self.restart()
def restart(self, block=True, timeout=None):
""" Restart a given evaluator by stopping it in cases where it is already started. """
if self.started:
self.stop()
self.start(block=block, timeout=timeout)
CALLBACK_GET_WXF = staticmethod(do_get_wxf)
CALLBACK_GET = staticmethod(do_get_result)
def do_evaluate_future(self, expr, result_update_callback=None, **kwargs):
future = futures.Future()
wxf = export(self.normalize_input(expr), target_format="wxf", **kwargs)
self.kernel_controller.evaluate_future(
wxf, future, result_update_callback=result_update_callback, **kwargs
)
return future
def evaluate_future(self, expr, **kwargs):
""" Evaluate an expression and return a future object.
The future object result is the evaluated expression. See
:func:`~wolframclient.evaluation.WolframLanguageSession.evaluate`.
"""
self.ensure_started()
return self.do_evaluate_future(
expr, result_update_callback=self.CALLBACK_GET, **kwargs
)
def evaluate_wxf_future(self, expr, **kwargs):
""" Evaluate an expression and return a future object.
The future object result is the WXF serialization of the evaluated expression.
See :func:`~wolframclient.evaluation.WolframLanguageSession.evaluate_wxf`.
"""
self.ensure_started()
return self.do_evaluate_future(
expr, result_update_callback=self.CALLBACK_GET_WXF, **kwargs
)
def evaluate_wrap_future(self, expr, **kwargs):
""" Evaluate an expression and return a future object.
The future object result is the result object with the evaluated expression and meta information.
See :func:`~wolframclient.evaluation.WolframLanguageSession.evaluate_wrap`.
"""
self.ensure_started()
return self.do_evaluate_future(expr, **kwargs)
def evaluate_wrap(self, expr, **kwargs):
return self.evaluate_wrap_future(expr, **kwargs).result()
def evaluate(self, expr, **kwargs):
result = self.evaluate_wrap(expr, **kwargs)
self.log_message_from_result(result)
return result.get()
def evaluate_wxf(self, expr, **kwargs):
""" Evaluate an expression and return the serialized expression.
This method does not deserialize the Wolfram kernel input. """
result = self.evaluate_wrap(expr, **kwargs)
self.log_message_from_result(result)
return result.wxf
def log_message_from_result(self, result):
if not result.success:
for msg in result.messages:
logger.warning(msg)
def get_parameter(self, parameter_name):
return self.kernel_controller.get_parameter(parameter_name)
def set_parameter(self, parameter_name, parameter_value):
self.kernel_controller.set_parameter(parameter_name, parameter_value)
get_parameter.__doc__ = WolframKernelController.get_parameter.__doc__
set_parameter.__doc__ = WolframKernelController.set_parameter.__doc__
def __repr__(self):
return "<%s: %s>" % (self.__class__.__name__, self.kernel_controller)