Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a proper queue datastructure #42

Open
dktapps opened this issue Feb 9, 2021 · 1 comment
Open

Implement a proper queue datastructure #42

dktapps opened this issue Feb 9, 2021 · 1 comment

Comments

@dktapps
Copy link
Member

dktapps commented Feb 9, 2021

Right now, Threaded is your only option if you need a thread-safe queue. This is heavily suboptimal for large numbers of items.

Consider the following script:

<?php

declare(strict_types=1);

$buf = new \Threaded();
$t = new class($buf) extends \Thread{
	public \Threaded $queue;
	public bool $shutdown = true;

	public function __construct(\Threaded $t){
		$this->queue = $t;
	}

	public function run(){
		$this->synchronized(function() : void{
			$this->shutdown = false;
			$this->notify();
		});
		$time = 0;
		$read = 0;
		var_dump("child thread running");
		while(!$this->shutdown){
			$this->synchronized(function() : void{
				while($this->queue->count() === 0){
					$this->wait();
				}
			});
			sleep(2);
			while($this->queue->count() > 0){
				$start = hrtime(true);
				$this->queue->shift();
				$time += (hrtime(true) - $start);
				$read++;
				if(($read % 10000) === 9999){
					var_dump("time per element: " . ($time / $read) . "ns");
				}
			}
		}

		while($this->queue->count() > 0){
			$start = hrtime(true);
			$this->queue->shift();
			$time += (hrtime(true) - $start);
		}
	}
};
$t->start();
$t->synchronized(function() use ($t) : void{
	while($t->shutdown){
		$t->wait();
	}
});
var_dump("starting...");

for($i = 0; $i < 1024 * 512; $i++){
	$t->synchronized(function() use ($t, $buf) : void{
		$buf[] = "a";
		$t->notify();
	});
	if(($i % 10000) === 9999){
		var_dump($buf->count());
	}
}

$t->shutdown = true;
$t->join();

You can observe that the performance of shift() rapidly degrades, slowing the reader to a crawl.
If the sleep() call is removed, it's snappy and happy.

This happens because the size of the underlying PHP HashTable balloons to a large size when the consumer is not shifting elements off the queue. When elements are removed, the underlying HashTable doesn't decrease in size, leaving a large number of empty gaps in the queue. This results in this loop taking an extraordinary amount of CPU time to find the first element of the HashTable, causing a huge performance loss to the reader.

This may manifest in PocketMine-MP under lag spike conditions. Because RakLib uses two Threaded objects to send and receive packets from the main thread, a lag spike on either side will cause the size of the Threaded hashtables to blow up and cause significant performance losses. Worse, because the HT is never downsized, the performance loss will persist even after the lag spike is gone.

In addition, this can cause significant performance losses for the writer, because the writer has to acquire a lock on the Threaded in order to add elements to it. The reader executes this extremely costly loop in a lock, which means that the writer will block for a significant amount of time if it happens to write at the same time as the reader is reading.

dktapps added a commit to pmmp/RakLib that referenced this issue Apr 29, 2021
this is a working implementation of #52.

I have my doubts that this gigantic refactor was worth the effort, because there's still lock contention on Snooze to consider; however, this does substantially reduce the problem described in pmmp/ext-pmmpthread#42, as well as reducing the overhead of inter-thread communication by removing the need to transmit session IDs.

This currently relies on each end of the IPC channels to roundtrip session open/close notifications to setup/cleanup stuff; I'll try to improve this before landing this in main.
dktapps added a commit that referenced this issue Feb 5, 2023
closes #71

This works by allowing threads direct access to the original thread's string for as long as it is cached in the origin thread's ThreadedBase connection.

As long as the connection lives long enough for the reading thread(s) to dereference the string in question (which, thanks to 4ddf79e, will be the case in all common cases, including dead threads and completed worker tasks), the extra malloc and string copy is avoided on the writer thread, which significantly improves performance in synthetic benchmarks.

If the connection is destroyed, it will create persistent copies of any cached strings during free_obj, and the old double-copy method will be used to enable the string to be accessed. However, this is rarely needed.

