Skip to content

Commit

Permalink
Allow strings to be directly copied between threads in some cases (#93)
Browse files Browse the repository at this point in the history
closes #71

This works by allowing threads direct access to the original thread's string for as long as it is cached in the origin thread's ThreadedBase connection.

As long as the connection lives long enough for the reading thread(s) to dereference the string in question (which, thanks to 4ddf79e, will be the case in all common cases, including dead threads and completed worker tasks), the extra malloc and string copy is avoided on the writer thread, which significantly improves performance in synthetic benchmarks.

If the connection is destroyed, it will create persistent copies of any cached strings during free_obj, and the old double-copy method will be used to enable the string to be accessed. However, this is rarely needed.

The caveat to this is that pthreads_store_sync_local_properties() will do work more often when strings are used, but I don't think this is a big concern. For most cases, the property table should be small enough for this to not be a problem anyway, and for the large cases, we need to implement dedicated queue data structures anyway. Profiling anyway suggested that the overhead of zend_hash_internal_pointer_reset_ex() was several orders of magnitude bigger a problem anyway (see #42).
  • Loading branch information
dktapps authored Feb 5, 2023
1 parent 60babc5 commit 283c2e8
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 10 deletions.
2 changes: 2 additions & 0 deletions src/object.c
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ void pthreads_base_free(zend_object *object) {
if (pthreads_globals_lock()) {
if (--base->ts_obj->refcount == 0) {
pthreads_ts_object_free(base);
} else {
pthreads_store_persist_local_properties(object);
}
pthreads_globals_object_delete(base);
pthreads_globals_unlock();
Expand Down
100 changes: 90 additions & 10 deletions src/store.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ void pthreads_store_sync_local_properties(zend_object* object) { /* {{{ */
remove = 0;
}
#endif
} else if (ts_val->type == STORE_TYPE_STRING_PTR && Z_TYPE_P(val) == IS_STRING) {
pthreads_string_storage_t* string = (pthreads_string_storage_t*)ts_val->data;
if (string->owner.ls == TSRMLS_CACHE && string->string == Z_STR_P(val)) {
//local caching of this by other threads is probably fine too, but fully caching it would probably
//require bytewise comparison, which isn't gonna be very performant
//it should be sufficient to only persist the owner thread's ref, since that's where copies will be
//made from anyway.
remove = 0;
}
}
}

Expand All @@ -115,7 +124,7 @@ void pthreads_store_sync_local_properties(zend_object* object) { /* {{{ */
} /* }}} */

static inline zend_bool pthreads_store_retain_in_local_cache(zval* val) {
return IS_PTHREADS_OBJECT(val) || IS_PTHREADS_CLOSURE_OBJECT(val) || IS_EXT_SOCKETS_OBJECT(val);
return IS_PTHREADS_OBJECT(val) || IS_PTHREADS_CLOSURE_OBJECT(val) || IS_EXT_SOCKETS_OBJECT(val) || Z_TYPE_P(val) == IS_STRING;
}

static inline zend_bool pthreads_store_valid_local_cache_item(zval* val) {
Expand All @@ -127,7 +136,7 @@ static inline zend_bool pthreads_store_valid_local_cache_item(zval* val) {
/* {{{ */
static inline zend_bool pthreads_store_storage_is_cacheable(zval* zstorage) {
pthreads_storage* storage = TRY_PTHREADS_STORAGE_PTR_P(zstorage);
return storage && (storage->type == STORE_TYPE_PTHREADS || storage->type == STORE_TYPE_CLOSURE || storage->type == STORE_TYPE_SOCKET);
return storage && (storage->type == STORE_TYPE_PTHREADS || storage->type == STORE_TYPE_CLOSURE || storage->type == STORE_TYPE_SOCKET || storage->type == STORE_TYPE_STRING_PTR);
} /* }}} */

/* {{{ Syncs all the cacheable properties from TS storage into local cache */
Expand Down Expand Up @@ -314,6 +323,15 @@ zend_bool pthreads_store_isset(zend_object *object, zval *key, int has_set_exist
isset = 0;
}
break;
case IS_PTR: {
pthreads_storage* storage = TRY_PTHREADS_STORAGE_PTR_P(zstorage);
if (storage->type == STORE_TYPE_STRING_PTR) {
pthreads_string_storage_t* string = (pthreads_string_storage_t*)storage->data;
if (ZSTR_LEN(string->string) == 0 || ZSTR_VAL(string->string)[0] == '0') {
isset = 0;
}
}
} break;
default:
break;
}
Expand Down Expand Up @@ -350,7 +368,7 @@ static inline void pthreads_store_update_local_property(zend_object* object, zva
zend_hash_update(object->properties, str_key, value);
}
}
Z_ADDREF_P(value);
Z_TRY_ADDREF_P(value);
}
}

