Skip to content

Commit

Permalink
Make sending commands thread safe
Browse files Browse the repository at this point in the history
Acquire a lock when sending and receiving messages to the command and
subscription socket. Fixes contention when the Connection sends a
command from multiple threads and may read some of the data from another
command.

For a discussion of this issue, see #21
  • Loading branch information
Tony Crisci committed Nov 1, 2018
1 parent 6224660 commit e9fcefa
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions i3ipc/i3ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import subprocess
from enum import Enum
from collections import deque
from threading import Timer
from threading import Timer, Lock
import time


Expand Down Expand Up @@ -391,7 +391,9 @@ def __init__(self, socket_path=None, auto_reconnect=False):
self.socket_path = socket_path
self.cmd_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.cmd_socket.connect(self.socket_path)
self.cmd_lock = Lock()
self.sub_socket = None
self.sub_lock = Lock()
self.auto_reconnect = auto_reconnect
self._restarting = False
self._quitting = False
Expand Down Expand Up @@ -449,6 +451,10 @@ def _ipc_recv(self, sock):
return self._unpack(data), msg_type

def _ipc_send(self, sock, message_type, payload):
'''
Send and receive a message from the ipc.
NOTE: this is not thread safe
'''
sock.sendall(self._pack(message_type, payload))
data, msg_type = self._ipc_recv(sock)
return data
Expand All @@ -466,6 +472,7 @@ def _wait_for_socket(self):

def message(self, message_type, payload):
try:
self.cmd_lock.acquire()
return self._ipc_send(self.cmd_socket, message_type, payload)
except BrokenPipeError as e:
if not self.auto_reconnect:
Expand All @@ -477,6 +484,8 @@ def message(self, message_type, payload):
self.cmd_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.cmd_socket.connect(self.socket_path)
return self._ipc_send(self.cmd_socket, message_type, payload)
finally:
self.cmd_lock.release()

def command(self, payload):
"""
Expand Down Expand Up @@ -655,8 +664,12 @@ def subscribe(self, events):
if events & Event.TICK:
events_obj.append("tick")

data = self._ipc_send(self.sub_socket, MessageType.SUBSCRIBE,
json.dumps(events_obj))
try:
self.sub_lock.acquire()
data = self._ipc_send(self.sub_socket, MessageType.SUBSCRIBE,
json.dumps(events_obj))
finally:
self.sub_lock.release()
result = json.loads(data, object_hook=CommandReply)
self.subscriptions |= events
return result
Expand Down

0 comments on commit e9fcefa

Please sign in to comment.