diff --git a/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java index d4d90f24ece2..12a62dd1c6bb 100644 --- a/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java +++ b/java/runtime/src/main/java/org/ray/runtime/config/RayConfig.java @@ -6,8 +6,10 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigException; import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValue; import java.io.File; +import java.util.ArrayList; import java.util.List; import java.util.Map; import org.ray.api.id.UniqueId; @@ -50,6 +52,7 @@ public class RayConfig { public final Long objectStoreSize; public final String rayletSocketName; + public final List rayletConfigParameters; public final String redisServerExecutablePath; public final String redisModulePath; @@ -162,6 +165,14 @@ public RayConfig(Config config) { // raylet socket name rayletSocketName = config.getString("ray.raylet.socket-name"); + // raylet parameters + rayletConfigParameters = new ArrayList(); + Config rayletConfig = config.getConfig("ray.raylet.config"); + for (java.util.Map.Entry entry : rayletConfig.entrySet()) { + String parameter = entry.getKey() + "," + String.valueOf(entry.getValue().unwrapped()); + rayletConfigParameters.add(parameter); + } + // library path this.libraryPath = new ImmutableList.Builder().add( rayHome + "/build/src/plasma", diff --git a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java index 7b25882dd600..0233d15c5e48 100644 --- a/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java @@ -205,7 +205,7 @@ private void startRaylet() { "0", // number of initial workers String.valueOf(maximumStartupConcurrency), ResourceUtil.getResourcesStringFromMap(rayConfig.resources), - "", // The internal config list. + String.join(",", rayConfig.rayletConfigParameters), // The internal config list. buildPythonWorkerCommand(), // python worker command buildWorkerCommandRaylet() // java worker command ); diff --git a/java/runtime/src/main/resources/ray.default.conf b/java/runtime/src/main/resources/ray.default.conf index b45d7dc6376d..a77e1bc4dd76 100644 --- a/java/runtime/src/main/resources/ray.default.conf +++ b/java/runtime/src/main/resources/ray.default.conf @@ -85,6 +85,10 @@ ray { raylet { // RPC socket name of Raylet socket-name: /tmp/ray/sockets/raylet + + // See src/ray/ray_config_def.h for options. + config { + } } } diff --git a/java/test/src/main/java/org/ray/api/test/BaseTest.java b/java/test/src/main/java/org/ray/api/test/BaseTest.java index 23f893a46425..55e27dbda90c 100644 --- a/java/test/src/main/java/org/ray/api/test/BaseTest.java +++ b/java/test/src/main/java/org/ray/api/test/BaseTest.java @@ -13,6 +13,7 @@ public class BaseTest { public void setUp() { System.setProperty("ray.home", "../.."); System.setProperty("ray.resources", "CPU:4,RES-A:4"); + System.setProperty("ray.raylet.config.inline_object_max_size_bytes", "0"); Ray.init(); } @@ -29,6 +30,7 @@ public void tearDown() { // unset system properties System.clearProperty("ray.home"); System.clearProperty("ray.resources"); + System.clearProperty("ray.raylet.config.inline_object_max_size_bytes"); } } diff --git a/src/ray/gcs/format/gcs.fbs b/src/ray/gcs/format/gcs.fbs index 6522a334c156..7619349f9b2a 100644 --- a/src/ray/gcs/format/gcs.fbs +++ b/src/ray/gcs/format/gcs.fbs @@ -122,12 +122,20 @@ table FunctionTableData { table ObjectTableData { // The size of the object. object_size: long; + // Is object in-lined? Inline objects are objects whose data and metadata are + // inlined in the GCS object table entry, which normally only specifies + // the object location. + inline_object_flag: bool; // The node manager ID that this object appeared on or was evicted by. manager: string; // Whether this entry is an addition or a deletion. is_eviction: bool; // The number of times this object has been evicted from this node so far. num_evictions: int; + // In-line object data. + inline_object_data: [ubyte]; + // In-line object metadata. + inline_object_metadata: [ubyte]; } table TaskReconstructionData { diff --git a/src/ray/object_manager/object_directory.cc b/src/ray/object_manager/object_directory.cc index db2b4b1490de..ba928e4454f0 100644 --- a/src/ray/object_manager/object_directory.cc +++ b/src/ray/object_manager/object_directory.cc @@ -8,15 +8,21 @@ ObjectDirectory::ObjectDirectory(boost::asio::io_service &io_service, namespace { -/// Process a suffix of the object table log and store the result in -/// client_ids. This assumes that client_ids already contains the result of the -/// object table log up to but not including this suffix. This also stores a -/// bool in has_been_created indicating whether the object has ever been -/// created before. +/// Process a suffix of the object table log. +/// If object is inlined (inline_object_flag = TRUE), its data and metadata are +/// stored with the object's entry so we read them into inline_object_data, and +/// inline_object_metadata, respectively. +/// If object is not inlined, store the result in client_ids. +/// This assumes that client_ids already contains the result of the +/// object table log up to but not including this suffix. +/// This function also stores a bool in has_been_created indicating whether the +/// object has ever been created before. void UpdateObjectLocations(const std::vector &location_history, const ray::gcs::ClientTable &client_table, std::unordered_set *client_ids, - bool *has_been_created) { + bool *inline_object_flag, + std::vector *inline_object_data, + std::string *inline_object_metadata, bool *has_been_created) { // location_history contains the history of locations of the object (it is a log), // which might look like the following: // client1.is_eviction = false @@ -24,6 +30,9 @@ void UpdateObjectLocations(const std::vector &location_history // client2.is_eviction = false // In such a scenario, we want to indicate client2 is the only client that contains // the object, which the following code achieves. + // + // If object is inlined each entry contains both the object's data and metadata, + // so we don't care about its location. if (!location_history.empty()) { // If there are entries, then the object has been created. Once this flag // is set to true, it should never go back to false. @@ -31,18 +40,35 @@ void UpdateObjectLocations(const std::vector &location_history } for (const auto &object_table_data : location_history) { ClientID client_id = ClientID::from_binary(object_table_data.manager); + if (object_table_data.inline_object_flag) { + if (!*inline_object_flag) { + // This is the first time we're receiving the inline object data. Read + // object's data from the GCS entry. + *inline_object_flag = object_table_data.inline_object_flag; + inline_object_data->assign(object_table_data.inline_object_data.begin(), + object_table_data.inline_object_data.end()); + inline_object_metadata->assign(object_table_data.inline_object_metadata.begin(), + object_table_data.inline_object_metadata.end()); + } + // We got the data and metadata of the object so exit the loop. + break; + } + if (!object_table_data.is_eviction) { client_ids->insert(client_id); } else { client_ids->erase(client_id); } } - // Filter out the removed clients from the object locations. - for (auto it = client_ids->begin(); it != client_ids->end();) { - if (client_table.IsRemoved(*it)) { - it = client_ids->erase(it); - } else { - it++; + + if (!*inline_object_flag) { + // Filter out the removed clients from the object locations. + for (auto it = client_ids->begin(); it != client_ids->end();) { + if (client_table.IsRemoved(*it)) { + it = client_ids->erase(it); + } else { + it++; + } } } } @@ -62,6 +88,8 @@ void ObjectDirectory::RegisterBackend() { // Update entries for this object. UpdateObjectLocations(location_history, gcs_client_->client_table(), &it->second.current_object_locations, + &it->second.inline_object_flag, &it->second.inline_object_data, + &it->second.inline_object_metadata, &it->second.has_been_created); // Copy the callbacks so that the callbacks can unsubscribe without interrupting // looping over the callbacks. @@ -74,6 +102,8 @@ void ObjectDirectory::RegisterBackend() { // It is safe to call the callback directly since this is already running // in the subscription callback stack. callback_pair.second(object_id, it->second.current_object_locations, + it->second.inline_object_flag, it->second.inline_object_data, + it->second.inline_object_metadata, it->second.has_been_created); } }; @@ -84,13 +114,24 @@ void ObjectDirectory::RegisterBackend() { ray::Status ObjectDirectory::ReportObjectAdded( const ObjectID &object_id, const ClientID &client_id, - const object_manager::protocol::ObjectInfoT &object_info) { + const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag, + const std::vector &inline_object_data, + const std::string &inline_object_metadata) { + RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id << " inline? " + << inline_object_flag; // Append the addition entry to the object table. auto data = std::make_shared(); data->manager = client_id.binary(); data->is_eviction = false; data->num_evictions = object_evictions_[object_id]; data->object_size = object_info.data_size; + data->inline_object_flag = inline_object_flag; + if (inline_object_flag) { + // Add object's data to its GCS entry. + data->inline_object_data.assign(inline_object_data.begin(), inline_object_data.end()); + data->inline_object_metadata.assign(inline_object_metadata.begin(), + inline_object_metadata.end()); + } ray::Status status = gcs_client_->object_table().Append(JobID::nil(), object_id, data, nullptr); return status; @@ -98,6 +139,7 @@ ray::Status ObjectDirectory::ReportObjectAdded( ray::Status ObjectDirectory::ReportObjectRemoved(const ObjectID &object_id, const ClientID &client_id) { + RAY_LOG(DEBUG) << "Reporting object removed to GCS " << object_id; // Append the eviction entry to the object table. auto data = std::make_shared(); data->manager = client_id.binary(); @@ -147,16 +189,19 @@ void ObjectDirectory::HandleClientRemoved(const ClientID &client_id) { if (listener.second.current_object_locations.count(client_id) > 0) { // If the subscribed object has the removed client as a location, update // its locations with an empty log so that the location will be removed. - UpdateObjectLocations({}, gcs_client_->client_table(), - &listener.second.current_object_locations, - &listener.second.has_been_created); + UpdateObjectLocations( + {}, gcs_client_->client_table(), &listener.second.current_object_locations, + &listener.second.inline_object_flag, &listener.second.inline_object_data, + &listener.second.inline_object_metadata, &listener.second.has_been_created); // Re-call all the subscribed callbacks for the object, since its // locations have changed. for (const auto &callback_pair : listener.second.callbacks) { // It is safe to call the callback directly since this is already running // in the subscription callback stack. - callback_pair.second(object_id, listener.second.current_object_locations, - listener.second.has_been_created); + callback_pair.second( + object_id, listener.second.current_object_locations, + listener.second.inline_object_flag, listener.second.inline_object_data, + listener.second.inline_object_metadata, listener.second.has_been_created); } } } @@ -182,8 +227,14 @@ ray::Status ObjectDirectory::SubscribeObjectLocations(const UniqueID &callback_i // immediately notify the caller of the current known locations. if (listener_state.has_been_created) { auto &locations = listener_state.current_object_locations; - io_service_.post([callback, locations, object_id]() { - callback(object_id, locations, /*has_been_created=*/true); + auto inline_object_flag = listener_state.inline_object_flag; + const auto &inline_object_data = listener_state.inline_object_data; + const auto &inline_object_metadata = listener_state.inline_object_metadata; + io_service_.post([callback, locations, inline_object_flag, inline_object_data, + inline_object_metadata, object_id]() { + callback(object_id, locations, inline_object_flag, inline_object_data, + inline_object_metadata, + /*has_been_created=*/true); }); } return status; @@ -216,20 +267,31 @@ ray::Status ObjectDirectory::LookupLocations(const ObjectID &object_id, const std::vector &location_history) { // Build the set of current locations based on the entries in the log. std::unordered_set client_ids; + bool inline_object_flag = false; + std::vector inline_object_data; + std::string inline_object_metadata; bool has_been_created = false; UpdateObjectLocations(location_history, gcs_client_->client_table(), - &client_ids, &has_been_created); + &client_ids, &inline_object_flag, &inline_object_data, + &inline_object_metadata, &has_been_created); // It is safe to call the callback directly since this is already running // in the GCS client's lookup callback stack. - callback(object_id, client_ids, has_been_created); + callback(object_id, client_ids, inline_object_flag, inline_object_data, + inline_object_metadata, has_been_created); }); } else { // If we have locations cached due to a concurrent SubscribeObjectLocations // call, call the callback immediately with the cached locations. + // If object inlined, we already have the object's data. auto &locations = it->second.current_object_locations; bool has_been_created = it->second.has_been_created; - io_service_.post([callback, object_id, locations, has_been_created]() { - callback(object_id, locations, has_been_created); + bool inline_object_flag = it->second.inline_object_flag; + const auto &inline_object_data = it->second.inline_object_data; + const auto &inline_object_metadata = it->second.inline_object_metadata; + io_service_.post([callback, object_id, locations, inline_object_flag, + inline_object_data, inline_object_metadata, has_been_created]() { + callback(object_id, locations, inline_object_flag, inline_object_data, + inline_object_metadata, has_been_created); }); } return status; diff --git a/src/ray/object_manager/object_directory.h b/src/ray/object_manager/object_directory.h index b44197b639ef..f1634c0f4d0e 100644 --- a/src/ray/object_manager/object_directory.h +++ b/src/ray/object_manager/object_directory.h @@ -48,9 +48,9 @@ class ObjectDirectoryInterface { virtual std::vector LookupAllRemoteConnections() const = 0; /// Callback for object location notifications. - using OnLocationsFound = std::function &, - bool has_been_created)>; + using OnLocationsFound = std::function &, bool, + const std::vector &, const std::string &, bool has_been_created)>; /// Lookup object locations. Callback may be invoked with empty list of client ids. /// @@ -99,10 +99,15 @@ class ObjectDirectoryInterface { /// \param object_id The object id that was put into the store. /// \param client_id The client id corresponding to this node. /// \param object_info Additional information about the object. + /// \param inline_object_flag Flag specifying whether object is inlined. + /// \param inline_object_data Object data. Only for inlined objects. + /// \param inline_object_metadata Object metadata. Only for inlined objects. /// \return Status of whether this method succeeded. virtual ray::Status ReportObjectAdded( const ObjectID &object_id, const ClientID &client_id, - const object_manager::protocol::ObjectInfoT &object_info) = 0; + const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag, + const std::vector &inline_object_data, + const std::string &inline_object_metadata) = 0; /// Report objects removed from this client's store to the object directory. /// @@ -154,9 +159,12 @@ class ObjectDirectory : public ObjectDirectoryInterface { ray::Status UnsubscribeObjectLocations(const UniqueID &callback_id, const ObjectID &object_id) override; - ray::Status ReportObjectAdded( - const ObjectID &object_id, const ClientID &client_id, - const object_manager::protocol::ObjectInfoT &object_info) override; + ray::Status ReportObjectAdded(const ObjectID &object_id, const ClientID &client_id, + const object_manager::protocol::ObjectInfoT &object_info, + bool inline_object_flag, + const std::vector &inline_object_data, + const std::string &inline_object_metadata) override; + ray::Status ReportObjectRemoved(const ObjectID &object_id, const ClientID &client_id) override; @@ -174,6 +182,15 @@ class ObjectDirectory : public ObjectDirectoryInterface { std::unordered_map callbacks; /// The current set of known locations of this object. std::unordered_set current_object_locations; + /// Specify whether the object is inlined. The data and the metadata of + /// an inlined object are stored in the object's GCS entry. In this flag + /// (i.e., the object is inlined) the content of current_object_locations + /// can be ignored. + bool inline_object_flag; + /// Inlined object data, if inline_object_flag == true. + std::vector inline_object_data; + /// Inlined object metadata, if inline_object_flag == true. + std::string inline_object_metadata; /// This flag will get set to true if the object has ever been created. It /// should never go back to false once set to true. If this is true, and /// the current_object_locations is empty, then this means that the object diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index e9904d9603b9..241d1bf8634a 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -10,13 +10,15 @@ namespace ray { ObjectManager::ObjectManager(asio::io_service &main_service, const ObjectManagerConfig &config, - std::shared_ptr object_directory) + std::shared_ptr object_directory, + plasma::PlasmaClient &store_client) : config_(config), object_directory_(std::move(object_directory)), store_notification_(main_service, config_.store_socket_name), buffer_pool_(config_.store_socket_name, config_.object_chunk_size), send_work_(send_service_), receive_work_(receive_service_), + store_client_(store_client), connection_pool_(), gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) { RAY_CHECK(config_.max_sends > 0); @@ -64,11 +66,39 @@ void ObjectManager::HandleObjectAdded( const object_manager::protocol::ObjectInfoT &object_info) { // Notify the object directory that the object has been added to this node. ObjectID object_id = ObjectID::from_binary(object_info.object_id); + RAY_LOG(DEBUG) << "Object added " << object_id; RAY_CHECK(local_objects_.count(object_id) == 0); local_objects_[object_id].object_info = object_info; - ray::Status status = - object_directory_->ReportObjectAdded(object_id, client_id_, object_info); + // If this object was created from inlined data, this means it is already in GCS, + // so no need to write it again. + if (local_inlined_objects_.find(object_id) == local_inlined_objects_.end()) { + std::vector inline_object_data; + std::string inline_object_metadata; + bool inline_object_flag = false; + if (object_info.data_size <= RayConfig::instance().inline_object_max_size_bytes()) { + // Inline object. Try to get the data from the object store. + plasma::ObjectBuffer object_buffer; + plasma::ObjectID plasma_id = object_id.to_plasma_id(); + RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer)); + if (object_buffer.data != nullptr) { + // The object exists. Store the object data in the GCS entry. + inline_object_flag = true; + inline_object_data.assign( + object_buffer.data->data(), + object_buffer.data->data() + object_buffer.data->size()); + inline_object_metadata.assign( + object_buffer.metadata->data(), + object_buffer.metadata->data() + object_buffer.metadata->size()); + // Mark this object as inlined, so that if this object is later + // evicted, we do not report it to the GCS. + local_inlined_objects_.insert(object_id); + } + } + RAY_CHECK_OK(object_directory_->ReportObjectAdded( + object_id, client_id_, object_info, inline_object_flag, inline_object_data, + inline_object_metadata)); + } // Handle the unfulfilled_push_requests_ which contains the push request that is not // completed due to unsatisfied local objects. auto iter = unfulfilled_push_requests_.find(object_id); @@ -90,10 +120,16 @@ void ObjectManager::HandleObjectAdded( } void ObjectManager::NotifyDirectoryObjectDeleted(const ObjectID &object_id) { + RAY_LOG(DEBUG) << "Object removed " << object_id; auto it = local_objects_.find(object_id); RAY_CHECK(it != local_objects_.end()); + if (local_inlined_objects_.find(object_id) == local_inlined_objects_.end()) { + // Inline object data can be retrieved by any node by contacting the GCS, + // so only report that the object was evicted if it wasn't inlined. + RAY_CHECK_OK(object_directory_->ReportObjectRemoved(object_id, client_id_)); + } local_objects_.erase(it); - ray::Status status = object_directory_->ReportObjectRemoved(object_id, client_id_); + local_inlined_objects_.erase(object_id); } ray::Status ObjectManager::SubscribeObjAdded( @@ -108,6 +144,26 @@ ray::Status ObjectManager::SubscribeObjDeleted( return ray::Status::OK(); } +void ObjectManager::PutInlineObject(const ObjectID &object_id, + const std::vector &inline_object_data, + const std::string &inline_object_metadata) { + if (local_objects_.find(object_id) == local_objects_.end()) { + // Inline object is not in the local object store. Create it from + // inline_object_data, and inline_object_metadata, respectively. + // + // Since this function is called on notification or when reading the + // object's entry from GCS, we know this object's entry is already in GCS. + // Remember this by adding the object to local_inlined_objects_. This way + // we avoid writing another copy of this object to GCS in HandleObjectAdded(). + local_inlined_objects_.insert(object_id); + auto status = store_client_.CreateAndSeal( + object_id.to_plasma_id(), + std::string(inline_object_data.begin(), inline_object_data.end()), + inline_object_metadata); + RAY_CHECK(status.IsPlasmaObjectExists() || status.ok()) << status.message(); + } +} + ray::Status ObjectManager::Pull(const ObjectID &object_id) { RAY_LOG(DEBUG) << "Pull on " << client_id_ << " of object " << object_id; // Check if object is already local. @@ -127,7 +183,13 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) { return object_directory_->SubscribeObjectLocations( object_directory_pull_callback_id_, object_id, [this](const ObjectID &object_id, const std::unordered_set &client_ids, - bool created) { + bool inline_object_flag, const std::vector &inline_object_data, + const std::string &inline_object_metadata, bool created) { + if (inline_object_flag) { + // This is an inlined object. Store it in the Plasma store and return. + PutInlineObject(object_id, inline_object_data, inline_object_metadata); + return; + } // Exit if the Pull request has already been fulfilled or canceled. auto it = pull_requests_.find(object_id); if (it == pull_requests_.end()) { @@ -169,7 +231,14 @@ void ObjectManager::TryPull(const ObjectID &object_id) { RAY_CHECK(local_objects_.count(object_id) == 0); // Make sure that there is at least one client which is not the local client. // TODO(rkn): It may actually be possible for this check to fail. - RAY_CHECK(client_vector.size() != 1 || client_vector[0] != client_id_); + if (client_vector.size() == 1 && client_vector[0] == client_id_) { + RAY_LOG(ERROR) << "The object manager with client ID " << client_id_ + << " is trying to pull object " << object_id + << " but the object table suggests that this object manager " + << "already has the object. The object may have been evicted."; + it->second.timer_set = false; + return; + } // Choose a random client to pull the object from. // Generate a random index. @@ -572,11 +641,19 @@ ray::Status ObjectManager::LookupRemainingWaitObjects(const UniqueID &wait_id) { RAY_RETURN_NOT_OK(object_directory_->LookupLocations( object_id, [this, wait_id](const ObjectID &lookup_object_id, - const std::unordered_set &client_ids, bool created) { + const std::unordered_set &client_ids, + bool inline_object_flag, + const std::vector &inline_object_data, + const std::string &inline_object_metadata, bool created) { auto &wait_state = active_wait_requests_.find(wait_id)->second; - if (!client_ids.empty()) { + if (!client_ids.empty() || inline_object_flag) { wait_state.remaining.erase(lookup_object_id); wait_state.found.insert(lookup_object_id); + if (inline_object_flag) { + // This is an inlined object. Store it in the Plasma store and return. + PutInlineObject(lookup_object_id, inline_object_data, + inline_object_metadata); + } } RAY_LOG(DEBUG) << "Wait request " << wait_id << ": " << client_ids.size() << " locations found for object " << lookup_object_id; @@ -610,8 +687,11 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) { RAY_CHECK_OK(object_directory_->SubscribeObjectLocations( wait_id, object_id, [this, wait_id](const ObjectID &subscribe_object_id, - const std::unordered_set &client_ids, bool created) { - if (!client_ids.empty()) { + const std::unordered_set &client_ids, + bool inline_object_flag, + const std::vector &inline_object_data, + const std::string &inline_object_metadata, bool created) { + if (!client_ids.empty() || inline_object_flag) { RAY_LOG(DEBUG) << "Wait request " << wait_id << ": subscription notification received for object " << subscribe_object_id; @@ -623,6 +703,11 @@ void ObjectManager::SubscribeRemainingWaitObjects(const UniqueID &wait_id) { // notification. return; } + if (inline_object_flag) { + // This is an inlined object. Store it in the Plasma store. + PutInlineObject(subscribe_object_id, inline_object_data, + inline_object_metadata); + } auto &wait_state = object_id_wait_state->second; wait_state.remaining.erase(subscribe_object_id); wait_state.found.insert(subscribe_object_id); diff --git a/src/ray/object_manager/object_manager.h b/src/ray/object_manager/object_manager.h index 29c75c57a773..b170e072781f 100644 --- a/src/ray/object_manager/object_manager.h +++ b/src/ray/object_manager/object_manager.h @@ -76,9 +76,12 @@ class ObjectManager : public ObjectManagerInterface { /// \param main_service The main asio io_service. /// \param config ObjectManager configuration. /// \param object_directory An object implementing the object directory interface. + /// \param store_client Reference to Plasma store. This is used to get and put + /// inlined objects in the local object store. explicit ObjectManager(boost::asio::io_service &main_service, const ObjectManagerConfig &config, - std::shared_ptr object_directory); + std::shared_ptr object_directory, + plasma::PlasmaClient &store_client); ~ObjectManager(); @@ -351,6 +354,12 @@ class ObjectManager : public ObjectManagerInterface { /// Handle Push task timeout. void HandlePushTaskTimeout(const ObjectID &object_id, const ClientID &client_id); + /// Add inline object to object store. Called when reading the object entry + /// from GCS or upon receiving a notification about an inline object. + void PutInlineObject(const ObjectID &object_id, + const std::vector &inline_object_data, + const std::string &inline_object_metadata); + ClientID client_id_; const ObjectManagerConfig config_; std::shared_ptr object_directory_; @@ -380,6 +389,10 @@ class ObjectManager : public ObjectManagerInterface { /// all incoming object transfers. std::vector receive_threads_; + /// Reference to Plasma Store. This is used to get and put inlined objects in + /// the local object store. + plasma::PlasmaClient &store_client_; + /// Connection pool for reusing outgoing connections to remote object managers. ConnectionPool connection_pool_; @@ -387,6 +400,12 @@ class ObjectManager : public ObjectManagerInterface { /// including when the object was last pushed to other object managers. std::unordered_map local_objects_; + /// Set of objects created from inlined data whose locations and/or evictions + /// should not be reported to the GCS. This includes objects that were + /// created from data retrieved from the GCS, since a GCS entry with the + /// inlined data already exists. + std::unordered_set local_inlined_objects_; + /// This is used as the callback identifier in Pull for /// SubscribeObjectLocations. We only need one identifier because we never need to /// subscribe multiple times to the same object during Pull. diff --git a/src/ray/object_manager/test/object_manager_stress_test.cc b/src/ray/object_manager/test/object_manager_stress_test.cc index 91b0ffc3d576..e7092955cc1f 100644 --- a/src/ray/object_manager/test/object_manager_stress_test.cc +++ b/src/ray/object_manager/test/object_manager_stress_test.cc @@ -30,13 +30,16 @@ class MockServer { public: MockServer(boost::asio::io_service &main_service, const ObjectManagerConfig &object_manager_config, - std::shared_ptr gcs_client) + std::shared_ptr gcs_client, + const std::string &store_name) : object_manager_acceptor_( main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)), object_manager_socket_(main_service), gcs_client_(gcs_client), object_manager_(main_service, object_manager_config, - std::make_shared(main_service, gcs_client_)) { + std::make_shared(main_service, gcs_client_), + store_client_) { + RAY_ARROW_CHECK_OK(store_client_.Connect(store_name.c_str())); RAY_CHECK_OK(RegisterGcs(main_service)); // Start listening for clients. DoAcceptObjectManager(); @@ -88,6 +91,7 @@ class MockServer { boost::asio::ip::tcp::acceptor object_manager_acceptor_; boost::asio::ip::tcp::socket object_manager_socket_; std::shared_ptr gcs_client_; + plasma::PlasmaClient store_client_; ObjectManager object_manager_; }; @@ -142,7 +146,7 @@ class TestObjectManagerBase : public ::testing::Test { om_config_1.max_receives = max_receives_a; om_config_1.object_chunk_size = object_chunk_size; om_config_1.push_timeout_ms = push_timeout_ms; - server1.reset(new MockServer(main_service, om_config_1, gcs_client_1)); + server1.reset(new MockServer(main_service, om_config_1, gcs_client_1, store_id_1)); // start second server gcs_client_2 = std::shared_ptr( @@ -154,7 +158,7 @@ class TestObjectManagerBase : public ::testing::Test { om_config_2.max_receives = max_receives_b; om_config_2.object_chunk_size = object_chunk_size; om_config_2.push_timeout_ms = push_timeout_ms; - server2.reset(new MockServer(main_service, om_config_2, gcs_client_2)); + server2.reset(new MockServer(main_service, om_config_2, gcs_client_2, store_id_2)); // connect to stores. RAY_ARROW_CHECK_OK(client1.Connect(store_id_1)); diff --git a/src/ray/object_manager/test/object_manager_test.cc b/src/ray/object_manager/test/object_manager_test.cc index 98ad9bbfbf97..904f1ed2a83c 100644 --- a/src/ray/object_manager/test/object_manager_test.cc +++ b/src/ray/object_manager/test/object_manager_test.cc @@ -21,13 +21,16 @@ class MockServer { public: MockServer(boost::asio::io_service &main_service, const ObjectManagerConfig &object_manager_config, - std::shared_ptr gcs_client) + std::shared_ptr gcs_client, + const std::string &store_name) : object_manager_acceptor_( main_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 0)), object_manager_socket_(main_service), gcs_client_(gcs_client), object_manager_(main_service, object_manager_config, - std::make_shared(main_service, gcs_client_)) { + std::make_shared(main_service, gcs_client_), + store_client_) { + RAY_ARROW_CHECK_OK(store_client_.Connect(store_name.c_str())); RAY_CHECK_OK(RegisterGcs(main_service)); // Start listening for clients. DoAcceptObjectManager(); @@ -79,6 +82,7 @@ class MockServer { boost::asio::ip::tcp::acceptor object_manager_acceptor_; boost::asio::ip::tcp::socket object_manager_socket_; std::shared_ptr gcs_client_; + plasma::PlasmaClient store_client_; ObjectManager object_manager_; }; @@ -127,7 +131,7 @@ class TestObjectManagerBase : public ::testing::Test { om_config_1.max_receives = max_receives; om_config_1.object_chunk_size = object_chunk_size; om_config_1.push_timeout_ms = push_timeout_ms; - server1.reset(new MockServer(main_service, om_config_1, gcs_client_1)); + server1.reset(new MockServer(main_service, om_config_1, gcs_client_1, store_id_1)); // start second server gcs_client_2 = std::shared_ptr( @@ -139,7 +143,7 @@ class TestObjectManagerBase : public ::testing::Test { om_config_2.max_receives = max_receives; om_config_2.object_chunk_size = object_chunk_size; om_config_2.push_timeout_ms = push_timeout_ms; - server2.reset(new MockServer(main_service, om_config_2, gcs_client_2)); + server2.reset(new MockServer(main_service, om_config_2, gcs_client_2, store_id_2)); // connect to stores. RAY_ARROW_CHECK_OK(client1.Connect(store_id_1)); @@ -291,8 +295,10 @@ class TestObjectManager : public TestObjectManagerBase { sub_id, object_1, [this, sub_id, object_1, object_2]( const ray::ObjectID &object_id, - const std::unordered_set &clients, bool created) { - if (!clients.empty()) { + const std::unordered_set &clients, bool inline_object_flag, + const std::vector inline_object_data, + const std::string inline_object_metadata, bool created) { + if (!clients.empty() || inline_object_flag) { TestWaitWhileSubscribed(sub_id, object_1, object_2); } })); diff --git a/src/ray/ray_config_def.h b/src/ray/ray_config_def.h index 894109f22af6..138d474bc9e3 100644 --- a/src/ray/ray_config_def.h +++ b/src/ray/ray_config_def.h @@ -130,6 +130,11 @@ RAY_CONFIG(int, object_manager_repeated_push_delay_ms, 60000); /// chunks exceeds the number of available sending threads. RAY_CONFIG(uint64_t, object_manager_default_chunk_size, 1000000); +/// Maximum size of an inline object (bytes). +/// Inline objects are objects whose data and metadata are inlined in the +/// GCS object table entry, which normally only specifies the object locations. +RAY_CONFIG(int64_t, inline_object_max_size_bytes, 512); + /// Number of workers per process RAY_CONFIG(int, num_workers_per_process, 1); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index d5f665364c2e..d05f28dd0e80 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -42,9 +42,11 @@ namespace raylet { NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeManagerConfig &config, ObjectManager &object_manager, std::shared_ptr gcs_client, - std::shared_ptr object_directory) + std::shared_ptr object_directory, + plasma::PlasmaClient &store_client) : io_service_(io_service), object_manager_(object_manager), + store_client_(store_client), gcs_client_(std::move(gcs_client)), object_directory_(std::move(object_directory)), heartbeat_timer_(io_service), @@ -89,8 +91,6 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, })); RAY_CHECK_OK(object_manager_.SubscribeObjDeleted( [this](const ObjectID &object_id) { HandleObjectMissing(object_id); })); - - RAY_ARROW_CHECK_OK(store_client_.Connect(config.store_socket_name.c_str())); } ray::Status NodeManager::RegisterGcs() { @@ -1190,10 +1190,16 @@ void NodeManager::TreatTaskAsFailedIfLost(const Task &task) { object_id, [this, task_marked_as_failed, task]( const ray::ObjectID &object_id, - const std::unordered_set &clients, bool has_been_created) { + const std::unordered_set &clients, bool inline_object_flag, + const std::vector &inline_object_data, + const std::string &inline_object_metadata, bool has_been_created) { if (!*task_marked_as_failed) { // Only process the object locations if we haven't already marked the // task as failed. + if (inline_object_flag) { + // If object is inlined, we already have its data and metadata, so return. + return; + } if (clients.empty() && has_been_created) { // The object does not exist on any nodes but has been created // before, so the object has been lost. Mark the task as failed to @@ -1800,7 +1806,7 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) { // Notify the task dependency manager that this object is local. const auto ready_task_ids = task_dependency_manager_.HandleObjectLocal(object_id); RAY_LOG(DEBUG) << "Object local " << object_id << ", " - << " on " << gcs_client_->client_table().GetLocalClientId() + << "on " << gcs_client_->client_table().GetLocalClientId() << ready_task_ids.size() << " tasks ready"; // Transition the tasks whose dependencies are now fulfilled to the ready state. if (ready_task_ids.size() > 0) { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index e60dc4e2ad66..4d9de0c63019 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -56,10 +56,12 @@ class NodeManager { /// /// \param resource_config The initial set of node resources. /// \param object_manager A reference to the local object manager. + /// \param reference to the local object store. NodeManager(boost::asio::io_service &io_service, const NodeManagerConfig &config, ObjectManager &object_manager, std::shared_ptr gcs_client, - std::shared_ptr object_directory_); + std::shared_ptr object_directory_, + plasma::PlasmaClient &store_client); /// Process a new client connection. /// @@ -400,7 +402,7 @@ class NodeManager { /// A Plasma object store client. This is used exclusively for creating new /// objects in the object store (e.g., for actor tasks that can't be run /// because the actor died). - plasma::PlasmaClient store_client_; + plasma::PlasmaClient &store_client_; /// A client connection to the GCS. std::shared_ptr gcs_client_; /// The object table. This is shared with the object manager. diff --git a/src/ray/raylet/raylet.cc b/src/ray/raylet/raylet.cc index 288f0a80b481..ea4bd3fec6a2 100644 --- a/src/ray/raylet/raylet.cc +++ b/src/ray/raylet/raylet.cc @@ -41,9 +41,10 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ std::shared_ptr gcs_client) : gcs_client_(gcs_client), object_directory_(std::make_shared(main_service, gcs_client_)), - object_manager_(main_service, object_manager_config, object_directory_), + object_manager_(main_service, object_manager_config, object_directory_, + store_client_), node_manager_(main_service, node_manager_config, object_manager_, gcs_client_, - object_directory_), + object_directory_, store_client_), socket_name_(socket_name), acceptor_(main_service, boost::asio::local::stream_protocol::endpoint(socket_name)), socket_(main_service), @@ -56,6 +57,8 @@ Raylet::Raylet(boost::asio::io_service &main_service, const std::string &socket_ boost::asio::ip::tcp::v4(), node_manager_config.node_manager_port)), node_manager_socket_(main_service) { + RAY_ARROW_CHECK_OK( + store_client_.Connect(node_manager_config.store_socket_name.c_str())); // Start listening for clients. DoAccept(); DoAcceptObjectManager(); diff --git a/src/ray/raylet/raylet.h b/src/ray/raylet/raylet.h index 84274ea6ecfe..8f010ed512a6 100644 --- a/src/ray/raylet/raylet.h +++ b/src/ray/raylet/raylet.h @@ -73,6 +73,10 @@ class Raylet { /// The object table. This is shared between the object manager and node /// manager. std::shared_ptr object_directory_; + /// Reference to Plasma Store. + /// A connection to the Plasma Store. This is shared between the node manager + /// and the main thread of the object manager. + plasma::PlasmaClient store_client_; /// Manages client requests for object transfers and availability. ObjectManager object_manager_; /// Manages client requests for task submission and execution. diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc index d698402994a4..a98df6d493d3 100644 --- a/src/ray/raylet/reconstruction_policy.cc +++ b/src/ray/raylet/reconstruction_policy.cc @@ -145,8 +145,10 @@ void ReconstructionPolicy::HandleTaskLeaseExpired(const TaskID &task_id) { created_object_id, [this, task_id, reconstruction_attempt]( const ray::ObjectID &object_id, - const std::unordered_set &clients, bool created) { - if (clients.empty()) { + const std::unordered_set &clients, bool inline_object_flag, + const std::vector &inline_object_data, + const std::string &inline_object_metadata, bool created) { + if (clients.empty() && !inline_object_flag) { // The required object no longer exists on any live nodes. Attempt // reconstruction. AttemptReconstruction(task_id, object_id, reconstruction_attempt, created); diff --git a/src/ray/raylet/reconstruction_policy_test.cc b/src/ray/raylet/reconstruction_policy_test.cc index 5e9ae6d7e521..9adbc1e893b9 100644 --- a/src/ray/raylet/reconstruction_policy_test.cc +++ b/src/ray/raylet/reconstruction_policy_test.cc @@ -29,10 +29,10 @@ class MockObjectDirectory : public ObjectDirectoryInterface { const ObjectID object_id = callback.first; auto it = locations_.find(object_id); if (it == locations_.end()) { - callback.second(object_id, std::unordered_set(), + callback.second(object_id, std::unordered_set(), false, {}, "", /*created=*/false); } else { - callback.second(object_id, it->second, /*created=*/true); + callback.second(object_id, it->second, false, {}, "", /*created=*/true); } } callbacks_.clear(); @@ -60,9 +60,11 @@ class MockObjectDirectory : public ObjectDirectoryInterface { const OnLocationsFound &)); MOCK_METHOD2(UnsubscribeObjectLocations, ray::Status(const ray::UniqueID &, const ObjectID &)); - MOCK_METHOD3(ReportObjectAdded, + MOCK_METHOD6(ReportObjectAdded, ray::Status(const ObjectID &, const ClientID &, - const object_manager::protocol::ObjectInfoT &)); + const object_manager::protocol::ObjectInfoT &, bool, + const std::vector &, const std::string &)); + MOCK_METHOD2(ReportObjectRemoved, ray::Status(const ObjectID &, const ClientID &)); private: diff --git a/test/object_manager_test.py b/test/object_manager_test.py index 9d67c6b61e4d..afab0292120d 100644 --- a/test/object_manager_test.py +++ b/test/object_manager_test.py @@ -210,7 +210,7 @@ def set_weights(self, x): def test_object_transfer_retry(ray_start_empty_cluster): cluster = ray_start_empty_cluster - repeated_push_delay = 4 + repeated_push_delay = 10 # Force the sending object manager to allow duplicate pushes again sooner. # Also, force the receiving object manager to retry the Pull sooner. @@ -246,6 +246,7 @@ def f(size): ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids) end_time = time.time() + print(end_time - start_time) # Make sure that the first time the objects get transferred, it happens # quickly. assert end_time - start_time < repeated_push_delay diff --git a/test/runtest.py b/test/runtest.py index b280d7fc0e68..1e35a08e5844 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -1059,8 +1059,14 @@ def test_object_transfer_dump(ray_start_cluster): cluster = ray_start_cluster num_nodes = 3 + # Set the inline object size to 0 to force all objects to be written to + # plasma. + config = json.dumps({"inline_object_max_size_bytes": 0}) for i in range(num_nodes): - cluster.add_node(resources={str(i): 1}, object_store_memory=10**9) + cluster.add_node( + resources={str(i): 1}, + object_store_memory=10**9, + _internal_config=config) ray.init(redis_address=cluster.redis_address) @ray.remote @@ -2518,6 +2524,56 @@ def f(): assert len(ready_ids) == 1 +def test_inline_objects(shutdown_only): + config = json.dumps({"initial_reconstruction_timeout_milliseconds": 200}) + ray.init(num_cpus=1, object_store_memory=10**7, _internal_config=config) + + @ray.remote + class Actor(object): + def create_inline_object(self): + return "inline" + + def create_non_inline_object(self): + return 10000 * [1] + + def get(self): + return + + a = Actor.remote() + # Count the number of objects that were successfully inlined. + inlined = 0 + for _ in range(100): + inline_object = a.create_inline_object.remote() + ray.get(inline_object) + plasma_id = ray.pyarrow.plasma.ObjectID(inline_object.binary()) + ray.worker.global_worker.plasma_client.delete([plasma_id]) + # Make sure we can still get an inlined object created by an actor even + # after it has been evicted. + try: + value = ray.get(inline_object) + assert value == "inline" + inlined += 1 + except ray.worker.RayTaskError: + pass + # Make sure some objects were inlined. Some of them may not get inlined + # because we evict the object soon after creating it. + assert inlined > 0 + + # Non-inlined objects are not able to be recreated after eviction. + for _ in range(10): + non_inline_object = a.create_non_inline_object.remote() + ray.get(non_inline_object) + plasma_id = ray.pyarrow.plasma.ObjectID(non_inline_object.binary()) + # This while loop is necessary because sometimes the object is still + # there immediately after plasma_client.delete. + while ray.worker.global_worker.plasma_client.contains(plasma_id): + ray.worker.global_worker.plasma_client.delete([plasma_id]) + # Objects created by an actor that were evicted and larger than the + # maximum inline object size cannot be retrieved or reconstructed. + with pytest.raises(ray.worker.RayTaskError): + ray.get(non_inline_object) == 10000 * [1] + + def test_ray_setproctitle(shutdown_only): ray.init(num_cpus=2)