Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get IPFS CID back from uploader and invoke RECORDING_END trigger #111

Merged
merged 1 commit into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/procs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ pid_t Util::Procs::StartPiped(const char *const *argv, int *fdin, int *fdout, in
return pid;
}

pid_t Util::Procs::startConverted(const char *const *argv, int *fdin){
pid_t Util::Procs::startConverted(const char *const *argv, int *fdin, std::string triggerPayload){
pid_t pid;
int pipein[2];
setHandler();
Expand Down Expand Up @@ -479,7 +479,7 @@ pid_t Util::Procs::startConverted(const char *const *argv, int *fdin){
}
dup2(ch_stdin, 0);
close(ch_stdin);
convertLogs(argv[0]);
convertLogs(argv[0], triggerPayload);
execvp(argv[0], (char *const *)argv);
/*LTS-START*/
char *trggr = getenv("MIST_TRIGGER");
Expand Down
2 changes: 1 addition & 1 deletion lib/procs.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace Util{
static std::string getOutputOf(std::deque<std::string> &argDeq);
static pid_t StartPiped(const char *const *argv, int *fdin, int *fdout, int *fderr);
static pid_t StartPiped(std::deque<std::string> &argDeq, int *fdin, int *fdout, int *fderr);
static pid_t startConverted(const char *const *argv, int *fdin);
static pid_t startConverted(const char *const *argv, int *fdin, std::string triggerPayload = "");
static void Stop(pid_t name);
static void Murder(pid_t name);
static void StopAll();
Expand Down
18 changes: 15 additions & 3 deletions lib/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#endif
#include <stdlib.h>
#include <sys/resource.h>
#include <mist/triggers.h>

#define RAXHDR_FIELDOFFSET p[1]
#define RAX_REQDFIELDS_LEN 36
Expand Down Expand Up @@ -325,7 +326,7 @@ namespace Util{
}
}

void convertLogs(const char *progName){
void convertLogs(const char *progName, std::string triggerPayload){
int pipeErr[2];
int pipeOut[2];
pid_t thisPid = getpid();
Expand All @@ -352,7 +353,7 @@ namespace Util{
sigaction(SIGHUP, &new_action, NULL);
sigaction(SIGTERM, &new_action, NULL);
sigaction(SIGPIPE, &new_action, NULL);
Util::logConverter(pipeErr[0], pipeOut[0], STDERR_FILENO, progName, thisPid);
Util::logConverter(pipeErr[0], pipeOut[0], STDERR_FILENO, progName, thisPid, triggerPayload);
exit(0);
}
Util::Procs::fork_complete();
Expand All @@ -369,7 +370,7 @@ namespace Util{
}
}

void logConverter(int inErr, int inOut, int out, const char *progName, pid_t pid){
void logConverter(int inErr, int inOut, int out, const char *progName, pid_t pid, std::string triggerPayload){
Socket::Connection errStream(-1, inErr);
Socket::Connection outStream(-1, inOut);
errStream.setBlocking(false);
Expand All @@ -389,6 +390,17 @@ namespace Util{
while (line.find('\r') != std::string::npos){line.erase(line.find('\r'));}
while (line.find('\n') != std::string::npos){line.erase(line.find('\n'));}
dprintf(out, "INFO|%s|%d||%s|%s\n", progName, pid, Util::streamName, line.c_str());
if (!strcmp(progName, "livepeer-catalyst-uploader") && !triggerPayload.empty()) {
// output is one-line JSON with URL
const JSON::Value &value = JSON::fromString(line);
// insert received IPFS CID
unsigned long cid_pos = triggerPayload.find("IPFS_CID");
if (cid_pos != std::string::npos) {
triggerPayload.replace(cid_pos, 8, value["uri"].asString());
// invoke RECORDING_END trigger
Triggers::doTrigger("RECORDING_END", triggerPayload, streamName);
}
}
line.clear();
}
}else{Util::sleep(25);}
Expand Down
4 changes: 2 additions & 2 deletions lib/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ namespace Util{
void logParser(int in, int out, bool colored,
void callback(const std::string &, const std::string &, const std::string &, uint64_t, bool) = 0);
void redirectLogsIfNeeded();
void convertLogs(const char *progName);
void logConverter(int inErr, int inOut, int out, const char *progName, pid_t pid);
void convertLogs(const char *progName, std::string triggerPayload = "");
void logConverter(int inErr, int inOut, int out, const char *progName, pid_t pid, std::string triggerPayload = "");

/// Holds type, size and offset for RelAccX class internal data fields.
class RelAccXFieldData{
Expand Down
27 changes: 25 additions & 2 deletions src/output/output.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1400,7 +1400,8 @@ namespace Mist{
streamName + "\n" + getConnectedHost() + "\n" + capa["name"].asStringRef() + "\n" + reqUrl;
Triggers::doTrigger("CONN_CLOSE", payload, streamName);
}
if (isRecordingToFile && config->hasOption("target") && Triggers::shouldTrigger("RECORDING_END", streamName)){
if (isRecordingToFile && config->hasOption("target") && Triggers::shouldTrigger("RECORDING_END", streamName) &&
config->getString("target").substr(0, 7) != "ipfs://"){
uint64_t rightNow = Util::epoch();
std::stringstream payl;
payl << streamName << '\n';
Expand Down Expand Up @@ -1876,7 +1877,29 @@ namespace Mist{
return false;
}
Util::Procs::forget(child);
}else{
} else if (file.substr(0,7) == "ipfs://") {
// Create RECORDING_END trigger payload to be invoked once IPFS CID is known
std::stringstream payl;
payl << streamName << '\n';
payl << "IPFS_CID" << '\n';
payl << capa["name"].asStringRef() << '\n';
// Can't fill these fields without knowing when the trigger will be fired
payl << 0 << '\n';
payl << 0 << '\n';
payl << 0 << '\n';
payl << 0 << '\n';
payl << 0 << '\n';
payl << 0 << '\n';
payl << 0 << '\n';
const char *cmd[] = {"livepeer-catalyst-uploader", "-t", "2592000s", (char*)file.c_str(), 0};
pid_t child = Util::Procs::startConverted(cmd, &outFile, payl.str());
if (child == -1){
ERROR_MSG("livepeer-catalyst-uploader process did not start, aborting");
return false;
}
Util::Procs::forget(child);
}
else{
int flags = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
int mode = O_RDWR | O_CREAT | (append ? O_APPEND : O_TRUNC);
if (!Util::createPathFor(file)){
Expand Down
2 changes: 1 addition & 1 deletion src/output/output_flv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace Mist{
capa["methods"][0u]["hrn"] = "FLV progressive";
capa["methods"][0u]["priority"] = 5;
capa["methods"][0u]["player_url"] = "/oldflashplayer.swf";
capa["push_urls"].append("/*.flv");
capa["push_urls"].append("*.flv");

JSON::Value opt;
opt["arg"] = "string";
Expand Down