Skip to content

Commit

Permalink
Rollback incompatible C++17 changes.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 368073175
  • Loading branch information
tensorflower-gardener authored and tensorflow-copybara committed Apr 12, 2021
1 parent 6579d2d commit ba0fa72
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 166 deletions.
100 changes: 16 additions & 84 deletions tensorflow_serving/batching/batching_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,6 @@ class BatchingSession : public ServingSession {
const std::string& thread_pool_name,
std::unique_ptr<BatchingSession>* result);

// Same as above but allows for specification of a default scheduler creator
// which enables requests that don't match an exact signature to also
// have batching.
static Status Create(
const BatchingSessionOptions& options, std::unique_ptr<Session> wrapped,
const std::vector<SignatureWithBatchingSessionSchedulerCreator>&
signatures_with_scheduler_creators,
BatchingSessionSchedulerCreator default_creator,
const std::string& thread_pool_name,
std::unique_ptr<BatchingSession>* result);

~BatchingSession() override = default;

Status Run(const std::vector<std::pair<string, Tensor>>& inputs,
Expand Down Expand Up @@ -268,34 +257,9 @@ class BatchingSession : public ServingSession {
// for monitoring purpose, and can be empty if not known.
const std::string thread_pool_name_;

// If set, default_scheduler_creator_ is used when the input signature does
// not match any existing signature defined during model load. This helps
// when the user uses either a combination of signatures or filter certain
// output tensors.
std::optional<BatchingSessionSchedulerCreator> default_scheduler_creator_;
absl::Mutex mu_;
std::unordered_map<TensorSignature,
std::unique_ptr<BatchScheduler<BatchingSessionTask>>,
HashTensorSignature, EqTensorSignature>
custom_signature_batch_schedulers_ ABSL_GUARDED_BY(mu_);

TF_DISALLOW_COPY_AND_ASSIGN(BatchingSession);
};

Status BatchingSession::Create(
const BatchingSessionOptions& options, std::unique_ptr<Session> wrapped,
const std::vector<SignatureWithBatchingSessionSchedulerCreator>&
signatures_with_scheduler_creators,
BatchingSessionSchedulerCreator default_creator,
const std::string& thread_pool_name,
std::unique_ptr<BatchingSession>* result) {
auto status = BatchingSession::Create(options, std::move(wrapped),
signatures_with_scheduler_creators,
thread_pool_name, result);
result->get()->default_scheduler_creator_ = default_creator;
return status;
}

Status BatchingSession::Create(
const BatchingSessionOptions& options, std::unique_ptr<Session> wrapped,
const std::vector<SignatureWithBatchingSessionSchedulerCreator>&
Expand Down Expand Up @@ -379,40 +343,23 @@ Status BatchingSession::InternalRun(
TensorSignatureFromRunArgs(inputs, output_tensor_names);
auto batch_scheduler_it = batch_schedulers_.find(signature);
if (batch_scheduler_it == batch_schedulers_.end()) {
if (default_scheduler_creator_.has_value()) {
absl::MutexLock l(&mu_);
batch_scheduler_it = custom_signature_batch_schedulers_.find(signature);
if (batch_scheduler_it == custom_signature_batch_schedulers_.end()) {
std::unique_ptr<BatchScheduler<BatchingSessionTask>> batch_scheduler;
TF_RETURN_IF_ERROR(default_scheduler_creator_.value()(
[&, signature](std::unique_ptr<Batch<BatchingSessionTask>> batch) {
ProcessBatch(signature, std::move(batch));
},
&batch_scheduler));
custom_signature_batch_schedulers_[signature] =
std::move(batch_scheduler);
batch_scheduler_it = custom_signature_batch_schedulers_.find(signature);
}
// We have a Run() call that doesn't match one of our batching signatures.
// Run it in-line.
LOG_EVERY_N_SEC(WARNING, 120)
<< "Request doesn't match any declared signature. Bypassing "
"batcher. Request signature is: "
<< TensorSignatureDebugString(signature);

// Because the wrapped session may not provide an implementation for
// thread_pool_options, we need to invoke different Run() functions
// depending on whether thread_pool_options is specified.
if (thread_pool_options) {
return wrapped_->Run(run_options, inputs, output_tensor_names,
target_node_names, outputs, run_metadata,
thread_pool_options.value());
} else {
// We have a Run() call that doesn't match one of our batching signatures.
// Run it in-line.
LOG_EVERY_N_SEC(WARNING, 120)
<< "Request doesn't match any declared signature and no default "
"scheduler creator specified. Bypassing "
"batcher. Request signature is: "
<< TensorSignatureDebugString(signature);

// Because the wrapped session may not provide an implementation for
// thread_pool_options, we need to invoke different Run() functions
// depending on whether thread_pool_options is specified.
if (thread_pool_options) {
return wrapped_->Run(run_options, inputs, output_tensor_names,
target_node_names, outputs, run_metadata,
thread_pool_options.value());
} else {
return wrapped_->Run(run_options, inputs, output_tensor_names,
target_node_names, outputs, run_metadata);
}
return wrapped_->Run(run_options, inputs, output_tensor_names,
target_node_names, outputs, run_metadata);
}
}
BatchScheduler<BatchingSessionTask>* batch_scheduler =
Expand Down Expand Up @@ -948,21 +895,6 @@ Status SplitInputTask(
return Status::OK();
}

Status CreateBatchingSession(
const BatchingSessionOptions& options,
const std::vector<SignatureWithBatchingSessionSchedulerCreator>&
signatures_with_scheduler_creators,
BatchingSessionSchedulerCreator default_creator,
std::unique_ptr<Session> session,
std::unique_ptr<Session>* batching_session) {
std::unique_ptr<BatchingSession> internal_batching_session;
TF_RETURN_IF_ERROR(BatchingSession::Create(
options, std::move(session), signatures_with_scheduler_creators,
default_creator, /*thread_pool_name=*/"", &internal_batching_session));
*batching_session = std::move(internal_batching_session);
return Status::OK();
}

Status CreateBatchingSession(
const BatchingSessionOptions& options,
const std::vector<SignatureWithBatchingSessionSchedulerCreator>&
Expand Down
10 changes: 0 additions & 10 deletions tensorflow_serving/batching/batching_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,6 @@ Status CreateBatchingSession(
std::unique_ptr<Session> session,
std::unique_ptr<Session>* batching_session);

// Same as above but allows for a default scheduler creator for which signatures
// that don't match a supplied value during run time can still use batching.
Status CreateBatchingSession(
const BatchingSessionOptions& options,
const std::vector<SignatureWithBatchingSessionSchedulerCreator>&
signatures_with_scheduler_creators,
BatchingSessionSchedulerCreator default_creator,
std::unique_ptr<Session> session,
std::unique_ptr<Session>* batching_session);

// A convenience for using CreateBatchingSession() to create a
// BasicBatchScheduler for a single signature.
Status CreateBasicBatchingSession(
Expand Down
46 changes: 0 additions & 46 deletions tensorflow_serving/batching/batching_session_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -994,52 +994,6 @@ TEST_P(BatchingSessionTest, ThreadPoolOptions) {
}));
}

TEST_P(BatchingSessionTest, SubsetOutputTensors) {
BasicBatchScheduler<BatchingSessionTask>::Options schedule_options;
schedule_options.max_batch_size = 6; // fits three 2-unit tasks
schedule_options.batch_timeout_micros = 1 * 1000 * 1000; // won't trigger
schedule_options.num_batch_threads = 1;
schedule_options = annotate_options(schedule_options);
BatchingSessionOptions batching_session_options;
std::unique_ptr<Session> batching_session;
TF_ASSERT_OK(CreateBasicBatchingSession(
schedule_options, batching_session_options, {{"x", "x2"}, {"y", "y3"}},
CreateHalfPlusTwoSession(), &batching_session));

const Tensor input0 = test::AsTensor<float>({8.0f, 6.0f}, {2});
const Tensor expected_output0 = test::AsTensor<float>({6.0f, 5.0f}, {2});
const Tensor input1 = test::AsTensor<float>({100.0f, 42.0f}, {2});
const Tensor expected_output1 = test::AsTensor<float>({53.0f, 24.0f}, {2});

std::unique_ptr<Thread> first_request_thread(
Env::Default()->StartThread(ThreadOptions(), "first_request_thread", [&] {
std::vector<Tensor> outputs;
TF_ASSERT_OK(batching_session->Run({{"x", input0}, {"x2", input1}},
{"y"} /* outputs */,
{} /* target nodes */, &outputs));
ASSERT_EQ(1, outputs.size());
test::ExpectTensorEqual<float>(expected_output0, outputs[0]);
}));
std::unique_ptr<Thread> second_request_thread(Env::Default()->StartThread(
ThreadOptions(), "second_request_thread", [&] {
std::vector<Tensor> outputs;
TF_ASSERT_OK(batching_session->Run({{"x2", input1}, {"x", input0}},
{"y3"} /* outputs */,
{} /* target nodes */, &outputs));
ASSERT_EQ(1, outputs.size());
test::ExpectTensorEqual<float>(expected_output1, outputs[0]);
}));
std::unique_ptr<Thread> third_request_thread(
Env::Default()->StartThread(ThreadOptions(), "third_request_thread", [&] {
std::vector<Tensor> outputs;
TF_ASSERT_OK(batching_session->Run({{"x2", input1}, {"x", input0}},
{"y"} /* outputs */,
{} /* target nodes */, &outputs));
ASSERT_EQ(1, outputs.size());
test::ExpectTensorEqual<float>(expected_output0, outputs[0]);
}));
}

INSTANTIATE_TEST_CASE_P(WithOrWithoutThreadPools, BatchingSessionTest,
::testing::Bool());

Expand Down
17 changes: 4 additions & 13 deletions tensorflow_serving/servables/tensorflow/bundle_factory_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ Status EstimateResourceFromPath(const string& path, bool use_validation_result,
Status WrapSessionForBatching(const BatchingParameters& batching_config,
std::shared_ptr<Batcher> batch_scheduler,
const std::vector<SignatureDef>& signatures,
std::unique_ptr<Session>* session,
bool enable_default_schedule_creator) {
std::unique_ptr<Session>* session) {
LOG(INFO) << "Wrapping session to perform batch processing";

if (batch_scheduler == nullptr) {
Expand Down Expand Up @@ -107,17 +106,9 @@ Status WrapSessionForBatching(const BatchingParameters& batching_config,
{tensor_signature, create_queue});
}

// TODO(b/184973097): Remove enable_default_schedule_creator once TFLite is
// fixed.
if (enable_default_schedule_creator) {
return CreateBatchingSession(batching_session_options,
signatures_with_scheduler_creators,
create_queue, std::move(*session), session);
} else {
return CreateBatchingSession(batching_session_options,
signatures_with_scheduler_creators,
std::move(*session), session);
}
return CreateBatchingSession(batching_session_options,
signatures_with_scheduler_creators,
std::move(*session), session);
}

Status WrapSession(std::unique_ptr<Session>* session) {
Expand Down
11 changes: 6 additions & 5 deletions tensorflow_serving/servables/tensorflow/bundle_factory_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,16 @@ Status CreateBatchScheduler(
Status EstimateResourceFromPath(const string& path, bool use_validation_result,
ResourceAllocation* estimate);

// Wraps a session in a new session that automatically batches Run() calls.
// TODO(b/184973097): Remove enable_default_schedule_creator once TFLite is
// fixed.
// Wraps a session in a new session that automatically batches Run() calls, for
// the given signatures.
// TODO(b/33233998): Support batching for Run() calls that use a combination of
// signatures -- i.e. sometimes construct a single TensorSignature for a set of
// SignatureDefs (usually just two of them) -- based on some config.
Status WrapSessionForBatching(
const BatchingParameters& batching_config,
std::shared_ptr<SharedBatchScheduler<BatchingSessionTask>> batch_scheduler,
const std::vector<SignatureDef>& signatures,
std::unique_ptr<Session>* session,
bool enable_default_schedule_creator = false);
std::unique_ptr<Session>* session);

// Wraps a session in a new session that only supports Run() without batching.
Status WrapSession(std::unique_ptr<Session>* session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ Status SavedModelBundleFactory::InternalCreateSavedModelBundle(
return result;
}();

bool is_tflite = config_.prefer_tflite_model() && TfLiteModelFound(path);
if (is_tflite) {
if (config_.prefer_tflite_model() && TfLiteModelFound(path)) {
int num_tflite_pools = config_.num_tflite_pools();
if (num_tflite_pools == 0 && config_.num_tflite_interpreters() > 0) {
num_tflite_pools = config_.num_tflite_interpreters();
Expand Down Expand Up @@ -180,13 +179,10 @@ Status SavedModelBundleFactory::InternalCreateSavedModelBundle(
// Enable batching of requests to any one signature_def in the SavedModel.
// Note that in the future, the plan is to enable explicit configuration
// of the one or many SignatureDefs to enable.
// TODO(b/184973097): Remove enable_default_schedule_creator once TFLite is
// fixed.
const std::vector<SignatureDef> signatures = GetSignatureDefs(**bundle);
return WrapSessionForBatching(
config_.batching_parameters(), batch_scheduler_, signatures,
&(*bundle)->session,
/*enable_default_schedule_creator=*/!is_tflite);
return WrapSessionForBatching(config_.batching_parameters(),
batch_scheduler_, signatures,
&(*bundle)->session);
}
return WrapSession(&(*bundle)->session);
}
Expand Down

0 comments on commit ba0fa72

Please sign in to comment.