diff --git a/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestResponseHandlerNativeAdapter.java b/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestResponseHandlerNativeAdapter.java index c4fbc7ab5..283e58176 100644 --- a/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestResponseHandlerNativeAdapter.java +++ b/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequestResponseHandlerNativeAdapter.java @@ -5,18 +5,58 @@ package software.amazon.awssdk.crt.s3; import software.amazon.awssdk.crt.http.HttpHeader; - +import java.lang.ref.PhantomReference; +import java.lang.ref.ReferenceQueue; import java.nio.ByteBuffer; class S3MetaRequestResponseHandlerNativeAdapter { private S3MetaRequestResponseHandler responseHandler; + // Create a reference queue + + + // Background thread to monitor the ReferenceQueue + static class CleanerThread extends Thread { + private final ReferenceQueue referenceQueue; + private long buffer; + + public CleanerThread(ReferenceQueue referenceQueue, long buffer) { + this.referenceQueue = referenceQueue; + this.buffer = buffer; + this.setDaemon(true); // Make it a daemon thread so it doesn't block JVM shutdown + } + + @Override + public void run() { + try { + while (true) { + // Wait for a PhantomReference to be enqueued + PhantomReference ref = + (PhantomReference) referenceQueue.remove(); // Blocking call + + // Trigger the callback or cleanup action + // freeDirectBuffer(this.buffer); + System.out.println("teaewewaet"); + System.out.println(this.buffer); + + // Optionally clear the reference to avoid memory leaks + ref.clear(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Restore interrupt status + } + } + } S3MetaRequestResponseHandlerNativeAdapter(S3MetaRequestResponseHandler responseHandler) { this.responseHandler = responseHandler; } - int onResponseBody(byte[] bodyBytesIn, long objectRangeStart, long objectRangeEnd) { - return this.responseHandler.onResponseBody(ByteBuffer.wrap(bodyBytesIn), objectRangeStart, objectRangeEnd); + int onResponseBody(ByteBuffer bodyBytesIn, long buffer, long objectRangeStart, long objectRangeEnd) { + ReferenceQueue referenceQueue = new ReferenceQueue<>(); + PhantomReference phantomReference = new PhantomReference<>(bodyBytesIn, referenceQueue); + CleanerThread cleanerThread = new CleanerThread(referenceQueue, buffer); + cleanerThread.start(); + return this.responseHandler.onResponseBody(bodyBytesIn, objectRangeStart, objectRangeEnd); } void onFinished(int errorCode, int responseStatus, byte[] errorPayload, String errorOperationName, int checksumAlgorithm, boolean didValidateChecksum, Throwable cause, final ByteBuffer headersBlob) { @@ -32,4 +72,7 @@ void onResponseHeaders(final int statusCode, final ByteBuffer headersBlob) { void onProgress(final S3MetaRequestProgress progress) { responseHandler.onProgress(progress); } + + + private static native void freeDirectBuffer(long bufferId); } diff --git a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json index 6e57a92c7..f970dbff6 100644 --- a/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json +++ b/src/main/resources/META-INF/native-image/software.amazon.awssdk/crt/aws-crt/jni-config.json @@ -2060,7 +2060,7 @@ { "name": "onResponseBody", "parameterTypes": [ - "byte[]", + "java.nio.ByteBuffer", "long", "long" ] diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c index 414c35244..8d978d954 100644 --- a/src/native/java_class_ids.c +++ b/src/native/java_class_ids.c @@ -711,7 +711,7 @@ static void s_cache_s3_meta_request_response_handler_native_adapter_properties(J AWS_FATAL_ASSERT(cls); s3_meta_request_response_handler_native_adapter_properties.onResponseBody = - (*env)->GetMethodID(env, cls, "onResponseBody", "([BJJ)I"); + (*env)->GetMethodID(env, cls, "onResponseBody", "(Ljava/nio/ByteBuffer;JJJ)I"); AWS_FATAL_ASSERT(s3_meta_request_response_handler_native_adapter_properties.onResponseBody); s3_meta_request_response_handler_native_adapter_properties.onFinished = (*env)->GetMethodID( diff --git a/src/native/s3_client.c b/src/native/s3_client.c index 7754387b3..8edfb7b67 100644 --- a/src/native/s3_client.c +++ b/src/native/s3_client.c @@ -436,7 +436,7 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_s3_S3Client_s3ClientNew( tls_options = &tls_options_storage; aws_tls_connection_options_init_from_ctx(tls_options, tls_ctx); } - +printf("xxxxxxxxx NewDirectByteBuffer\n"); struct aws_s3_client_config client_config = { .max_active_connections_override = max_connections, .region = region, @@ -600,7 +600,22 @@ static int s_on_s3_meta_request_body_callback( return AWS_OP_ERR; } - jobject jni_payload = aws_jni_byte_array_from_cursor(env, body); + // jobject jni_payload = NULL; + struct aws_byte_buf *dest = aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct aws_byte_buf)); + aws_byte_buf_init_copy_from_cursor(dest, aws_jni_get_allocator(), *body); + + // jobject jni_payload = aws_java_byte_array_new(env, body->len);; + // (*env)->DeleteLocalRef(env, jni_payload); + // for (size_t i = 0; i < 8; i++) + // { + // jni_payload = aws_java_byte_array_new(env, body->len/8); + // (*env)->DeleteLocalRef(env, jni_payload); + // } + + jobject jni_payload = (*env)->NewDirectByteBuffer(env, dest->buffer, dest->len); + + // (*env)->DeleteLocalRef(env, jni_payload); + // (*env)->SetByteArrayRegion(env, jni_payload, 0, (jsize)body->len, (jbyte *)body->ptr); if (jni_payload == NULL) { /* JVM is out of memory, but native code can still have memory available, handle it and don't crash. */ aws_jni_check_and_clear_exception(env); @@ -617,6 +632,7 @@ static int s_on_s3_meta_request_body_callback( callback_data->java_s3_meta_request_response_handler_native_adapter, s3_meta_request_response_handler_native_adapter_properties.onResponseBody, jni_payload, + (long)dest, range_start, range_end); @@ -635,6 +651,7 @@ static int s_on_s3_meta_request_body_callback( } } return_value = AWS_OP_SUCCESS; + // aws_byte_buf_clean_up(&dest); cleanup: (*env)->DeleteLocalRef(env, jni_payload);