diff --git a/C/Savina/src/federated/Big_fed.lf b/C/Savina/src/federated/Big_fed.lf new file mode 100644 index 0000000..6143e5c --- /dev/null +++ b/C/Savina/src/federated/Big_fed.lf @@ -0,0 +1,134 @@ +/** + * A benchmark that implements a many-to-many message passing scenario. Several + * workers are created, each of which sends a ping message to one other worker. + * To which worker the ping message is sent is decided randomly. The worker + * who receives the ping message replies with a pong. Uppon receiving the pong, + * the worker sends the next ping message. + * + * In LF, the challenging aspect about this benchmark is its sparse activity. + * While each worker is connected to all other workers, it will only send a + * message to precisely one of them for each tag. Since we need to ensure that + * input ports have a single writer, each worker has to create multiport inputs, + * where each port instance corresponds to one potential source of ping or pong + * messages. In order to determine from which worker we received a ping or pong + * message, we need to iterate over all ports and check `if_present()`. However, + * this becomes very expensive for a large number of workers. For 120 workers we + * send a total of 120 pings and 120 pongs per iteration, but we need to check + * up to 14400 ping ports and 14400 pong ports in each iteration. Obviously this + * introduces a large overhead. + * + * @author Hannes Klein + * @author Felix Wittwer + * @author Christian Menard + * @author Arthur Deng + */ + +target C{ + coordination: centralized, + timeout: 10 secs, + files: "../include/PseudoRandom.h" +}; + + +// Despite the name, this only collects "finished" messages from all workers +// and lets the benchmark runner know when all the workers finished +reactor Sink(num_workers:size_t(10)) { + // number of exit messages received + state num_messages: size_t(0); + input[num_workers] worker_finished :bool; + output finished: bool; + + + reaction(worker_finished) -> finished {= + // collect all exit messages, finish when all the messages have been received. + int i; + for(i = 0; i < self->num_workers; i++) { + if(worker_finished[i]->is_present) { + self->num_messages += 1; + if(self->num_messages == self->num_workers) { + lf_print("FINISHED"); + lf_request_stop(); + SET(finished, true); + } + } + } + =} +} + +reactor Worker(bank_index: size_t(0), num_messages: size_t(200), num_workers:size_t(10)) { + + preamble {= + #include "PseudoRandom.h" + #include "stdint.h" + =} + + state num_pings: size_t(0); + state random: PseudoRandom; + state exp_pong: size_t({=SIZE_MAX=}); + + input[num_workers] in_ping: bool; + input[num_workers] in_pong: bool; + output[num_workers] out_ping: bool; + output[num_workers] out_pong: bool; + output finished : bool; + logical action next; + + // send ping + reaction (next) -> out_ping {= + self->num_pings++; + int to = nextInt(&(self->random)) % self->num_workers; + self->exp_pong = to; + SET(out_ping[to], true); + =} + + // reply with pong + reaction(in_ping) -> out_pong {= + size_t i; + for(i = 0; i < self->num_workers; i++) { + if (in_ping[i]->is_present) { + SET(out_pong[i], true); + } + } + =} + + // receive pong and send next ping + reaction (in_pong) -> next, finished {= + size_t i; + for(i = 0; i < self->num_workers; i++) { + if (in_pong[i]->is_present) { + if (i != self->exp_pong) { + fprintf(stderr, "Expected pong from %zu but received pong from %zu", self->exp_pong, i); + } + } + } + + // send next ping + if (self->num_pings == self->num_messages) { + SET(finished, true); + } else { + lf_schedule(next, 0); + } + =} + + reaction (startup) -> next {= + // reset state + self->num_pings = 0; + self->exp_pong = SIZE_MAX; + initPseudoRandom(&(self->random), self->bank_index); + + // start execution + lf_schedule(next, 0); + =} +} + + +federated reactor (numPingsPerReactor:size_t(200), numReactors:size_t(10)) +{ + + sink = new Sink(num_workers=numReactors); + worker = new[numReactors] Worker(num_messages=numPingsPerReactor, num_workers=numReactors); + + worker.finished -> sink.worker_finished; + worker.out_ping -> interleaved(worker.in_ping); + worker.out_pong -> interleaved(worker.in_pong); +} diff --git a/C/Savina/src/federated/PingPong_fed.lf b/C/Savina/src/federated/PingPong_fed.lf new file mode 100644 index 0000000..b372d0f --- /dev/null +++ b/C/Savina/src/federated/PingPong_fed.lf @@ -0,0 +1,62 @@ +target C { + coordination: centralized, + timeout: 10 secs +}; + +reactor Ping(count:int(10)) { + input receive:int; + output send:int; + output finished:bool; + state pingsLeft:int(count); + logical action serve; + reaction (startup, serve) -> send {= + lf_print("Sending ping #%d", self->pingsLeft); + SET(send, self->pingsLeft--); + =} + + reaction (receive) -> serve, finished {= + if (self->pingsLeft > 0) { + lf_schedule(serve, 0); + } else { + lf_print("No pings left! Done!"); + SET(finished, true); + } + =} + + reaction(shutdown) {= + lf_print("Ping should have no pings left and it has: %d. \n", self->pingsLeft); + =} +} +reactor Pong(expected:int(10)) { + input receive:int; + output send:int; + input finish: bool; + state count:int(0); + reaction(receive) -> send {= + self->count++; + lf_print("Received %d", receive->value); + SET(send, receive->value); + =} + reaction(finish) {= + if (self->count != self->expected) { + lf_print_error_and_exit("Pong expected to receive %d inputs, but it received %d.\n", + self->expected, self->count + ); + exit(1); + } + printf("Success.\n"); + lf_request_stop(); + =} + + reaction(shutdown) {= + lf_print("Pong expected to receive %d inputs, and it received %d. \n", self->expected, self->count); + =} +} + +federated reactor(count:int(10)) { + ping = new Ping(count = count); + pong = new Pong(expected = count); + ping.send -> pong.receive; + pong.send -> ping.receive; + ping.finished -> pong.finish; +} \ No newline at end of file diff --git a/C/Savina/src/federated/Threadring_fed.lf b/C/Savina/src/federated/Threadring_fed.lf new file mode 100644 index 0000000..bba9a42 --- /dev/null +++ b/C/Savina/src/federated/Threadring_fed.lf @@ -0,0 +1,121 @@ +/** + * Micro-benchmark from the Savina benchmark suite, intended + * to measure message passing overhead and switching between + * actors. + * See https://shamsimam.github.io/papers/2014-agere-savina.pdf. + * + * To open the causality loop in the ring of reactors one + * reactor uses a logical action for message passing. + * + * @author Matthew Chorlian + */ + +target C { + coordination: centralized, + timeout: 10 secs +}; + + +preamble {= + int ping; + + bool hasNext(int ping) { + if (ping > 0) { + return true; + } return false; + } + + int getPingsLeft(int ping) { + return ping; + } + + int ping_next(int ping) { + return ping - 1; + } +=} + +reactor ThreadRingReactor { + + input inPrevReactor:int; + + output finished:bool; + output outNextReactor:int; + + + reaction(inPrevReactor) -> outNextReactor, finished {= + if (hasNext(inPrevReactor->value)) { + lf_print("Passing value %d", inPrevReactor->value - 1); + SET(outNextReactor, ping_next(inPrevReactor->value)); + } else { + SET(finished, true); + lf_print("Finished with count %d", getPingsLeft(inPrevReactor->value)); + } + =} +} + +reactor ThreadRingReactorLoopOpener { + + input inPrevReactor:int; + input start:int; + output finished:bool; + output outNextReactor:int; + + logical action sendToNextReactor:int; + + // this is where the loop terminates as of right now + reaction(sendToNextReactor) -> outNextReactor {= + lf_print("Reacting to logical action in Loop Opener"); + SET(outNextReactor, sendToNextReactor->value); + =} + + reaction(inPrevReactor) -> sendToNextReactor, finished {= + if (hasNext(inPrevReactor->value)) { + lf_print("Passing value %d", inPrevReactor->value - 1); + sendToNextReactor->value = ping_next(inPrevReactor->value); + lf_schedule(sendToNextReactor, 0); + } else { + SET(finished, true); + lf_print("Finished with count %d", getPingsLeft(inPrevReactor->value)); + } + =} + + reaction(start) -> sendToNextReactor, finished {= + lf_print("Loop Opener starting with count %d", getPingsLeft(start->value)); + if (hasNext(start->value)) { + lf_print("Passing value %d", start->value - 1); + sendToNextReactor->value = ping_next(start->value); + lf_schedule(sendToNextReactor, 0); + } else { + SET(finished, true); + } + =} +} + +reactor Initializer(numReactors:int(10), numPings:int(100)) { + + input[numReactors] inFinished:bool; + output outStart:int; + + reaction(startup) -> outStart {= + SET(outStart, self->numPings); + =} + + reaction(inFinished) {= + printf("Success.\n"); + lf_request_stop(); + =} +} + +federated reactor (numPings:int(100), numReactors:int(10)) + +{ + + init = new Initializer(numReactors=numReactors, numPings=numPings); + loopOpener = new ThreadRingReactorLoopOpener(); + workers = new[9] ThreadRingReactor(); + + loopOpener.outNextReactor, workers.outNextReactor -> workers.inPrevReactor, loopOpener.inPrevReactor after 0; + init.outStart -> loopOpener.start; + loopOpener.finished, workers.finished -> init.inFinished; + +} diff --git a/C/Savina/src/federated/Throughput_fed.lf b/C/Savina/src/federated/Throughput_fed.lf new file mode 100644 index 0000000..ee2845c --- /dev/null +++ b/C/Savina/src/federated/Throughput_fed.lf @@ -0,0 +1,122 @@ +/** + * Micro-benchmark from the Savina benchmark suite, where it is called Fork Join. + * See https://shamsimam.github.io/papers/2014-agere-savina.pdf. + * + * According to the Savina paper performance can be achieved by + * batch processing messages in the worker actors. + * This is not possible with reactors because scheduling and + * advancing logical time occurs after each round of + * message delivery. + * + * @author Hannes Klein (c++ version) + * @author Matthew Chorlian (adapted C++ version to C) + */ + + +/* [[[cog +# This file is a code generator using the python module cog: +# See https://nedbatchelder.com/code/cog/ +# +# All instructions for code generation are in-lined in comments +# like this one. With that you can use this file as a normal source file +# but also to generate code. +# +# To change the generated code in-line within this file run: +# $ python -m cog -r this-file.lf +# To generate a new file from this file stripping the generator code in the process run: +# $ python -m cog -d -o output-file.lf this-file.lf +# +# Use the command line option -D to specify generator parameters, for example: +# $ python -m cog -r -D parameter=100 this-file.lf +# +# Generator parameters used in this file: +# -D numWorkers=60 +# -D numMessagesPerReactor=10000 +]]] */ +// [[[end]]] + +/* [[[cog + # force existence, type and default values of generator parameters + if 'numWorkers' in globals(): + numWorkers = int(numWorkers) + else: + globals()['numWorkers'] = 60 + + # output the current value of the generator parameters used in the last generation run + cog.outl(f'// Generated file with the following parameters:') + cog.outl(f'// numWorkers = {numWorkers}') + cog.outl(f'// numMessagesPerReactor={numMessagesPerReactor}') +]]] */ +// Generated file with the following parameters: +// numWorkers = 60 +// numMessagesPerReactor = 10000 +// [[[end]]] + +target C { + flags: "-lm", + coordination: centralized, + timeout: 10 secs +}; + + +reactor ThroughputReactor(totalMessages:int(100)) { + + preamble {= + #include + + void performComputation(double theta) { + double sint = sin(theta); + // volatile to defeat dead code elimination + volatile double res = sint * sint; + } + =} + + input inMessage:bool; + + reaction(inMessage) {= + performComputation(37.2); + =} + + reaction(shutdown) {= + lf_print("Shutdown reaction invoked."); + =} +} + +/** + * + */ +reactor ThroughputProducer(totalMessages:int(100), numConsumer:int(10)) { + + state sent_messages: int(0); + input start:bool; + output outMessage:bool; + output finished:bool; + logical action send_next_msg; + reaction(startup, send_next_msg) -> outMessage, finished {= + SET(outMessage, true); + self->sent_messages++; + if (self->sent_messages == self->totalMessages) { + SET(finished, true); + lf_print("Success!"); + lf_request_stop(); + } + lf_schedule(send_next_msg, 0); + =} + + reaction(shutdown) {= + lf_print("Shutdown reaction invoked."); + =} +} + +/* [[[cog +cog.outl(f'federated reactor (numIterations:int({numIterations}), numMessagesPerReactor:int({numMessagesPerReactor}), numWorkers:int({numWorkers}))') +]]] */ +federated reactor (numMessagesPerReactor:int(100), numWorkers:int(10)) +// [[[end]]] +{ + producer = new ThroughputProducer(totalMessages=numMessagesPerReactor, numConsumer=numWorkers); + worker = new[numWorkers] ThroughputReactor(totalMessages=numMessagesPerReactor); + + + (producer.outMessage)+ -> worker.inMessage; +}