You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We are facing issues when using multithreading in combination with postfilters (with prefilters probably too). The next simple script hangs forever (probaly due to some pthread lock waiting to be unlocked):
from time import time
import blosc2
import numpy as np
# Size and dtype of super-chunks
# nchunks = 20_000
# chunkshape = 50_000
nchunks = 1
chunkshape = 20_000
dtype = np.dtype(np.int32)
chunksize = chunkshape * dtype.itemsize
# Set the compression and decompression parameters
cparams = {"clevel": 1, "codec": blosc2.Codec.LZ4HC, "typesize": 4, "nthreads": 8}
dparams = {"nthreads": 8}
storage = {"cparams": cparams, "dparams": dparams}
schunk = blosc2.SChunk(chunksize=chunksize, **storage)
data = np.zeros(chunkshape, dtype=dtype)
t0 = time()
for i in range(nchunks):
schunk.append_data(data)
print(f"time append: {time() - t0:.2f}s")
print(f"cratio: {schunk.cratio:.2f}x")
# Associate a postfilter to schunk
@blosc2.postfilter(schunk, np.dtype(dtype))
def py_postfilter(input, output, offset):
output[:] = input + 1
#pass
t0 = time()
sum = 0
for chunk in schunk.iterchunks(dtype):
sum += chunk.sum()
print(f"time sum (postfilter): {time() - t0:.2f}s")
print(sum)
The output is:
time append: 0.00s
cratio: 2500.00x
Process finished with exit code 143 (interrupted by signal 15: SIGTERM)
Above the time for sum is not printed because I needed to kill the process (killall python). However, if we comment out the postfilter registration, the script finishes correctly:
time append: 0.00s
cratio: 2500.00x
time sum (postfilter): 0.00s
0
Furthermore, I have been able to create a C program doing exactly the same thing and I cannot reproduce the blocking:
#include <stdio.h>
#include <assert.h>
#include <blosc2.h>
#define KB 1024.
#define MB (1024*KB)
#define GB (1024*MB)
#define CHUNKSIZE (20 * 1000)
//#define NCHUNKS 1000
#define NCHUNKS 1
#define NTHREADS 8
int postfilter_func(blosc2_postfilter_params *postparams) {
int nelems = postparams->size / postparams->typesize;
int32_t *in = ((int32_t *)(postparams->in));
int32_t *out = ((int32_t *)(postparams->out));
for (int i = 0; i < nelems; i++) {
out[i] = in[i] + 1;
}
return 0;
}
int main(void) {
blosc2_init();
static int32_t data[CHUNKSIZE];
static int32_t data_dest[CHUNKSIZE];
int32_t isize = CHUNKSIZE * sizeof(int32_t);
int i, nchunk;
int64_t nchunks;
blosc_timestamp_t last;
printf("Blosc version info: %s (%s)\n",
BLOSC2_VERSION_STRING, BLOSC2_VERSION_DATE);
/* Create a super-chunk container */
blosc2_cparams cparams = BLOSC2_CPARAMS_DEFAULTS;
cparams.typesize = sizeof(int32_t);
cparams.compcode = BLOSC_LZ4HC;
cparams.clevel = 1;
cparams.nthreads = NTHREADS;
blosc2_dparams dparams = BLOSC2_DPARAMS_DEFAULTS;
dparams.nthreads = NTHREADS;
// Set some postfilter parameters and function
dparams.postfilter = (blosc2_postfilter_fn)postfilter_func;
// We need to zero the contents of the postparams
blosc2_postfilter_params postparams = {0};
// In this case we are not passing any additional input in tpostparams
dparams.postparams = &postparams;
blosc2_storage storage = {.cparams=&cparams, .dparams=&dparams};
blosc2_schunk* schunk = blosc2_schunk_new(&storage);
// Add some data
blosc_set_timestamp(&last);
for (nchunk = 0; nchunk < NCHUNKS; nchunk++) {
for (i = 0; i < CHUNKSIZE; i++) {
data[i] = i + nchunk * CHUNKSIZE;
}
nchunks = blosc2_schunk_append_buffer(schunk, data, isize);
assert(nchunks == nchunk + 1);
}
/* Retrieve and decompress the chunks from the super-chunks and compare values */
for (nchunk = 0; nchunk < NCHUNKS; nchunk++) {
int32_t dsize = blosc2_schunk_decompress_chunk(schunk, nchunk, data_dest, isize);
if (dsize < 0) {
printf("Decompression error in schunk. Error code: %d\n", dsize);
return dsize;
}
/* Check integrity of this chunk */
for (i = 0; i < CHUNKSIZE; i++) {
assert (data_dest[i] == 1 + i + nchunk * CHUNKSIZE);
}
}
printf("Successful roundtrip schunk <-> frame <-> fileframe !\n");
/* Free resources */
blosc2_schunk_free(schunk);
blosc2_destroy();
return 0;
}
This runs perfectly well (even with 1000 chunks):
Blosc version info: 2.4.4.dev ($Date:: 2022-10-23 #$)
Successful roundtrip schunk <-> frame <-> fileframe !
Process finished with exit code 0
So the issues should be in python-blosc2. I strongly suspect that we are not correctly holding the GIL during the invocation of the postfilter, or, for some reason, calling Python code from multithreaded code is not safe. Needs some investigation. One possibility is to disable multithreading completely for pre-/post-filters.
The text was updated successfully, but these errors were encountered:
We are facing issues when using multithreading in combination with postfilters (with prefilters probably too). The next simple script hangs forever (probaly due to some pthread lock waiting to be unlocked):
The output is:
Above the time for sum is not printed because I needed to kill the process (
killall python
). However, if we comment out the postfilter registration, the script finishes correctly:Furthermore, I have been able to create a C program doing exactly the same thing and I cannot reproduce the blocking:
This runs perfectly well (even with 1000 chunks):
So the issues should be in python-blosc2. I strongly suspect that we are not correctly holding the GIL during the invocation of the postfilter, or, for some reason, calling Python code from multithreaded code is not safe. Needs some investigation. One possibility is to disable multithreading completely for pre-/post-filters.
The text was updated successfully, but these errors were encountered: