From c68c29068ded0e56c7e454ca6ece94b5c9e86b66 Mon Sep 17 00:00:00 2001 From: "Liu, Ruokun" Date: Sun, 21 Nov 2021 12:57:19 -0800 Subject: [PATCH] Use multiprocessing semaphore instead of pipe. --- .../magma/compiler/channels/pypychannel.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/lava/magma/compiler/channels/pypychannel.py b/src/lava/magma/compiler/channels/pypychannel.py index 869ef05fc..9fe86ff09 100644 --- a/src/lava/magma/compiler/channels/pypychannel.py +++ b/src/lava/magma/compiler/channels/pypychannel.py @@ -8,7 +8,7 @@ from dataclasses import dataclass import numpy as np -from multiprocessing import Pipe +from multiprocessing import Semaphore from lava.magma.compiler.channels.interfaces import ( Channel, @@ -42,8 +42,8 @@ def __init__(self, name, shm, proto, size, req, ack): shm : SharedMemory proto : Proto size : int - req : Pipe - ack : Pipe + req : Semaphore + ack : Semaphore """ self._name = name self._shm = shm @@ -97,7 +97,7 @@ def start(self): def _ack_callback(self): try: while not self._done: - self._ack.recv_bytes(0) + self._ack.acquire() not_full = self.probe() self._semaphore.release() if self.observer and not not_full: @@ -124,7 +124,7 @@ def send(self, data): self._semaphore.acquire() self._array[self._idx][:] = data[:] self._idx = (self._idx + 1) % self._size - self._req.send_bytes(bytes(0)) + self._req.release() def join(self): self._done = True @@ -179,8 +179,8 @@ def __init__(self, name, shm, proto, size, req, ack): shm : SharedMemory proto : Proto size : int - req : Pipe - ack : Pipe + req : Semaphore + ack : Semaphore """ self._name = name self._shm = shm @@ -234,7 +234,7 @@ def start(self): def _req_callback(self): try: while not self._done: - self._req.recv_bytes(0) + self._req.acquire() not_empty = self.probe() self._queue.put_nowait(0) if self.observer and not not_empty: @@ -265,7 +265,7 @@ def recv(self): self._queue.get() result = self._array[self._idx].copy() self._idx = (self._idx + 1) % self._size - self._ack.send_bytes(bytes(0)) + self._ack.release() return result @@ -335,11 +335,11 @@ def __init__(self, nbytes = np.prod(shape) * np.dtype(dtype).itemsize smm = message_infrastructure.smm shm = smm.SharedMemory(int(nbytes * size)) - req = Pipe(duplex=False) - ack = Pipe(duplex=False) + req = Semaphore(0) + ack = Semaphore(0) proto = Proto(shape=shape, dtype=dtype, nbytes=nbytes) - self._src_port = CspSendPort(src_name, shm, proto, size, req[1], ack[0]) - self._dst_port = CspRecvPort(dst_name, shm, proto, size, req[0], ack[1]) + self._src_port = CspSendPort(src_name, shm, proto, size, req, ack) + self._dst_port = CspRecvPort(dst_name, shm, proto, size, req, ack) @property def src_port(self) -> AbstractCspSendPort: