Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ID Refactor] Shorten the length of JobID to 4 bytes #5110

Merged
merged 43 commits into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
8a280dd
WIP
jovany-wang Jul 1, 2019
dff8c21
Fix
jovany-wang Jul 3, 2019
689ff32
Add jobid test
jovany-wang Jul 3, 2019
e586258
Fix
jovany-wang Jul 3, 2019
d37f3d0
Add python part
jovany-wang Jul 3, 2019
29f774d
Fix
jovany-wang Jul 3, 2019
0123990
Fix tes
jovany-wang Jul 3, 2019
7aec837
Remove TODOs
jovany-wang Jul 3, 2019
7cef169
Fix C++ tests
jovany-wang Jul 3, 2019
12e1b4c
Lint
jovany-wang Jul 3, 2019
a5598b0
Fix
jovany-wang Jul 4, 2019
aa94dfa
Fix exporting functions in multiple ray.init
jovany-wang Jul 4, 2019
e99206a
Fix java test
jovany-wang Jul 5, 2019
a04d755
Fix lint
jovany-wang Jul 5, 2019
ce63b84
Fix linting
jovany-wang Jul 5, 2019
c307934
Merge branch 'master' into short-jobid-to-4bytes
jovany-wang Jul 7, 2019
e0e34f4
Address comments.
jovany-wang Jul 7, 2019
13c4150
FIx
jovany-wang Jul 8, 2019
18bf5bf
Address and fix linting
jovany-wang Jul 8, 2019
2d047f1
Refine and fix
jovany-wang Jul 8, 2019
7656f28
Fix
jovany-wang Jul 8, 2019
f0ceceb
address
jovany-wang Jul 8, 2019
bc5de09
Address comments.
jovany-wang Jul 8, 2019
24b6a29
Fix linting
jovany-wang Jul 8, 2019
166ece8
Fix
jovany-wang Jul 8, 2019
e81714d
Address
jovany-wang Jul 8, 2019
cacd3c1
Address comments.
jovany-wang Jul 8, 2019
ea1d0eb
Address
jovany-wang Jul 8, 2019
9f89ed6
Address
jovany-wang Jul 8, 2019
68b0c26
Merge branch 'master' into short-jobid-to-4bytes
jovany-wang Jul 8, 2019
376eef0
Fix
jovany-wang Jul 8, 2019
d1e1e6b
Fix
jovany-wang Jul 8, 2019
81a7040
Fix
jovany-wang Jul 9, 2019
d2699e0
Fix lint
jovany-wang Jul 9, 2019
0892ea2
Fix
jovany-wang Jul 9, 2019
95627dc
Fix linting
jovany-wang Jul 9, 2019
bb1a0d4
Address comments.
jovany-wang Jul 10, 2019
30f4683
Fix linting
jovany-wang Jul 10, 2019
193e15e
Address comments.
jovany-wang Jul 10, 2019
01cd242
Fix linting
jovany-wang Jul 10, 2019
4889da8
Merge branch 'master' into short-jobid-to-4bytes
jovany-wang Jul 11, 2019
ebff61a
address comments.
jovany-wang Jul 11, 2019
e36061c
Fix
jovany-wang Jul 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions java/api/src/main/java/org/ray/api/id/JobId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.ray.api.id;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;

