Skip to content

Commit

Permalink
ON-16111: use monitor thread in zfsink to allow testing higher rates
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Drinkwater committed Oct 21, 2024
1 parent 7d4498c commit 48c4f03
Showing 1 changed file with 73 additions and 13 deletions.
86 changes: 73 additions & 13 deletions src/tests/zf_apps/zfsink.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,23 @@
#include <stdbool.h>
#include <stdarg.h>
#include <inttypes.h>
#include <pthread.h>
#include <sys/time.h>


struct resources {
/* statistics */
volatile uint64_t n_rx_pkts;
volatile uint64_t n_rx_bytes;
};


static bool cfg_quiet = false;
static bool cfg_rx_timestamping = false;
static struct resources res;

/* Mutex to protect printing from different threads */
static pthread_mutex_t printf_mutex;


static void usage_msg(FILE* f)
Expand All @@ -51,12 +64,10 @@ static void usage_err(void)

static void vlog(const char* fmt, ...)
{
if( ! cfg_quiet ) {
va_list args;
va_start(args, fmt);
vprintf(fmt, args);
va_end(args);
}
va_list args;
va_start(args, fmt);
vprintf(fmt, args);
va_end(args);
}


Expand All @@ -77,9 +88,6 @@ static void try_recv(struct zfur* ur)

/* Do something useful with the datagram here! */


vlog("Received datagram of length %zu\n", rd.iov[0].iov_len);

/* In the case rx timestamping capabilities are enabled, we can retrieve
* the time at which the packet was received.
* */
Expand All @@ -88,13 +96,18 @@ static void try_recv(struct zfur* ur)
struct timespec ts;
int rc = zfur_pkt_get_timestamp(ur, &rd.msg, &ts, 0, &flags);

pthread_mutex_lock(&printf_mutex);
if( rc == 0 )
vlog("At time: %lld.%.9ld\n", ts.tv_sec, ts.tv_nsec);
vlog("Hardware timestamp: %lld.%.9ld\n", ts.tv_sec, ts.tv_nsec);
else
vlog("Error retrieving timestamp! Return code: %d\n", rc);
pthread_mutex_unlock(&printf_mutex);
}

zfur_zc_recv_done(ur, &rd.msg);

res.n_rx_pkts += 1;
res.n_rx_bytes += rd.iov[0].iov_len;
} while( rd.msg.dgrams_left );
}

Expand All @@ -104,7 +117,6 @@ static void poll_muxer(struct zf_muxer_set* muxer, int timeout)
struct epoll_event evs[8];
const int max_evs = sizeof(evs) / sizeof(evs[0]);

vlog("Polling muxer\n");
int n_ev = zf_muxer_wait(muxer, evs, max_evs, timeout);

for( int i = 0; i < n_ev; ++i )
Expand All @@ -115,7 +127,6 @@ static void poll_muxer(struct zf_muxer_set* muxer, int timeout)
static void ev_loop_reactor(struct zf_stack* stack, struct zfur* ur)
{
while( 1 ) {
vlog("Polling reactor\n");
while( zf_reactor_perform(stack) == 0 )
;
try_recv(ur);
Expand Down Expand Up @@ -145,7 +156,6 @@ static void ev_loop_waitable_fd(struct zf_stack* stack,
struct epoll_event evs[8];
const int max_evs = sizeof(evs) / sizeof(evs[0]);

vlog("Calling epoll_wait\n");
int n_ev = epoll_wait(epollfd, evs, max_evs, -1);

for( int i = 0; i < n_ev; ++i )
Expand Down Expand Up @@ -233,8 +243,51 @@ void print_attrs(struct zf_attr* attr)
}


static void monitor()
{
uint64_t now_bytes, prev_bytes;
struct timeval start, end;
uint64_t prev_pkts, now_pkts;
int ms, pkt_rate, mbps;

pthread_mutex_lock(&printf_mutex);
printf("#%9s %16s %16s", "pkt-rate", "bandwidth(Mbps)", "total-pkts\n");
pthread_mutex_unlock(&printf_mutex);

prev_pkts = res.n_rx_pkts;
prev_bytes = res.n_rx_bytes;
gettimeofday(&start, NULL);

while( 1 ) {
sleep(1);
now_pkts = res.n_rx_pkts;
now_bytes = res.n_rx_bytes;
gettimeofday(&end, NULL);
ms = (end.tv_sec - start.tv_sec) * 1000;
ms += (end.tv_usec - start.tv_usec) / 1000;
pkt_rate = (int) ((now_pkts - prev_pkts) * 1000 / ms);
mbps = (int) ((now_bytes - prev_bytes) * 8 / 1000 / ms);
pthread_mutex_lock(&printf_mutex);
printf("%10d %16d %16"PRIu64"\n", pkt_rate, mbps, now_pkts);
pthread_mutex_unlock(&printf_mutex);
fflush(stdout);
prev_pkts = now_pkts;
prev_bytes = now_bytes;
start = end;
}
}


static void* monitor_fn(void* arg)
{
monitor();
return NULL;
}


int main(int argc, char* argv[])
{
pthread_t thread_id;
int cfg_muxer = 0;
int cfg_waitable_fd = 0;
bool cfg_print_attrs = false;
Expand Down Expand Up @@ -320,6 +373,13 @@ int main(int argc, char* argv[])
ZF_TRY(zf_muxer_add(muxer, zfur_to_waitable(ur), &event));
}

pthread_mutex_init(&printf_mutex, NULL);
res.n_rx_bytes = 0;
res.n_rx_pkts = 0;

if( ! cfg_quiet )
ZF_TRY(pthread_create(&thread_id, NULL, monitor_fn, NULL) == 0);

if( cfg_waitable_fd )
ev_loop_waitable_fd(stack, muxer);
else if( cfg_muxer )
Expand Down

0 comments on commit 48c4f03

Please sign in to comment.