Expand Down Expand Up @@ -768,6 +786,34 @@ void pthreads_store_tohash(zend_object *object, HashTable *hash) {
}
} /* }}} */

/* {{{ */
void pthreads_store_persist_local_properties(zend_object* object) {
pthreads_zend_object_t* threaded = PTHREADS_FETCH_FROM(object);
pthreads_object_t* ts_obj = threaded->ts_obj;

if (pthreads_monitor_lock(ts_obj->monitor)) {
zval *zstorage;
ZEND_HASH_FOREACH_VAL(&ts_obj->props->hash, zstorage) {
pthreads_storage* storage = TRY_PTHREADS_STORAGE_PTR_P(zstorage);

if (storage != NULL && storage->type == STORE_TYPE_STRING_PTR) {
pthreads_string_storage_t* string = (pthreads_string_storage_t*)storage->data;
if (string->owner.ls == TSRMLS_CACHE) {
//we can't guarantee this string will continue to be available once we stop referencing it on this thread,
//so we must create a persistent copy now

zend_string* persistent_string = pthreads_store_save_string(string->string);
pthreads_store_storage_dtor(zstorage);

ZVAL_STR(zstorage, persistent_string);
}
}
} ZEND_HASH_FOREACH_END();

pthreads_monitor_unlock(ts_obj->monitor);
}
} /* }}} */