/**
* Represents the id of a Ray job.
*/
public class JobId extends BaseId implements Serializable {

// Note that the max value of a job id is NIL which value is (2^32 - 1).
public static final Long MAX_VALUE = (long) Math.pow(2, 32) - 1;

public static final int LENGTH = 4;

public static final JobId NIL = genNil();

/**
* Create a JobID instance according to the given bytes.
*/
private JobId(byte[] id) {
super(id);
}

/**
* Create a JobId from a given hex string.
*/
public static JobId fromHexString(String hex) {
return new JobId(hexString2Bytes(hex));
}

/**
* Creates a JobId from the given ByteBuffer.
*/
public static JobId fromByteBuffer(ByteBuffer bb) {
return new JobId(byteBuffer2Bytes(bb));
}

public static JobId fromInt(int value) {
byte[] bytes = new byte[JobId.LENGTH];
ByteBuffer wbb = ByteBuffer.wrap(bytes);
wbb.order(ByteOrder.LITTLE_ENDIAN);
wbb.putInt(value);
return new JobId(bytes);
}

/**
* Generate a nil JobId.
*/
private static JobId genNil() {
byte[] b = new byte[LENGTH];
Arrays.fill(b, (byte) 0xFF);
return new JobId(b);
}

@Override
public int size() {
return LENGTH;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.ray.api.runtimecontext;

import java.util.List;
import org.ray.api.id.JobId;
import org.ray.api.id.UniqueId;

/**
Expand All @@ -11,7 +12,7 @@ public interface RuntimeContext {
/**
* Get the current Job ID.
*/
UniqueId getCurrentJobId();
JobId getCurrentJobId();

/**
* Get the current actor ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ public AbstractRayRuntime(RayConfig rayConfig) {
this.rayConfig = rayConfig;
functionManager = new FunctionManager(rayConfig.jobResourcePath);
worker = new Worker(this);
workerContext = new WorkerContext(rayConfig.workerMode,
rayConfig.jobId, rayConfig.runMode);
runtimeContext = new RuntimeContextImpl(this);
}

Expand Down
13 changes: 13 additions & 0 deletions java/runtime/src/main/java/org/ray/runtime/RayDevRuntime.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.ray.runtime;

import java.util.concurrent.atomic.AtomicInteger;
import org.ray.api.id.JobId;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.objectstore.MockObjectStore;
import org.ray.runtime.objectstore.ObjectStoreProxy;
Expand All @@ -13,9 +15,16 @@ public RayDevRuntime(RayConfig rayConfig) {

private MockObjectStore store;

private AtomicInteger jobCounter = new AtomicInteger(0);

@Override
public void start() {
store = new MockObjectStore(this);
if (rayConfig.getJobId().isNil()) {
rayConfig.setJobId(nextJobId());
}
workerContext = new WorkerContext(rayConfig.workerMode,
rayConfig.getJobId(), rayConfig.runMode);
objectStoreProxy = new ObjectStoreProxy(this, null);
rayletClient = new MockRayletClient(this, rayConfig.numberExecThreadsForDevRuntime);
}
Expand All @@ -33,4 +42,8 @@ public MockObjectStore getObjectStore() {
public Worker getWorker() {
return ((MockRayletClient) rayletClient).getCurrentWorker();
}

private JobId nextJobId() {
return JobId.fromInt(jobCounter.getAndIncrement());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.Map;
import org.ray.api.id.JobId;
import org.ray.runtime.config.RayConfig;
import org.ray.runtime.config.WorkerMode;
import org.ray.runtime.gcs.GcsClient;
Expand Down Expand Up @@ -94,6 +95,12 @@ public void start() {

gcsClient = new GcsClient(rayConfig.getRedisAddress(), rayConfig.redisPassword);

if (rayConfig.getJobId() == JobId.NIL) {
rayConfig.setJobId(gcsClient.nextJobId());
}

workerContext = new WorkerContext(rayConfig.workerMode,
rayConfig.getJobId(), rayConfig.runMode);
// TODO(qwang): Get object_store_socket_name and raylet_socket_name from Redis.
objectStoreProxy = new ObjectStoreProxy(this, rayConfig.objectStoreSocketName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.base.Preconditions;
import java.util.List;
import org.ray.api.id.JobId;
import org.ray.api.id.UniqueId;
import org.ray.api.runtimecontext.NodeInfo;
import org.ray.api.runtimecontext.RuntimeContext;
Expand All @@ -17,7 +18,7 @@ public RuntimeContextImpl(AbstractRayRuntime runtime) {
}

@Override
public UniqueId getCurrentJobId() {
public JobId getCurrentJobId() {
return runtime.getWorkerContext().getCurrentJobId();
}

Expand Down
14 changes: 7 additions & 7 deletions java/runtime/src/main/java/org/ray/runtime/WorkerContext.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package org.ray.runtime;

import com.google.common.base.Preconditions;
import org.ray.api.id.JobId;
import org.ray.api.id.TaskId;
import org.ray.api.id.UniqueId;
import org.ray.runtime.config.RunMode;
import org.ray.runtime.config.WorkerMode;
import org.ray.runtime.task.TaskSpec;
import org.ray.runtime.util.IdUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,7 +31,7 @@ public class WorkerContext {

private ThreadLocal<TaskSpec> currentTask;

private UniqueId currentJobId;
private JobId currentJobId;

private ClassLoader currentClassLoader;

Expand All @@ -43,7 +45,7 @@ public class WorkerContext {
*/
private RunMode runMode;

public WorkerContext(WorkerMode workerMode, UniqueId jobId, RunMode runMode) {
public WorkerContext(WorkerMode workerMode, JobId jobId, RunMode runMode) {
mainThreadId = Thread.currentThread().getId();
taskIndex = ThreadLocal.withInitial(() -> 0);
putIndex = ThreadLocal.withInitial(() -> 0);
Expand All @@ -52,15 +54,13 @@ public WorkerContext(WorkerMode workerMode, UniqueId jobId, RunMode runMode) {
currentTask = ThreadLocal.withInitial(() -> null);
currentClassLoader = null;
if (workerMode == WorkerMode.DRIVER) {
// TODO(qwang): Assign the driver id to worker id
// once we treat driver id as a special worker id.
workerId = jobId;
workerId = IdUtil.computeDriverId(jobId);
currentTaskId.set(TaskId.randomId());
currentJobId = jobId;
} else {
workerId = UniqueId.randomId();
this.currentTaskId.set(TaskId.NIL);
this.currentJobId = UniqueId.NIL;
this.currentJobId = JobId.NIL;
}
}

Expand Down Expand Up @@ -119,7 +119,7 @@ public UniqueId getCurrentWorkerId() {
/**
* The ID of the current job.
*/
public UniqueId getCurrentJobId() {
public JobId getCurrentJobId() {
return currentJobId;
}

Expand Down
16 changes: 12 additions & 4 deletions java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.ray.api.id.UniqueId;
import org.ray.api.id.JobId;
import org.ray.runtime.util.NetworkUtil;
import org.ray.runtime.util.ResourceUtil;
import org.ray.runtime.util.StringUtil;
Expand All @@ -32,7 +32,7 @@ public class RayConfig {
public final WorkerMode workerMode;
public final RunMode runMode;
public final Map<String, Double> resources;
public final UniqueId jobId;
private JobId jobId;
public final String logDir;
public final boolean redirectOutput;
public final List<String> libraryPath;
Expand Down Expand Up @@ -108,9 +108,9 @@ public RayConfig(Config config) {
// Job id.
String jobId = config.getString("ray.job.id");
if (!jobId.isEmpty()) {
this.jobId = UniqueId.fromHexString(jobId);
this.jobId = JobId.fromHexString(jobId);
} else {
this.jobId = UniqueId.randomId();
this.jobId = JobId.NIL;
}
// Log dir.
logDir = removeTrailingSlash(config.getString("ray.log-dir"));
Expand Down Expand Up @@ -198,6 +198,14 @@ public Integer getRedisPort() {
return redisPort;
}

public void setJobId(JobId jobId) {
this.jobId = jobId;
}

public JobId getJobId() {
return this.jobId;
}

@Override
public String toString() {
return "RayConfig{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.objectweb.asm.Type;
import org.ray.api.function.RayFunc;
import org.ray.api.id.UniqueId;
import org.ray.api.id.JobId;
import org.ray.runtime.util.LambdaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,7 +48,7 @@ public class FunctionManager {
/**
* Mapping from the job id to the functions that belong to this job.
*/
private Map<UniqueId, JobFunctionTable> jobFunctionTables = new HashMap<>();
private Map<JobId, JobFunctionTable> jobFunctionTables = new HashMap<>();

/**
* The resource path which we can load the job's jar resources.
Expand All @@ -72,7 +72,7 @@ public FunctionManager(String jobResourcePath) {
* @param func The lambda.
* @return A RayFunction object.
*/
public RayFunction getFunction(UniqueId jobId, RayFunc func) {
public RayFunction getFunction(JobId jobId, RayFunc func) {
JavaFunctionDescriptor functionDescriptor = RAY_FUNC_CACHE.get().get(func.getClass());
if (functionDescriptor == null) {
SerializedLambda serializedLambda = LambdaUtils.getSerializedLambda(func);
Expand All @@ -92,7 +92,7 @@ public RayFunction getFunction(UniqueId jobId, RayFunc func) {
* @param functionDescriptor The function descriptor.
* @return A RayFunction object.
*/
public RayFunction getFunction(UniqueId jobId, JavaFunctionDescriptor functionDescriptor) {
public RayFunction getFunction(JobId jobId, JavaFunctionDescriptor functionDescriptor) {
JobFunctionTable jobFunctionTable = jobFunctionTables.get(jobId);
if (jobFunctionTable == null) {
ClassLoader classLoader;
Expand Down
6 changes: 6 additions & 0 deletions java/runtime/src/main/java/org/ray/runtime/gcs/GcsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.commons.lang3.ArrayUtils;
import org.ray.api.Checkpointable.Checkpoint;
import org.ray.api.id.BaseId;
import org.ray.api.id.JobId;
import org.ray.api.id.TaskId;
import org.ray.api.id.UniqueId;
import org.ray.api.runtimecontext.NodeInfo;
Expand Down Expand Up @@ -158,6 +159,11 @@ public List<Checkpoint> getCheckpointsForActor(UniqueId actorId) {
return checkpoints;
}

public JobId nextJobId() {
int jobCounter = (int) primary.incr("JobCounter".getBytes());
return JobId.fromInt(jobCounter);
}

private RedisClient getShardClient(BaseId key) {
return shards.get((int) Long.remainderUnsigned(IdUtil.murmurHashCode(key),
shards.size()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,9 @@ public boolean exists(byte[] key) {
}
}

public long incr(byte[] key) {
try (Jedis jedis = jedisPool.getResource()) {
return jedis.incr(key).intValue();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.commons.lang3.NotImplementedException;
import org.ray.api.RayObject;
import org.ray.api.WaitResult;
import org.ray.api.id.JobId;
import org.ray.api.id.ObjectId;
import org.ray.api.id.TaskId;
import org.ray.api.id.UniqueId;
Expand Down Expand Up @@ -164,7 +165,7 @@ public void notifyUnblocked(TaskId currentTaskId) {
}

@Override
public TaskId generateTaskId(UniqueId jobId, TaskId parentTaskId, int taskIndex) {
public TaskId generateTaskId(JobId jobId, TaskId parentTaskId, int taskIndex) {
return TaskId.randomId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.List;
import org.ray.api.RayObject;
import org.ray.api.WaitResult;
import org.ray.api.id.JobId;
import org.ray.api.id.ObjectId;
import org.ray.api.id.TaskId;
import org.ray.api.id.UniqueId;
Expand All @@ -21,7 +22,7 @@ public interface RayletClient {

void notifyUnblocked(TaskId currentTaskId);

TaskId generateTaskId(UniqueId jobId, TaskId parentTaskId, int taskIndex);
TaskId generateTaskId(JobId jobId, TaskId parentTaskId, int taskIndex);

<T> WaitResult<T> wait(List<RayObject<T>> waitFor, int numReturns, int
timeoutMs, TaskId currentTaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.ray.api.RayObject;
import org.ray.api.WaitResult;
import org.ray.api.exception.RayException;
import org.ray.api.id.JobId;
import org.ray.api.id.ObjectId;
import org.ray.api.id.TaskId;
import org.ray.api.id.UniqueId;
Expand All @@ -39,7 +40,7 @@ public class RayletClientImpl implements RayletClient {

// TODO(qwang): JobId parameter can be removed once we embed jobId in driverId.
public RayletClientImpl(String schedulerSockName, UniqueId clientId,
boolean isWorker, UniqueId jobId) {
boolean isWorker, JobId jobId) {
client = nativeInit(schedulerSockName, clientId.getBytes(),
isWorker, jobId.getBytes());
}
Expand Down Expand Up @@ -107,7 +108,7 @@ public void fetchOrReconstruct(List<ObjectId> objectIds, boolean fetchOnly,
}

@Override
public TaskId generateTaskId(UniqueId jobId, TaskId parentTaskId, int taskIndex) {
public TaskId generateTaskId(JobId jobId, TaskId parentTaskId, int taskIndex) {
byte[] bytes = nativeGenerateTaskId(jobId.getBytes(), parentTaskId.getBytes(), taskIndex);
return new TaskId(bytes);
}
Expand Down Expand Up @@ -146,7 +147,7 @@ private static TaskSpec parseTaskSpecFromProtobuf(byte[] bytes) {
}

// Parse common fields.
UniqueId jobId = UniqueId.fromByteBuffer(taskSpec.getJobId().asReadOnlyByteBuffer());
JobId jobId = JobId.fromByteBuffer(taskSpec.getJobId().asReadOnlyByteBuffer());
TaskId taskId = TaskId.fromByteBuffer(taskSpec.getTaskId().asReadOnlyByteBuffer());
TaskId parentTaskId = TaskId.fromByteBuffer(taskSpec.getParentTaskId().asReadOnlyByteBuffer());
int parentCounter = (int) taskSpec.getParentCounter();
Expand Down
Loading