Skip to content

Commit

Permalink
Use multiprocessing semaphore instead of pipe.
Browse files Browse the repository at this point in the history
  • Loading branch information
harryliu-intel committed Nov 21, 2021
1 parent 4940ae4 commit c68c290
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions src/lava/magma/compiler/channels/pypychannel.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dataclasses import dataclass

import numpy as np
from multiprocessing import Pipe
from multiprocessing import Semaphore

from lava.magma.compiler.channels.interfaces import (
Channel,
Expand Down Expand Up @@ -42,8 +42,8 @@ def __init__(self, name, shm, proto, size, req, ack):
shm : SharedMemory
proto : Proto
size : int
req : Pipe
ack : Pipe
req : Semaphore
ack : Semaphore
"""
self._name = name
self._shm = shm
Expand Down Expand Up @@ -97,7 +97,7 @@ def start(self):
def _ack_callback(self):
try:
while not self._done:
self._ack.recv_bytes(0)
self._ack.acquire()
not_full = self.probe()
self._semaphore.release()
if self.observer and not not_full:
Expand All @@ -124,7 +124,7 @@ def send(self, data):
self._semaphore.acquire()
self._array[self._idx][:] = data[:]
self._idx = (self._idx + 1) % self._size
self._req.send_bytes(bytes(0))
self._req.release()

def join(self):
self._done = True
Expand Down Expand Up @@ -179,8 +179,8 @@ def __init__(self, name, shm, proto, size, req, ack):
shm : SharedMemory
proto : Proto
size : int
req : Pipe
ack : Pipe
req : Semaphore
ack : Semaphore
"""
self._name = name
self._shm = shm
Expand Down Expand Up @@ -234,7 +234,7 @@ def start(self):
def _req_callback(self):
try:
while not self._done:
self._req.recv_bytes(0)
self._req.acquire()
not_empty = self.probe()
self._queue.put_nowait(0)
if self.observer and not not_empty:
Expand Down Expand Up @@ -265,7 +265,7 @@ def recv(self):
self._queue.get()
result = self._array[self._idx].copy()
self._idx = (self._idx + 1) % self._size
self._ack.send_bytes(bytes(0))
self._ack.release()

return result

Expand Down Expand Up @@ -335,11 +335,11 @@ def __init__(self,
nbytes = np.prod(shape) * np.dtype(dtype).itemsize
smm = message_infrastructure.smm
shm = smm.SharedMemory(int(nbytes * size))
req = Pipe(duplex=False)
ack = Pipe(duplex=False)
req = Semaphore(0)
ack = Semaphore(0)
proto = Proto(shape=shape, dtype=dtype, nbytes=nbytes)
self._src_port = CspSendPort(src_name, shm, proto, size, req[1], ack[0])
self._dst_port = CspRecvPort(dst_name, shm, proto, size, req[0], ack[1])
self._src_port = CspSendPort(src_name, shm, proto, size, req, ack)
self._dst_port = CspRecvPort(dst_name, shm, proto, size, req, ack)

@property
def src_port(self) -> AbstractCspSendPort:
Expand Down

0 comments on commit c68c290

Please sign in to comment.