From 241b9a6174e0197393be29af0a8c3acb336731ec Mon Sep 17 00:00:00 2001 From: ES-Alexander Date: Tue, 22 Mar 2022 11:23:06 +1100 Subject: [PATCH] mavftp: make usable and useful - removed unused MAVProxy imports (no longer dependent on MAVProxy) - added `FTPUser` class for interacting conveniently via mavftp - added `list` method, which returns the requested files list - added `get` method, which gets and saves a requested file - added 'params' method, which gets, parses, and saves a parameter file - added CLI to access all available commands - used some f-strings, and `subparsers.add_parser(required)`, so requires Python >= 3.7 --- core/libs/mavftp/mavftp.py | 752 ++++++++++++++++++++++++------------- 1 file changed, 495 insertions(+), 257 deletions(-) diff --git a/core/libs/mavftp/mavftp.py b/core/libs/mavftp/mavftp.py index a654436008..797f1d06ff 100644 --- a/core/libs/mavftp/mavftp.py +++ b/core/libs/mavftp/mavftp.py @@ -1,9 +1,10 @@ #!/usr/bin/env python -'''mavlink file transfer support''' +"""mavlink file transfer support""" # As this file is basically copied over from https://github.com/ArduPilot/MAVProxy/blob/master/MAVProxy/modules/mavproxy_ftp.py -# let's not run pylint on it +# let's not run pylint or mypy on it # pylint: skip-file +# type: ignore import io import time, os, sys @@ -13,9 +14,6 @@ from io import BytesIO as SIO -from MAVProxy.modules.lib import mp_module -from MAVProxy.modules.lib import mp_settings - # opcodes OP_None = 0 OP_TerminateSession = 1 @@ -52,8 +50,11 @@ HDR_Len = 12 MAX_Payload = 239 + class FTP_OP: - def __init__(self, seq, session, opcode, size, req_opcode, burst_complete, offset, payload): + def __init__( + self, seq, session, opcode, size, req_opcode, burst_complete, offset, payload + ): self.seq = seq self.session = session self.opcode = opcode @@ -64,8 +65,18 @@ def __init__(self, seq, session, opcode, size, req_opcode, burst_complete, offse self.payload = payload def pack(self): - '''pack message''' - ret = struct.pack(" 0: ret += " [%u]" % self.payload[0] return ret + class WriteQueue: def __init__(self, ofs, size): self.ofs = ofs self.size = size self.last_send = 0 -class FTPModule(): + +class FTPModule: def __init__(self, mav): self.ftp_settings = { - 'debug': 0, - 'pkt_loss_tx': 0, - 'pkt_loss_rx': 0, - 'max_backlog': 5, - 'burst_read_size': 110, - 'write_size': 110, - 'write_qsize': 5, - 'retry_time': 0.5 - } + "debug": 0, + "pkt_loss_tx": 0, + "pkt_loss_rx": 0, + "max_backlog": 5, + "burst_read_size": 110, + "write_size": 110, + "write_qsize": 5, + "retry_time": 0.5, + } self.seq = 0 self.session = 0 @@ -132,7 +150,7 @@ def __init__(self, mav): self.rtt = 0.5 self.reached_eof = False self.backlog = 0 - self.burst_size = self.ftp_settings['burst_read_size'] + self.burst_size = self.ftp_settings["burst_read_size"] self.write_list = None self.write_block_size = 0 self.write_acks = 0 @@ -150,22 +168,24 @@ def __init__(self, mav): self.get_result = None def send(self, op): - '''send a request''' + """send a request""" op.seq = self.seq payload = op.pack() plen = len(payload) if plen < MAX_Payload + HDR_Len: - payload.extend(bytearray([0]*((HDR_Len+MAX_Payload)-plen))) - self.mav.mav.file_transfer_protocol_send(self.network, self.target_system, self.target_component, payload) + payload.extend(bytearray([0] * ((HDR_Len + MAX_Payload) - plen))) + self.mav.mav.file_transfer_protocol_send( + self.network, self.target_system, self.target_component, payload + ) self.seq = (self.seq + 1) % 256 self.last_op = op now = time.time() - if self.ftp_settings['debug'] > 1: + if self.ftp_settings["debug"] > 1: print("> %s dt=%.2f" % (op, now - self.last_op_time)) self.last_op_time = time.time() def terminate_session(self): - '''terminate current session''' + """terminate current session""" self.send(FTP_OP(self.seq, self.session, OP_TerminateSession, 0, 0, 0, 0, None)) self.fh = None self.filename = None @@ -190,40 +210,49 @@ def terminate_session(self): self.reached_eof = False self.backlog = 0 self.duplicates = 0 - if self.ftp_settings['debug'] > 0: + if self.ftp_settings["debug"] > 0: print("Terminated session") def cmd_list(self, args): - '''list files''' + """list files""" self.list_result = None if len(args) > 0: dname = args[0] else: - dname = '/' - print("Listing %s" % dname) - enc_dname = bytearray(dname, 'ascii') + dname = "/" + print("Listing %s" % dname) + enc_dname = bytearray(dname, "ascii") self.total_size = 0 self.dir_offset = 0 - op = FTP_OP(self.seq, self.session, OP_ListDirectory, len(enc_dname), 0, 0, self.dir_offset, enc_dname) + op = FTP_OP( + self.seq, + self.session, + OP_ListDirectory, + len(enc_dname), + 0, + 0, + self.dir_offset, + enc_dname, + ) self.send(op) def handle_list_reply(self, op, m): - '''handle OP_ListDirectory reply''' - output = '' + """handle OP_ListDirectory reply""" + output = "" if op.opcode == OP_Ack: - dentries = sorted(op.payload.split(b'\x00')) + dentries = sorted(op.payload.split(b"\x00")) for d in dentries: if len(d) == 0: continue self.dir_offset += 1 try: - d = str(d, 'ascii') + d = str(d, "ascii") except Exception: continue - if d[0] == 'D': + if d[0] == "D": output += "D %s\n" % d[1:] - elif d[0] == 'F': - (name, size) = d[1:].split('\t') + elif d[0] == "F": + (name, size) = d[1:].split("\t") size = int(size) self.total_size += size output += " %s\t%u\n" % (name, size) @@ -233,16 +262,20 @@ def handle_list_reply(self, op, m): more = self.last_op more.offset = self.dir_offset self.send(more) - elif op.opcode == OP_Nack and len(op.payload) == 1 and op.payload[0] == ERR_EndOfFile: + elif ( + op.opcode == OP_Nack + and len(op.payload) == 1 + and op.payload[0] == ERR_EndOfFile + ): output += "Total size %.2f kByte" % (self.total_size / 1024.0) self.total_size = 0 else: - output += 'LIST: %s' % op + output += "LIST: %s" % op self.list_result = output return output def cmd_get(self, args, callback=None, callback_progress=None): - '''get file''' + """get file""" self.get_result = None if len(args) == 0: print("Usage: get FILENAME ") @@ -253,7 +286,7 @@ def cmd_get(self, args, callback=None, callback_progress=None): self.filename = args[1] else: self.filename = os.path.basename(fname) - if callback is None or self.ftp_settings['debug'] > 1: + if callback is None or self.ftp_settings["debug"] > 1: print("Getting %s as %s" % (fname, self.filename)) self.op_start = time.time() self.callback = callback @@ -261,40 +294,44 @@ def cmd_get(self, args, callback=None, callback_progress=None): self.read_retries = 0 self.duplicates = 0 self.reached_eof = False - self.burst_size = self.ftp_settings['burst_read_size'] + self.burst_size = self.ftp_settings["burst_read_size"] if self.burst_size < 1: self.burst_size = 239 elif self.burst_size > 239: self.burst_size = 239 - enc_fname = bytearray(fname, 'ascii') + enc_fname = bytearray(fname, "ascii") self.open_retries = 0 - op = FTP_OP(self.seq, self.session, OP_OpenFileRO, len(enc_fname), 0, 0, 0, enc_fname) + op = FTP_OP( + self.seq, self.session, OP_OpenFileRO, len(enc_fname), 0, 0, 0, enc_fname + ) self.send(op) def handle_open_RO_reply(self, op, m): - '''handle OP_OpenFileRO reply''' + """handle OP_OpenFileRO reply""" if op.opcode == OP_Ack: if self.filename is None: return try: - if self.callback is not None or self.filename == '-': + if self.callback is not None or self.filename == "-": self.fh = SIO() else: - self.fh = open(self.filename, 'wb') + self.fh = open(self.filename, "wb") except Exception as ex: print("Failed to open %s: %s" % (self.filename, ex)) self.terminate_session() return - read = FTP_OP(self.seq, self.session, OP_BurstReadFile, self.burst_size, 0, 0, 0, None) + read = FTP_OP( + self.seq, self.session, OP_BurstReadFile, self.burst_size, 0, 0, 0, None + ) self.last_burst_read = time.time() self.send(read) else: - if self.callback is None or self.ftp_settings['debug'] > 0: + if self.callback is None or self.ftp_settings["debug"] > 0: print("ftp open failed") self.terminate_session() def check_read_finished(self): - '''check if download has completed''' + """check if download has completed""" if self.reached_eof and len(self.read_gaps) == 0: ofs = self.fh.tell() dt = time.time() - self.op_start @@ -308,16 +345,19 @@ def check_read_finished(self): if sys.version_info.major < 3: print(self.fh.read()) else: - print(self.fh.read().decode('utf-8')) + print(self.fh.read().decode("utf-8")) else: - print("Wrote %u bytes to %s in %.2fs %.1fkByte/s" % (ofs, self.filename, dt, rate)) + print( + "Wrote %u bytes to %s in %.2fs %.1fkByte/s" + % (ofs, self.filename, dt, rate) + ) self.get_result = self.filename self.terminate_session() return True return False def write_payload(self, op): - '''write payload from a read op''' + """write payload from a read op""" self.fh.seek(op.offset) self.fh.write(op.payload) self.read_total += len(op.payload) @@ -325,10 +365,10 @@ def write_payload(self, op): self.callback_progress(self.fh, self.read_total) def handle_burst_read(self, op, m): - '''handle OP_BurstReadFile reply''' - if self.ftp_settings['pkt_loss_tx'] > 0: - if random.uniform(0,100) < self.ftp_settings['pkt_loss_tx']: - if self.ftp_settings['debug'] > 0: + """handle OP_BurstReadFile reply""" + if self.ftp_settings["pkt_loss_tx"] > 0: + if random.uniform(0, 100) < self.ftp_settings["pkt_loss_tx"]: + if self.ftp_settings["debug"] > 0: print("FTP: dropping TX") return if self.fh is None or self.filename is None: @@ -343,7 +383,7 @@ def handle_burst_read(self, op, m): if size > self.burst_size: # this server doesn't handle the burst size argument self.burst_size = MAX_Payload - if self.ftp_settings['debug'] > 0: + if self.ftp_settings["debug"] > 0: print("Setting burst size to %u" % self.burst_size) if op.opcode == OP_Ack and self.fh is not None: ofs = self.fh.tell() @@ -353,11 +393,19 @@ def handle_burst_read(self, op, m): if gap in self.read_gaps: self.read_gaps.remove(gap) self.read_gap_times.pop(gap) - if self.ftp_settings['debug'] > 0: - print("FTP: removed gap", gap, self.reached_eof, len(self.read_gaps)) + if self.ftp_settings["debug"] > 0: + print( + "FTP: removed gap", + gap, + self.reached_eof, + len(self.read_gaps), + ) else: - if self.ftp_settings['debug'] > 0: - print("FTP: dup read reply at %u of len %u ofs=%u" % (op.offset, op.size, self.fh.tell())) + if self.ftp_settings["debug"] > 0: + print( + "FTP: dup read reply at %u of len %u ofs=%u" + % (op.offset, op.size, self.fh.tell()) + ) self.duplicates += 1 return self.write_payload(op) @@ -366,7 +414,7 @@ def handle_burst_read(self, op, m): return elif op.offset > ofs: # we have a gap - gap = (ofs, op.offset-ofs) + gap = (ofs, op.offset - ofs) max_read = self.burst_size while True: if gap[1] <= max_read: @@ -384,8 +432,15 @@ def handle_burst_read(self, op, m): if op.size > 0 and op.size < self.burst_size: # a burst complete with non-zero size and less than burst packet size # means EOF - if not self.reached_eof and self.ftp_settings['debug'] > 0: - print("EOF at %u with %u gaps t=%.2f" % (self.fh.tell(), len(self.read_gaps), time.time() - self.op_start)) + if not self.reached_eof and self.ftp_settings["debug"] > 0: + print( + "EOF at %u with %u gaps t=%.2f" + % ( + self.fh.tell(), + len(self.read_gaps), + time.time() - self.op_start, + ) + ) self.reached_eof = True if self.check_read_finished(): return @@ -393,34 +448,43 @@ def handle_burst_read(self, op, m): return more = self.last_op more.offset = op.offset + op.size - if self.ftp_settings['debug'] > 0: - print("FTP: burst continue at %u %u" % (more.offset, self.fh.tell())) + if self.ftp_settings["debug"] > 0: + print( + "FTP: burst continue at %u %u" % (more.offset, self.fh.tell()) + ) self.send(more) elif op.opcode == OP_Nack: ecode = op.payload[0] - if self.ftp_settings['debug'] > 0: + if self.ftp_settings["debug"] > 0: print("FTP: burst nack: ", op) if ecode == ERR_EndOfFile or ecode == 0: if not self.reached_eof and op.offset > self.fh.tell(): # we lost the last part of the burst - if self.ftp_settings['debug'] > 0: + if self.ftp_settings["debug"] > 0: print("burst lost EOF %u %u" % (self.fh.tell(), op.offset)) return - if not self.reached_eof and self.ftp_settings['debug'] > 0: - print("EOF at %u with %u gaps t=%.2f" % (self.fh.tell(), len(self.read_gaps), time.time() - self.op_start)) + if not self.reached_eof and self.ftp_settings["debug"] > 0: + print( + "EOF at %u with %u gaps t=%.2f" + % ( + self.fh.tell(), + len(self.read_gaps), + time.time() - self.op_start, + ) + ) self.reached_eof = True if self.check_read_finished(): return self.check_read_send() - elif self.ftp_settings['debug'] > 0: + elif self.ftp_settings["debug"] > 0: print("FTP: burst Nack (ecode:%u): %s" % (ecode, op)) else: print("FTP: burst error: %s" % op) def handle_reply_read(self, op, m): - '''handle OP_ReadFile reply''' + """handle OP_ReadFile reply""" if self.fh is None or self.filename is None: - if self.ftp_settings['debug'] > 0: + if self.ftp_settings["debug"] > 0: print("FTP Unexpected read reply") print(op) return @@ -434,24 +498,26 @@ def handle_reply_read(self, op, m): ofs = self.fh.tell() self.write_payload(op) self.fh.seek(ofs) - if self.ftp_settings['debug'] > 0: - print("FTP: removed gap", gap, self.reached_eof, len(self.read_gaps)) + if self.ftp_settings["debug"] > 0: + print( + "FTP: removed gap", gap, self.reached_eof, len(self.read_gaps) + ) if self.check_read_finished(): return elif op.size < self.burst_size: - print("FTP: file size changed to %u" % op.offset+op.size) + print("FTP: file size changed to %u" % op.offset + op.size) self.terminate_session() else: self.duplicates += 1 - if self.ftp_settings['debug'] > 0: + if self.ftp_settings["debug"] > 0: print("FTP: no gap read", gap, len(self.read_gaps)) elif op.opcode == OP_Nack: print("Read failed with %u gaps" % len(self.read_gaps), str(op)) self.terminate_session() self.check_read_send() - + def cmd_put(self, args, fh=None, callback=None, progress_callback=None): - '''put file''' + """put file""" if len(args) == 0: print("Usage: put FILENAME ") return @@ -462,7 +528,7 @@ def cmd_put(self, args, fh=None, callback=None, progress_callback=None): self.fh = fh if self.fh is None: try: - self.fh = open(fname, 'rb') + self.fh = open(fname, "rb") except Exception as ex: print("Failed to open %s: %s" % (fname, ex)) return @@ -472,12 +538,12 @@ def cmd_put(self, args, fh=None, callback=None, progress_callback=None): self.filename = os.path.basename(fname) if callback is None: print("Putting %s as %s" % (fname, self.filename)) - self.fh.seek(0,2) + self.fh.seek(0, 2) file_size = self.fh.tell() self.fh.seek(0) # setup write list - self.write_block_size = self.ftp_settings['write_size'] + self.write_block_size = self.ftp_settings["write_size"] self.write_file_size = file_size write_blockcount = file_size // self.write_block_size @@ -496,12 +562,14 @@ def cmd_put(self, args, fh=None, callback=None, progress_callback=None): self.put_callback_progress = progress_callback self.read_retries = 0 self.op_start = time.time() - enc_fname = bytearray(self.filename, 'ascii') - op = FTP_OP(self.seq, self.session, OP_CreateFile, len(enc_fname), 0, 0, 0, enc_fname) + enc_fname = bytearray(self.filename, "ascii") + op = FTP_OP( + self.seq, self.session, OP_CreateFile, len(enc_fname), 0, 0, 0, enc_fname + ) self.send(op) def put_finished(self, flen): - '''finish a put''' + """finish a put""" if self.put_callback_progress: self.put_callback_progress(1.0) self.put_callback_progress = None @@ -510,9 +578,9 @@ def put_finished(self, flen): self.put_callback = None else: print("Sent file of length ", flen) - + def handle_create_file_reply(self, op, m): - '''handle OP_CreateFile reply''' + """handle OP_CreateFile reply""" if self.fh is None: self.terminate_session() return @@ -523,7 +591,7 @@ def handle_create_file_reply(self, op, m): self.terminate_session() def send_more_writes(self): - '''send some more writes''' + """send some more writes""" if len(self.write_list) == 0: # all done self.put_finished(self.write_file_size) @@ -532,11 +600,13 @@ def send_more_writes(self): now = time.time() if self.write_last_send is not None: - if now - self.write_last_send > max(min(10*self.rtt, 1),0.2): + if now - self.write_last_send > max(min(10 * self.rtt, 1), 0.2): # we seem to have lost a block of replies - self.write_pending = max(0, self.write_pending-1) + self.write_pending = max(0, self.write_pending - 1) - n = min(self.ftp_settings['write_qsize']-self.write_pending, len(self.write_list)) + n = min( + self.ftp_settings["write_qsize"] - self.write_pending, len(self.write_list) + ) for i in range(n): # send in round-robin, skipping any that have been acked idx = self.write_idx @@ -545,14 +615,23 @@ def send_more_writes(self): ofs = idx * self.write_block_size self.fh.seek(ofs) data = self.fh.read(self.write_block_size) - write = FTP_OP(self.seq, self.session, OP_WriteFile, len(data), 0, 0, ofs, bytearray(data)) + write = FTP_OP( + self.seq, + self.session, + OP_WriteFile, + len(data), + 0, + 0, + ofs, + bytearray(data), + ) self.send(write) self.write_idx = (idx + 1) % self.write_total self.write_pending += 1 self.write_last_send = now def handle_write_reply(self, op, m): - '''handle OP_WriteFile reply''' + """handle OP_WriteFile reply""" if self.fh is None: self.terminate_session() return @@ -572,73 +651,86 @@ def handle_write_reply(self, op, m): self.write_list.discard(idx) self.write_acks += 1 if self.put_callback_progress: - self.put_callback_progress(self.write_acks/float(self.write_total)) + self.put_callback_progress(self.write_acks / float(self.write_total)) self.send_more_writes() def cmd_rm(self, args): - '''remove file''' + """remove file""" if len(args) == 0: print("Usage: rm FILENAME") return fname = args[0] print("Removing %s" % fname) - enc_fname = bytearray(fname, 'ascii') - op = FTP_OP(self.seq, self.session, OP_RemoveFile, len(enc_fname), 0, 0, 0, enc_fname) + enc_fname = bytearray(fname, "ascii") + op = FTP_OP( + self.seq, self.session, OP_RemoveFile, len(enc_fname), 0, 0, 0, enc_fname + ) self.send(op) def cmd_rmdir(self, args): - '''remove directory''' + """remove directory""" if len(args) == 0: print("Usage: rmdir FILENAME") return dname = args[0] print("Removing %s" % dname) - enc_dname = bytearray(dname, 'ascii') - op = FTP_OP(self.seq, self.session, OP_RemoveDirectory, len(enc_dname), 0, 0, 0, enc_dname) + enc_dname = bytearray(dname, "ascii") + op = FTP_OP( + self.seq, + self.session, + OP_RemoveDirectory, + len(enc_dname), + 0, + 0, + 0, + enc_dname, + ) self.send(op) def handle_remove_reply(self, op, m): - '''handle remove reply''' + """handle remove reply""" if op.opcode != OP_Ack: print("Remove failed %s" % op) def cmd_rename(self, args): - '''rename file''' + """rename file""" if len(args) < 2: print("Usage: rename OLDNAME NEWNAME") return name1 = args[0] name2 = args[1] print("Renaming %s to %s" % (name1, name2)) - enc_name1 = bytearray(name1, 'ascii') - enc_name2 = bytearray(name2, 'ascii') - enc_both = enc_name1 + b'\x00' + enc_name2 + enc_name1 = bytearray(name1, "ascii") + enc_name2 = bytearray(name2, "ascii") + enc_both = enc_name1 + b"\x00" + enc_name2 op = FTP_OP(self.seq, self.session, OP_Rename, len(enc_both), 0, 0, 0, enc_both) self.send(op) def handle_rename_reply(self, op, m): - '''handle rename reply''' + """handle rename reply""" if op.opcode != OP_Ack: print("Rename failed %s" % op) def cmd_mkdir(self, args): - '''make directory''' + """make directory""" if len(args) < 1: print("Usage: mkdir NAME") return name = args[0] print("Creating directory %s" % name) - enc_name = bytearray(name, 'ascii') - op = FTP_OP(self.seq, self.session, OP_CreateDirectory, len(enc_name), 0, 0, 0, enc_name) + enc_name = bytearray(name, "ascii") + op = FTP_OP( + self.seq, self.session, OP_CreateDirectory, len(enc_name), 0, 0, 0, enc_name + ) self.send(op) def handle_mkdir_reply(self, op, m): - '''handle mkdir reply''' + """handle mkdir reply""" if op.opcode != OP_Ack: print("Create directory failed %s" % op) def cmd_crc(self, args): - '''get crc''' + """get crc""" if len(args) < 1: print("Usage: crc NAME") return @@ -646,57 +738,83 @@ def cmd_crc(self, args): self.filename = name self.op_start = time.time() print("Getting CRC for %s" % name) - enc_name = bytearray(name, 'ascii') - op = FTP_OP(self.seq, self.session, OP_CalcFileCRC32, len(enc_name), 0, 0, 0, bytearray(enc_name)) + enc_name = bytearray(name, "ascii") + op = FTP_OP( + self.seq, + self.session, + OP_CalcFileCRC32, + len(enc_name), + 0, + 0, + 0, + bytearray(enc_name), + ) self.send(op) def handle_crc_reply(self, op, m): - '''handle crc reply''' + """handle crc reply""" if op.opcode == OP_Ack and op.size == 4: - crc, = struct.unpack(" 1: + if self.ftp_settings["debug"] > 1: print("< %s dt=%.2f" % (op, dt)) self.last_op_time = now - if self.ftp_settings['pkt_loss_rx'] > 0: - if random.uniform(0,100) < self.ftp_settings['pkt_loss_rx']: - if self.ftp_settings['debug'] > 1: + if self.ftp_settings["pkt_loss_rx"] > 0: + if random.uniform(0, 100) < self.ftp_settings["pkt_loss_rx"]: + if self.ftp_settings["debug"] > 1: print("FTP: dropping packet RX") return - if op.req_opcode == self.last_op.opcode and op.seq == (self.last_op.seq + 1) % 256: + if ( + op.req_opcode == self.last_op.opcode + and op.seq == (self.last_op.seq + 1) % 256 + ): self.rtt = max(min(self.rtt, dt), 0.01) if op.req_opcode == OP_ListDirectory: self.handle_list_reply(op, m) @@ -721,13 +839,16 @@ def mavlink_packet(self, m): elif op.req_opcode == OP_CalcFileCRC32: self.handle_crc_reply(op, m) else: - print('FTP Unknown %s' % str(op)) + print("FTP Unknown %s" % str(op)) def send_gap_read(self, g): - '''send a read for a gap''' + """send a read for a gap""" (offset, length) = g - if self.ftp_settings['debug'] > 0: - print("Gap read of %u at %u rem=%u blog=%u" % (length, offset, len(self.read_gaps), self.backlog)) + if self.ftp_settings["debug"] > 0: + print( + "Gap read of %u at %u rem=%u blog=%u" + % (length, offset, len(self.read_gaps), self.backlog) + ) read = FTP_OP(self.seq, self.session, OP_ReadFile, length, 0, 0, offset, None) self.send(read) self.read_gaps.remove(g) @@ -737,7 +858,7 @@ def send_gap_read(self, g): self.backlog += 1 def check_read_send(self): - '''see if we should send another gap read''' + """see if we should send another gap read""" if len(self.read_gaps) == 0: return g = self.read_gaps[0] @@ -749,7 +870,7 @@ def check_read_send(self): if self.read_gap_times[g] == 0: self.send_gap_read(g) return - if self.read_gap_times[g] > 0 and dt > self.ftp_settings['retry_time']: + if self.read_gap_times[g] > 0 and dt > self.ftp_settings["retry_time"]: if self.backlog > 0: self.backlog -= 1 self.read_gap_times[g] = 0 @@ -757,7 +878,7 @@ def check_read_send(self): if self.read_gap_times[g] != 0: # still pending return - if not self.reached_eof and self.backlog >= self.ftp_settings['max_backlog']: + if not self.reached_eof and self.backlog >= self.ftp_settings["max_backlog"]: # don't fill queue too far until we have got past the burst return if now - self.last_gap_send < 0.05: @@ -766,11 +887,15 @@ def check_read_send(self): self.send_gap_read(g) def idle_task(self): - '''check for file gaps and lost requests''' + """check for file gaps and lost requests""" now = time.time() # see if we lost an open reply - if self.op_start is not None and now - self.op_start > 1.0 and self.last_op.opcode == OP_OpenFileRO: + if ( + self.op_start is not None + and now - self.op_start > 1.0 + and self.last_op.opcode == OP_OpenFileRO + ): self.op_start = now self.open_retries += 1 if self.open_retries > 2: @@ -778,27 +903,50 @@ def idle_task(self): self.op_start = None self.terminate_session() return - if self.ftp_settings['debug'] > 0: + if self.ftp_settings["debug"] > 0: print("FTP: retry open") send_op = self.last_op - self.send(FTP_OP(self.seq, self.session, OP_TerminateSession, 0, 0, 0, 0, None)) + self.send( + FTP_OP(self.seq, self.session, OP_TerminateSession, 0, 0, 0, 0, None) + ) self.session = (self.session + 1) % 256 send_op.session = self.session self.send(send_op) - if len(self.read_gaps) == 0 and self.last_burst_read is None and self.write_list is None: + if ( + len(self.read_gaps) == 0 + and self.last_burst_read is None + and self.write_list is None + ): return if self.fh is None: return # see if burst read has stalled - if not self.reached_eof and self.last_burst_read is not None and now - self.last_burst_read > self.ftp_settings['retry_time']: + if ( + not self.reached_eof + and self.last_burst_read is not None + and now - self.last_burst_read > self.ftp_settings["retry_time"] + ): dt = now - self.last_burst_read self.last_burst_read = now - if self.ftp_settings['debug'] > 0: - print("Retry read at %u rtt=%.2f dt=%.2f" % (self.fh.tell(), self.rtt, dt)) - self.send(FTP_OP(self.seq, self.session, OP_BurstReadFile, self.burst_size, 0, 0, self.fh.tell(), None)) + if self.ftp_settings["debug"] > 0: + print( + "Retry read at %u rtt=%.2f dt=%.2f" % (self.fh.tell(), self.rtt, dt) + ) + self.send( + FTP_OP( + self.seq, + self.session, + OP_BurstReadFile, + self.burst_size, + 0, + 0, + self.fh.tell(), + None, + ) + ) self.read_retries += 1 # see if we can fill gaps @@ -807,119 +955,209 @@ def idle_task(self): if self.write_list is not None: self.send_more_writes() -mav = mavutil.mavlink_connection('udpin:0.0.0.0:14550') - - -def decodeParams(fh): - magic = 0x671b - data = fh.read() - magic2,num_params,total_params = struct.unpack(" 0 and data[0] == pad_byte: - # skip pad bytes - data = data[1:] - - if len(data) == 0: - break - ptype, plen = struct.unpack(">4) & 0x0F - ptype &= 0x0F +class FTPUser(FTPModule): + def latest_result(self, cmd): + return getattr(self, f"{cmd}_result") - if not ptype in data_types: - print("paramftp: bad type 0x%x" % ptype) - return + def result_available(self, cmd): + return self.latest_result(cmd) is not None - (type_len, type_format) = data_types[ptype] - - name_len = ((plen>>4) & 0x0F) + 1 - common_len = (plen & 0x0F) - name = last_name[0:common_len] + data[2:2+name_len] - vdata = data[2+name_len:2+name_len+type_len] - last_name = name - data = data[2+name_len+type_len:] - v, = struct.unpack("<" + type_format, vdata) - params.append((name, v, ptype)) - count += 1 - - if count != total_params: - print("paramftp: bad count %u should be %u" % (count, total_params)) - return - param_types = {} - mav_param_set = set() - fetch_one = dict() - fetch_set = None - mav_param_count = total_params - - idx = 0 - for (name, v, ptype) in params: - # we need to set it to REAL32 to ensure we use write value for param_set - name = str(name.decode('utf-8')) - param_types[name] = mavutil.mavlink.MAV_PARAM_TYPE_REAL32 - mav_param_set.add(idx) - idx += 1 - - print("Received %u parameters (ftp)" % total_params) - for param in params: - print(param) - return params - - -class FTPCli: - mavftp = FTPModule(mav) - cmd_map = { - 'list': mavftp.cmd_list, - 'get': mavftp.cmd_get - } - result_map = { - 'list': 'list_result', - 'get': 'get_result' - } + def get_cmd(self, cmd): + return getattr(self, f"cmd_{cmd}") def request(self, cmd, params, timeout=0.5, max_retries=5): - retries = 0 - while retries < max_retries and getattr(self.mavftp, self.result_map[cmd]) is None: - self.cmd_map[cmd](params) + for attempt in range(max_retries): + self.get_cmd(cmd)(params) start = time.time() - while getattr(self.mavftp, self.result_map[cmd]) is None and time.time() - start < timeout: + while not self.result_available(cmd) and time.time() - start < timeout: try: - m = mav.recv_match(type='FILE_TRANSFER_PROTOCOL') - self.mavftp.mavlink_packet(m) - except: - pass - self.mavftp.idle_task() + m = self.mav.recv_match( + type="FILE_TRANSFER_PROTOCOL", + blocking=True, + timeout=timeout / 2, + ) + self.mavlink_packet(m) + except Exception as e: + print(e) + self.idle_task() time.sleep(0.001) - retries += 1 - return getattr(self.mavftp, self.result_map[cmd]) + if self.result_available(cmd): + break + return self.latest_result(cmd) + + def list(self, path, *args, **kwargs): + return self.request("list", (path,), *args, **kwargs) + + def get(self, filepath, save_as, *args, **kwargs): + return self.request("get", (filepath, save_as), *args, **kwargs) + + def params(self, *args, **kwargs): + """get a file, and parse as params.""" + file = self.get(*args, **kwargs) + with open(file, "rb") as f: + return self.decode_params(f) + + @staticmethod + def decode_params(fh): + magic = 0x671B + data = fh.read() + magic2, num_params, total_params = struct.unpack(" 0 and data[0] == pad_byte: + # skip pad bytes + data = data[1:] + if len(data) == 0: + break - def __init__(self): - print(self.request('list', ['/home/pi/tools/bridges'])) - file = self.request('get', ['@PARAM/param.pck', 'param.pck']) - with open(file, 'rb') as f: - decodeParams(f) + ptype, plen = struct.unpack("> 4) & 0x0F + ptype &= 0x0F -FTPCli() \ No newline at end of file + if not ptype in data_types: + print("paramftp: bad type 0x%x" % ptype) + return + + (type_len, type_format) = data_types[ptype] + + name_len = ((plen >> 4) & 0x0F) + 1 + common_len = plen & 0x0F + name = last_name[0:common_len] + data[2 : 2 + name_len] + vdata = data[2 + name_len : 2 + name_len + type_len] + last_name = name + data = data[2 + name_len + type_len :] + (v,) = struct.unpack("<" + type_format, vdata) + params.append((name, v, ptype)) + count += 1 + + if count != total_params: + print("paramftp: bad count %u should be %u" % (count, total_params)) + return + param_types = {} + mav_param_set = set() + fetch_one = dict() + fetch_set = None + mav_param_count = total_params + + idx = 0 + for (name, v, ptype) in params: + # we need to set it to REAL32 to ensure we use write value for param_set + name = str(name.decode("utf-8")) + param_types[name] = mavutil.mavlink.MAV_PARAM_TYPE_REAL32 + mav_param_set.add(idx) + idx += 1 + + print("Received %u parameters (ftp)" % total_params) + for param in params: + print(param) + return params + + +if __name__ == "__main__": + from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter + + parser = ArgumentParser( + description="MAVLink FTP", formatter_class=ArgumentDefaultsHelpFormatter + ) + parser.add_argument( + "-m", + "--mavlink", + default="udpin:0.0.0.0:14550", + help="MAVLink connection specifier", + ) + subparsers = parser.add_subparsers(dest="cmd", required=True) + # 'list' command sub-parser + parser_list = subparsers.add_parser( + "list", + help=FTPUser.cmd_list.__doc__, + formatter_class=ArgumentDefaultsHelpFormatter, + ) + parser_list.add_argument( + "path", help="the path to list from", nargs="?", default="/" + ) + parser_list.set_defaults( + func=lambda ftp_user, args: print(ftp_user.list(args.path)) + ) + # 'get' command sub-parser + parser_get = subparsers.add_parser("get", help=FTPUser.cmd_get.__doc__) + parser_get.add_argument("filepath", help="the device file to download") + parser_get.add_argument("save_as", help="the filename to save to") + parser_get.set_defaults( + func=lambda ftp_user, args: ftp_user.get(args.filepath, args.save_as) + ) + # 'params' command sub-parser + parser_params = subparsers.add_parser( + "params", + help=FTPUser.params.__doc__, + formatter_class=ArgumentDefaultsHelpFormatter, + ) + parser_params.add_argument( + "filepath", + help="the device parameters file to download", + nargs="?", + default="@PARAM/param.pck", + ) + parser_params.add_argument( + "save_as", help="the filename to save to", nargs="?", default="param.pck" + ) + parser_params.set_defaults( + func=lambda ftp_user, args: ftp_user.params(args.filepath, args.save_as) + ) + # other command sub-parser + def handle_cmd(ftp_user, args): + handler = getattr(ftp_user, f"cmd_{args.cmd}") + if args.params: + return handler(args.params) + return handler() + + for attr in dir(FTPUser): + if attr.startswith("cmd") and not ("list" in attr or "get" in attr): + cmd = attr[4:] + extra_parser = subparsers.add_parser( + cmd, help=getattr(FTPUser, attr).__doc__ + ) + extra_parser.add_argument("params", nargs="*") + extra_parser.set_defaults(func=handle_cmd) + + args = parser.parse_args() + + device = mavutil.mavlink_connection(args.mavlink) + + if args.mavlink.startswith("udpin"): + print("connecting to MAVLink client...") + device.wait_heartbeat() + else: + print("connection to MAVLink server...") + while "waiting for server to respond": + device.mav.ping_send(int(time.time() * 1e6), 0, 0, 0) + if device.recv_match(): + break + time.sleep(0.5) + print("connection success!") + + ftp_user = FTPUser(device) + args.func(ftp_user, args)