Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FuriThread: Improve state callbacks #3881

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions applications/services/loader/loader.c
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,14 @@ static void loader_applications_closed_callback(void* context) {
furi_message_queue_put(loader->queue, &message, FuriWaitForever);
}

static void loader_thread_state_callback(FuriThreadState thread_state, void* context) {
static void
loader_thread_state_callback(FuriThread* thread, FuriThreadState thread_state, void* context) {
UNUSED(thread);
furi_assert(context);

Loader* loader = context;

if(thread_state == FuriThreadStateStopped) {
Loader* loader = context;

LoaderMessage message;
message.type = LoaderMessageTypeAppClosed;
furi_message_queue_put(loader->queue, &message, FuriWaitForever);
Expand Down
15 changes: 4 additions & 11 deletions applications/services/region/region.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,12 @@ static int32_t region_load_file(void* context) {
return 0;
}

static void region_loader_pending_callback(void* context, uint32_t arg) {
UNUSED(arg);

FuriThread* loader = context;
furi_thread_join(loader);
furi_thread_free(loader);
}

static void region_loader_state_callback(FuriThreadState state, void* context) {
static void
region_loader_release_callback(FuriThread* thread, FuriThreadState state, void* context) {
UNUSED(context);

if(state == FuriThreadStateStopped) {
furi_timer_pending_callback(region_loader_pending_callback, furi_thread_get_current(), 0);
furi_thread_free(thread);
}
}

Expand All @@ -126,7 +119,7 @@ static void region_storage_callback(const void* message, void* context) {

if(event->type == StorageEventTypeCardMount) {
FuriThread* loader = furi_thread_alloc_ex(NULL, 2048, region_load_file, NULL);
furi_thread_set_state_callback(loader, region_loader_state_callback);
furi_thread_set_state_callback(loader, region_loader_release_callback);
furi_thread_start(loader);
}
}
Expand Down
67 changes: 33 additions & 34 deletions applications/services/rpc/rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ static RpcSystemCallbacks rpc_systems[] = {
struct RpcSession {
Rpc* rpc;

FuriThread* thread;
FuriThreadId thread_id;

RpcHandlerDict_t handlers;
FuriStreamBuffer* stream;
Expand Down Expand Up @@ -172,7 +172,7 @@ size_t rpc_session_feed(

size_t bytes_sent = furi_stream_buffer_send(session->stream, encoded_bytes, size, timeout);

furi_thread_flags_set(furi_thread_get_id(session->thread), RpcEvtNewData);
furi_thread_flags_set(session->thread_id, RpcEvtNewData);

return bytes_sent;
}
Expand Down Expand Up @@ -220,7 +220,7 @@ bool rpc_pb_stream_read(pb_istream_t* istream, pb_byte_t* buf, size_t count) {
break;
} else {
/* Save disconnect flag and continue reading buffer */
furi_thread_flags_set(furi_thread_get_id(session->thread), RpcEvtDisconnect);
furi_thread_flags_set(session->thread_id, RpcEvtDisconnect);
}
} else if(flags & RpcEvtNewData) {
// Just wake thread up
Expand Down Expand Up @@ -347,35 +347,32 @@ static int32_t rpc_session_worker(void* context) {
return 0;
}

static void rpc_session_thread_pending_callback(void* context, uint32_t arg) {
UNUSED(arg);
RpcSession* session = (RpcSession*)context;
static void rpc_session_thread_release_callback(
FuriThread* thread,
FuriThreadState thread_state,
void* context) {
if(thread_state == FuriThreadStateStopped) {
RpcSession* session = (RpcSession*)context;

for(size_t i = 0; i < COUNT_OF(rpc_systems); ++i) {
if(rpc_systems[i].free) {
(rpc_systems[i].free)(session->system_contexts[i]);
for(size_t i = 0; i < COUNT_OF(rpc_systems); ++i) {
if(rpc_systems[i].free) {
(rpc_systems[i].free)(session->system_contexts[i]);
}
}
}
free(session->system_contexts);
free(session->decoded_message);
RpcHandlerDict_clear(session->handlers);
furi_stream_buffer_free(session->stream);

furi_mutex_acquire(session->callbacks_mutex, FuriWaitForever);
if(session->terminated_callback) {
session->terminated_callback(session->context);
}
furi_mutex_release(session->callbacks_mutex);

furi_mutex_free(session->callbacks_mutex);
furi_thread_join(session->thread);
furi_thread_free(session->thread);
free(session);
}
free(session->system_contexts);
free(session->decoded_message);
RpcHandlerDict_clear(session->handlers);
furi_stream_buffer_free(session->stream);

furi_mutex_acquire(session->callbacks_mutex, FuriWaitForever);
if(session->terminated_callback) {
session->terminated_callback(session->context);
}
furi_mutex_release(session->callbacks_mutex);

static void rpc_session_thread_state_callback(FuriThreadState thread_state, void* context) {
if(thread_state == FuriThreadStateStopped) {
furi_timer_pending_callback(rpc_session_thread_pending_callback, context, 0);
furi_mutex_free(session->callbacks_mutex);
furi_thread_free(thread);
free(session);
}
}

Expand Down Expand Up @@ -407,12 +404,14 @@ RpcSession* rpc_session_open(Rpc* rpc, RpcOwner owner) {
};
rpc_add_handler(session, PB_Main_stop_session_tag, &rpc_handler);

session->thread = furi_thread_alloc_ex("RpcSessionWorker", 3072, rpc_session_worker, session);
FuriThread* thread =
furi_thread_alloc_ex("RpcSessionWorker", 3072, rpc_session_worker, session);
session->thread_id = furi_thread_get_id(thread);

furi_thread_set_state_context(session->thread, session);
furi_thread_set_state_callback(session->thread, rpc_session_thread_state_callback);
furi_thread_set_state_context(thread, session);
furi_thread_set_state_callback(thread, rpc_session_thread_release_callback);

furi_thread_start(session->thread);
furi_thread_start(thread);

return session;
}
Expand All @@ -424,7 +423,7 @@ void rpc_session_close(RpcSession* session) {
rpc_session_set_send_bytes_callback(session, NULL);
rpc_session_set_close_callback(session, NULL);
rpc_session_set_buffer_is_empty_callback(session, NULL);
furi_thread_flags_set(furi_thread_get_id(session->thread), RpcEvtDisconnect);
furi_thread_flags_set(session->thread_id, RpcEvtDisconnect);
}

void rpc_on_system_start(void* p) {
Expand Down
8 changes: 4 additions & 4 deletions applications/system/updater/util/update_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -395,14 +395,15 @@ bool update_task_open_file(UpdateTask* update_task, FuriString* filename) {
return open_success;
}

static void update_task_worker_thread_cb(FuriThreadState state, void* context) {
UpdateTask* update_task = context;
static void
update_task_worker_thread_cb(FuriThread* thread, FuriThreadState state, void* context) {
UNUSED(context);

if(state != FuriThreadStateStopped) {
return;
}

if(furi_thread_get_return_code(update_task->thread) == UPDATE_TASK_NOERR) {
if(furi_thread_get_return_code(thread) == UPDATE_TASK_NOERR) {
furi_delay_ms(UPDATE_DELAY_OPERATION_OK);
furi_hal_power_reset();
}
Expand All @@ -427,7 +428,6 @@ UpdateTask* update_task_alloc(void) {
furi_thread_alloc_ex("UpdateWorker", 5120, NULL, update_task);

furi_thread_set_state_callback(thread, update_task_worker_thread_cb);
furi_thread_set_state_context(thread, update_task);
#ifdef FURI_RAM_EXEC
UNUSED(update_task_worker_backup_restore);
furi_thread_set_callback(thread, update_task_worker_flash_writer);
Expand Down
16 changes: 6 additions & 10 deletions furi/core/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct FuriThread {
StaticTask_t container;
StackType_t* stack_buffer;

FuriThreadState state;
volatile FuriThreadState state;
int32_t ret;

FuriThreadCallback callback;
Expand All @@ -59,7 +59,6 @@ struct FuriThread {
// this ensures that the size of this structure is minimal
bool is_service;
bool heap_trace_enabled;
volatile bool is_active;
};

// IMPORTANT: container MUST be the FIRST struct member
Expand All @@ -84,7 +83,7 @@ static void furi_thread_set_state(FuriThread* thread, FuriThreadState state) {
furi_assert(thread);
thread->state = state;
if(thread->state_callback) {
thread->state_callback(state, thread->state_context);
thread->state_callback(thread, state, thread->state_context);
}
}

Expand Down Expand Up @@ -124,7 +123,7 @@ static void furi_thread_body(void* context) {
// flush stdout
__furi_thread_stdout_flush(thread);

furi_thread_set_state(thread, FuriThreadStateStopped);
furi_thread_set_state(thread, FuriThreadStateStopping);

vTaskDelete(NULL);
furi_thread_catch();
Expand Down Expand Up @@ -207,7 +206,6 @@ void furi_thread_free(FuriThread* thread) {
furi_check(thread->is_service == false);
// Cannot free a non-joined thread
furi_check(thread->state == FuriThreadStateStopped);
furi_check(!thread->is_active);

furi_thread_set_name(thread, NULL);
furi_thread_set_appid(thread, NULL);
Expand Down Expand Up @@ -349,8 +347,6 @@ void furi_thread_start(FuriThread* thread) {

uint32_t stack_depth = thread->stack_size / sizeof(StackType_t);

thread->is_active = true;

furi_check(
xTaskCreateStatic(
furi_thread_body,
Expand All @@ -368,7 +364,7 @@ void furi_thread_cleanup_tcb_event(TaskHandle_t task) {
// clear thread local storage
vTaskSetThreadLocalStoragePointer(task, 0, NULL);
furi_check(thread == (FuriThread*)task);
thread->is_active = false;
furi_thread_set_state(thread, FuriThreadStateStopped);
}
}

Expand All @@ -383,8 +379,8 @@ bool furi_thread_join(FuriThread* thread) {
//
// If your thread exited, but your app stuck here: some other thread uses
// all cpu time, which delays kernel from releasing task handle
while(thread->is_active) {
furi_delay_ms(10);
while(thread->state != FuriThreadStateStopped) {
furi_delay_tick(2);
}

return true;
Expand Down
6 changes: 4 additions & 2 deletions furi/core/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ extern "C" {
* Many of the FuriThread functions MUST ONLY be called when the thread is STOPPED.
*/
typedef enum {
FuriThreadStateStopped, /**< Thread is stopped */
FuriThreadStateStopped, /**< Thread is stopped and is safe to release */
FuriThreadStateStopping, /**< Thread is stopping */
FuriThreadStateStarting, /**< Thread is starting */
FuriThreadStateRunning, /**< Thread is running */
} FuriThreadState;
Expand Down Expand Up @@ -80,10 +81,11 @@ typedef void (*FuriThreadStdoutWriteCallback)(const char* data, size_t size);
*
* The function to be used as a state callback MUST follow this signature.
*
* @param[in] pointer to the FuriThread instance that changed the state
* @param[in] state identifier of the state the thread has transitioned to
* @param[in,out] context pointer to a user-specified object
*/
typedef void (*FuriThreadStateCallback)(FuriThreadState state, void* context);
typedef void (*FuriThreadStateCallback)(FuriThread* thread, FuriThreadState state, void* context);

/**
* @brief Signal handler callback function pointer type.
Expand Down
2 changes: 1 addition & 1 deletion targets/f18/api_symbols.csv
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
entry,status,name,type,params
Version,+,75.0,,
Version,+,76.0,,
Header,+,applications/services/bt/bt_service/bt.h,,
Header,+,applications/services/bt/bt_service/bt_keys_storage.h,,
Header,+,applications/services/cli/cli.h,,
Expand Down
2 changes: 1 addition & 1 deletion targets/f7/api_symbols.csv
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
entry,status,name,type,params
Version,+,75.0,,
Version,+,76.0,,
Header,+,applications/drivers/subghz/cc1101_ext/cc1101_ext_interconnect.h,,
Header,+,applications/services/bt/bt_service/bt.h,,
Header,+,applications/services/bt/bt_service/bt_keys_storage.h,,
Expand Down