Skip to content

Commit

Permalink
ggml-gobject: Use a custom GSource to check the GAsyncQueue
Browse files Browse the repository at this point in the history
As opposed to using a constantly-refreshing GIdleSource. The latter
was consuming quite many cycles, because GIdle sources are "always-ready"
  • Loading branch information
smspillaz committed Jul 20, 2023
1 parent dfd4130 commit 0848ec6
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 29 deletions.
43 changes: 14 additions & 29 deletions ggml-gobject/ggml-language-model.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <ggml-gobject/ggml-cached-model.h>
#include <ggml-gobject/ggml-gpt.h>
#include <ggml-gobject/ggml-language-model.h>
#include <ggml-gobject/internal/ggml-async-queue-source.h>
#include <ggml-gobject/internal/ggml-stream-internal.h>

struct _GGMLLanguageModel {
Expand Down Expand Up @@ -667,24 +668,21 @@ ggml_language_model_complete_thread_loop (gpointer data)

typedef struct _GGMLLanguageModelCompleteMonitorState
{
GAsyncQueue *queue;
GAsyncReadyCallback callback;
gpointer user_data;
GDestroyNotify user_data_destroy;
size_t ref_count;
} GGMLLanguageModelCompleteMonitorState;

static GGMLLanguageModelCompleteMonitorState *
ggml_language_model_complete_monitor_state_new (GAsyncQueue *async_queue,
GAsyncReadyCallback callback,
ggml_language_model_complete_monitor_state_new (GAsyncReadyCallback callback,
gpointer user_data,
GDestroyNotify user_data_destroy)
{
GGMLLanguageModelCompleteMonitorState *state = g_new0 (GGMLLanguageModelCompleteMonitorState, 1);
state->callback = callback;
state->user_data = user_data;
state->user_data_destroy = user_data_destroy;
state->queue = g_async_queue_ref (async_queue);
state->ref_count = 1;

return state;
Expand All @@ -695,7 +693,6 @@ ggml_language_model_complete_monitor_state_unref (GGMLLanguageModelCompleteMonit
{
if (--state->ref_count == 0)
{
g_clear_pointer (&state->queue, g_async_queue_unref);
g_clear_pointer (&state->user_data, state->user_data_destroy);

g_clear_pointer (&state, g_free);
Expand Down Expand Up @@ -768,30 +765,15 @@ ggml_language_model_monitor_process_completion (GGMLLanguageModelCompleteMonitor
}

static gboolean
ggml_language_model_monitor_callback (gpointer user_data)
ggml_language_model_monitor_callback (gpointer message, gpointer user_data)
{
GGMLLanguageModelCompleteMonitorState *state = user_data;
g_autoptr(GGMLLanguageModelChunkCompletion) completion = message;

while (TRUE)
/* If we return TRUE*/
if (ggml_language_model_monitor_process_completion (state, completion))
{
GGMLLanguageModelChunkCompletion *completion = g_async_queue_try_pop (state->queue);

/* Nothing more in the queue, but we're not done with our completion,
* so return G_SOURCE_CONTINUE here. */
if (completion == NULL)
{
return G_SOURCE_CONTINUE;
}

/* If we return TRUE*/
if (ggml_language_model_monitor_process_completion (state, completion))
{
ggml_language_model_chunk_completion_free (completion);
ggml_language_model_complete_monitor_state_unref (state);
return G_SOURCE_REMOVE;
}

ggml_language_model_chunk_completion_free (completion);
return G_SOURCE_REMOVE;
}
}

Expand Down Expand Up @@ -835,13 +817,16 @@ ggml_language_model_complete_async (GGMLLanguageModel *language_model,
g_autoptr(GError) error = NULL;

g_autoptr(GAsyncQueue) async_queue = g_async_queue_new_full ((GDestroyNotify) ggml_language_model_chunk_completion_free);
g_autoptr(GGMLLanguageModelCompleteMonitorState) monitor_state = ggml_language_model_complete_monitor_state_new (async_queue,
callback,
g_autoptr(GGMLLanguageModelCompleteMonitorState) monitor_state = ggml_language_model_complete_monitor_state_new (callback,
user_data,
user_data_destroy);

g_idle_add ((GSourceFunc) ggml_language_model_monitor_callback,
g_steal_pointer (&monitor_state));
GSource *monitor_source = ggml_async_queue_source_new (async_queue,
ggml_language_model_monitor_callback,
g_steal_pointer (&monitor_state),
ggml_language_model_complete_monitor_state_unref,
cancellable);
g_source_attach (g_steal_pointer (&monitor_source), NULL);

GGMLLanguageModelCompleteState *state = ggml_language_model_complete_state_new (language_model,
prompt,
Expand Down
109 changes: 109 additions & 0 deletions ggml-gobject/internal/ggml-async-queue-source.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* ggml-gobject/ggml-async-queue-source.c
*
* Library code for ggml-async-queue-source
*
* Copyright (C) 2023 Sam Spilsbury.
*
* ggml-gobject is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* ggml-gobject is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along
* with ggml-gobject; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/

#include <gio/gio.h>
#include <ggml-gobject/internal/ggml-async-queue-source.h>

typedef struct {
GSource source;
GAsyncQueue *queue;
} GGMLAsyncQueueSource;

static gboolean
ggml_async_queue_source_prepare (GSource *source, int32_t *timeout)
{
GGMLAsyncQueueSource *async_queue_source = (GGMLAsyncQueueSource *) source;
return g_async_queue_length (async_queue_source->queue) > 0;
}

static gboolean
ggml_async_queue_source_dispatch (GSource *source,
GSourceFunc func,
gpointer user_data)
{
GGMLAsyncQueueSource *async_queue_source = (GGMLAsyncQueueSource *) source;
gpointer message = g_async_queue_try_pop (async_queue_source->queue);
GGMLAsyncQueueSourceDispatchFunc real_func = (GGMLAsyncQueueSourceDispatchFunc) func;

if (message != NULL)
{
g_assert (real_func != NULL);
return real_func (message, user_data);
}

return G_SOURCE_CONTINUE;
}

static void
ggml_async_queue_source_finalize (GSource *source)
{
GGMLAsyncQueueSource *async_queue_source = (GGMLAsyncQueueSource *) source;

g_clear_pointer (&async_queue_source->queue, g_async_queue_unref);

/* The source will be freed later */
}

static GSourceFuncs async_queue_source_funcs = {
.prepare = ggml_async_queue_source_prepare,
.check = NULL,
.dispatch = ggml_async_queue_source_dispatch,
.finalize = ggml_async_queue_source_finalize
};

/**
* ggml_async_queue_source_new:
* @queue: A #GAsyncQueue
* @func: A #GGMLAsyncQueueSourceDispatchFunc
* @user_data: (closure @func): A closure for @func
* @user_data_destroy: (destroy @func): A destructor for @user_data
* @cancellable: A #GCancellable
*
* Create a new #GGMLAsyncQueueSource from #GAsyncQueue . The source will
* be dispatched when there is work to be done from the @queue. The queue
* needs to wakeup the main context manually by calling g_main_context_wakeup
* once the source is ready to be dispatched, as there is no poll-filedescriptor
*/
GSource *
ggml_async_queue_source_new (GAsyncQueue *queue,
GGMLAsyncQueueSourceDispatchFunc func,
gpointer user_data,
GDestroyNotify user_data_destroy,
GCancellable *cancellable)
{

GGMLAsyncQueueSource *source = (GGMLAsyncQueueSource *) g_source_new (&async_queue_source_funcs, sizeof (GGMLAsyncQueueSource));

g_source_set_callback ((GSource *) source, (GSourceFunc) func, user_data, user_data_destroy);
g_source_set_name ((GSource *) source, "AsyncQueueSource");

source->queue = g_async_queue_ref (queue);

if (cancellable != NULL)
{
g_autoptr(GSource) cancellable_source = g_cancellable_source_new (cancellable);
g_source_set_dummy_callback (cancellable_source);
g_source_add_child_source ((GSource *) source, cancellable_source);
}

return (GSource *) source;
}
44 changes: 44 additions & 0 deletions ggml-gobject/internal/ggml-async-queue-source.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* ggml-gobject/ggml-async-queue-source.h
*
* Library code for ggml-async-queue-source
*
* Copyright (C) 2023 Sam Spilsbury.
*
* ggml-gobject is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* ggml-gobject is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along
* with ggml-gobject; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/

#include <glib-object.h>

G_BEGIN_DECLS

/**
* GGMLAsyncQueueSourceDispatchFunc:
* @item: Some item to be dispatched in the queue
* @user_data: User data for the dispatch func
*
* Returns: A boolean for whether the source should be removed,
* usually this is either %G_SOURCE_REMOVE or %G_SOURCE_CONTINUE.
*/
typedef gboolean (*GGMLAsyncQueueSourceDispatchFunc) (gpointer item, gpointer user_data);

GSource *
ggml_async_queue_source_new (GAsyncQueue *queue,
GGMLAsyncQueueSourceDispatchFunc func,
gpointer user_data,
GDestroyNotify user_data_destroy,
GCancellable *cancellable);

G_END_DECLS
2 changes: 2 additions & 0 deletions ggml-gobject/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ ggml_gobject_toplevel_introspectable_sources = files([
'ggml-types.c',
])
ggml_gobject_toplevel_internal_sources = files([
'internal/ggml-async-queue-source.c',
'internal/ggml-stream-internal.c',
])
ggml_gobject_toplevel_internal_headers = files([
'internal/ggml-async-queue-source.h',
'internal/ggml-context-internal.h',
'internal/ggml-stream-internal.h',
'internal/ggml-tensor-internal.h',
Expand Down

0 comments on commit 0848ec6

Please sign in to comment.