Skip to content

Commit

Permalink
Logging tweaks and some process spawning rework
Browse files Browse the repository at this point in the history
  • Loading branch information
WinterSnowfall authored Oct 22, 2023
1 parent 434a883 commit 66ff636
Showing 1 changed file with 37 additions and 33 deletions.
70 changes: 37 additions & 33 deletions wookiee_unicaster.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#!/usr/bin/env python3
'''
@author: Winter Snowfall
@version: 3.11
@date: 20/10/2023
@version: 3.12
@date: 22/10/2023
'''

import os
Expand Down Expand Up @@ -151,20 +151,20 @@ def __del__(self):
pass

def wookiee_server_handler_start(self):
logger.info(f'WU P{self.peer} >>> Starting server processes...')
logger.debug(f'WU P{self.peer} >>> Starting server process...')

wookiee_server_process = multiprocessing.Process(target=self.wookiee_server_worker,
server_handler_process = multiprocessing.Process(target=self.wookiee_server_worker,
args=(self.peer, self.peers, self.server_socket, self.remote_peer_event_list,
self.source_queue_list, self.remote_peer_worker_exit_event,
self.remote_peer_addr_array, self.remote_peer_port_array,
self.max_packet_size, self.source_packet_count,
self.child_proc_started_event, self.wookiee_constants),
daemon=True)
wookiee_server_process.start()
server_handler_process.start()

logger.info(f'WU P{self.peer} >>> Started server processes.')
logger.debug(f'WU P{self.peer} >>> Started server process.')

return wookiee_server_process
return server_handler_process

def wookiee_server_worker(self, peer, peers, isocket, remote_peer_event_list, source_queue_list,
remote_peer_worker_exit_event, remote_peer_addr_array, remote_peer_port_array,
Expand All @@ -180,7 +180,7 @@ def wookiee_server_worker(self, peer, peers, isocket, remote_peer_event_list, so
# 'server-source-receive'
wookiee_name = wookiee_constants.WOOKIEE_MODE_NAMES.get(b'100')

logger.debug(f'WU P{peer} {wookiee_name} *** Server worker started.')
logger.info(f'WU P{peer} {wookiee_name} *** Server worker started.')

try:
remote_peer_queue_dict = {}
Expand Down Expand Up @@ -281,7 +281,7 @@ def wookiee_server_worker(self, peer, peers, isocket, remote_peer_event_list, so
except SystemExit:
pass

logger.debug(f'WU P{peer} {wookiee_name} *** Server worker stopped.')
logger.info(f'WU P{peer} {wookiee_name} *** Server worker stopped.')

class RemotePeerHandler:
# keep alive packet content (featuring bowcaster ASCII art guards)
Expand Down Expand Up @@ -406,18 +406,18 @@ def __del__(self):
pass

def wookiee_peer_handler_start(self):
logger.info(f'WU P{self.peer} >>> Starting remote peer handler processes...')
logger.debug(f'WU P{self.peer} >>> Starting remote peer handler processes...')
# reset all shared process events
self.exit_event.clear()
self.link_event.clear()
self.remote_peer_event.clear()

wookiee_processes = [None, None, None, None]
peer_handler_process_list = []

# only clients must spawn a peer count of client-source-receive processes,
# since servers will only need one catch-all receive process
if self.wookiee_mode == self.wookiee_constants.WOOKIEE_MODE_CLIENT:
wookiee_processes[0] = multiprocessing.Process(target=self.wookiee_receive_worker,
peer_handler_process = multiprocessing.Process(target=self.wookiee_receive_worker,
# + '-source-receive'
args=(self.peer, self.wookiee_mode + b'00', self.source,
(self.source_ip, self.source_port), self.socket_timeout,
Expand All @@ -427,9 +427,10 @@ def wookiee_peer_handler_start(self):
self.max_packet_size, self.source_packet_count,
self.wookiee_constants),
daemon=True)
wookiee_processes[0].start()
peer_handler_process.start()
peer_handler_process_list.append(peer_handler_process)

wookiee_processes[1] = multiprocessing.Process(target=self.wookiee_relay_worker,
peer_handler_process = multiprocessing.Process(target=self.wookiee_relay_worker,
# + '-source-relay'
args=(self.peer, self.wookiee_mode + b'01', self.destination,
(self.destination_ip, self.destination_port), self.link_event,
Expand All @@ -438,9 +439,10 @@ def wookiee_peer_handler_start(self):
None, self.destination_packet_count,
self.wookiee_constants),
daemon=True)
wookiee_processes[1].start()
peer_handler_process.start()
peer_handler_process_list.append(peer_handler_process)

wookiee_processes[2] = multiprocessing.Process(target=self.wookiee_receive_worker,
peer_handler_process = multiprocessing.Process(target=self.wookiee_receive_worker,
# + '-destination-receive'
args=(self.peer, self.wookiee_mode + b'10', self.destination,
(None, None), self.socket_timeout,
Expand All @@ -450,9 +452,10 @@ def wookiee_peer_handler_start(self):
self.max_packet_size, self.source_packet_count,
self.wookiee_constants),
daemon=True)
wookiee_processes[2].start()
peer_handler_process.start()
peer_handler_process_list.append(peer_handler_process)

wookiee_processes[3] = multiprocessing.Process(target=self.wookiee_relay_worker,
peer_handler_process = multiprocessing.Process(target=self.wookiee_relay_worker,
# + '-destination-relay'
args=(self.peer, self.wookiee_mode + b'11', self.source,
(self.source_ip, self.source_port), self.link_event,
Expand All @@ -461,11 +464,12 @@ def wookiee_peer_handler_start(self):
self.remote_peer_port_array, self.destination_packet_count,
self.wookiee_constants),
daemon=True)
wookiee_processes[3].start()
peer_handler_process.start()
peer_handler_process_list.append(peer_handler_process)

logger.info(f'WU P{self.peer} >>> Started remote peer handler processes.')
logger.debug(f'WU P{self.peer} >>> Started remote peer handler processes.')

return wookiee_processes
return peer_handler_process_list

def wookiee_receive_worker(self, peer, wookiee_mode, isocket, iaddr,
socket_timeout, link_event, remote_peer_event, exit_event,
Expand All @@ -481,7 +485,7 @@ def wookiee_receive_worker(self, peer, wookiee_mode, isocket, iaddr,

wookiee_name = wookiee_constants.WOOKIEE_MODE_NAMES.get(wookiee_mode)

logger.debug(f'WU P{peer} {wookiee_name} +++ Receive worker started.')
logger.info(f'WU P{peer} {wookiee_name} +++ Receive worker started.')

try:
# ensure no timeout is actively enforced on the socket
Expand Down Expand Up @@ -611,7 +615,7 @@ def wookiee_receive_worker(self, peer, wookiee_mode, isocket, iaddr,
except:
pass

logger.debug(f'WU P{peer} {wookiee_name} +++ Receive worker stopped.')
logger.info(f'WU P{peer} {wookiee_name} +++ Receive worker stopped.')

def wookiee_relay_worker(self, peer, wookiee_mode, osocket, oaddr,
link_event, remote_peer_event, exit_event, source_queue,
Expand All @@ -628,7 +632,7 @@ def wookiee_relay_worker(self, peer, wookiee_mode, osocket, oaddr,

wookiee_name = wookiee_constants.WOOKIEE_MODE_NAMES.get(wookiee_mode)

logger.debug(f'WU P{peer} {wookiee_name} --- Relay worker started.')
logger.info(f'WU P{peer} {wookiee_name} --- Relay worker started.')

# 'server-destination-relay'
if wookiee_mode == b'111':
Expand Down Expand Up @@ -690,10 +694,10 @@ def wookiee_relay_worker(self, peer, wookiee_mode, osocket, oaddr,
oaddr = (socket.inet_ntoa(struct.pack('!L', remote_peer_addr_array[peer - 1])),
remote_peer_port_array[peer - 1])
if oaddr != (0, 0):
logger.info(f'WU P{peer} {wookiee_name} --- Updated peer IP address/port.')
logger.info(f'WU P{peer} {wookiee_name} --- Cached remote peer IP address/port.')
remote_peer_addr_cached = True
else:
logger.debug(f'WU P{peer} {wookiee_name} --- Waiting on IP address/port update.')
logger.debug(f'WU P{peer} {wookiee_name} --- Waiting to establish remote peer IP address/port.')
# wait times here should be minimal due to link_event sync
sleep(0.05)

Expand Down Expand Up @@ -732,7 +736,7 @@ def wookiee_relay_worker(self, peer, wookiee_mode, osocket, oaddr,
except:
pass

logger.debug(f'WU P{peer} {wookiee_name} --- Relay worker stopped.')
logger.info(f'WU P{peer} {wookiee_name} --- Relay worker stopped.')

if __name__ == "__main__":
# catch SIGTERM and exit gracefully
Expand Down Expand Up @@ -1080,9 +1084,9 @@ def wookiee_relay_worker(self, peer, wookiee_mode, osocket, oaddr,
reset_peer = remote_peer_handlers_reset_queue.get()
reset_peer_index = reset_peer - 1
logger.debug(f'WU >>> Resetting remote peer handler P{reset_peer}...')
for process in remote_peer_handlers_processes[reset_peer_index]:
if process is not None and process.is_alive():
process.join()
for remote_peer_handler_process in remote_peer_handlers_processes[reset_peer_index]:
if remote_peer_handler_process.is_alive():
remote_peer_handler_process.join()
remote_peer_handlers_processes[reset_peer_index] = remote_peer_handlers[reset_peer_index].wookiee_peer_handler_start()
logger.debug(f'WU >>> Remote peer handler P{reset_peer} has been reset.')

Expand Down Expand Up @@ -1114,9 +1118,9 @@ def wookiee_relay_worker(self, peer, wookiee_mode, osocket, oaddr,
logger.debug('WU >>> Waiting for the remote peer handler threads to complete...')

for peer in range(peers):
for process in remote_peer_handlers_processes[peer]:
if process is not None and process.is_alive():
process.join()
for remote_peer_handler_process in remote_peer_handlers_processes[peer]:
if remote_peer_handler_process.is_alive():
remote_peer_handler_process.join()
# clear remote peer handler references to trigger the destructor
remote_peer_handlers[peer] = None

Expand Down

0 comments on commit 66ff636

Please sign in to comment.