The caveat to this is that pthreads_store_sync_local_properties() will do work more often when strings are used, but I don't think this is a big concern. For most cases, the property table should be small enough for this to not be a problem anyway, and for the large cases, we need to implement dedicated queue data structures anyway. Profiling anyway suggested that the overhead of zend_hash_internal_pointer_reset_ex() was several orders of magnitude bigger a problem anyway (see #42).
@dktapps dktapps pinned this issue May 22, 2023
dktapps added a commit that referenced this issue Nov 13, 2023
this produces a huge performance improvement for queues with large internal tables.

an internal table of large size may appear if the array had lots of elements inserted into it and later deleted.
this resulted in major performance losses for the reader of the elements, as zend_hash_internal_pointer_reset_ex() had to scan through many IS_UNDEF offsets to find the actual first element.

there are two ways to attack this problem:
1) reallocate the internal table as elements are deleted to reduce the internal table size - this proved to be relatively ineffective
2) track the start and end of the hashtable to avoid repeated scans during every shift() call - this is the approach taken in this commit, and provides major performance benefits

the test case written in #42 now runs to completion substantially faster, without any performance degradation.

more tests are needed to ensure that this works fully as intended, but I chose to take the safe route with invalidating vs updating the offsets, so I think it should be good.
@dktapps
Copy link
Member Author

dktapps commented Nov 15, 2023

Updated test case for ext-pmmpthread v6:

<?php

declare(strict_types=1);

use pmmp\thread\ThreadSafeArray;
use pmmp\thread\Thread;


$buf = new ThreadSafeArray();
$t = new class($buf) extends Thread{
	public ThreadSafeArray $queue;
	public bool $shutdown = true;

	public function __construct(ThreadSafeArray $t){
		$this->queue = $t;
	}

	public function run() : void{
		$this->synchronized(function() : void{
			$this->shutdown = false;
			$this->notify();
		});
		$time = 0;
		$read = 0;
		var_dump("child thread running");
		while(!$this->shutdown){
			$this->synchronized(function() : void{
				while($this->queue->count() === 0){
					$this->wait();
				}
			});
			sleep(2);
			$prev = -1;
			while($this->queue->count() > 0){
				$start = hrtime(true);
				$i = $this->queue->shift();
				if($i !== ($prev + 1)){
					var_dump("out of order: received " . $i . " but expected " . ($prev + 1));
				}
				$prev = $i;
				$time += (hrtime(true) - $start);
				$read++;
				if(($read % 10000) === 9999){
					var_dump("time per element: " . ($time / $read) . "ns");
				}
			}
		}

		while($this->queue->count() > 0){
			$start = hrtime(true);
			$this->queue->shift();
			$time += (hrtime(true) - $start);
		}
	}
};
$t->start(Thread::INHERIT_ALL);
$t->synchronized(function() use ($t) : void{
	while($t->shutdown){
		$t->wait();
	}
});
var_dump("starting...");

for($i = 0; $i < 1024 * 512; $i++){
	$t->synchronized(function() use ($t, $buf, $i) : void{
		$buf[] = $i;
		$t->notify();
	});
	if(($i % 10000) === 9999){
		var_dump($buf->count());
	}
}

$t->shutdown = true;
$t->join();

dktapps added a commit that referenced this issue Nov 15, 2023
This produces a significant performance improvement for queues with large internal tables.

An internal table of large size may appear if the array had lots of elements inserted into it and later deleted.
This resulted in major performance losses for the reader of the elements, as `zend_hash_internal_pointer_reset_ex()` had to scan through many `IS_UNDEF` offsets to find the actual first element.

There are two ways to attack this problem:

1) reallocate the internal table as elements are deleted to reduce the internal table size - this proved to be relatively ineffective
2) track the start and end of the hashtable to avoid repeated scans during every shift() call - this is the approach taken in this commit, and provides major performance benefits

The test case written in #42 now runs to completion substantially faster, without any performance degradation.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant