Skip to content

Commit

Permalink
Attempt to fix pluto issues
Browse files Browse the repository at this point in the history
  • Loading branch information
BatchDrake committed Dec 12, 2023
1 parent 07aa9ff commit 1d2c174
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 21 deletions.
1 change: 1 addition & 0 deletions .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ jobs:
# soapyosmo removed because of random compilation errors
# soapyairspyhf removed because of random compilation errors
brew install libsndfile volk fftw soapysdr libxml2 portaudio
python3 -Im pip install setuptools
brew install soapyrtlsdr soapyhackrf soapybladerf soapyairspy soapyredpitaya soapyiris limesuite soapyplutosdr
brew install --head soapyuhd
# TODO: needed?
Expand Down
26 changes: 26 additions & 0 deletions analyzer/impl/local.c
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,9 @@ suscan_local_analyzer_ctor(suscan_analyzer_t *parent, va_list ap)
const struct suscan_source_info *source_info = NULL;
struct sigutils_specttuner_params st_params =
sigutils_specttuner_params_INITIALIZER;
struct suscan_sample_buffer_pool_params bp_params =
suscan_sample_buffer_pool_params_INITIALIZER;

struct sigutils_channel_detector_params det_params;
suscan_source_config_t *config;
pthread_mutexattr_t attr;
Expand All @@ -535,6 +538,12 @@ suscan_local_analyzer_ctor(suscan_analyzer_t *parent, va_list ap)
goto fail;
}

/* Initialize buffer pools */
if ((new->bufpool = suscan_sample_buffer_pool_new(&bp_params)) == NULL) {
SU_ERROR("Cannot create sample buffer pool\n");
goto fail;
}

/* Initialize source */
if (!suscan_local_analyzer_source_init(new, config)) {
SU_ERROR("Failed to initialize source\n");
Expand Down Expand Up @@ -754,6 +763,15 @@ suscan_local_analyzer_dtor(void *ptr)
if (self->detector != NULL)
su_channel_detector_destroy(self->detector);

if (self->psd_worker != NULL) {
if (!suscan_worker_destroy(self->psd_worker)) {
SU_ERROR("Failed to destroy slow worker.\n");

/* Mark smoothPSD object as released */
self->smooth_psd = NULL;
}
}

if (self->smooth_psd != NULL)
su_smoothpsd_destroy(self->smooth_psd);

Expand Down Expand Up @@ -806,6 +824,14 @@ suscan_local_analyzer_dtor(void *ptr)
/* Consume any pending messages */
suscan_analyzer_consume_mq(&self->mq_in);

/* Finalize buffers (if possible) */
if (self->bufpool != NULL) {
if (!suscan_sample_buffer_pool_released(self->bufpool))
SU_WARNING("Buffer pool has unreleased buffers. Memory leak ahead.\n");
else
suscan_sample_buffer_pool_destroy(self->bufpool);
}

/* Finalize queue */
suscan_mq_finalize(&self->mq_in);

Expand Down
3 changes: 3 additions & 0 deletions analyzer/impl/local.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <sigutils/smoothpsd.h>
#include <analyzer/inspector/factory.h>
#include <analyzer/inspector/overridable.h>
#include <analyzer/pool.h>

#include <rbtree.h>

Expand Down Expand Up @@ -116,8 +117,10 @@ struct suscan_local_analyzer {
uint64_t last_channels;

/* Source worker objects */
suscan_sample_buffer_pool_t *bufpool; /* Sample buffer pool */
su_channel_detector_t *detector; /* Channel detector */
su_smoothpsd_t *smooth_psd;
suscan_worker_t *psd_worker;
suscan_worker_t *source_wk; /* Used by one source only */
suscan_worker_t *slow_wk; /* Worker for slow operations */
SUCOMPLEX *read_buf;
Expand Down
51 changes: 42 additions & 9 deletions analyzer/pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@ SU_INSTANCER(suscan_sample_buffer, suscan_sample_buffer_pool_t *parent)

SU_ALLOCATE_FAIL(self, suscan_sample_buffer_t);

self->parent = parent;
self->rindex = -1;
self->circular = parent->params.vm_circularity;
self->acquired = SU_FALSE;
self->size = parent->params.alloc_size;
self->parent = parent;
self->refcnt = 0;
self->rindex = -1;
self->circular = parent->params.vm_circularity;
self->acquired = SU_FALSE;
self->size = parent->params.alloc_size;

SU_TRYZ_FAIL(pthread_mutex_init(&self->mutex, NULL));
self->mutex_init = SU_TRUE;

SU_ALLOCATE_MANY_FAIL(self->data, self->size, SUCOMPLEX);

Expand All @@ -54,9 +58,19 @@ SU_COLLECTOR(suscan_sample_buffer)
if (self->data != NULL)
free(self->data);

if (self->mutex_init)
pthread_mutex_destroy(&self->mutex);

free(self);
}

SU_METHOD(suscan_sample_buffer, void, inc_ref)
{
pthread_mutex_lock(&self->mutex);
++self->refcnt;
pthread_mutex_unlock(&self->mutex);
}

/***************** Construct the suscan sample buffer pool ********************/
SU_CONSTRUCTOR(suscan_sample_buffer_pool,
const struct suscan_sample_buffer_pool_params *params)
Expand Down Expand Up @@ -140,7 +154,12 @@ SU_METHOD(suscan_sample_buffer_pool, suscan_sample_buffer_t *, acquire)
return NULL;
}

++ret->refcnt;
ret->acquired = SU_TRUE;

pthread_mutex_lock(&self->mutex);
--self->free_num;
pthread_mutex_unlock(&self->mutex);

} else {
/* Room for new buffers. Perform a try_acquire. */
Expand All @@ -164,6 +183,10 @@ SU_METHOD(suscan_sample_buffer_pool, suscan_sample_buffer_t *, try_acquire)
SU_WARNING("acquire() aborted due to non-buffer entry\n");
goto fail;
}

pthread_mutex_lock(&self->mutex);
--self->free_num;
pthread_mutex_unlock(&self->mutex);
} else {
/* No free elements, allocate and return */
SU_MAKE_FAIL(tmp, suscan_sample_buffer, self);
Expand All @@ -173,6 +196,7 @@ SU_METHOD(suscan_sample_buffer_pool, suscan_sample_buffer_t *, try_acquire)
}

ret->acquired = SU_TRUE;
++ret->refcnt;

return ret;

Expand All @@ -186,7 +210,7 @@ SU_METHOD(suscan_sample_buffer_pool, suscan_sample_buffer_t *, try_acquire)
SU_METHOD(suscan_sample_buffer_pool, SUBOOL, give, suscan_sample_buffer_t *buf)
{
SUBOOL ok = SU_FALSE;

SUBOOL delete;
if (!buf->acquired) {
SU_ERROR("BUG: Sample buffer is not acquired\n");
goto done;
Expand All @@ -207,9 +231,18 @@ SU_METHOD(suscan_sample_buffer_pool, SUBOOL, give, suscan_sample_buffer_t *buf)
goto done;
}

buf->acquired = SU_FALSE;

SU_TRY(suscan_mq_write(&self->free_mq, SUSCAN_POOL_MQ_TYPE_BUFFER, buf));
SU_TRY(pthread_mutex_lock(&buf->mutex));
delete = --buf->refcnt == 0;
pthread_mutex_unlock(&buf->mutex);

if (delete) {
buf->acquired = SU_FALSE;
pthread_mutex_lock(&self->mutex);
++self->free_num;
pthread_mutex_unlock(&self->mutex);

SU_TRY(suscan_mq_write(&self->free_mq, SUSCAN_POOL_MQ_TYPE_BUFFER, buf));
}

ok = SU_TRUE;

Expand Down
15 changes: 14 additions & 1 deletion analyzer/pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ struct suscan_sample_buffer_pool;

struct suscan_sample_buffer {
struct suscan_sample_buffer_pool *parent;

pthread_mutex_t mutex;
SUBOOL mutex_init;
SUSCOUNT refcnt;

int rindex; /* Reverse index in the buffer table */
SUBOOL circular;
SUBOOL acquired;
Expand All @@ -57,6 +60,7 @@ SU_GETTER(suscan_sample_buffer, SUSCOUNT, size)
}

SU_INSTANCER(suscan_sample_buffer, struct suscan_sample_buffer_pool *);
SU_METHOD(suscan_sample_buffer, void, inc_ref);
SU_COLLECTOR(suscan_sample_buffer);

struct suscan_sample_buffer_pool_params {
Expand Down Expand Up @@ -84,6 +88,7 @@ struct suscan_sample_buffer_pool {
struct suscan_sample_buffer_pool_params params;

PTR_LIST(suscan_sample_buffer_t, buffer);
unsigned free_num;
struct suscan_mq free_mq;
pthread_mutex_t mutex;
SUBOOL mutex_init;
Expand All @@ -106,4 +111,12 @@ SU_METHOD(suscan_sample_buffer_pool, suscan_sample_buffer_t *, acquire);
SU_METHOD(suscan_sample_buffer_pool, suscan_sample_buffer_t *, try_acquire);
SU_METHOD(suscan_sample_buffer_pool, SUBOOL, give, suscan_sample_buffer_t *);

SUINLINE SU_GETTER(
suscan_sample_buffer_pool,
SUBOOL,
released)
{
return self->free_num == self->buffer_count;
}

#endif /* _SUSCAN_POOL */
46 changes: 35 additions & 11 deletions analyzer/workers/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -557,21 +557,32 @@ suscan_local_analyzer_init_channel_worker(suscan_local_analyzer_t *self)
{
struct sigutils_smoothpsd_params sp_params =
sigutils_smoothpsd_params_INITIALIZER;
SUBOOL ok = SU_FALSE;

/* Create smooth PSD */
sp_params.fft_size = self->parent->params.detector_params.window_size;
sp_params.samp_rate = self->effective_samp_rate;
sp_params.refresh_rate = 1. / self->interval_psd;

self->sp_params = sp_params;

SU_TRYCATCH(
self->smooth_psd = su_smoothpsd_new(
&sp_params,
suscan_local_analyzer_on_psd,
self),
return SU_FALSE);
SU_MAKE(
self->smooth_psd,
su_smoothpsd,
&sp_params,
suscan_local_analyzer_on_psd,
self);

SU_TRY(
self->psd_worker = suscan_worker_new_ex(
"psd-worker",
&self->mq_in,
self));

ok = SU_TRUE;

return SU_TRUE;
done:
return ok;
}

SUBOOL
Expand All @@ -581,6 +592,8 @@ suscan_source_channel_wk_cb(
void *cb_private)
{
suscan_local_analyzer_t *self = (suscan_local_analyzer_t *) wk_private;
suscan_sample_buffer_t *buffer = NULL;

SUSDIFF got;
SUSCOUNT read_size;
SUBOOL mutex_acquired = SU_FALSE;
Expand All @@ -603,17 +616,25 @@ suscan_source_channel_wk_cb(
SU_TRYCATCH(
pthread_mutex_unlock(&self->throttle_mutex) != -1,
goto done);

if (read_size < self->bufpool->params.alloc_size)
goto done;
}

SU_TRYCATCH(suscan_local_analyzer_parse_overridable(self), goto done);

buffer = suscan_sample_buffer_pool_acquire(self->bufpool);
if (buffer == NULL) {
SU_ERROR("Failed to acquire read buffer\n");
goto done;
}

/* Ready to read */
suscan_local_analyzer_read_start(self);

if ((got = suscan_source_read(
self->source,
self->read_buf,
read_size)) > 0) {
buffer = suscan_source_read_buffer(self->source, self->bufpool, &got);

if (buffer != NULL) {
suscan_local_analyzer_process_start(self);

if (self->iq_rev)
Expand Down Expand Up @@ -718,6 +739,9 @@ suscan_source_channel_wk_cb(
if (mutex_acquired)
(void) suscan_local_analyzer_unlock_loop(self);

if (buffer != NULL)
suscan_sample_buffer_pool_give(self->bufpool, buffer);

return restart;
}

0 comments on commit 1d2c174

Please sign in to comment.