Skip to content

Commit

Permalink
Inline objects (ray-project#3756)
Browse files Browse the repository at this point in the history
* added store_client_ to object_manager and node_manager

* half through...

* all code in, and compiling! Nothing tested though...

* something is working ;-)

* added a few more comments

* now, add only one entry to the in GCS for inlined objects

* more comments

* remove a spurious todo

* some comment updates

* add test

* added support for meta data for inline objects

* avoid some copies

* Initialize plasma client in tests

* Better comments. Enable configuring nline_object_max_size_bytes.

* Update src/ray/object_manager/object_manager.cc

Co-Authored-By: istoica <[email protected]>

* Update src/ray/raylet/node_manager.cc

Co-Authored-By: istoica <[email protected]>

* Update src/ray/raylet/node_manager.cc

Co-Authored-By: istoica <[email protected]>

* fiexed comments

* fixed various typos in comments

* updated comments in object_manager.h and object_manager.cc

* addressed all comments...hopefully ;-)

* Only add eviction entries for objects that are not inlined

* fixed a bunch of comments

* Fix test

* Fix object transfer dump test

* lint

* Comments

* Fix test?

* Fix test?

* lint

* fix build

* Fix build

* lint

* Use const ref

* Fixes, don't let object manager hang

* Increase object transfer retry time for travis?

* Fix test

* Fix test?

* Add internal config to java, fix PlasmaFreeTest
  • Loading branch information
