Skip to content

Commit

Permalink
Merge pull request envoyproxy#489 from jplevyak/code-cache
Browse files Browse the repository at this point in the history
Add Wasm code cache and option to fail if not cached.
  • Loading branch information
jplevyak authored Apr 24, 2020
2 parents f80bba6 + 44654fc commit 1a52eb6
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 374 deletions.
4 changes: 3 additions & 1 deletion source/common/config/datasource.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "envoy/api/api.h"
#include "envoy/config/core/v3/base.pb.h"
#include "envoy/event/deferred_deletable.h"
#include "envoy/init/manager.h"
#include "envoy/upstream/cluster_manager.h"

Expand Down Expand Up @@ -58,7 +59,8 @@ class LocalAsyncDataProvider {

using LocalAsyncDataProviderPtr = std::unique_ptr<LocalAsyncDataProvider>;

class RemoteAsyncDataProvider : public Config::DataFetcher::RemoteDataFetcherCallback,
class RemoteAsyncDataProvider : public Event::DeferredDeletable,
public Config::DataFetcher::RemoteDataFetcherCallback,
public Logger::Loggable<Logger::Id::config> {
public:
RemoteAsyncDataProvider(Upstream::ClusterManager& cm, Init::Manager& manager,
Expand Down
178 changes: 115 additions & 63 deletions source/extensions/common/wasm/wasm.cc
Original file line number Diff line number Diff line change
@@ -1,35 +1,12 @@
#include "extensions/common/wasm/wasm.h"

#include <algorithm>
#include <cctype>
#include <limits>
#include <memory>
#include <string>

#include "envoy/common/exception.h"
#include "envoy/config/wasm/v3/wasm.pb.validate.h"
#include "envoy/grpc/status.h"
#include "envoy/http/codes.h"
#include "envoy/local_info/local_info.h"
#include "envoy/thread_local/thread_local.h"

#include "common/common/assert.h"
#include "common/common/base64.h"
#include "common/common/empty_string.h"
#include "common/common/enum_to_int.h"
#include "common/common/logger.h"
#include <chrono>

#include "envoy/event/deferred_deletable.h"

#include "extensions/common/wasm/wasm_vm.h"
#include "extensions/common/wasm/well_known_names.h"
#include "common/common/logger.h"

#include "absl/base/casts.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/node_hash_map.h"
#include "absl/strings/str_cat.h"
#include "absl/synchronization/mutex.h"
#include "openssl/bytestring.h"
#include "openssl/hmac.h"
#include "openssl/sha.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -38,9 +15,22 @@ namespace Wasm {

namespace {

struct CodeCacheEntry {
std::string code;
bool in_progress;
MonotonicTime fetch_time;
};

const std::string INLINE_STRING = "<inline>";
// NB: xDS currently does not support failing asynchronously, so we fail immediately
// if remote Wasm code is not cached and do a background fill.
const bool DEFAULT_FAIL_IF_NOT_CACHED = true;
bool fail_if_code_not_cached = DEFAULT_FAIL_IF_NOT_CACHED;
const int CODE_CACHE_SECONDS_NEGATIVE_CACHING = 10;

std::atomic<int64_t> active_wasm_;
std::atomic<int64_t> active_wasms;
std::mutex code_cache_mutex;
std::unordered_map<std::string, CodeCacheEntry>* code_cache = nullptr;

// Downcast WasmBase to the actual Wasm.
inline Wasm* getWasm(WasmHandleSharedPtr& base_wasm_handle) {
Expand All @@ -50,8 +40,8 @@ inline Wasm* getWasm(WasmHandleSharedPtr& base_wasm_handle) {
} // namespace

void Wasm::initializeStats() {
active_wasm_++;
wasm_stats_.active_.set(active_wasm_);
active_wasms++;
wasm_stats_.active_.set(active_wasms);
wasm_stats_.created_.inc();
}

Expand All @@ -76,7 +66,7 @@ Wasm::Wasm(absl::string_view runtime, absl::string_view vm_id, absl::string_view
ALL_WASM_STATS(POOL_COUNTER_PREFIX(*scope_, absl::StrCat("wasm.", runtime, ".")),
POOL_GAUGE_PREFIX(*scope_, absl::StrCat("wasm.", runtime, ".")))}) {
initializeStats();
ENVOY_LOG(debug, "Base Wasm created {} now active", active_wasm_);
ENVOY_LOG(debug, "Base Wasm created {} now active", active_wasms);
}

Wasm::Wasm(WasmHandleSharedPtr base_wasm_handle, Event::Dispatcher& dispatcher)
Expand All @@ -90,7 +80,7 @@ Wasm::Wasm(WasmHandleSharedPtr base_wasm_handle, Event::Dispatcher& dispatcher)
cluster_manager_(getWasm(base_wasm_handle)->clusterManager()), dispatcher_(dispatcher),
time_source_(dispatcher.timeSource()), wasm_stats_(getWasm(base_wasm_handle)->wasm_stats_) {
initializeStats();
ENVOY_LOG(debug, "Thread-Local Wasm created {} now active", active_wasm_);
ENVOY_LOG(debug, "Thread-Local Wasm created {} now active", active_wasms);
}

void Wasm::setTickPeriod(uint32_t context_id, std::chrono::milliseconds new_tick_period) {
Expand Down Expand Up @@ -127,9 +117,9 @@ void Wasm::tickHandler(uint32_t root_context_id) {
}

Wasm::~Wasm() {
active_wasm_--;
wasm_stats_.active_.set(active_wasm_);
ENVOY_LOG(debug, "~Wasm {} remaining active", active_wasm_);
active_wasms--;
wasm_stats_.active_.set(active_wasms);
ENVOY_LOG(debug, "~Wasm {} remaining active", active_wasms);
if (server_shutdown_post_cb_) {
dispatcher_.post(server_shutdown_post_cb_);
}
Expand All @@ -155,8 +145,17 @@ void Wasm::log(absl::string_view root_id, const Http::RequestHeaderMap* request_
context->log(request_headers, response_headers, response_trailers, stream_info);
}

void clearCodeCacheForTesting(bool fail_if_not_cached) {
std::lock_guard<std::mutex> guard(code_cache_mutex);
fail_if_code_not_cached = fail_if_not_cached;
if (code_cache) {
delete code_cache;
code_cache = nullptr;
}
}

static void createWasmInternal(const VmConfig& vm_config, const PluginSharedPtr& plugin,
Stats::ScopeSharedPtr scope,
const Stats::ScopeSharedPtr& scope,
Upstream::ClusterManager& cluster_manager,
Init::Manager& init_manager, Event::Dispatcher& dispatcher,
Runtime::RandomGenerator& random, Api::Api& api,
Expand All @@ -165,45 +164,98 @@ static void createWasmInternal(const VmConfig& vm_config, const PluginSharedPtr&
std::unique_ptr<Context> root_context_for_testing,
CreateWasmCallback&& cb) {
std::string source, code;
bool fetch = false;
if (vm_config.code().has_remote()) {
source = vm_config.code().remote().http_uri().uri();
std::lock_guard<std::mutex> guard(code_cache_mutex);
if (!code_cache) {
code_cache = new std::remove_reference<decltype(*code_cache)>::type;
}
auto it = code_cache->find(vm_config.code().remote().sha256());
if (it != code_cache->end()) {
if (it->second.in_progress) {
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
"createWasm: failed to load (in prpgress) from {}", source);
throw WasmException(
fmt::format("Failed to load WASM code (fetch in progress) from {}", source));
}
code = it->second.code;
if (code.empty()) {
if (dispatcher.timeSource().monotonicTime() - it->second.fetch_time <
std::chrono::seconds(CODE_CACHE_SECONDS_NEGATIVE_CACHING)) {
ENVOY_LOG_TO_LOGGER(Envoy::Logger::Registry::getLog(Envoy::Logger::Id::wasm), warn,
"createWasm: failed to load (cached) from {}", source);
throw WasmException(fmt::format("Failed to load WASM code (cached) from {}", source));
}
fetch = true; // Fetch failed, retry.
it->second.in_progress = true;
it->second.fetch_time = dispatcher.timeSource().monotonicTime();
}
} else {
fetch = true; // Not in cache, fetch.
auto& e = (*code_cache)[vm_config.code().remote().sha256()];
e.in_progress = true;
e.fetch_time = dispatcher.timeSource().monotonicTime();
}
} else if (vm_config.code().has_local()) {
code = Config::DataSource::read(vm_config.code().local(), true, api);
source = Config::DataSource::getPath(vm_config.code().local())
.value_or(code.empty() ? EMPTY_STRING : INLINE_STRING);
}

auto callback = [vm_config, scope, &cluster_manager, &dispatcher, &lifecycle_notifier, plugin, cb,
source,
context_ptr = root_context_for_testing ? root_context_for_testing.release()
: nullptr](const std::string& code) {
if (code.empty()) {
throw WasmException(fmt::format("Failed to load WASM code from {}", source));
}
std::unique_ptr<Context> context(context_ptr);
auto configuration = vm_config.configuration();
auto vm_key = proxy_wasm::makeVmKey(vm_config.vm_id(), configuration, code);
cb(std::static_pointer_cast<WasmHandle>(proxy_wasm::createWasm(
vm_key, code, plugin,
[&vm_config, &configuration, &scope, &cluster_manager, &dispatcher,
&lifecycle_notifier](absl::string_view vm_key) {
auto wasm = std::make_shared<Wasm>(vm_config.runtime(), vm_config.vm_id(), configuration,
vm_key, scope, cluster_manager, dispatcher);
// NB: we need the shared_ptr to have been created for shared_from_this() to work.
wasm->initializeLifecycle(lifecycle_notifier);
return std::static_pointer_cast<WasmHandleBase>(std::make_shared<WasmHandle>(wasm));
},
vm_config.allow_precompiled(), std::move(context))));
};
auto complete_cb =
[cb, vm_config, plugin, scope, &cluster_manager, &dispatcher, &lifecycle_notifier,
root_context_for_testing_ptr = root_context_for_testing.release()](std::string code) {
auto root_context = std::unique_ptr<Context>(root_context_for_testing_ptr);
auto vm_key = proxy_wasm::makeVmKey(vm_config.vm_id(), vm_config.configuration(), code);
cb(std::static_pointer_cast<WasmHandle>(proxy_wasm::createWasm(
vm_key, code, plugin,
[&vm_config, &scope, &cluster_manager, &dispatcher,
&lifecycle_notifier](absl::string_view vm_key) {
auto wasm = std::make_shared<Wasm>(vm_config.runtime(), vm_config.vm_id(),
vm_config.configuration(), vm_key, scope,
cluster_manager, dispatcher);
// NB: we need the shared_ptr to have been created for shared_from_this() to work.
wasm->initializeLifecycle(lifecycle_notifier);
return std::static_pointer_cast<WasmHandleBase>(std::make_shared<WasmHandle>(wasm));
},
vm_config.allow_precompiled(), std::move(root_context))));
};

if (vm_config.code().has_remote()) {
if (fetch) {
// NB: if the (fetching) exception is thrown below, the remote_data provider will be deleted
// immediately rather than completing the async fetch, so allow for self-delete.
auto remote_data_provider_holder =
std::make_shared<std::unique_ptr<Config::DataSource::RemoteAsyncDataProvider>>();
auto fetch_callback = [vm_config, complete_cb, source, &dispatcher,
remote_data_provider_holder](const std::string& code) {
{
std::lock_guard<std::mutex> guard(code_cache_mutex);
auto& e = (*code_cache)[vm_config.code().remote().sha256()];
e.in_progress = false;
e.code = code;
}
if (!fail_if_code_not_cached) {
if (code.empty()) {
throw WasmException(
fmt::format("Failed to load WASM code (fetch failed) from {}", source));
}
complete_cb(code);
}
// NB: must be deleted explicitly.
dispatcher.deferredDelete(
Envoy::Event::DeferredDeletablePtr{remote_data_provider_holder->release()});
remote_data_provider_holder->reset();
};
remote_data_provider = std::make_unique<Config::DataSource::RemoteAsyncDataProvider>(
cluster_manager, init_manager, vm_config.code().remote(), dispatcher, random, true,
std::move(callback));
} else if (vm_config.code().has_local()) {
callback(code);
fetch_callback);
if (fail_if_code_not_cached) {
*remote_data_provider_holder = std::move(remote_data_provider);
throw WasmException(fmt::format("Failed to load WASM code (fetching) from {}", source));
}
} else {
callback(EMPTY_STRING);
complete_cb(code);
}
}

Expand Down
2 changes: 2 additions & 0 deletions source/extensions/common/wasm/wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ WasmHandleSharedPtr getOrCreateThreadLocalWasm(const WasmHandleSharedPtr& base_w
const PluginSharedPtr& plugin,
Event::Dispatcher& dispatcher);

void clearCodeCacheForTesting(bool fail_if_not_cached);

} // namespace Wasm
} // namespace Common
} // namespace Extensions
Expand Down
2 changes: 1 addition & 1 deletion test/extensions/access_loggers/wasm/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ TEST_P(WasmAccessLogConfigTest, CreateWasmFromEmpty) {
AccessLog::InstanceSharedPtr instance;
EXPECT_THROW_WITH_MESSAGE(
instance = factory->createAccessLogInstance(*message, std::move(filter), context),
Common::Wasm::WasmException, "Failed to load WASM code from ");
Common::Wasm::WasmException, "Failed to create WASM VM with unspecified runtime.");
}

TEST_P(WasmAccessLogConfigTest, CreateWasmFromWASM) {
Expand Down
5 changes: 4 additions & 1 deletion test/extensions/common/wasm/wasm_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ class TestContext : public Extensions::Common::Wasm::Context {
MOCK_METHOD2(log_, void(spdlog::level::level_enum level, absl::string_view message));
};

class WasmCommonTest : public testing::TestWithParam<std::string> {};
class WasmCommonTest : public testing::TestWithParam<std::string> {
public:
void SetUp() { clearCodeCacheForTesting(false); }
};

INSTANTIATE_TEST_SUITE_P(Runtimes, WasmCommonTest,
testing::Values("v8",
Expand Down
Loading

0 comments on commit 1a52eb6

Please sign in to comment.