From 86c6e4ab057c9381507dd129ed589fcb8b86860b Mon Sep 17 00:00:00 2001 From: Ivan Poleshchuk Date: Wed, 28 Sep 2022 14:51:24 +0500 Subject: [PATCH] Get IPFS CID back from uploader and invoke RECORDING_END trigger --- lib/procs.cpp | 4 ++-- lib/procs.h | 2 +- lib/util.cpp | 18 +++++++++++++++--- lib/util.h | 4 ++-- src/output/output.cpp | 27 +++++++++++++++++++++++++-- src/output/output_flv.cpp | 2 +- 6 files changed, 46 insertions(+), 11 deletions(-) diff --git a/lib/procs.cpp b/lib/procs.cpp index 15169859d..85d223da7 100644 --- a/lib/procs.cpp +++ b/lib/procs.cpp @@ -436,7 +436,7 @@ pid_t Util::Procs::StartPiped(const char *const *argv, int *fdin, int *fdout, in return pid; } -pid_t Util::Procs::startConverted(const char *const *argv, int *fdin){ +pid_t Util::Procs::startConverted(const char *const *argv, int *fdin, std::string triggerPayload){ pid_t pid; int pipein[2]; setHandler(); @@ -479,7 +479,7 @@ pid_t Util::Procs::startConverted(const char *const *argv, int *fdin){ } dup2(ch_stdin, 0); close(ch_stdin); - convertLogs(argv[0]); + convertLogs(argv[0], triggerPayload); execvp(argv[0], (char *const *)argv); /*LTS-START*/ char *trggr = getenv("MIST_TRIGGER"); diff --git a/lib/procs.h b/lib/procs.h index 0f5a4327a..0d6cc70d8 100644 --- a/lib/procs.h +++ b/lib/procs.h @@ -33,7 +33,7 @@ namespace Util{ static std::string getOutputOf(std::deque &argDeq); static pid_t StartPiped(const char *const *argv, int *fdin, int *fdout, int *fderr); static pid_t StartPiped(std::deque &argDeq, int *fdin, int *fdout, int *fderr); - static pid_t startConverted(const char *const *argv, int *fdin); + static pid_t startConverted(const char *const *argv, int *fdin, std::string triggerPayload = ""); static void Stop(pid_t name); static void Murder(pid_t name); static void StopAll(); diff --git a/lib/util.cpp b/lib/util.cpp index 7b6298044..93ea21504 100644 --- a/lib/util.cpp +++ b/lib/util.cpp @@ -17,6 +17,7 @@ #endif #include #include +#include #define RAXHDR_FIELDOFFSET p[1] #define RAX_REQDFIELDS_LEN 36 @@ -325,7 +326,7 @@ namespace Util{ } } - void convertLogs(const char *progName){ + void convertLogs(const char *progName, std::string triggerPayload){ int pipeErr[2]; int pipeOut[2]; pid_t thisPid = getpid(); @@ -352,7 +353,7 @@ namespace Util{ sigaction(SIGHUP, &new_action, NULL); sigaction(SIGTERM, &new_action, NULL); sigaction(SIGPIPE, &new_action, NULL); - Util::logConverter(pipeErr[0], pipeOut[0], STDERR_FILENO, progName, thisPid); + Util::logConverter(pipeErr[0], pipeOut[0], STDERR_FILENO, progName, thisPid, triggerPayload); exit(0); } Util::Procs::fork_complete(); @@ -369,7 +370,7 @@ namespace Util{ } } - void logConverter(int inErr, int inOut, int out, const char *progName, pid_t pid){ + void logConverter(int inErr, int inOut, int out, const char *progName, pid_t pid, std::string triggerPayload){ Socket::Connection errStream(-1, inErr); Socket::Connection outStream(-1, inOut); errStream.setBlocking(false); @@ -389,6 +390,17 @@ namespace Util{ while (line.find('\r') != std::string::npos){line.erase(line.find('\r'));} while (line.find('\n') != std::string::npos){line.erase(line.find('\n'));} dprintf(out, "INFO|%s|%d||%s|%s\n", progName, pid, Util::streamName, line.c_str()); + if (!strcmp(progName, "livepeer-catalyst-uploader") && !triggerPayload.empty()) { + // output is one-line JSON with URL + const JSON::Value &value = JSON::fromString(line); + // insert received IPFS CID + unsigned long cid_pos = triggerPayload.find("IPFS_CID"); + if (cid_pos != std::string::npos) { + triggerPayload.replace(cid_pos, 8, value["uri"].asString()); + // invoke RECORDING_END trigger + Triggers::doTrigger("RECORDING_END", triggerPayload, streamName); + } + } line.clear(); } }else{Util::sleep(25);} diff --git a/lib/util.h b/lib/util.h index 8524449ad..4046ee7bb 100644 --- a/lib/util.h +++ b/lib/util.h @@ -63,8 +63,8 @@ namespace Util{ void logParser(int in, int out, bool colored, void callback(const std::string &, const std::string &, const std::string &, uint64_t, bool) = 0); void redirectLogsIfNeeded(); - void convertLogs(const char *progName); - void logConverter(int inErr, int inOut, int out, const char *progName, pid_t pid); + void convertLogs(const char *progName, std::string triggerPayload = ""); + void logConverter(int inErr, int inOut, int out, const char *progName, pid_t pid, std::string triggerPayload = ""); /// Holds type, size and offset for RelAccX class internal data fields. class RelAccXFieldData{ diff --git a/src/output/output.cpp b/src/output/output.cpp index a91b45887..63cabb827 100644 --- a/src/output/output.cpp +++ b/src/output/output.cpp @@ -1400,7 +1400,8 @@ namespace Mist{ streamName + "\n" + getConnectedHost() + "\n" + capa["name"].asStringRef() + "\n" + reqUrl; Triggers::doTrigger("CONN_CLOSE", payload, streamName); } - if (isRecordingToFile && config->hasOption("target") && Triggers::shouldTrigger("RECORDING_END", streamName)){ + if (isRecordingToFile && config->hasOption("target") && Triggers::shouldTrigger("RECORDING_END", streamName) && + config->getString("target").substr(0, 7) != "ipfs://"){ uint64_t rightNow = Util::epoch(); std::stringstream payl; payl << streamName << '\n'; @@ -1876,7 +1877,29 @@ namespace Mist{ return false; } Util::Procs::forget(child); - }else{ + } else if (file.substr(0,7) == "ipfs://") { + // Create RECORDING_END trigger payload to be invoked once IPFS CID is known + std::stringstream payl; + payl << streamName << '\n'; + payl << "IPFS_CID" << '\n'; + payl << capa["name"].asStringRef() << '\n'; + // Can't fill these fields without knowing when the trigger will be fired + payl << 0 << '\n'; + payl << 0 << '\n'; + payl << 0 << '\n'; + payl << 0 << '\n'; + payl << 0 << '\n'; + payl << 0 << '\n'; + payl << 0 << '\n'; + const char *cmd[] = {"livepeer-catalyst-uploader", "-t", "2592000s", (char*)file.c_str(), 0}; + pid_t child = Util::Procs::startConverted(cmd, &outFile, payl.str()); + if (child == -1){ + ERROR_MSG("livepeer-catalyst-uploader process did not start, aborting"); + return false; + } + Util::Procs::forget(child); + } + else{ int flags = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH; int mode = O_RDWR | O_CREAT | (append ? O_APPEND : O_TRUNC); if (!Util::createPathFor(file)){ diff --git a/src/output/output_flv.cpp b/src/output/output_flv.cpp index 33ba45cc8..2c85fe0ce 100644 --- a/src/output/output_flv.cpp +++ b/src/output/output_flv.cpp @@ -31,7 +31,7 @@ namespace Mist{ capa["methods"][0u]["hrn"] = "FLV progressive"; capa["methods"][0u]["priority"] = 5; capa["methods"][0u]["player_url"] = "/oldflashplayer.swf"; - capa["push_urls"].append("/*.flv"); + capa["push_urls"].append("*.flv"); JSON::Value opt; opt["arg"] = "string";