Skip to content

Commit

Permalink
ON-16111: use monitor thread in zfsink to allow testing higher rates
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Drinkwater committed Oct 15, 2024
1 parent 7d4498c commit 2039470
Showing 1 changed file with 105 additions and 41 deletions.
146 changes: 105 additions & 41 deletions src/tests/zf_apps/zfsink.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,32 @@
#include <stdbool.h>
#include <stdarg.h>
#include <inttypes.h>
#include <pthread.h>
#include <sys/time.h>


static bool cfg_quiet = false;
static bool cfg_rx_timestamping = false;

/* Mutex to protect printing from different threads */
static pthread_mutex_t printf_mutex;


struct resources {
/* handle for accessing the stack */
struct zf_stack* stack;

/* handle for accessing the muxer */
struct zf_muxer_set* muxer;

/* handle for the UDP receive zocket */
struct zfur* ur;

/* statistics */
uint64_t n_rx_pkts;
uint64_t n_rx_bytes;
};


static void usage_msg(FILE* f)
{
Expand All @@ -51,16 +72,14 @@ static void usage_err(void)

static void vlog(const char* fmt, ...)
{
if( ! cfg_quiet ) {
va_list args;
va_start(args, fmt);
vprintf(fmt, args);
va_end(args);
}
va_list args;
va_start(args, fmt);
vprintf(fmt, args);
va_end(args);
}


static void try_recv(struct zfur* ur)
static void try_recv(struct resources* res, struct zfur* ur)
{
struct {
/* The iovec used by zfur_msg must be immediately afterwards */
Expand All @@ -77,9 +96,6 @@ static void try_recv(struct zfur* ur)

/* Do something useful with the datagram here! */


vlog("Received datagram of length %zu\n", rd.iov[0].iov_len);

/* In the case rx timestamping capabilities are enabled, we can retrieve
* the time at which the packet was received.
* */
Expand All @@ -88,54 +104,56 @@ static void try_recv(struct zfur* ur)
struct timespec ts;
int rc = zfur_pkt_get_timestamp(ur, &rd.msg, &ts, 0, &flags);

pthread_mutex_lock(&printf_mutex);
if( rc == 0 )
vlog("At time: %lld.%.9ld\n", ts.tv_sec, ts.tv_nsec);
vlog("Hardware timestamp: %lld.%.9ld\n", ts.tv_sec, ts.tv_nsec);
else
vlog("Error retrieving timestamp! Return code: %d\n", rc);
pthread_mutex_unlock(&printf_mutex);
}

zfur_zc_recv_done(ur, &rd.msg);

res->n_rx_pkts += 1;
res->n_rx_bytes += rd.iov[0].iov_len;
} while( rd.msg.dgrams_left );
}


static void poll_muxer(struct zf_muxer_set* muxer, int timeout)
static void poll_muxer(struct resources* res, int timeout)
{
struct epoll_event evs[8];
const int max_evs = sizeof(evs) / sizeof(evs[0]);

vlog("Polling muxer\n");
int n_ev = zf_muxer_wait(muxer, evs, max_evs, timeout);
int n_ev = zf_muxer_wait(res->muxer, evs, max_evs, timeout);

for( int i = 0; i < n_ev; ++i )
try_recv(evs[i].data.ptr);
try_recv(res, evs[i].data.ptr);
}


static void ev_loop_reactor(struct zf_stack* stack, struct zfur* ur)
static void ev_loop_reactor(struct resources* res)
{
while( 1 ) {
vlog("Polling reactor\n");
while( zf_reactor_perform(stack) == 0 )
while( zf_reactor_perform(res->stack) == 0 )
;
try_recv(ur);
try_recv(res, res->ur);
}
}


static void ev_loop_muxer(struct zf_muxer_set* muxer)
static void ev_loop_muxer(struct resources* res)
{
while( 1 )
poll_muxer(muxer, -1);
poll_muxer(res, -1);
}