/* {{{ */
void pthreads_store_free(pthreads_store_t *store){
zend_hash_destroy(&store->hash);
Expand All @@ -787,6 +833,14 @@ static pthreads_storage* pthreads_store_create(pthreads_ident_t* source, zval *u


switch(Z_TYPE_P(unstore)){
case IS_STRING: {
storage->type = STORE_TYPE_STRING_PTR;
pthreads_string_storage_t* string = malloc(sizeof(pthreads_string_storage_t));
string->owner = *source;
string->string = Z_STR_P(unstore);
storage->data = string;
} break;

case IS_RESOURCE: {
pthreads_resource resource = malloc(sizeof(*resource));
storage->type = STORE_TYPE_RESOURCE;
Expand Down Expand Up @@ -885,9 +939,14 @@ static zend_result pthreads_store_save_zval(pthreads_ident_t* source, zval *zsto
result = SUCCESS;
break;
case IS_STRING:
ZVAL_STR(zstorage, pthreads_store_save_string(Z_STR_P(write)));
result = SUCCESS;
break;
//permanent strings can be used directly
//non-permanent strings are handled differently
if (GC_FLAGS(Z_STR_P(write)) & IS_STR_PERMANENT) {
ZVAL_STR(zstorage, Z_STR_P(write));

result = SUCCESS;
break;
}
default: {
pthreads_storage *storage = pthreads_store_create(source, write);
if (storage != NULL) {
Expand All @@ -906,6 +965,17 @@ static int pthreads_store_convert(pthreads_storage *storage, zval *pzval){
int result = SUCCESS;

switch(storage->type) {
case STORE_TYPE_STRING_PTR: {
pthreads_string_storage_t* string = (pthreads_string_storage_t*)storage->data;

if (string->owner.ls == TSRMLS_CACHE) {
//this thread owns the string - we can use it directly
ZVAL_STR_COPY(pzval, string->string);
} else {
//this thread does not own the string - create a copy
ZVAL_STR(pzval, pthreads_store_restore_string(string->string));
}
} break;
case STORE_TYPE_RESOURCE: {
pthreads_resource stored = (pthreads_resource) storage->data;

Expand Down Expand Up @@ -1036,6 +1106,7 @@ static void pthreads_store_restore_zval_ex(zval *unstore, zval *zstorage, zend_b
ZVAL_COPY(unstore, zstorage);
break;
case IS_STRING:
/* permanent interned string, or persisted string from a dead thread */
ZVAL_STR(unstore, pthreads_store_restore_string(Z_STR_P(zstorage)));
break;
case IS_PTR:
Expand All @@ -1061,13 +1132,21 @@ static void pthreads_store_restore_zval(zval *unstore, zval *zstorage) {
static void pthreads_store_hard_copy_storage(zval *new_zstorage, zval *zstorage) {
if (Z_TYPE_P(zstorage) == IS_PTR) {
pthreads_storage *storage = (pthreads_storage *) Z_PTR_P(zstorage);
pthreads_storage *copy = malloc(sizeof(pthreads_storage));
if (storage->type == STORE_TYPE_STRING_PTR) {
//hard-copy string ptrs here, since the destination object might not exist on the thread which owns the string
//this means that the owning thread may not be aware that this new ref now exists and won't persist the string when it dies
pthreads_string_storage_t* string = (pthreads_string_storage_t*)storage->data;
ZVAL_STR(new_zstorage, pthreads_store_save_string(string->string));

} else {
pthreads_storage* copy = malloc(sizeof(pthreads_storage));

memcpy(copy, storage, sizeof(pthreads_storage));
memcpy(copy, storage, sizeof(pthreads_storage));

//if we add new store types, their internal data might need to be copied here
//if we add new store types, their internal data might need to be copied here

ZVAL_PTR(new_zstorage, copy);
ZVAL_PTR(new_zstorage, copy);
}
} else if (Z_TYPE_P(zstorage) == IS_STRING) {
ZVAL_STR(new_zstorage, pthreads_store_save_string(Z_STR_P(zstorage)));
} else {
Expand Down Expand Up @@ -1200,6 +1279,7 @@ static void pthreads_store_storage_dtor (zval *zstorage){
if (Z_TYPE_P(zstorage) == IS_PTR) {
pthreads_storage *storage = (pthreads_storage *) Z_PTR_P(zstorage);
switch (storage->type) {
case STORE_TYPE_STRING_PTR:
case STORE_TYPE_RESOURCE:
case STORE_TYPE_SOCKET:
case STORE_TYPE_CLOSURE:
Expand Down
3 changes: 3 additions & 0 deletions src/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ int pthreads_store_shift(zend_object *object, zval *member);
int pthreads_store_chunk(zend_object *object, zend_long size, zend_bool preserve, zval *chunk);
int pthreads_store_pop(zend_object *object, zval *member);
int pthreads_store_count(zend_object *object, zend_long *count);
/* {{{ Copies any thread-local data to permanent storage when an object ref is destroyed */
void pthreads_store_persist_local_properties(zend_object* object); /* }}} */

void pthreads_store_free(pthreads_store_t *store);

/* {{{ * iteration helpers */
Expand Down
5 changes: 5 additions & 0 deletions src/store_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ typedef enum _pthreads_store_type {
STORE_TYPE_RESOURCE,
STORE_TYPE_SOCKET,
STORE_TYPE_ENUM,
STORE_TYPE_STRING_PTR,
} pthreads_store_type;

typedef struct _pthreads_storage {
Expand All @@ -45,4 +46,8 @@ typedef struct _pthreads_closure_storage_t {
pthreads_ident_t owner;
} pthreads_closure_storage_t;

typedef struct _pthreads_string_storage_t {
zend_string* string;
pthreads_ident_t owner;
} pthreads_string_storage_t;
#endif
56 changes: 56 additions & 0 deletions tests/single-copy-strings-basic.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
--TEST--
Test that string copying works correctly from live and dead threads
--DESCRIPTION--
We implement some optimisations to allow strings to be copied only 1 time, as long as they live on the child thread for long enough for the parent thread to dereference them.
This test verifies the basic functionality, with a string that will be single-copied (the "a" string) and another which will be copied the old way with just-in-time rescue when the object is destroyed.
--FILE--
<?php

$thread = new class extends \Thread{

public \ThreadedArray $buffer;

public function __construct(){
$this->buffer = new \ThreadedArray();
}

public ?string $str = null;
public bool $shutdown = false;


public function run() : void{
$this->synchronized(function() : void{
$this->buffer[] = str_repeat("a", 20);
$this->buffer[] = str_repeat("b", 20);
$this->notify();
});
$this->synchronized(function() : void{
while(!$this->shutdown){
$this->wait();
}
});
}
};
$thread->start();

$thread->synchronized(function() use ($thread) : void{
while($thread->buffer->count() === 0){
$thread->wait();
}
});
var_dump($thread->buffer->shift());
$thread->synchronized(function() use ($thread) : void{
$thread->shutdown = true;
$thread->notify();
});

$thread->join();
var_dump($thread->buffer->shift());

echo "OK\n";
?>
--EXPECT--
string(20) "aaaaaaaaaaaaaaaaaaaa"
string(20) "bbbbbbbbbbbbbbbbbbbb"
OK

0 comments on commit 283c2e8

Please sign in to comment.