istoica authored and stephanie-wang committed Feb 7, 2019
1 parent 5db1afe commit f987572
Show file tree
Hide file tree
Showing 20 changed files with 369 additions and 70 deletions.
11 changes: 11 additions & 0 deletions java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValue;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.ray.api.id.UniqueId;
Expand Down Expand Up @@ -50,6 +52,7 @@ public class RayConfig {
public final Long objectStoreSize;

public final String rayletSocketName;
public final List<String> rayletConfigParameters;

public final String redisServerExecutablePath;
public final String redisModulePath;
Expand Down Expand Up @@ -162,6 +165,14 @@ public RayConfig(Config config) {
// raylet socket name
rayletSocketName = config.getString("ray.raylet.socket-name");

// raylet parameters
rayletConfigParameters = new ArrayList<String>();
Config rayletConfig = config.getConfig("ray.raylet.config");
for (java.util.Map.Entry<java.lang.String,ConfigValue> entry : rayletConfig.entrySet()) {
String parameter = entry.getKey() + "," + String.valueOf(entry.getValue().unwrapped());
rayletConfigParameters.add(parameter);
}

// library path
this.libraryPath = new ImmutableList.Builder<String>().add(
rayHome + "/build/src/plasma",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private void startRaylet() {
"0", // number of initial workers
String.valueOf(maximumStartupConcurrency),
ResourceUtil.getResourcesStringFromMap(rayConfig.resources),
"", // The internal config list.
String.join(",", rayConfig.rayletConfigParameters), // The internal config list.
buildPythonWorkerCommand(), // python worker command
buildWorkerCommandRaylet() // java worker command
);
Expand Down
4 changes: 4 additions & 0 deletions java/runtime/src/main/resources/ray.default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ ray {
raylet {
// RPC socket name of Raylet
socket-name: /tmp/ray/sockets/raylet

// See src/ray/ray_config_def.h for options.
config {
}
}

}
2 changes: 2 additions & 0 deletions java/test/src/main/java/org/ray/api/test/BaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class BaseTest {
public void setUp() {
System.setProperty("ray.home", "../..");
System.setProperty("ray.resources", "CPU:4,RES-A:4");
System.setProperty("ray.raylet.config.inline_object_max_size_bytes", "0");
Ray.init();
}

Expand All @@ -29,6 +30,7 @@ public void tearDown() {
// unset system properties
System.clearProperty("ray.home");
System.clearProperty("ray.resources");
System.clearProperty("ray.raylet.config.inline_object_max_size_bytes");
}

}
8 changes: 8 additions & 0 deletions src/ray/gcs/format/gcs.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,20 @@ table FunctionTableData {
table ObjectTableData {
// The size of the object.
object_size: long;
// Is object in-lined? Inline objects are objects whose data and metadata are
// inlined in the GCS object table entry, which normally only specifies
// the object location.
inline_object_flag: bool;
// The node manager ID that this object appeared on or was evicted by.
manager: string;
// Whether this entry is an addition or a deletion.
is_eviction: bool;
// The number of times this object has been evicted from this node so far.
num_evictions: int;
// In-line object data.
inline_object_data: [ubyte];
// In-line object metadata.
inline_object_metadata: [ubyte];
}

table TaskReconstructionData {
Expand Down
110 changes: 86 additions & 24 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,67 @@ ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service,

namespace {

/// Process a suffix of the object table log and store the result in
/// client_ids. This assumes that client_ids already contains the result of the
/// object table log up to but not including this suffix. This also stores a
/// bool in has_been_created indicating whether the object has ever been
/// created before.
/// Process a suffix of the object table log.
/// If object is inlined (inline_object_flag = TRUE), its data and metadata are
/// stored with the object's entry so we read them into inline_object_data, and
/// inline_object_metadata, respectively.
/// If object is not inlined, store the result in client_ids.
/// This assumes that client_ids already contains the result of the
/// object table log up to but not including this suffix.
/// This function also stores a bool in has_been_created indicating whether the
/// object has ever been created before.
void UpdateObjectLocations(const std::vector<ObjectTableDataT> &location_history,
const ray::gcs::ClientTable &client_table,
std::unordered_set<ClientID> *client_ids,
bool *has_been_created) {
bool *inline_object_flag,
std::vector<uint8_t> *inline_object_data,
std::string *inline_object_metadata, bool *has_been_created) {
// location_history contains the history of locations of the object (it is a log),
// which might look like the following:
// client1.is_eviction = false
// client1.is_eviction = true
// client2.is_eviction = false
// In such a scenario, we want to indicate client2 is the only client that contains
// the object, which the following code achieves.
//
// If object is inlined each entry contains both the object's data and metadata,
// so we don't care about its location.
if (!location_history.empty()) {
// If there are entries, then the object has been created. Once this flag
// is set to true, it should never go back to false.
*has_been_created = true;
}
for (const auto &object_table_data : location_history) {
ClientID client_id = ClientID::from_binary(object_table_data.manager);
if (object_table_data.inline_object_flag) {
if (!*inline_object_flag) {
// This is the first time we're receiving the inline object data. Read
// object's data from the GCS entry.
*inline_object_flag = object_table_data.inline_object_flag;
inline_object_data->assign(object_table_data.inline_object_data.begin(),
object_table_data.inline_object_data.end());
inline_object_metadata->assign(object_table_data.inline_object_metadata.begin(),
object_table_data.inline_object_metadata.end());
}
// We got the data and metadata of the object so exit the loop.
break;
}

if (!object_table_data.is_eviction) {
client_ids->insert(client_id);
} else {
client_ids->erase(client_id);
}
}
// Filter out the removed clients from the object locations.
for (auto it = client_ids->begin(); it != client_ids->end();) {
if (client_table.IsRemoved(*it)) {
it = client_ids->erase(it);
} else {
it++;

if (!*inline_object_flag) {
// Filter out the removed clients from the object locations.
for (auto it = client_ids->begin(); it != client_ids->end();) {
if (client_table.IsRemoved(*it)) {
it = client_ids->erase(it);
} else {
it++;
}
}
}
}
Expand All @@ -62,6 +88,8 @@ void ObjectDirectory::RegisterBackend() {
// Update entries for this object.
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&it->second.current_object_locations,
&it->second.inline_object_flag, &it->second.inline_object_data,
&it->second.inline_object_metadata,
&it->second.has_been_created);
// Copy the callbacks so that the callbacks can unsubscribe without interrupting
// looping over the callbacks.
Expand All @@ -74,6 +102,8 @@ void ObjectDirectory::RegisterBackend() {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, it->second.current_object_locations,
it->second.inline_object_flag, it->second.inline_object_data,
it->second.inline_object_metadata,
it->second.has_been_created);
}
};
Expand All @@ -84,20 +114,32 @@ void ObjectDirectory::RegisterBackend() {

ray::Status ObjectDirectory::ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info) {
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) {
RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id << " inline? "
<< inline_object_flag;
// Append the addition entry to the object table.
auto data = std::make_shared<ObjectTableDataT>();
data->manager = client_id.binary();
data->is_eviction = false;
data->num_evictions = object_evictions_[object_id];
data->object_size = object_info.data_size;
data->inline_object_flag = inline_object_flag;
if (inline_object_flag) {
// Add object's data to its GCS entry.
data->inline_object_data.assign(inline_object_data.begin(), inline_object_data.end());
data->inline_object_metadata.assign(inline_object_metadata.begin(),
inline_object_metadata.end());
}
ray::Status status =
gcs_client_->object_table().Append(JobID::nil(), object_id, data, nullptr);
return status;
}

ray::Status ObjectDirectory::ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) {
RAY_LOG(DEBUG) << "Reporting object removed to GCS " << object_id;
// Append the eviction entry to the object table.
auto data = std::make_shared<ObjectTableDataT>();
data->manager = client_id.binary();
Expand Down Expand Up @@ -147,16 +189,19 @@ void ObjectDirectory::HandleClientRemoved(const ClientID &client_id) {
if (listener.second.current_object_locations.count(client_id) > 0) {
// If the subscribed object has the removed client as a location, update
// its locations with an empty log so that the location will be removed.
UpdateObjectLocations({}, gcs_client_->client_table(),
&listener.second.current_object_locations,
&listener.second.has_been_created);
UpdateObjectLocations(
{}, gcs_client_->client_table(), &listener.second.current_object_locations,
&listener.second.inline_object_flag, &listener.second.inline_object_data,
&listener.second.inline_object_metadata, &listener.second.has_been_created);
// Re-call all the subscribed callbacks for the object, since its
// locations have changed.
for (const auto &callback_pair : listener.second.callbacks) {
// It is safe to call the callback directly since this is already running
// in the subscription callback stack.
callback_pair.second(object_id, listener.second.current_object_locations,
listener.second.has_been_created);
callback_pair.second(
object_id, listener.second.current_object_locations,
listener.second.inline_object_flag, listener.second.inline_object_data,
listener.second.inline_object_metadata, listener.second.has_been_created);
}
}
}
Expand All @@ -182,8 +227,14 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i
// immediately notify the caller of the current known locations.
if (listener_state.has_been_created) {
auto &locations = listener_state.current_object_locations;
io_service_.post([callback, locations, object_id]() {
callback(object_id, locations, /*has_been_created=*/true);
auto inline_object_flag = listener_state.inline_object_flag;
const auto &inline_object_data = listener_state.inline_object_data;
const auto &inline_object_metadata = listener_state.inline_object_metadata;
io_service_.post([callback, locations, inline_object_flag, inline_object_data,
inline_object_metadata, object_id]() {
callback(object_id, locations, inline_object_flag, inline_object_data,
inline_object_metadata,
/*has_been_created=*/true);
});
}
return status;
Expand Down Expand Up @@ -216,20 +267,31 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id,
const std::vector<ObjectTableDataT> &location_history) {
// Build the set of current locations based on the entries in the log.
std::unordered_set<ClientID> client_ids;
bool inline_object_flag = false;
std::vector<uint8_t> inline_object_data;
std::string inline_object_metadata;
bool has_been_created = false;
UpdateObjectLocations(location_history, gcs_client_->client_table(),
&client_ids, &has_been_created);
&client_ids, &inline_object_flag, &inline_object_data,
&inline_object_metadata, &has_been_created);
// It is safe to call the callback directly since this is already running
// in the GCS client's lookup callback stack.
callback(object_id, client_ids, has_been_created);
callback(object_id, client_ids, inline_object_flag, inline_object_data,
inline_object_metadata, has_been_created);
});
} else {
// If we have locations cached due to a concurrent SubscribeObjectLocations
// call, call the callback immediately with the cached locations.
// If object inlined, we already have the object's data.
auto &locations = it->second.current_object_locations;
bool has_been_created = it->second.has_been_created;
io_service_.post([callback, object_id, locations, has_been_created]() {
callback(object_id, locations, has_been_created);
bool inline_object_flag = it->second.inline_object_flag;
const auto &inline_object_data = it->second.inline_object_data;
const auto &inline_object_metadata = it->second.inline_object_metadata;
io_service_.post([callback, object_id, locations, inline_object_flag,
inline_object_data, inline_object_metadata, has_been_created]() {
callback(object_id, locations, inline_object_flag, inline_object_data,
inline_object_metadata, has_been_created);
});
}
return status;
Expand Down
31 changes: 24 additions & 7 deletions src/ray/object_manager/object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ class ObjectDirectoryInterface {
virtual std::vector<RemoteConnectionInfo> LookupAllRemoteConnections() const = 0;

/// Callback for object location notifications.
using OnLocationsFound = std::function<void(const ray::ObjectID &object_id,
const std::unordered_set<ray::ClientID> &,
bool has_been_created)>;
using OnLocationsFound = std::function<void(
const ray::ObjectID &object_id, const std::unordered_set<ray::ClientID> &, bool,
const std::vector<uint8_t> &, const std::string &, bool has_been_created)>;

/// Lookup object locations. Callback may be invoked with empty list of client ids.
///
Expand Down Expand Up @@ -99,10 +99,15 @@ class ObjectDirectoryInterface {
/// \param object_id The object id that was put into the store.
/// \param client_id The client id corresponding to this node.
/// \param object_info Additional information about the object.
/// \param inline_object_flag Flag specifying whether object is inlined.
/// \param inline_object_data Object data. Only for inlined objects.
/// \param inline_object_metadata Object metadata. Only for inlined objects.
/// \return Status of whether this method succeeded.
virtual ray::Status ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info) = 0;
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) = 0;

/// Report objects removed from this client's store to the object directory.
///
Expand Down Expand Up @@ -154,9 +159,12 @@ class ObjectDirectory : public ObjectDirectoryInterface {
ray::Status UnsubscribeObjectLocations(const UniqueID &callback_id,
const ObjectID &object_id) override;

ray::Status ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info) override;
ray::Status ReportObjectAdded(const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info,
bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) override;

ray::Status ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) override;

Expand All @@ -174,6 +182,15 @@ class ObjectDirectory : public ObjectDirectoryInterface {
std::unordered_map<UniqueID, OnLocationsFound> callbacks;
/// The current set of known locations of this object.
std::unordered_set<ClientID> current_object_locations;
/// Specify whether the object is inlined. The data and the metadata of
/// an inlined object are stored in the object's GCS entry. In this flag
/// (i.e., the object is inlined) the content of current_object_locations
/// can be ignored.
bool inline_object_flag;
/// Inlined object data, if inline_object_flag == true.
std::vector<uint8_t> inline_object_data;
/// Inlined object metadata, if inline_object_flag == true.
std::string inline_object_metadata;
/// This flag will get set to true if the object has ever been created. It
/// should never go back to false once set to true. If this is true, and
/// the current_object_locations is empty, then this means that the object
Expand Down
Loading

0 comments on commit f987572

Please sign in to comment.