Skip to content

Commit

Permalink
list active downloads in controlpanel
Browse files Browse the repository at this point in the history
  • Loading branch information
9001 committed Nov 10, 2024
1 parent 0ce7cf5 commit 8aba5ae
Show file tree
Hide file tree
Showing 21 changed files with 271 additions and 31 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1687,6 +1687,7 @@ scrape_configs:
currently the following metrics are available,
* `cpp_uptime_seconds` time since last copyparty restart
* `cpp_boot_unixtime_seconds` same but as an absolute timestamp
* `cpp_active_dl` number of active downloads
* `cpp_http_conns` number of open http(s) connections
* `cpp_http_reqs` number of http(s) requests handled
* `cpp_sus_reqs` number of 403/422/malicious requests
Expand Down
1 change: 1 addition & 0 deletions copyparty/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
web/deps/scp.woff2
web/deps/sha512.ac.js
web/deps/sha512.hw.js
web/iiam.gif
web/md.css
web/md.html
web/md.js
Expand Down
1 change: 1 addition & 0 deletions copyparty/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,7 @@ def add_admin(ap):
ap2.add_argument("--no-reload", action="store_true", help="disable ?reload=cfg (reload users/volumes/volflags from config file)")
ap2.add_argument("--no-rescan", action="store_true", help="disable ?scan (volume reindexing)")
ap2.add_argument("--no-stack", action="store_true", help="disable ?stack (list all stacks)")
ap2.add_argument("--dl-list", metavar="LVL", type=int, default=2, help="who can see active downloads in the controlpanel? [\033[32m0\033[0m]=nobody, [\033[32m1\033[0m]=admins, [\033[32m2\033[0m]=everyone")


def add_thumbnail(ap):
Expand Down
45 changes: 40 additions & 5 deletions copyparty/broker_mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ def __init__(self, hub: "SvcHub") -> None:
self.procs = []
self.mutex = threading.Lock()

self.retpend: dict[int, Any] = {}
self.retpend_mutex = threading.Lock()

self.num_workers = self.args.j or CORES
self.log("broker", "booting {} subprocesses".format(self.num_workers))
for n in range(1, self.num_workers + 1):
Expand All @@ -54,6 +57,8 @@ def __init__(self, hub: "SvcHub") -> None:
self.procs.append(proc)
proc.start()

Daemon(self.periodic, "mp-periodic")

def shutdown(self) -> None:
self.log("broker", "shutting down")
for n, proc in enumerate(self.procs):
Expand Down Expand Up @@ -90,8 +95,10 @@ def collector(self, proc: MProcess) -> None:
self.log(*args)

elif dest == "retq":
# response from previous ipc call
raise Exception("invalid broker_mp usage")
with self.retpend_mutex:
retq = self.retpend.pop(retq_id)

retq.put(args[0])

else:
# new ipc invoking managed service in hub
Expand All @@ -109,7 +116,6 @@ def collector(self, proc: MProcess) -> None:
proc.q_pend.put((retq_id, "retq", rv))

def ask(self, dest: str, *args: Any) -> Union[ExceptionalQueue, NotExQueue]:

# new non-ipc invoking managed service in hub
obj = self.hub
for node in dest.split("."):
Expand All @@ -121,17 +127,30 @@ def ask(self, dest: str, *args: Any) -> Union[ExceptionalQueue, NotExQueue]:
retq.put(rv)
return retq

def wask(self, dest: str, *args: Any) -> list[Union[ExceptionalQueue, NotExQueue]]:
# call from hub to workers
ret = []
for p in self.procs:
retq = ExceptionalQueue(1)
retq_id = id(retq)
with self.retpend_mutex:
self.retpend[retq_id] = retq

p.q_pend.put((retq_id, dest, list(args)))
ret.append(retq)
return ret

def say(self, dest: str, *args: Any) -> None:
"""
send message to non-hub component in other process,
returns a Queue object which eventually contains the response if want_retval
(not-impl here since nothing uses it yet)
"""
if dest == "listen":
if dest == "httpsrv.listen":
for p in self.procs:
p.q_pend.put((0, dest, [args[0], len(self.procs)]))

elif dest == "set_netdevs":
elif dest == "httpsrv.set_netdevs":
for p in self.procs:
p.q_pend.put((0, dest, list(args)))

Expand All @@ -140,3 +159,19 @@ def say(self, dest: str, *args: Any) -> None:

else:
raise Exception("what is " + str(dest))

def periodic(self) -> None:
while True:
time.sleep(1)

tdli = {}
tdls = {}
qs = self.wask("httpsrv.read_dls")
for q in qs:
qr = q.get()
dli, dls = qr
tdli.update(dli)
tdls.update(dls)
tdl = (tdli, tdls)
for p in self.procs:
p.q_pend.put((0, "httpsrv.write_dls", tdl))
39 changes: 20 additions & 19 deletions copyparty/broker_mpw.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,37 +82,38 @@ def main(self) -> None:
while True:
retq_id, dest, args = self.q_pend.get()

# self.logw("work: [{}]".format(d[0]))
if dest == "retq":
# response from previous ipc call
with self.retpend_mutex:
retq = self.retpend.pop(retq_id)

retq.put(args)
continue

if dest == "shutdown":
self.httpsrv.shutdown()
self.logw("ok bye")
sys.exit(0)
return

elif dest == "reload":
if dest == "reload":
self.logw("mpw.asrv reloading")
self.asrv.reload()
self.logw("mpw.asrv reloaded")
continue

elif dest == "reload_sessions":
if dest == "reload_sessions":
with self.asrv.mutex:
self.asrv.load_sessions()
continue

elif dest == "listen":
self.httpsrv.listen(args[0], args[1])

elif dest == "set_netdevs":
self.httpsrv.set_netdevs(args[0])
obj = self
for node in dest.split("."):
obj = getattr(obj, node)

elif dest == "retq":
# response from previous ipc call
with self.retpend_mutex:
retq = self.retpend.pop(retq_id)

retq.put(args)

else:
raise Exception("what is " + str(dest))
rv = obj(*args) # type: ignore
if retq_id:
self.say("retq", rv, retq_id=retq_id)

def ask(self, dest: str, *args: Any) -> Union[ExceptionalQueue, NotExQueue]:
retq = ExceptionalQueue(1)
Expand All @@ -123,5 +124,5 @@ def ask(self, dest: str, *args: Any) -> Union[ExceptionalQueue, NotExQueue]:
self.q_yield.put((retq_id, dest, list(args)))
return retq

def say(self, dest: str, *args: Any) -> None:
self.q_yield.put((0, dest, list(args)))
def say(self, dest: str, *args: Any, retq_id=0) -> None:
self.q_yield.put((retq_id, dest, list(args)))
4 changes: 2 additions & 2 deletions copyparty/broker_thr.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ def ask(self, dest: str, *args: Any) -> Union[ExceptionalQueue, NotExQueue]:
return NotExQueue(obj(*args)) # type: ignore

def say(self, dest: str, *args: Any) -> None:
if dest == "listen":
if dest == "httpsrv.listen":
self.httpsrv.listen(args[0], 1)
return

if dest == "set_netdevs":
if dest == "httpsrv.set_netdevs":
self.httpsrv.set_netdevs(args[0])
return

Expand Down
Loading

0 comments on commit 8aba5ae

Please sign in to comment.