Skip to content

Commit

Permalink
phantom ref experiement
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK committed Sep 10, 2024
1 parent 4d34c94 commit 8480be6
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,58 @@
package software.amazon.awssdk.crt.s3;

import software.amazon.awssdk.crt.http.HttpHeader;

import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.nio.ByteBuffer;

class S3MetaRequestResponseHandlerNativeAdapter {
private S3MetaRequestResponseHandler responseHandler;
// Create a reference queue


// Background thread to monitor the ReferenceQueue
static class CleanerThread extends Thread {
private final ReferenceQueue<ByteBuffer> referenceQueue;
private long buffer;

public CleanerThread(ReferenceQueue<ByteBuffer> referenceQueue, long buffer) {
this.referenceQueue = referenceQueue;
this.buffer = buffer;
this.setDaemon(true); // Make it a daemon thread so it doesn't block JVM shutdown
}

@Override
public void run() {
try {
while (true) {
// Wait for a PhantomReference to be enqueued
PhantomReference<? extends ByteBuffer> ref =
(PhantomReference<? extends ByteBuffer>) referenceQueue.remove(); // Blocking call

// Trigger the callback or cleanup action
// freeDirectBuffer(this.buffer);
System.out.println("teaewewaet");
System.out.println(this.buffer);

// Optionally clear the reference to avoid memory leaks
ref.clear();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore interrupt status
}
}
}

S3MetaRequestResponseHandlerNativeAdapter(S3MetaRequestResponseHandler responseHandler) {
this.responseHandler = responseHandler;
}

int onResponseBody(byte[] bodyBytesIn, long objectRangeStart, long objectRangeEnd) {
return this.responseHandler.onResponseBody(ByteBuffer.wrap(bodyBytesIn), objectRangeStart, objectRangeEnd);
int onResponseBody(ByteBuffer bodyBytesIn, long buffer, long objectRangeStart, long objectRangeEnd) {
ReferenceQueue<ByteBuffer> referenceQueue = new ReferenceQueue<>();
PhantomReference<ByteBuffer> phantomReference = new PhantomReference<>(bodyBytesIn, referenceQueue);
CleanerThread cleanerThread = new CleanerThread(referenceQueue, buffer);
cleanerThread.start();
return this.responseHandler.onResponseBody(bodyBytesIn, objectRangeStart, objectRangeEnd);
}

void onFinished(int errorCode, int responseStatus, byte[] errorPayload, String errorOperationName, int checksumAlgorithm, boolean didValidateChecksum, Throwable cause, final ByteBuffer headersBlob) {
Expand All @@ -32,4 +72,7 @@ void onResponseHeaders(final int statusCode, final ByteBuffer headersBlob) {
void onProgress(final S3MetaRequestProgress progress) {
responseHandler.onProgress(progress);
}


private static native void freeDirectBuffer(long bufferId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2060,7 +2060,7 @@
{
"name": "onResponseBody",
"parameterTypes": [
"byte[]",
"java.nio.ByteBuffer",
"long",
"long"
]
Expand Down
2 changes: 1 addition & 1 deletion src/native/java_class_ids.c
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ static void s_cache_s3_meta_request_response_handler_native_adapter_properties(J
AWS_FATAL_ASSERT(cls);

s3_meta_request_response_handler_native_adapter_properties.onResponseBody =
(*env)->GetMethodID(env, cls, "onResponseBody", "([BJJ)I");
(*env)->GetMethodID(env, cls, "onResponseBody", "(Ljava/nio/ByteBuffer;JJJ)I");
AWS_FATAL_ASSERT(s3_meta_request_response_handler_native_adapter_properties.onResponseBody);

s3_meta_request_response_handler_native_adapter_properties.onFinished = (*env)->GetMethodID(
Expand Down
21 changes: 19 additions & 2 deletions src/native/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_s3_S3Client_s3ClientNew(
tls_options = &tls_options_storage;
aws_tls_connection_options_init_from_ctx(tls_options, tls_ctx);
}

printf("xxxxxxxxx NewDirectByteBuffer\n");
struct aws_s3_client_config client_config = {
.max_active_connections_override = max_connections,
.region = region,
Expand Down Expand Up @@ -600,7 +600,22 @@ static int s_on_s3_meta_request_body_callback(
return AWS_OP_ERR;
}

jobject jni_payload = aws_jni_byte_array_from_cursor(env, body);
// jobject jni_payload = NULL;
struct aws_byte_buf *dest = aws_mem_calloc(aws_jni_get_allocator(), 1, sizeof(struct aws_byte_buf));
aws_byte_buf_init_copy_from_cursor(dest, aws_jni_get_allocator(), *body);

// jobject jni_payload = aws_java_byte_array_new(env, body->len);;
// (*env)->DeleteLocalRef(env, jni_payload);
// for (size_t i = 0; i < 8; i++)
// {
// jni_payload = aws_java_byte_array_new(env, body->len/8);
// (*env)->DeleteLocalRef(env, jni_payload);
// }

jobject jni_payload = (*env)->NewDirectByteBuffer(env, dest->buffer, dest->len);

// (*env)->DeleteLocalRef(env, jni_payload);
// (*env)->SetByteArrayRegion(env, jni_payload, 0, (jsize)body->len, (jbyte *)body->ptr);
if (jni_payload == NULL) {
/* JVM is out of memory, but native code can still have memory available, handle it and don't crash. */
aws_jni_check_and_clear_exception(env);
Expand All @@ -617,6 +632,7 @@ static int s_on_s3_meta_request_body_callback(
callback_data->java_s3_meta_request_response_handler_native_adapter,
s3_meta_request_response_handler_native_adapter_properties.onResponseBody,
jni_payload,
(long)dest,
range_start,
range_end);

Expand All @@ -635,6 +651,7 @@ static int s_on_s3_meta_request_body_callback(
}
}
return_value = AWS_OP_SUCCESS;
// aws_byte_buf_clean_up(&dest);

cleanup:
(*env)->DeleteLocalRef(env, jni_payload);
Expand Down

0 comments on commit 8480be6

Please sign in to comment.