Skip to content

Commit

Permalink
sockets working
Browse files Browse the repository at this point in the history
  • Loading branch information
marinagmoreira committed Nov 27, 2023
1 parent ad57def commit 255143d
Show file tree
Hide file tree
Showing 3 changed files with 328 additions and 75 deletions.
2 changes: 1 addition & 1 deletion astrobee/survey_manager/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,5 @@ include_directories(
## Install ##
#############

catkin_install_python(PROGRAMS scripts/command_astrobee
catkin_install_python(PROGRAMS scripts/command_astrobee scripts/monitor_astrobee
DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION})
304 changes: 230 additions & 74 deletions astrobee/survey_manager/scripts/command_astrobee
Original file line number Diff line number Diff line change
Expand Up @@ -21,51 +21,11 @@
import argparse
import subprocess
import sys
import os
import threading



# command = 'rosrun inspection inspection_tool -panorama -panorama_poses /resources/panorama_iss.txt -panorama_mode "5_mapper_and_hugin"'


def send_command_with_input(command):
print(command)

# Start the process
process = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE, stdout=sys.stdout, stderr=subprocess.PIPE, text=True)

try:
while process.poll() is None:

# Get user input dynamically
user_input = input()

# Check if the user wants to exit
if user_input.lower().strip() == 'exit':
break

# Send the user input to the process
process.stdin.write(user_input + "\n")
process.stdin.flush()

# Close stdin to indicate no more input will be sent
process.stdin.close()

# Read output and error (if any)
output, error = process.communicate()

# Get the final exit code
return process.returncode

except:
return -1


def send_command(command):
print(command)
process = subprocess.Popen(command, shell=True, stdout=sys.stdout, stderr=sys.stdout, text=True)

# Get the output and error (if any)
return process.wait()
import socket
import select

def get_position(bay):

Expand All @@ -84,43 +44,234 @@ def get_position(bay):
if bay == "jem_bay7":
return "'11, -9.7, 4.8'"

class SurveyExecutor:

def __init__(self, robot_name, goal, bay, run):
self.robot = robot_name
self.input_path = '/tmp/input_' + robot_name
self.output_path = '/tmp/output_' + robot_name

# Check if the file exists
if os.path.exists(self.input_path):
os.remove(self.input_path)
if os.path.exists(self.output_path):
os.remove(self.output_path)

# Declare socket for process input
self.sock_input = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock_input.settimeout(1) # Set a timeout for socket operations
self.sock_input.bind(self.input_path)
self.sock_input.listen(1) # Listen for one connection
# self.sock_input_connected = False
# self.sock_input.accept(connect_input_callback)

# Declare socket for process output
self.sock_output = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock_output.settimeout(1) # Set a timeout for socket operations
self.sock_output.bind(self.output_path)
self.sock_output.listen(1) # Listen for one connection
# self.sock_output_connected = False
# self.sock_output.accept(connect_output_callback)

# Open the input FIFO file descriptor with non-blocking flag
# self.input_fifo_fd = os.open(self.input_fifo_path, os.O_RDONLY | os.O_NONBLOCK)
# self.input_fifo = os.fdopen(self.input_fifo_fd)

# Open the output FIFO file descriptor with non-blocking flag
# self.output_fifo_fd = os.open(self.output_fifo_path, os.O_WRONLY)
# self.output_fifo = os.fdopen(self.output_fifo_fd, 'w')


self._stop_event = threading.Event()


self.survey_manager_executor(goal, bay, run)

self.sock_input.close()
self.sock_output.close()

# def connect_input_callback(sock, addr):
# """Callback function that is called when a connection is made to the socket."""
# print("Input connection made from", addr)
# sock_input_connected = False

# def connect_output_callback(sock, addr):
# """Callback function that is called when a connection is made to the socket."""
# print("Output connection made from", addr)
# sock_output_connected = False

def thread_write_output(self, process):
print("starting thread_write_output...")
# Store commulative output
output_total = ""
connected = False
try:
while True:
# Get output from process
output = process.stdout.readline()
if (output == '' and process.poll() is not None) or self._stop_event.is_set():
break
if output:
output_total += output

# If socket is not connected try to connect
try:
if not connected:
print("trying to connect")
conn, addr = self.sock_output.accept()
conn.setblocking(False)

connected = True
conn.send(output_total.encode("utf-8")[:1024])
except socket.timeout:
continue
except (socket.error, BrokenPipeError):
print("Error sending data. Receiver may have disconnected.")
connected = False

# If socket is already connected, send output
if connected:
try:
conn.send(output.encode("utf-8")[:1024])
except (socket.error, BrokenPipeError):
print("Error sending data. Receiver may have disconnected.")
connected = False

# print(output.strip())
# self.sock_output.send(output.encode("utf-8")[:1024])
except Exception as e:
print("exit output:")
print(e)
# finally:
# # Save total output into a log
# print(output_total)

def thread_read_input(self, process):
print("starting thread_read_input...")
try:
while True:
while not self._stop_event.is_set():
print("waiting for connection")
try:
client_socket, client_address = self.sock_input.accept()
break
except socket.timeout:
continue
if self._stop_event.is_set():
break
client_socket.settimeout(1) # Set a timeout for socket operations


