Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase timeout for object manager valgrind tests #4027

Merged
merged 4 commits into from
Feb 14, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/ray/object_manager/object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ void ObjectDirectory::RegisterBackend() {
ray::Status ObjectDirectory::ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) {
const plasma::ObjectBuffer &plasma_buffer) {
RAY_LOG(DEBUG) << "Reporting object added to GCS " << object_id << " inline? "
<< inline_object_flag;
// Append the addition entry to the object table.
Expand All @@ -128,9 +127,12 @@ ray::Status ObjectDirectory::ReportObjectAdded(
data->inline_object_flag = inline_object_flag;
if (inline_object_flag) {
// Add object's data to its GCS entry.
data->inline_object_data.assign(inline_object_data.begin(), inline_object_data.end());
data->inline_object_metadata.assign(inline_object_metadata.begin(),
inline_object_metadata.end());
data->inline_object_data.assign(
plasma_buffer.data->data(),
plasma_buffer.data->data() + plasma_buffer.data->size());
data->inline_object_metadata.assign(
plasma_buffer.metadata->data(),
plasma_buffer.metadata->data() + plasma_buffer.metadata->size());
}
ray::Status status =
gcs_client_->object_table().Append(JobID::nil(), object_id, data, nullptr);
Expand Down
12 changes: 6 additions & 6 deletions src/ray/object_manager/object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <unordered_set>
#include <vector>

#include "plasma/client.h"

#include "ray/gcs/client.h"
#include "ray/id.h"
#include "ray/object_manager/format/object_manager_generated.h"
Expand Down Expand Up @@ -100,14 +102,13 @@ class ObjectDirectoryInterface {
/// \param client_id The client id corresponding to this node.
/// \param object_info Additional information about the object.
/// \param inline_object_flag Flag specifying whether object is inlined.
/// \param inline_object_data Object data. Only for inlined objects.
/// \param inline_object_metadata Object metadata. Only for inlined objects.
/// \param plasma_buffer Object data and metadata from plasma. This data is
/// only valid for inlined objects (i.e., when inline_object_flag=true).
/// \return Status of whether this method succeeded.
virtual ray::Status ReportObjectAdded(
const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info, bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) = 0;
const plasma::ObjectBuffer &plasma_buffer) = 0;

/// Report objects removed from this client's store to the object directory.
///
Expand Down Expand Up @@ -162,8 +163,7 @@ class ObjectDirectory : public ObjectDirectoryInterface {
ray::Status ReportObjectAdded(const ObjectID &object_id, const ClientID &client_id,
const object_manager::protocol::ObjectInfoT &object_info,
bool inline_object_flag,
const std::vector<uint8_t> &inline_object_data,
const std::string &inline_object_metadata) override;
const plasma::ObjectBuffer &plasma_buffer) override;

ray::Status ReportObjectRemoved(const ObjectID &object_id,
const ClientID &client_id) override;
Expand Down
16 changes: 5 additions & 11 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,29 +75,23 @@ void ObjectManager::HandleObjectAdded(
std::vector<uint8_t> inline_object_data;
std::string inline_object_metadata;
bool inline_object_flag = false;
plasma::ObjectBuffer object_buffer;
if (object_info.data_size <= RayConfig::instance().inline_object_max_size_bytes()) {
// Inline object. Try to get the data from the object store.
plasma::ObjectBuffer object_buffer;
plasma::ObjectID plasma_id = object_id.to_plasma_id();
RAY_ARROW_CHECK_OK(store_client_.Get(&plasma_id, 1, 0, &object_buffer));
if (object_buffer.data != nullptr) {
// The object exists. Store the object data in the GCS entry.
// The object exists. Set inline_object_flag so that the object data
// will be stored in the GCS entry.
inline_object_flag = true;
inline_object_data.assign(
object_buffer.data->data(),
object_buffer.data->data() + object_buffer.data->size());
inline_object_metadata.assign(
object_buffer.metadata->data(),
object_buffer.metadata->data() + object_buffer.metadata->size());
// Mark this object as inlined, so that if this object is later
// evicted, we do not report it to the GCS.
local_inlined_objects_.insert(object_id);
}
}

RAY_CHECK_OK(object_directory_->ReportObjectAdded(
object_id, client_id_, object_info, inline_object_flag, inline_object_data,
inline_object_metadata));
RAY_CHECK_OK(object_directory_->ReportObjectAdded(object_id, client_id_, object_info,
inline_object_flag, object_buffer));
}
// Handle the unfulfilled_push_requests_ which contains the push request that is not
// completed due to unsatisfied local objects.
Expand Down
20 changes: 12 additions & 8 deletions src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

#include "ray/object_manager/object_manager.h"

namespace {
std::string store_executable;
int64_t wait_timeout_ms;
}

namespace ray {

static inline void flushall_redis(void) {
Expand All @@ -15,8 +20,6 @@ static inline void flushall_redis(void) {
redisFree(context);
}

std::string store_executable;

class MockServer {
public:
MockServer(boost::asio::io_service &main_service,
Expand Down Expand Up @@ -342,27 +345,27 @@ class TestObjectManager : public TestObjectManagerBase {
case 0: {
// Ensure timeout_ms = 0 is handled correctly.
// Out of 5 objects, we expect 3 ready objects and 2 remaining objects.
TestWait(600, 5, 3, /*timeout_ms=*/0, false, false);
TestWait(100, 5, 3, /*timeout_ms=*/0, false, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is decreasing the object size intentional here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, to put it back under the inline objects limit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, shouldn't we test both?

} break;
case 1: {
// Ensure timeout_ms = 1000 is handled correctly.
// Out of 5 objects, we expect 3 ready objects and 2 remaining objects.
TestWait(600, 5, 3, /*timeout_ms=*/1000, false, false);
TestWait(100, 5, 3, wait_timeout_ms, false, false);
} break;
case 2: {
// Generate objects locally to ensure local object code-path works properly.
// Out of 5 objects, we expect 3 ready objects and 2 remaining objects.
TestWait(600, 5, 3, 1000, false, /*test_local=*/true);
TestWait(100, 5, 3, wait_timeout_ms, false, /*test_local=*/true);
} break;
case 3: {
// Wait on an object that's never registered with GCS to ensure timeout works
// properly.
TestWait(600, /*num_objects=*/5, /*required_objects=*/6, 1000,
TestWait(100, /*num_objects=*/5, /*required_objects=*/6, wait_timeout_ms,
/*include_nonexistent=*/true, false);
} break;
case 4: {
// Ensure infinite time code-path works properly.
TestWait(600, 5, 5, /*timeout_ms=*/-1, false, false);
TestWait(100, 5, 5, /*timeout_ms=*/-1, false, false);
} break;
}
}
Expand Down Expand Up @@ -484,6 +487,7 @@ TEST_F(TestObjectManager, StartTestObjectManager) {

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
ray::store_executable = std::string(argv[1]);
store_executable = std::string(argv[1]);
wait_timeout_ms = std::stoi(std::string(argv[2]));
return RUN_ALL_TESTS();
}
4 changes: 2 additions & 2 deletions src/ray/raylet/reconstruction_policy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ class MockObjectDirectory : public ObjectDirectoryInterface {
const OnLocationsFound &));
MOCK_METHOD2(UnsubscribeObjectLocations,
ray::Status(const ray::UniqueID &, const ObjectID &));
MOCK_METHOD6(ReportObjectAdded,
MOCK_METHOD5(ReportObjectAdded,
ray::Status(const ObjectID &, const ClientID &,
const object_manager::protocol::ObjectInfoT &, bool,
const std::vector<uint8_t> &, const std::string &));
const plasma::ObjectBuffer &));

MOCK_METHOD2(ReportObjectRemoved, ray::Status(const ObjectID &, const ClientID &));

Expand Down
3 changes: 2 additions & 1 deletion src/ray/test/run_object_manager_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ sleep 1s
# Run tests.
$CORE_DIR/src/ray/object_manager/object_manager_stress_test $STORE_EXEC
sleep 1s
$CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC
# Use timeout=1000ms for the Wait tests.
$CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 1000
$REDIS_DIR/redis-cli -p 6379 shutdown
sleep 1s

Expand Down
5 changes: 3 additions & 2 deletions src/ray/test/run_object_manager_valgrind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ sleep 1s
${REDIS_SERVER} --loglevel warning ${LOAD_MODULE_ARGS} --port 6379 &
sleep 1s

# Run tests.
$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC
# Run tests. Use timeout=10000ms for the Wait tests since tests run slower
# in valgrind.
$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_test $STORE_EXEC 10000
sleep 1s
$VALGRIND_CMD $CORE_DIR/src/ray/object_manager/object_manager_stress_test $STORE_EXEC
$REDIS_DIR/redis-cli -p 6379 shutdown
Expand Down