static void ev_loop_waitable_fd(struct zf_stack* stack,
struct zf_muxer_set* muxer)
static void ev_loop_waitable_fd(struct resources* res)
{
int waitable_fd;
ZF_TRY(zf_waitable_fd_get(stack, &waitable_fd));
ZF_TRY(zf_waitable_fd_prime(stack));
ZF_TRY(zf_waitable_fd_get(res->stack, &waitable_fd));
ZF_TRY(zf_waitable_fd_prime(res->stack));

int epollfd = epoll_create(10);
struct epoll_event ev = { .events = EPOLLIN, .data.fd = waitable_fd };
Expand All @@ -145,13 +163,12 @@ static void ev_loop_waitable_fd(struct zf_stack* stack,
struct epoll_event evs[8];
const int max_evs = sizeof(evs) / sizeof(evs[0]);

vlog("Calling epoll_wait\n");
int n_ev = epoll_wait(epollfd, evs, max_evs, -1);

for( int i = 0; i < n_ev; ++i )
if( evs[i].data.fd == waitable_fd ) {
poll_muxer(muxer, 0);
ZF_TRY(zf_waitable_fd_prime(stack));
poll_muxer(res, 0);
ZF_TRY(zf_waitable_fd_prime(res->stack));
}
else {
/* Not possible in this sample code. */
Expand Down Expand Up @@ -233,8 +250,51 @@ void print_attrs(struct zf_attr* attr)
}


static void monitor(struct resources* res)
{
uint64_t now_bytes, prev_bytes;
struct timeval start, end;
uint64_t prev_pkts, now_pkts;
int ms, pkt_rate, mbps;

printf("#%9s %16s %16s", "pkt-rate", "bandwidth(Mbps)", "total-pkts\n");

prev_pkts = res->n_rx_pkts;
prev_bytes = res->n_rx_bytes;
gettimeofday(&start, NULL);

while( 1 ) {
sleep(1);
now_pkts = res->n_rx_pkts;
now_bytes = res->n_rx_bytes;
gettimeofday(&end, NULL);
ms = (end.tv_sec - start.tv_sec) * 1000;
ms += (end.tv_usec - start.tv_usec) / 1000;
pkt_rate = (int) ((now_pkts - prev_pkts) * 1000 / ms);
mbps = (int) ((now_bytes - prev_bytes) * 8 / 1000 / ms);
pthread_mutex_lock(&printf_mutex);
printf("%10d %16d %16"PRIu64"\n", pkt_rate, mbps, now_pkts);
pthread_mutex_unlock(&printf_mutex);
fflush(stdout);
prev_pkts = now_pkts;
prev_bytes = now_bytes;
start = end;
}
}


static void* monitor_fn(void* arg)
{
struct resources* res = arg;
monitor(res);
return NULL;
}


int main(int argc, char* argv[])
{
pthread_t thread_id;
struct resources* res;
int cfg_muxer = 0;
int cfg_waitable_fd = 0;
bool cfg_print_attrs = false;
Expand Down Expand Up @@ -290,20 +350,20 @@ int main(int argc, char* argv[])
if( cfg_rx_timestamping )
ZF_TRY(zf_attr_set_int(attr, "rx_timestamping", 1));

struct zf_stack* stack;
ZF_TRY(zf_stack_alloc(attr, &stack));
ZF_TRY((res = calloc(1, sizeof(*res))) != NULL);

ZF_TRY(zf_stack_alloc(attr, &res->stack));

if( cfg_print_attrs )
print_attrs(attr);

struct zfur* ur;
ZF_TRY(zfur_alloc(&ur, stack, attr));
ZF_TRY(zfur_alloc(&res->ur, res->stack, attr));

if( ai_remote )
ZF_TRY(zfur_addr_bind(ur, ai_local->ai_addr, ai_local->ai_addrlen,
ZF_TRY(zfur_addr_bind(res->ur, ai_local->ai_addr, ai_local->ai_addrlen,
ai_remote->ai_addr, ai_remote->ai_addrlen, 0));
else
ZF_TRY(zfur_addr_bind(ur, ai_local->ai_addr, ai_local->ai_addrlen,
ZF_TRY(zfur_addr_bind(res->ur, ai_local->ai_addr, ai_local->ai_addrlen,
NULL, 0, 0));

/* If no local port was specified, report which one was assigned */
Expand All @@ -313,19 +373,23 @@ int main(int argc, char* argv[])
}

/* Initialise the multiplexer if we're going to use one. */
struct epoll_event event = { .events = EPOLLIN, .data = { .ptr = ur } };
struct zf_muxer_set* muxer;
struct epoll_event event = { .events = EPOLLIN, .data = { .ptr = res->ur } };
if( cfg_muxer || cfg_waitable_fd ) {
ZF_TRY(zf_muxer_alloc(stack, &muxer));
ZF_TRY(zf_muxer_add(muxer, zfur_to_waitable(ur), &event));
ZF_TRY(zf_muxer_alloc(res->stack, &res->muxer));
ZF_TRY(zf_muxer_add(res->muxer, zfur_to_waitable(res->ur), &event));
}

pthread_mutex_init(&printf_mutex, NULL);

if( ! cfg_quiet )
ZF_TRY(pthread_create(&thread_id, NULL, monitor_fn, res) == 0);

if( cfg_waitable_fd )
ev_loop_waitable_fd(stack, muxer);
ev_loop_waitable_fd(res);
else if( cfg_muxer )
ev_loop_muxer(muxer);
ev_loop_muxer(res);
else
ev_loop_reactor(stack, ur);
ev_loop_reactor(res);

return 0;
}

0 comments on commit 2039470

Please sign in to comment.