while True:
print("accepted connection:")
print(client_address)

while not self._stop_event.is_set():
print("waiting to receive")
try:
request = client_socket.recv(1024).decode("utf-8")
break
except socket.timeout:
continue
if self._stop_event.is_set():
break

if not request:
break
print("got: " + request)

print(request)
process.stdin.write(request + "\n")
process.stdin.flush()
except Exception as e:
print("exit input:")
print(e)



def send_command_with_input(self, command):
print(command)
return_code = -1

try:
# Start the process
process = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

# Start input and output threads
input_thread = threading.Thread(target=self.thread_read_input, args=(process,))
input_thread.start()
output_thread = threading.Thread(target=self.thread_write_output, args=(process,))
output_thread.start()

output_thread.join()
# Get the return code of the process
return_code = process.poll()

except Exception as e:
print("exit main:")
print(e)
# Get the return code of the process
process.kill()
finally:
# Forcefully stop the thread (not recommended)
print("Killing input thread...")
self._stop_event.set()
input_thread.join()

# Get the final exit code
return return_code

def send_command(self, command):
print(command)
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

# Get the output and error (if any)
return process.wait()


def repeat_inspection(self):
while True:
user_input = input("Do you want to continue? (y/n): ").lower().strip()
if user_input == 'y':
if send_command_with_input("rosrun inspection inspection_tool -resume") != 0:
repeat_inspection() # Continue recursively
break
elif user_input == 'n':
print("Exiting.")
break
else:
print("Invalid input. Please enter 'y' or 'n'.")


def survey_manager_executor(self, goal, bay, run):

def repeat_inspection():
while True:
user_input = input("Do you want to continue? (y/n): ").lower().strip()
if user_input == 'y':
if send_command_with_input("rosrun inspection inspection_tool -resume") != 0:
repeat_inspection() # Continue recursively
break
elif user_input == 'n':
print("Exiting.")
break
else:
print("Invalid input. Please enter 'y' or 'n'.")
if goal == "dock":
self.send_command("rosrun executive teleop -dock")

elif goal == "dock":
self.send_command("rosrun executive teleop -undock")

def survey_manager_executer(goal, bay, run):
elif goal == "move":
self.send_command("rosrun executive teleop -move -pos " + bay)

if goal == "dock":
send_command("rosrun executive teleop -dock")
# Change exposure if needed

elif goal == "dock":
send_command("rosrun executive teleop -undock")
# Change map if needed

elif goal == "move":
send_command("rosrun executive teleop -move -pos " + bay)
elif goal == "panorama":

elif goal == "panorama":
send_command("python gds_helper_batch.py -i cmd -- bagger -start pano_" + bay + "_" + run)
if send_command_with_input("rosrun inspection inspection_tool -panorama -panorama_mode '5_mapper_and_hugin' -pos " + str(get_position(bay))) != 0:
repeat_inspection()
send_command("python gds_helper_batch.py -i cmd -- bagger -stop")
self.send_command("python gds_helper_batch.py -i cmd -- bagger -start pano_" + bay + "_" + run)
if self.send_command_with_input("rosrun inspection inspection_tool -panorama -panorama_mode '5_mapper_and_hugin' -pos " + str(get_position(bay))) != 0:
self.repeat_inspection()
self.send_command("python gds_helper_batch.py -i cmd -- bagger -stop")

elif goal == "stereo":
send_command("python gds_helper_batch.py -i cmd -- bagger -start stereo_" + bay + "_" + run)
send_command("python gds_helper_batch.py -i cmd -- plan -load plans/ISAAC/" + bay + "_stereo_mapping.fplan")
send_command("python gds_helper_batch.py -i cmd -- plan -run")
send_command("python gds_helper_batch.py -i cmd -- bagger -stop")


elif goal == "stereo":
self.send_command("python gds_helper_batch.py -i cmd -- bagger -start stereo_" + bay + "_" + run)
self.send_command("python gds_helper_batch.py -i cmd -- plan -load plans/ISAAC/" + bay + "_stereo_mapping.fplan")
self.send_command("python gds_helper_batch.py -i cmd -- plan -run")
self.send_command("python gds_helper_batch.py -i cmd -- bagger -stop")


class CustomFormatter(argparse.ArgumentDefaultsHelpFormatter):
Expand All @@ -132,6 +283,11 @@ if __name__ == "__main__":
description=__doc__, formatter_class=CustomFormatter
)

parser.add_argument(
"robot_name",
default="",
help="Robot name executing the command.",
)
parser.add_argument(
"command_name",
help="Prefix for bagfiles to merge. Bags should all be in the current working directory.",
Expand All @@ -149,4 +305,4 @@ if __name__ == "__main__":
)
args = parser.parse_args()

survey_manager_executer(args.command_name, args.bay, args.run)
e = SurveyExecutor(args.robot_name, args.command_name, args.bay, args.run)
Loading

0 comments on commit 255143d

Please sign in to comment.