diff --git a/test/userspace/async/async_key_value_source.ut.cpp b/test/userspace/async/async_key_value_source.ut.cpp new file mode 100644 index 0000000000..f57816c3c4 --- /dev/null +++ b/test/userspace/async/async_key_value_source.ut.cpp @@ -0,0 +1,265 @@ +/* +Copyright (C) 2018 Sysdig, Inc. + +This file is part of sysdig. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ +#include "async_key_value_source.h" + +#include +#include + +using namespace sysdig; + +namespace +{ + +/** + * Intermediate realization of async_key_value_source that can return pre-canned + * results. + */ +class precanned_metadata_source : public async_key_value_source +{ +public: + const static uint64_t FOREVER_MS; + + precanned_metadata_source(const uint64_t max_wait_ms, + const uint64_t ttl_ms = FOREVER_MS) + : async_key_value_source(max_wait_ms, ttl_ms), + m_responses() + { } + + void set_response(const std::string& key, const std::string& response) + { + m_responses[key] = response; + } + + std::string get_response(const std::string& key) + { + return m_responses[key]; + } + +private: + std::map m_responses; +}; +const uint64_t precanned_metadata_source::FOREVER_MS = static_cast(~0L); + +/** + * Realization of async_key_value_source that returns results without delay. + */ +class immediate_metadata_source : public precanned_metadata_source +{ +public: + const static uint64_t MAX_WAIT_TIME_MS; + + immediate_metadata_source(): + precanned_metadata_source(MAX_WAIT_TIME_MS) + { } + +protected: + virtual void run_impl() override + { + std::string key; + + while(dequeue_next_key(key)) + { + store_value(key, get_response(key)); + } + } +}; +const uint64_t immediate_metadata_source::MAX_WAIT_TIME_MS = 5000; + +/** + * Realization of async_key_value_source that returns results with some + * specified delay. + */ +class delayed_metadata_source : public precanned_metadata_source +{ +public: + const static uint64_t MAX_WAIT_TIME_MS; + + delayed_metadata_source(const uint64_t delay_ms, + const uint64_t ttl_ms = FOREVER_MS): + precanned_metadata_source(MAX_WAIT_TIME_MS, ttl_ms), + m_delay_ms(delay_ms) + { } + +protected: + virtual void run_impl() override + { + std::string key; + + while(dequeue_next_key(key)) + { + std::this_thread::sleep_for(std::chrono::milliseconds(m_delay_ms)); + store_value(key, get_response(key)); + } + } + +private: + uint64_t m_delay_ms; +}; +const uint64_t delayed_metadata_source::MAX_WAIT_TIME_MS = 0; + +} + +/** + * Ensure that a concrete async_key_value_source is in the expected initial + * state after construction. + */ +TEST(async_key_value_source_test, construction) +{ + immediate_metadata_source source; + + ASSERT_EQ(immediate_metadata_source::MAX_WAIT_TIME_MS, source.get_max_wait()); + ASSERT_EQ(precanned_metadata_source::FOREVER_MS, source.get_ttl()); + ASSERT_FALSE(source.is_running()); +} + +/** + * Ensure that if a concrete async_key_value_source returns the metadata before + * the timeout, that the lookup() method returns true, and that it returns + * the metadata in the output parameter. + */ +TEST(async_key_value_source_test, lookup_key_immediate_return) +{ + const std::string key = "foo"; + const std::string metadata = "bar"; + std::string response = "response-not-set"; + + immediate_metadata_source source; + + // Seed the precanned response + source.set_response(key, metadata); + + ASSERT_TRUE(source.lookup(key, response)); + ASSERT_EQ(metadata, response); + ASSERT_TRUE(source.is_running()); +} + +/** + * Ensure that if a concrete async_key_value_source cannot return the result + * before the timeout, and if the client did not provide a callback, that + * calling lookup() after the result it available returns the value. + */ +TEST(async_key_value_source_test, lookup_key_delayed_return_second_call) +{ + const uint64_t DELAY_MS = 50; + const std::string key = "mykey"; + const std::string metadata = "myvalue"; + + delayed_metadata_source source(DELAY_MS); + + std::string response = "response-not-set"; + bool response_found; + + // Seed the precanned response + source.set_response(key, metadata); + + response_found = source.lookup(key, response); + + ASSERT_FALSE(response_found); + + // Since we didn't supply a callback, a subsequent call to lookup + // after the metadata collection is complete will return the previously + // collected metadata. We know that the delayed_metadata_source is + // waiting for DELAY_MS, so wait longer than that. + std::this_thread::sleep_for(std::chrono::milliseconds(2 * DELAY_MS)); + + // Response should now be available + response_found = source.lookup(key, response); + + ASSERT_TRUE(response_found); + ASSERT_EQ(metadata, response); +} + +/** + * Ensure that if a concrete async_key_value_source cannot return the result + * before the timeout, and if the client did provide a callback, that the + * callback is invoked with the metadata once they're avaialble. + */ +TEST(async_key_value_source_test, look_key_delayed_async_callback) +{ + const uint64_t DELAY_MS = 50; + const std::string key = "mykey"; + const std::string metadata = "myvalue"; + + delayed_metadata_source source(DELAY_MS); + + std::string sync_response = "sync-response-not-set"; + std::string async_response = "async-response-not-set"; + bool response_found; + + // Seed the precanned response + source.set_response(key, metadata); + + response_found = source.lookup(key, + sync_response, + [&async_response](const std::string& key, + const std::string& value) + { + async_response = value; + }); + + ASSERT_FALSE(response_found); + + // Since we supplied a callback, the delayed_metadata_source should + // complete after DELAY_MS, and it should immediately call our + // callback. Wait long enough for that to happen. + std::this_thread::sleep_for(std::chrono::milliseconds(5 * DELAY_MS)); + + ASSERT_EQ(metadata, async_response); +} + +/** + * Ensure that "old" results are pruned + */ +TEST(async_key_value_source_test, prune_old_metadata) +{ + const uint64_t DELAY_MS = 0; + const uint64_t TTL_MS = 20; + + const std::string key1 = "mykey1"; + const std::string metadata1 = "myvalue1"; + + const std::string key2 = "mykey2"; + const std::string metadata2 = "myvalue2"; + + delayed_metadata_source source(DELAY_MS, TTL_MS); + std::string response = "response-not-set"; + + // Seed the precanned response + source.set_response(key1, metadata1); + source.set_response(key2, metadata2); + + // Since DELAY_MS is 0, then lookup should return false immediately, + // and should almost immediately add the result to the cache + ASSERT_FALSE(source.lookup(key1, response)); + + // Wait long enough for the old entry to require pruning + std::this_thread::sleep_for(std::chrono::milliseconds(2 * TTL_MS)); + + // Request the other key. This should wake up the thread and actually + // preform the pruning. + ASSERT_FALSE(source.lookup(key2, response)); + + // Wait long enough for the async thread to get woken up and to + // prune the old entry + std::this_thread::sleep_for(std::chrono::milliseconds(TTL_MS)); + + // Since the first key should have been pruned, a second call to + // fetch the first key should also return false. + ASSERT_FALSE(source.lookup(key1, response)); +} diff --git a/test/userspace/libsinsp/async_linux_docker_metadata_source.ut.cpp b/test/userspace/libsinsp/async_linux_docker_metadata_source.ut.cpp new file mode 100644 index 0000000000..31c79aa1b2 --- /dev/null +++ b/test/userspace/libsinsp/async_linux_docker_metadata_source.ut.cpp @@ -0,0 +1,290 @@ +/** + * @file + * + * Fill in a short overview of the file's content + * + * @copyright Copyright (c) 2019 Sysdig Inc., All Rights Reserved + */ +#include "async_linux_docker_metadata_source.h" +#include "test_helpers/web_server_helper.h" + +#include +#include +#include +#include +#include + +#include +#include + +#if defined(LOCAL_DEBUG) +# include +# define LOG(fmt, ...) fprintf(stderr, "[%s]:%d: " fmt "\n", __FUNCTION__, __LINE__, ##__VA_ARGS__) +#else +# define LOG(fmt, ...) do { } while(false) +#endif + +using namespace test; +using namespace sysdig; + +class sinsp_container_manager; + +namespace +{ + +const std::string s_container_id = "b646c6d7cad09218238eb0ccf72d78024bc4e742a11b778c1637575121abcdd5"; +const std::string s_image_id = "fe52b035c0bdc374d688ec285efc80a349b34e188031408a037c171cadf3e47b"; + +/** + * Read the content of the file with the given filename into the given + * content. + * + * @param[in] filename The name of the file to read + * @param[out] content On success, returns the output of the file + * + * @returns true if the file can be opened for reading, false otherwise. + */ +bool read_file(const std::string& filename, std::string& content) +{ + std::ifstream in(filename); + + if(!in) + { + return false; + } + + std::ostringstream out; + + Poco::StreamCopier::copyStream(in, out); + + content = std::move(out.str()); + + return true; +} + +/** + * Base class for all tests; used to set up a suite-wide fixture. + */ +class async_linux_docker_metadata_source_test : public testing::Test +{ +public: + /** + * Allocate the web_server_helper before any test run. + */ + static void SetUpTestCase() + { + std::string content; + + ASSERT_EQ(s_server_helper, nullptr); + s_server_helper = new web_server_helper(); + + ASSERT_TRUE(read_file("./resources/docker_container_" + s_container_id + ".json", + content)); + s_server_helper->set_content("/v1.24/containers/" + s_container_id + "/json", + content); + + ASSERT_TRUE(read_file("./resources/docker_image_" + s_image_id + ".json", + content)); + + s_server_helper->set_content("/v1.24/images/" + s_image_id + "/json?digests=1", + content); + } + + /** + * Deallocate the web_server_helper after all tests have finished. + */ + static void TearDownTestCase() + { + ASSERT_NE(s_server_helper, nullptr); + delete s_server_helper; + } + +protected: + /** + * Enable the tests to get the server without being able to muck + * with the pointer. + */ + web_server_helper& get_docker() + { + return *s_server_helper; + } + + /** + * Returns true if the given collection contains the given element + * + * @tparam collection The type of collection to search + * @tparam element_type The type of element in the collection + * + * @param[in] collection The collection to search + * @param[in] element The element for which to search + * + * @returns true if the element is found in the collection, + * false otherwise. + */ + template + bool contains(const collection_type& collection, const element_type& element) + { + return (std::find(collection.begin(), collection.end(), element) != collection.end()); + } + +private: + static web_server_helper* s_server_helper; +}; + +web_server_helper* async_linux_docker_metadata_source_test::s_server_helper; + +} // end namespace + + +/** + * Ensure that the constructor puts the metadata source in the expected + * initial state. + */ +TEST_F(async_linux_docker_metadata_source_test, constructor) +{ + async_linux_docker_metadata_source source; + + ASSERT_EQ(async_linux_docker_metadata_source::DEFAULT_API_VERSION, + source.get_api_version()); + ASSERT_EQ(async_linux_docker_metadata_source::DEFAULT_DOCKER_SOCKET_PATH, + source.get_socket_path()); + ASSERT_TRUE(source.query_image_info()); +} + +/** + * Ensure that if the client specifies custom values for the api version and + * the socket path, that those values are recorded. + */ +TEST_F(async_linux_docker_metadata_source_test, constructor_custom_values) +{ + const bool query_image_info = true; + const std::string api_version = "v10"; + const std::string socket_path = "/some/path.sock"; + + async_linux_docker_metadata_source source(query_image_info, + socket_path, + api_version); + + ASSERT_EQ(api_version, source.get_api_version()); + ASSERT_EQ(socket_path, source.get_socket_path()); + ASSERT_EQ(query_image_info, source.query_image_info()); +} + +/** + * Ensure that set_query_image_info() updates the image info query state. + */ +TEST_F(async_linux_docker_metadata_source_test, query_image_info) +{ + async_linux_docker_metadata_source source; + + source.set_query_image_info(false); + ASSERT_FALSE(source.query_image_info()); + + source.set_query_image_info(true); + ASSERT_TRUE(source.query_image_info()); +} + +/** + * Ensure that lookup_metrics() exhibits the expected behavior. Specifically, + * we expect the first call to lookup_metrics() to fail, and to kick off the + * background thread. We expect that thread, within a reasonable amount of + * time, to fetch the desired content, and to parse it. We expect a subsequent + * call to lookup_metrics() to return the parsed metrics. + */ +TEST_F(async_linux_docker_metadata_source_test, lookup_metrics) +{ + const bool query_image_info = true; + std::shared_ptr container_info(new sinsp_container_info()); + sinsp_container_manager* manager = nullptr; + + container_info->m_id = s_container_id; + container_info->m_type = CT_DOCKER; + + docker_metadata metadata(manager, container_info); + async_linux_docker_metadata_source source(query_image_info, + get_docker().get_socket_path()); + + // The first call to lookup() will kick off the async lookup. The + // Docker metadata fetcher will not block waiting for a response, so + // the first call for a given id should always fail. + ASSERT_FALSE(source.lookup(container_info->m_id, metadata)); + + // We don't know exactly how long it will take for the async fetcher to + // contact the docker server helper, for the server helper to return + // the precanned response, and for the async fetcher to parse the + // results. We should be able to poll for the response. We'll poll + // for up to a max of 10s -- if it takes more than 10s, we'll assume + // something has gone horribly wrong. + const int MAX_WAIT_TIME_SECS = 10; + const int FRACTION_OF_SECOND = 10; + bool eventually_successful = false; + + for (int i = 0; !eventually_successful && i < (MAX_WAIT_TIME_SECS * FRACTION_OF_SECOND); ++i) + { + const int ONE_SEC_MS = 1000; + std::this_thread::sleep_for(std::chrono::milliseconds(ONE_SEC_MS / FRACTION_OF_SECOND)); + eventually_successful = source.lookup(container_info->m_id, metadata); + } + + ASSERT_TRUE(eventually_successful); + container_info = metadata.m_container_info; + + // Make sure that we correctly parsed the interesting information + ASSERT_EQ(container_info->m_id, s_container_id); + ASSERT_EQ(container_info->m_type, CT_DOCKER); + ASSERT_EQ(container_info->m_name, "opengrok"); + ASSERT_EQ(container_info->m_image, "opengrok/docker:latest"); + ASSERT_EQ(container_info->m_imageid, s_image_id); + ASSERT_EQ(container_info->m_imagerepo, "opengrok/docker"); + ASSERT_EQ(container_info->m_imagetag, "latest"); + ASSERT_EQ(container_info->m_imagedigest, ""); + ASSERT_EQ(container_info->m_container_ip, 2886795267); + ASSERT_FALSE(container_info->m_privileged); + + ASSERT_NE(container_info->m_mounts.begin(), container_info->m_mounts.end()); + { + auto itr = container_info->m_mounts.begin(); + + ASSERT_EQ(itr->m_source, "/home/user/.opengrok"); + ASSERT_EQ(itr->m_dest, "/src"); + ASSERT_EQ(itr->m_mode, ""); + ASSERT_TRUE(itr->m_rdwr); + ASSERT_EQ(itr->m_propagation, "rprivate"); + } + + ASSERT_NE(container_info->m_port_mappings.begin(), container_info->m_port_mappings.end()); + { + auto itr = container_info->m_port_mappings.begin(); + + ASSERT_EQ(itr->m_host_ip, 0); + ASSERT_EQ(itr->m_host_port, 8080); + ASSERT_EQ(itr->m_container_port, 8080); + } + + ASSERT_TRUE(container_info->m_labels.empty()); + + ASSERT_NE(container_info->m_env.begin(), container_info->m_env.end()); + { + ASSERT_TRUE(contains(container_info->m_env, "REINDEX=0")); + ASSERT_TRUE(contains(container_info->m_env, "LANG=C.UTF-8")); + ASSERT_TRUE(contains(container_info->m_env, "JAVA_HOME=/docker-java-home/jre")); + ASSERT_TRUE(contains(container_info->m_env, "JAVA_VERSION=8u181")); + ASSERT_TRUE(contains(container_info->m_env, "JAVA_DEBIAN_VERSION=8u181-b13-2~deb9u1")); + ASSERT_TRUE(contains(container_info->m_env, "CATALINA_HOME=/usr/local/tomcat")); + ASSERT_TRUE(contains(container_info->m_env, "TOMCAT_NATIVE_LIBDIR=/usr/local/tomcat/native-jni-lib")); + ASSERT_TRUE(contains(container_info->m_env, "LD_LIBRARY_PATH=/usr/local/tomcat/native-jni-lib")); + ASSERT_TRUE(contains(container_info->m_env, "OPENSSL_VERSION=1.1.0j-1~deb9u1")); + ASSERT_TRUE(contains(container_info->m_env, "TOMCAT_MAJOR=9")); + ASSERT_TRUE(contains(container_info->m_env, "TOMCAT_VERSION=9.0.14")); + } + + ASSERT_EQ(container_info->m_mesos_task_id, std::string()); + ASSERT_EQ(container_info->m_memory_limit, 0); + ASSERT_EQ(container_info->m_swap_limit, 0); + ASSERT_EQ(container_info->m_cpu_shares, 1024); + ASSERT_EQ(container_info->m_cpu_quota, 0); + ASSERT_EQ(container_info->m_cpu_period, 100000); + ASSERT_EQ(container_info->m_sysdig_agent_conf, std::string()); + ASSERT_EQ(container_info->m_metadata_deadline, 0); +} + diff --git a/test/userspace/libsinsp/curl_url_fetcher.ut.cpp b/test/userspace/libsinsp/curl_url_fetcher.ut.cpp new file mode 100644 index 0000000000..d18fcb38f7 --- /dev/null +++ b/test/userspace/libsinsp/curl_url_fetcher.ut.cpp @@ -0,0 +1,160 @@ +/* +Copyright (C) 2018 Sysdig, Inc. + +This file is part of sysdig. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ +#include "curl_url_fetcher.h" +#include "test_helpers/web_server_helper.h" +#include + +using namespace sysdig; +using namespace test; + +namespace +{ + +const int HTTP_OK = 200; + +} // end namespace; + +/* + * Base class for all tests; used to set up a suite-wide fixture. + */ +class curl_url_fetcher_test : public testing::Test +{ +public: + /** + * Allocate the web_server_helper before any test run. + */ + static void SetUpTestCase() + { + ASSERT_EQ(s_unix_server_helper, nullptr); + s_unix_server_helper = new web_server_helper(); + + ASSERT_EQ(s_tcp_server_helper, nullptr); + s_tcp_server_helper = new web_server_helper( + web_server_helper::SELECT_EPHEMERAL_PORT); + } + + /** + * Deallocate the web_server_helper after all tests have finished. + */ + static void TearDownTestCase() + { + ASSERT_NE(s_unix_server_helper, nullptr); + delete s_unix_server_helper; + s_unix_server_helper = nullptr; + + ASSERT_NE(s_tcp_server_helper, nullptr); + delete s_tcp_server_helper; + s_tcp_server_helper = nullptr; + } + + /** + * Clear any entries from the helpers. + */ + void TearDown() + { + s_unix_server_helper->reset(); + s_tcp_server_helper->reset(); + } + +protected: + /** + * Enable the tests to get the UNIX domain socket server helper without + * being able to muck with the pointer. + */ + web_server_helper& get_unix_server() + { + return *s_unix_server_helper; + } + + /** + * Enable the tests to get the TCP server helper without + * being able to muck with the pointer. + */ + web_server_helper& get_tcp_server() + { + return *s_tcp_server_helper; + } + + +private: + static web_server_helper* s_unix_server_helper; + static web_server_helper* s_tcp_server_helper; +}; + +web_server_helper* curl_url_fetcher_test::s_unix_server_helper; +web_server_helper* curl_url_fetcher_test::s_tcp_server_helper; + +/** + * Ensure that the default constructor creates a curl_url_fetcher for TCP. + */ +TEST_F(curl_url_fetcher_test, tcp_constructor) +{ + curl_url_fetcher fetcher; + + ASSERT_TRUE(fetcher.is_tcp()); + ASSERT_EQ(fetcher.get_socket_path(), std::string()); +} + +/** + * Ensure that the parameterized constructor creates a curl_url_fetcher for + * UNIX domain sockets. + */ +TEST_F(curl_url_fetcher_test, unix_domain_constructor) +{ + curl_url_fetcher fetcher(get_unix_server().get_socket_path()); + + ASSERT_FALSE(fetcher.is_tcp()); + ASSERT_EQ(fetcher.get_socket_path(), get_unix_server().get_socket_path()); +} + +/** + * Ensure that a curl_url_fetcher can fetch a document via TCP. + */ +TEST_F(curl_url_fetcher_test, fetch_tcp) +{ + const std::string path = "/foo"; + const std::string expected_content = "bar"; + const std::string url = "http://localhost:" + + std::to_string(get_tcp_server().get_port()) + + path; + std::string actual_content; + curl_url_fetcher fetcher; + + get_tcp_server().set_content(path, expected_content); + + ASSERT_EQ(fetcher.fetch(url, actual_content), HTTP_OK); + ASSERT_EQ(expected_content, actual_content); +} + +/** + * Ensure that a curl_url_fetcher can fetch a document via a UNIX domain socket. + */ +TEST_F(curl_url_fetcher_test, fetch_unix) +{ + const std::string path = "/bar"; + const std::string expected_content = "foo"; + const std::string url = "http://localhost:" + path; + std::string actual_content; + curl_url_fetcher fetcher(get_unix_server().get_socket_path()); + + get_unix_server().set_content(path, expected_content); + + ASSERT_EQ(fetcher.fetch(url, actual_content), HTTP_OK); + ASSERT_EQ(expected_content, actual_content); +} diff --git a/userspace/async/async_key_value_source.h b/userspace/async/async_key_value_source.h new file mode 100644 index 0000000000..8f20324002 --- /dev/null +++ b/userspace/async/async_key_value_source.h @@ -0,0 +1,276 @@ +/* +Copyright (C) 2018 Sysdig, Inc. + +This file is part of sysdig. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace sysdig +{ + +/** + * Base class for classes that need to collect values asynchronously from some + * value source. Subclasses will override the the run_impl() method and + * implement the concrete value lookup behavior. In that method, subclasses + * will use use dequeue_next_key() method to get the key that it will use to + * collect the value(s), collect the appropriate value(s), and call the + * store_value() method to save the value. The run_impl() method should + * continue to dequeue and process values while the dequeue_next_key() method + * returns true. + * + * The constructor for this class accepts a maximum wait time; this specifies + * how long client code is willing to wait for a synchronous response (i.e., + * how long the lookup() method will block waiting for the requested value). + * If the async_key_value_source is able to collect the requested value + * within that time period, then the lookup() method will return them. + * + * If the lookup() method is unable to collect the requested value within + * the requested time period, then one of two things will happen. + * + *
    + *
  1. If the client supplied a callback handler in the call to lookup(), then + * that callback handler will be invoked by the async_key_value_source once + * the value has been collected. Note that the callback handler will be + * invoked in the context of the asynchronous thread associated with the + * async_key_value_source.
  2. + *
  3. If the client did not supply a handler, then the value will be stored, + * and the next call to the lookup() method with the same key will return + * the previously collected value. If lookup() is not called with the + * specified ttl time, then this compoment will prune the stored value.
  4. + *
+ * + * @tparam key_type The type of the keys for which concrete subclasses will + * query. This type must have a valid operator==(). + * @tparam value_type The type of value that concrete subclasses will + * receive from a query. This type must have a valid + * operator=(). + */ +template +class async_key_value_source +{ +public: + /** + * If provided to the constructor as max_wait_ms, then lookup will + * not wait for a response. + */ + const static uint64_t NO_WAIT_LOOKUP = 0; + + typedef std::function callback_handler; + + /** + * Initialize this new async_key_value_source, which will block + * synchronously for the given max_wait_ms for value collection. + * + * @param[in] max_wait_ms The maximum amount of time that client code + * is willing to wait for lookup() to collect + * a value before falling back to an async + * return. + * @param[in] ttl_ms The time, in milliseconds, that a cached + * value will live before being considered + * "too old" and being pruned. + */ + async_key_value_source(uint64_t max_wait_ms, uint64_t ttl_ms); + + async_key_value_source(const async_key_value_source&) = delete; + async_key_value_source(async_key_value_source&&) = delete; + async_key_value_source& operator=(const async_key_value_source&) = delete; + + virtual ~async_key_value_source(); + + /** + * Returns the maximum amount of time, in milliseconds, that a call to + * lookup() will block synchronously before returning. + */ + uint64_t get_max_wait() const; + + /** + * Returns the maximum amount of time, in milliseconds, that a cached + * value will live before being pruned. + */ + uint64_t get_ttl() const; + + /** + * Lookup value(s) based on the given key. This method will block + * the caller for up the max_wait_ms time specified at construction + * for the desired value(s) to be available. + * + * @param[in] key The key to the value for which the client + * wishs to query. + * @param[out] value If this method is able to fetch the desired + * value within the max_wait_ms specified at + * construction time, then this output parameter + * will contain the collected value. The value + * of this parameter is defined only if this method + * returns true. + * @param[in] handler If this method is unable to collect the requested + * value(s) before the timeout, and if this parameter + * is a valid, non-empty, function, then this class + * will invoke the given handler from the async + * thread immediately after the collected values + * are available. If this handler is empty, then + * this async_key_value_source will store the + * values until either the next call to lookup() + * or until its ttl expires, whichever comes first. + * The handler is responsible for any thread-safety + * guarantees. + * + * @returns true if this method was able to lookup and return the + * value synchronously; false otherwise. + */ + bool lookup(const key_type& key, + value_type& value, + const callback_handler& handler = callback_handler()); + + /** + * Determines if the async thread assocaited with this + * async_key_value_source is running. + * + * Note: This API is for information only. Clients should + * not use this to implement any sort of complex behavior. Such + * use will lead to race conditions. For example, is_running() and + * lookup() could potentially race, causing is_running() to return + * false after lookup() has started the thread. + * + * @returns true if the async thread is running, false otherwise. + */ + bool is_running() const; + +protected: + /** + * Stops the thread assocaited with this async_key_value_source, if + * it is running; otherwise, does nothing. The only use for this is + * in a destructor to ensure that the async thread stops when the + * object is destroyed. + */ + void stop(); + + /** + * Dequeues an entry from the request queue and returns it in the given + * key. Concrete subclasses will call this method to get the next key + * for which to collect values. + * + * @returns true if there was a key to dequeue, false otherwise. + */ + bool dequeue_next_key(key_type& key); + + /** + * Get the (potentially partial) value for the given key. + * + * @param[in] key The key whose value is needed. + * + * @returns the value associated with the given key. + */ + value_type get_value(const key_type& key); + + /** + * Stores a value for the given key. Concrete subclasses will call + * this method from their run_impl() method to save (or otherwise + * notifiy the client about) an available value. + * + * @param[in] key The key for which the client asked for the value. + * @param[in] value The collected value. + */ + void store_value(const key_type& key, const value_type& value); + + /** + * Concrete subclasses must override this method to perform the + * asynchronous value lookup. The implementation should: + * + *
    + *
  • Loop while dequeue_next_key() is true.
  • + *
  • Get any existing value for that key using get_value()
  • + *
  • Do whatever work is necessary to lookup the value associated + * with that key.
  • + *
  • Call store_value to store the updated value, and to + * notify any client code waiting on that data.
  • + *
+ */ + virtual void run_impl() = 0; + +private: + /** + * Holds information associated with a single lookup() request. + */ + struct lookup_request + { + lookup_request(): + m_available(false), + m_value(), + m_available_condition(), + m_callback(), + m_start_time(std::chrono::steady_clock::now()) + { } + + /** Is the value here available? */ + bool m_available; + + /** The value for a key. */ + value_type m_value; + + /** Block in lookup() waiting for a sync response. */ + std::condition_variable m_available_condition; + + /** + * A optional client-specified callback handler for async + * response notification. + */ + callback_handler m_callback; + + /** The time at which this request was made. */ + std::chrono::time_point m_start_time; + }; + + typedef std::map value_map; + + /** + * The entry point of the async thread, which blocks waiting for work + * and dispatches work to run_impl(). + */ + void run(); + + /** + * Remove any entries that are older than the time-to-live. + */ + void prune_stale_requests(); + + uint64_t m_max_wait_ms; + uint64_t m_ttl_ms; + std::thread m_thread; + bool m_running; + bool m_terminate; + mutable std::mutex m_mutex; + std::condition_variable m_start_condition; + std::condition_variable m_queue_not_empty_condition; + std::list m_request_queue; + std::set m_request_set; + value_map m_value_map; +}; + + +} // end namespace sysdig + +#include "async_key_value_source.tpp" diff --git a/userspace/async/async_key_value_source.tpp b/userspace/async/async_key_value_source.tpp new file mode 100644 index 0000000000..ed972e1d95 --- /dev/null +++ b/userspace/async/async_key_value_source.tpp @@ -0,0 +1,288 @@ +/* +Copyright (C) 2018 Sysdig, Inc. + +This file is part of sysdig. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ +#include "logger.h" + +#include +#include +#include +#include +#include +#include + +namespace sysdig +{ + +template +async_key_value_source::async_key_value_source( + const uint64_t max_wait_ms, + const uint64_t ttl_ms): + m_max_wait_ms(max_wait_ms), + m_ttl_ms(ttl_ms), + m_thread(), + m_running(false), + m_terminate(false), + m_mutex(), + m_queue_not_empty_condition(), + m_value_map() +{ } + +template +async_key_value_source::~async_key_value_source() +{ + try + { + stop(); + } + catch(...) + { + g_logger.log(std::string(__FUNCTION__) + + ": Exception in destructor", + sinsp_logger::SEV_ERROR); + } +} + +template +uint64_t async_key_value_source::get_max_wait() const +{ + return m_max_wait_ms; +} + +template +uint64_t async_key_value_source::get_ttl() const +{ + return m_ttl_ms; +} + +template +void async_key_value_source::stop() +{ + bool join_needed = false; + + { + std::unique_lock guard(m_mutex); + + if(m_running) + { + m_terminate = true; + join_needed = true; + + // The async thread might be waiting for new events + // so wake it up + m_queue_not_empty_condition.notify_one(); + } + } // Drop the mutex before join() + + if (join_needed) + { + m_thread.join(); + + // Remove any pointers from the thread to this object + // (just to be safe) + m_thread = std::thread(); + } +} + +template +bool async_key_value_source::is_running() const +{ + // Since this is for information only and it's ok to race, we + // expliclty do not lock here. + + return m_running; +} + +template +void async_key_value_source::run() +{ + m_running = true; + + while(!m_terminate) + { + { + std::unique_lock guard(m_mutex); + + while(!m_terminate && m_request_queue.empty()) + { + // Wait for something to show up on the queue + m_queue_not_empty_condition.wait(guard); + } + + prune_stale_requests(); + } + + if(!m_terminate) + { + run_impl(); + } + } + + m_running = false; +} + +template +bool async_key_value_source::lookup( + const key_type& key, + value_type& value, + const callback_handler& callback) +{ + std::unique_lock guard(m_mutex); + + if(!m_running && !m_thread.joinable()) + { + m_thread = std::thread(&async_key_value_source::run, this); + } + + typename value_map::const_iterator itr = m_value_map.find(key); + bool request_complete = (itr != m_value_map.end()) && itr->second.m_available; + + if(!request_complete) + { + // Haven't made the request yet + if (itr == m_value_map.end()) + { + m_value_map[key].m_available = false; + m_value_map[key].m_value = value; + } + + // Make request to API and let the async thread know about it + if (std::find(m_request_set.begin(), + m_request_set.end(), + key) == m_request_set.end()) + { + m_request_queue.push_back(key); + m_request_set.insert(key); + m_queue_not_empty_condition.notify_one(); + } + + // + // If the client code is willing to wait a short amount of time + // to satisfy the request, then wait for the async thread to + // pick up the newly-added request and execute it. If + // processing that request takes too much time, then we'll + // not be able to return the value information on this call, + // and the async thread will continue handling the request so + // that it'll be available on the next call. + // + if (m_max_wait_ms > 0) + { + m_value_map[key].m_available_condition.wait_for( + guard, + std::chrono::milliseconds(m_max_wait_ms)); + + itr = m_value_map.find(key); + request_complete = (itr != m_value_map.end()) && itr->second.m_available; + } + } + + if(request_complete) + { + value = itr->second.m_value; + m_value_map.erase(key); + } + else + { + m_value_map[key].m_callback = callback; + } + + return request_complete; +} + +template +bool async_key_value_source::dequeue_next_key(key_type& key) +{ + std::lock_guard guard(m_mutex); + bool key_found = false; + + if(m_request_queue.size() > 0) + { + key_found = true; + + key = m_request_queue.front(); + m_request_queue.pop_front(); + m_request_set.erase(key); + } + + return key_found; +} + +template +value_type async_key_value_source::get_value( + const key_type& key) +{ + std::lock_guard guard(m_mutex); + + return m_value_map[key].m_value; +} + +template +void async_key_value_source::store_value( + const key_type& key, + const value_type& value) +{ + std::lock_guard guard(m_mutex); + + if (m_value_map[key].m_callback) + { + m_value_map[key].m_callback(key, value); + m_value_map.erase(key); + } + else + { + m_value_map[key].m_value = value; + m_value_map[key].m_available = true; + m_value_map[key].m_available_condition.notify_one(); + } +} + +/** + * Prune any "old" outstanding requests. This method expect that the caller + * is holding m_mutex. + */ +template +void async_key_value_source::prune_stale_requests() +{ + // Avoid both iterating over and modifying the map by saving a list + // of keys to prune. + std::vector keys_to_prune; + + for(auto i = m_value_map.begin(); + !m_terminate && (i != m_value_map.end()); + ++i) + { + const auto now = std::chrono::steady_clock::now(); + + const auto age_ms = + std::chrono::duration_cast( + now - i->second.m_start_time).count(); + + if(age_ms > m_ttl_ms) + { + keys_to_prune.push_back(i->first); + } + } + + for(auto i = keys_to_prune.begin(); + !m_terminate && (i != keys_to_prune.end()); + ++i) + { + m_value_map.erase(*i); + } +} + +} // end namespace sysdig diff --git a/userspace/libsinsp/CMakeLists.txt b/userspace/libsinsp/CMakeLists.txt index fa47321eb1..87dcb46f73 100644 --- a/userspace/libsinsp/CMakeLists.txt +++ b/userspace/libsinsp/CMakeLists.txt @@ -18,6 +18,7 @@ include_directories(./) include_directories(../../common) include_directories(../libscap) +include_directories(../async) include_directories("${JSONCPP_INCLUDE}") include_directories("${LUAJIT_INCLUDE}") @@ -31,10 +32,13 @@ if(NOT WIN32 AND NOT APPLE) endif() add_library(sinsp STATIC + async_docker_metadata_source.cpp + async_linux_docker_metadata_source.cpp chisel.cpp chisel_api.cpp container.cpp ctext.cpp + curl_url_fetcher.cpp cyclewriter.cpp cursescomponents.cpp cursestable.cpp @@ -100,6 +104,7 @@ add_library(sinsp STATIC sinsp_auth.cpp sinsp_curl.cpp stopwatch.cpp + url_fetcher.cpp uri_parser.c uri.cpp utils.cpp diff --git a/userspace/libsinsp/async_docker_metadata_source.cpp b/userspace/libsinsp/async_docker_metadata_source.cpp new file mode 100644 index 0000000000..442cf8b8ce --- /dev/null +++ b/userspace/libsinsp/async_docker_metadata_source.cpp @@ -0,0 +1,350 @@ +/* +Copyright (C) 2018 Sysdig, Inc. + +This file is part of sysdig. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ +#include "async_docker_metadata_source.h" + +#if defined(CYGWING_AGENT) +# include "async_windows_docker_metadata_source.h" +#else +# include "async_linux_docker_metadata_source.h" +#endif + +#include "sinsp_int.h" +#include "logger.h" + +#if defined(LOCAL_DEBUG) +# include +# define LOG(fmt, ...) fprintf(stderr, "[%s]:%d: " fmt "\n", __FUNCTION__, __LINE__, ##__VA_ARGS__) +#else +# define LOG(fmt, ...) do { } while(false) +#endif + +using namespace sysdig; + +async_docker_metadata_source::async_docker_metadata_source( + const std::string& api_version, + const bool query_image_info): + async_key_value_source(NO_WAIT_LOOKUP, + MAX_TTL_MS), + m_query_image_info(query_image_info), + m_api_version(api_version) +{ } + +const std::string& async_docker_metadata_source::get_api_version() const +{ + return m_api_version; +} + +bool async_docker_metadata_source::query_image_info() const +{ + return m_query_image_info; +} + +void async_docker_metadata_source::set_query_image_info(const bool query_info) +{ + m_query_image_info = query_info; +} + +void async_docker_metadata_source::run_impl() +{ + std::string container_id; + + while(dequeue_next_key(container_id)) + { + docker_metadata metadata = get_value(container_id); + + if(parse_docker(metadata.m_manager, + metadata.m_container_info.get())) + { + store_value(container_id, metadata); + } + } +} + +bool async_docker_metadata_source::parse_docker(sinsp_container_manager* const manager, + sinsp_container_info* const container) +{ + std::string json; + sinsp_docker_response resp = + get_docker(manager, + build_request("/containers/" + container->m_id + "/json"), + json); + + switch(resp) + { + case sinsp_docker_response::RESP_OK: + break; + + case sinsp_docker_response::RESP_BAD_REQUEST: + m_api_version = ""; + json = ""; + + resp = get_docker(manager, + build_request("/containers/" + container->m_id + "/json"), + json); + if(resp == sinsp_docker_response::RESP_OK) + { + break; + } + /* FALLTHRU */ + + case sinsp_docker_response::RESP_ERROR: + ASSERT(false); + return false; + } + + Json::Value root; + Json::Reader reader; + const bool parsingSuccessful = reader.parse(json, root); + if(!parsingSuccessful) + { + ASSERT(false); + return false; + } + + const Json::Value& config_obj = root["Config"]; + + container->m_image = config_obj["Image"].asString(); + + std::string imgstr = root["Image"].asString(); + size_t cpos = imgstr.find(":"); + if(cpos != std::string::npos) + { + container->m_imageid = imgstr.substr(cpos + 1); + } + + // containers can be spawned using just the imageID as image name, + // with or without the hash prefix (e.g. sha256:) + bool no_name = !container->m_imageid.empty() && + strncmp(container->m_image.c_str(), container->m_imageid.c_str(), + MIN(container->m_image.length(), container->m_imageid.length())) == 0; + no_name |= !imgstr.empty() && + strncmp(container->m_image.c_str(), imgstr.c_str(), + MIN(container->m_image.length(), imgstr.length())) == 0; + + if(!no_name || !m_query_image_info) + { + std::string hostname; + std::string port; + + sinsp_utils::split_container_image(container->m_image, + hostname, + port, + container->m_imagerepo, + container->m_imagetag, + container->m_imagedigest, + false); + } + + if(m_query_image_info && !container->m_imageid.empty() && + (no_name || container->m_imagedigest.empty() || + (!container->m_imagedigest.empty() && container->m_imagetag.empty()))) + { + std::string img_json; + + resp = get_docker(manager, + build_request("/images/" + container->m_imageid + "/json?digests=1"), + json); + if(resp == sinsp_docker_response::RESP_OK) + { + Json::Value img_root; + if(reader.parse(img_json, img_root)) + { + for(const auto& rdig : img_root["RepoDigests"]) + { + if(rdig.isString()) + { + std::string repodigest = rdig.asString(); + if(container->m_imagerepo.empty()) + { + container->m_imagerepo = repodigest.substr(0, repodigest.find("@")); + } + if(repodigest.find(container->m_imagerepo) != std::string::npos) + { + container->m_imagedigest = repodigest.substr(repodigest.find("@")+1); + break; + } + } + } + for(const auto& rtag : img_root["RepoTags"]) + { + if(rtag.isString()) + { + std::string repotag = rtag.asString(); + if(container->m_imagerepo.empty()) + { + container->m_imagerepo = repotag.substr(0, repotag.rfind(":")); + } + if(repotag.find(container->m_imagerepo) != std::string::npos) + { + container->m_imagetag = repotag.substr(repotag.rfind(":")+1); + break; + } + } + } + } + } + } + if(container->m_imagetag.empty()) + { + container->m_imagetag = "latest"; + } + + container->m_name = root["Name"].asString(); + + if(!container->m_name.empty() && container->m_name[0] == '/') + { + container->m_name = container->m_name.substr(1); + } + + const Json::Value& net_obj = root["NetworkSettings"]; + + std::string ip = net_obj["IPAddress"].asString(); + if(ip.empty()) + { + const Json::Value& hconfig_obj = root["HostConfig"]; + std::string net_mode = hconfig_obj["NetworkMode"].asString(); + if(strncmp(net_mode.c_str(), "container:", strlen("container:")) == 0) + { + std::string container_id = net_mode.substr(net_mode.find(":") + 1); + uint32_t container_ip; + const sinsp_container_info* const container_info = + manager ? manager->get_container(container_id) : nullptr; + + if(container_info) + { + container_ip = container_info->m_container_ip; + } + else + { + sinsp_container_info pcnt; + pcnt.m_id = container_id; + parse_docker(manager, &pcnt); + container_ip = pcnt.m_container_ip; + } + container->m_container_ip = container_ip; + } + } + else + { + if(inet_pton(AF_INET, ip.c_str(), &container->m_container_ip) == -1) + { + ASSERT(false); + } + container->m_container_ip = ntohl(container->m_container_ip); + } + + std::vector ports = net_obj["Ports"].getMemberNames(); + for(std::vector::const_iterator it = ports.begin(); it != ports.end(); ++it) + { + size_t tcp_pos = it->find("/tcp"); + if(tcp_pos == std::string::npos) + { + continue; + } + + uint16_t container_port = atoi(it->c_str()); + + const Json::Value& v = net_obj["Ports"][*it]; + if(v.isArray()) + { + for(uint32_t j = 0; j < v.size(); ++j) + { + sinsp_container_info::container_port_mapping port_mapping; + + ip = v[j]["HostIp"].asString(); + std::string port = v[j]["HostPort"].asString(); + + if(inet_pton(AF_INET, ip.c_str(), &port_mapping.m_host_ip) == -1) + { + ASSERT(false); + continue; + } + port_mapping.m_host_ip = ntohl(port_mapping.m_host_ip); + + port_mapping.m_container_port = container_port; + port_mapping.m_host_port = atoi(port.c_str()); + container->m_port_mappings.push_back(port_mapping); + } + } + } + + std::vector labels = config_obj["Labels"].getMemberNames(); + for(std::vector::const_iterator it = labels.begin(); it != labels.end(); ++it) + { + std::string val = config_obj["Labels"][*it].asString(); + container->m_labels[*it] = val; + } + + const Json::Value& env_vars = config_obj["Env"]; + + for(const auto& env_var : env_vars) + { + if(env_var.isString()) + { + container->m_env.emplace_back(env_var.asString()); + } + } + + const auto& host_config_obj = root["HostConfig"]; + container->m_memory_limit = host_config_obj["Memory"].asInt64(); + container->m_swap_limit = host_config_obj["MemorySwap"].asInt64(); + const auto cpu_shares = host_config_obj["CpuShares"].asInt64(); + if(cpu_shares > 0) + { + container->m_cpu_shares = cpu_shares; + } + container->m_cpu_quota = host_config_obj["CpuQuota"].asInt64(); + const auto cpu_period = host_config_obj["CpuPeriod"].asInt64(); + if(cpu_period > 0) + { + container->m_cpu_period = cpu_period; + } + const Json::Value &privileged = host_config_obj["Privileged"]; + if(!privileged.isNull() && privileged.isBool()) + { + container->m_privileged = privileged.asBool(); + } + + sinsp_container_info::parse_json_mounts(root["Mounts"], container->m_mounts); + +#ifdef HAS_ANALYZER + sinsp_utils::find_env(container->m_sysdig_agent_conf, container->get_env(), "SYSDIG_AGENT_CONF"); + // container->m_sysdig_agent_conf = get_docker_env(env_vars, "SYSDIG_AGENT_CONF"); +#endif + return true; +} + +async_docker_metadata_source* +async_docker_metadata_source::new_async_docker_metadata_source(const bool query_image_info) +{ + async_docker_metadata_source* docker_metadata = nullptr; + +#if defined(CYGWING_AGENT) + docker_metadata = new async_windows_docker_metadata_source(query_image_info); +#else // !CYGWING_AGENT +# if defined(HAS_CAPTURE) + docker_metadata = new async_linux_docker_metadata_source(query_image_info); +# else // !HAS_CAPTURE + // TODO: Need to implement async_null_docker_metadata_source + // docker_metadata = new async_null_docker_metadata_source(); +# endif //HAS_CAPTURE +#endif // CYGWING_AGENT + + return docker_metadata; +} diff --git a/userspace/libsinsp/async_docker_metadata_source.h b/userspace/libsinsp/async_docker_metadata_source.h new file mode 100644 index 0000000000..cc1fe0cf4c --- /dev/null +++ b/userspace/libsinsp/async_docker_metadata_source.h @@ -0,0 +1,148 @@ +/* +Copyright (C) 2018 Sysdig, Inc. + +This file is part of sysdig. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ +#pragma once + +#include "async_key_value_source.h" +#include "sinsp.h" +#include "container.h" + +namespace sysdig +{ + +/** + * The value_type that an async_docker_metadata_source's lookup() method + * will produce. + */ +struct docker_metadata +{ + docker_metadata(): + m_manager(nullptr), + m_container_info() + { } + + docker_metadata(sinsp_container_manager* const manager, + std::shared_ptr& container_info): + m_manager(manager), + m_container_info(container_info) + { } + + sinsp_container_manager* m_manager; + std::shared_ptr m_container_info; +}; + +/** + * Interface to async_docker_metadata_source -- an abstract async_key_value_source + * for fetching docker metadata and metadata. + */ +class async_docker_metadata_source : public async_key_value_source +{ +public: + /** + * The maximum amount of time that a collected Docker metric will + * remain cached waiting for a subsequent call to lookup() before + * being discarded. + */ + const uint64_t MAX_TTL_MS = (30 * 1000); + + /** + * Returns the API version that this async_key_value_source will use to + * fetch information from Docker. + */ + const std::string& get_api_version() const; + + /** + * Returns true if this async_docker_metadata_source should query for + * image info, false otherwise. + */ + bool query_image_info() const; + + /** + * Update the query_image_info state for this async_docker_metadata_source. + */ + void set_query_image_info(bool query_info); + + /** + * Creates a new async_docker_metadata_source that is appropriate + * for the build environment (Linux/Windows/no-analyzer) + * + * @param[in] query_image_info If image information is missing, should + * we query Docker for it? + * + * Note that the caller is responsible for deleting the returned object. + */ + static async_docker_metadata_source* new_async_docker_metadata_source( + bool query_image_info); + +protected: + /** + * Initialize a new async_docker_metadata_source. + * + * @param[in] api_version The version of the Docker API to use. + * @param[in] query_image_info Should this componenty query Docker for + * missing image information? + */ + async_docker_metadata_source(const std::string& api_version, + bool query_image_info = true); + + /** + * Builds and returns a URL for querying Docker on the local host. + * This differs between Linux and Windows, so the concrete implementation + * is left to subclasses. + * + * @param[in] path The base path of the URL + */ + virtual std::string build_request(const std::string& path) = 0; + + /** + * Fetches the JSON from Docker using the given url. + * + * @param[in] manager Used to query container information + * @param[in] url The URL to query + * @param[out] json The fetched JSON + */ + virtual sinsp_docker_response get_docker(sinsp_container_manager* manager, + const std::string& url, + std::string &json) = 0; + + /** + * Parses the JSON returned from Dcoker and populates the given + * container with the information within. + * + * @param[in] manager Used to query container information + * @param[in,out] container The container information to populate + * + * @returns true on success, false otherwise. + */ + bool parse_docker(sinsp_container_manager* manager, + sinsp_container_info *container); + + /** + * Drives the asynchronous fetching of the information from docker. + * This method runs in the context of the thread associated with + * this async_docker_metadata_source. + */ + void run_impl() override; + +private: + bool m_query_image_info; + std::string m_api_version; +}; + +} // end namespace sysdig diff --git a/userspace/libsinsp/async_linux_docker_metadata_source.cpp b/userspace/libsinsp/async_linux_docker_metadata_source.cpp new file mode 100644 index 0000000000..c8a2758de7 --- /dev/null +++ b/userspace/libsinsp/async_linux_docker_metadata_source.cpp @@ -0,0 +1,96 @@ +/* +Copyright (C) 2018 Sysdig, Inc. + +This file is part of sysdig. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ +#include "async_linux_docker_metadata_source.h" +#include "sinsp_int.h" +#include "logger.h" + +#if defined(LOCAL_DEBUG) +# include +# define LOG(fmt, ...) fprintf(stderr, "[%s]:%d: " fmt "\n", __FUNCTION__, __LINE__, ##__VA_ARGS__) +#else +# define LOG(fmt, ...) do { } while(false) +#endif + +using namespace sysdig; + +const std::string async_linux_docker_metadata_source::DEFAULT_DOCKER_SOCKET_PATH = "/var/run/docker.sock"; +const std::string async_linux_docker_metadata_source::DEFAULT_API_VERSION = "/v1.24"; + +async_linux_docker_metadata_source::async_linux_docker_metadata_source( + const bool query_image_info, + const std::string& socket_path, + const std::string& api_version): + async_docker_metadata_source(api_version, query_image_info) + , m_unix_socket_path(scap_get_host_root() + socket_path) +#if defined(HAS_CAPTURE) + , m_url_fetcher(url_fetcher::new_fetcher(m_unix_socket_path)) +#endif +{ } + +async_linux_docker_metadata_source::~async_linux_docker_metadata_source() +{ + stop(); +} + +const std::string& async_linux_docker_metadata_source::get_socket_path() const +{ + return m_unix_socket_path; +} + +std::string async_linux_docker_metadata_source::build_request(const std::string& path) +{ + return "http://localhost" + get_api_version() + path; +} + +sinsp_docker_response async_linux_docker_metadata_source::get_docker( + sinsp_container_manager* const, + const std::string& url, + std::string &json) +{ + sinsp_docker_response response = sinsp_docker_response::RESP_ERROR; + +#if defined(HAS_CAPTURE) + try + { + LOG("url: %s", url.c_str()); + + const int http_code = m_url_fetcher->fetch(url, json); + + if(http_code == 200) + { + response = sinsp_docker_response::RESP_OK; + } + else + { + g_logger.log("http_code: " + std::to_string(http_code), + sinsp_logger::SEV_WARNING); + response = sinsp_docker_response::RESP_BAD_REQUEST; + } + } + catch(const std::exception& ex) + { + g_logger.log(std::string("Failed to fetch URL: ") + ex.what(), + sinsp_logger::SEV_WARNING); + ASSERT(false); + response = sinsp_docker_response::RESP_ERROR; + } +#endif // HAS_CAPTURE + + return response; +} diff --git a/userspace/libsinsp/async_linux_docker_metadata_source.h b/userspace/libsinsp/async_linux_docker_metadata_source.h new file mode 100644 index 0000000000..48b6590dba --- /dev/null +++ b/userspace/libsinsp/async_linux_docker_metadata_source.h @@ -0,0 +1,64 @@ +/* +Copyright (C) 2018 Sysdig, Inc. + +This file is part of sysdig. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ +#pragma once + +#include "async_docker_metadata_source.h" + +#if defined(HAS_CAPTURE) +# include "url_fetcher.h" +#endif + +namespace sysdig +{ + +/** + * Interface to async_linux_docker_metadata_source -- a concrete + * async_docker_metadata_source for fetching docker metadata and metadata + * on Linux. + */ +class async_linux_docker_metadata_source : public async_docker_metadata_source +{ +public: + const static std::string DEFAULT_DOCKER_SOCKET_PATH; + const static std::string DEFAULT_API_VERSION; + + async_linux_docker_metadata_source( + bool query_image_information = true, + const std::string& socket_path = DEFAULT_DOCKER_SOCKET_PATH, + const std::string& api_version = DEFAULT_API_VERSION); + + ~async_linux_docker_metadata_source(); + + const std::string& get_socket_path() const; + +protected: + std::string build_request(const std::string& path) override; + sinsp_docker_response get_docker(sinsp_container_manager* manager, + const std::string& url, + std::string &json) override; + +private: + std::string m_unix_socket_path; + +#if defined(HAS_CAPTURE) + url_fetcher::ptr m_url_fetcher; +#endif +}; + +} diff --git a/userspace/libsinsp/async_windows_docker_metadata_source.cpp b/userspace/libsinsp/async_windows_docker_metadata_source.cpp new file mode 100644 index 0000000000..c9c98d09f0 --- /dev/null +++ b/userspace/libsinsp/async_windows_docker_metadata_source.cpp @@ -0,0 +1,70 @@ +/* +Copyright (C) 2018 Sysdig, Inc. + +This file is part of sysdig. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ +#include "async_windows_docker_metadata_source.h" +#include "sinsp_int.h" +#include "logger.h" + +using namespace sysdig; + +const std::string async_windows_docker_metadata_source::DEFAULT_API_VERSION = "/v1.30"; + +async_windows_docker_metadata_source::async_windows_docker_metadata_source( + const std::string& api_version, + const bool query_image_info): + async_docker_metadata_source(api_version, query_image_info) +{ } + +std::string async_windows_docker_metadata_source::build_request(const std::string& path) +{ + return "GET " + get_api_version() + path + " HTTP/1.1\r\nHost: docker\r\n\r\n"; +} + +sinsp_docker_response async_windows_docker_metadata_source::get_docker( + sinsp_container_manager* const manager, + const std::string& url, + std::string &json) +{ + const char* response = nullptr; + + const bool qdres = wh_query_docker(manager->get_inspector()->get_wmi_handle(), + const_cast(url.c_str()), + &response); + + if(!qdres) + { + ASSERT(false); + return sinsp_docker_response::RESP_ERROR; + } + + json = response; + if(strncmp(json.c_str(), "HTTP/1.0 200 OK", sizeof("HTTP/1.0 200 OK") - 1)) + { + return sinsp_docker_response::RESP_BAD_REQUEST; + } + + size_t pos = json.find("{"); + if(pos == std::string::npos) + { + ASSERT(false); + return sinsp_docker_response::RESP_ERROR; + } + json = json.substr(pos); + + return sinsp_docker_response::RESP_OK; +} diff --git a/userspace/libsinsp/async_windows_docker_metadata_source.h b/userspace/libsinsp/async_windows_docker_metadata_source.h new file mode 100644 index 0000000000..322df84ad8 --- /dev/null +++ b/userspace/libsinsp/async_windows_docker_metadata_source.h @@ -0,0 +1,47 @@ +/* +Copyright (C) 2018 Sysdig, Inc. + +This file is part of sysdig. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ +#pragma once + +#include "async_docker_metadata_source.h" + +namespace sysdig +{ + +/** + * Interface to async_windows_docker_metadata_source -- a concrete + * async_docker_metadata_source for fetching docker metadata and metadata + * on Windows. + */ +class async_windows_docker_metadata_source : public async_docker_metadata_source +{ +public: + const static std::string DEFAULT_API_VERSION; + + async_windows_docker_metadata_source( + const std::string& api_version = DEFAULT_API_VERSION, + bool query_image_info = true); + +protected: + std::string build_request(const std::string& path) override; + sinsp_docker_response get_docker(sinsp_container_manager* manager, + const std::string& url, + std::string &json) override; +}; + +} diff --git a/userspace/libsinsp/container.cpp b/userspace/libsinsp/container.cpp index 8d1a129a1a..f83951154d 100644 --- a/userspace/libsinsp/container.cpp +++ b/userspace/libsinsp/container.cpp @@ -23,6 +23,7 @@ limitations under the License. #include #endif +#include "async_docker_metadata_source.h" #include "sinsp.h" #include "sinsp_int.h" #include "container.h" @@ -31,6 +32,10 @@ limitations under the License. #include "dragent_win_hal_public.h" #endif +#include + +using namespace sysdig; + void sinsp_container_info::parse_json_mounts(const Json::Value &mnt_obj, vector &mounts) { if(!mnt_obj.isNull() && mnt_obj.isArray()) @@ -83,317 +88,132 @@ const sinsp_container_info::container_mount_info *sinsp_container_info::mount_by return NULL; } -#if !defined(CYGWING_AGENT) && defined(HAS_CAPTURE) -CURLM *sinsp_container_engine_docker::m_curlm = NULL; -CURL *sinsp_container_engine_docker::m_curl = NULL; -#endif - -bool sinsp_container_engine_docker::m_query_image_info = true; - -sinsp_container_engine_docker::sinsp_container_engine_docker() : - m_unix_socket_path(string(scap_get_host_root()) + "/var/run/docker.sock"), - m_api_version("/v1.24") +namespace { -#if !defined(CYGWING_AGENT) && defined(HAS_CAPTURE) - if(!m_curlm) - { - m_curl = curl_easy_init(); - m_curlm = curl_multi_init(); - if(m_curlm) - { - curl_multi_setopt(m_curlm, CURLMOPT_PIPELINING, CURLPIPE_HTTP1|CURLPIPE_MULTIPLEX); - } - - if(m_curl) - { - curl_easy_setopt(m_curl, CURLOPT_UNIX_SOCKET_PATH, m_unix_socket_path.c_str()); - curl_easy_setopt(m_curl, CURLOPT_HTTPGET, 1); - curl_easy_setopt(m_curl, CURLOPT_FOLLOWLOCATION, 1); - curl_easy_setopt(m_curl, CURLOPT_WRITEFUNCTION, curl_write_callback); - } - } -#endif -} - -void sinsp_container_engine_docker::cleanup() +std::ostream& operator<<(std::ostream& out, const sinsp_container_info::container_mount_info& info) { -#if !defined(CYGWING_AGENT) && defined(HAS_CAPTURE) - curl_easy_cleanup(m_curl); - m_curl = NULL; - curl_multi_cleanup(m_curlm); - m_curlm = NULL; -#endif + out << info.to_string(); + return out; } -void sinsp_container_engine_docker::set_query_image_info(bool query_image_info) +std::ostream& operator<<(std::ostream& out, const sinsp_container_info::container_port_mapping& info) { - m_query_image_info = query_image_info; + out << info.to_string(); + return out; } -#if !defined(CYGWING_AGENT) && defined(HAS_CAPTURE) -size_t sinsp_container_engine_docker::curl_write_callback(const char* ptr, size_t size, size_t nmemb, string* json) +template +std::string iterable_to_string(const Iterable& itr) { - const std::size_t total = size * nmemb; - json->append(ptr, total); - return total; -} -#endif + std::stringstream out; + auto i = itr.begin(); -bool sinsp_container_engine_docker::parse_docker(sinsp_container_manager* manager, sinsp_container_info *container, sinsp_threadinfo* tinfo) -{ - string json; -#ifndef CYGWING_AGENT - sinsp_docker_response resp = get_docker(manager, "http://localhost" + m_api_version + "/containers/" + container->m_id + "/json", json); -#else - sinsp_docker_response resp = get_docker(manager, "GET /v1.30/containers/" + container->m_id + "/json HTTP/1.1\r\nHost: docker\r\n\r\n", json); -#endif - switch(resp) { - case sinsp_docker_response::RESP_BAD_REQUEST: - m_api_version = ""; - json = ""; -#ifndef CYGWING_AGENT - resp = get_docker(manager, "http://localhost/containers/" + container->m_id + "/json", json); -#else - resp = get_docker(manager, "GET /containers/" + container->m_id + "/json HTTP/1.1\r\nHost: docker\r\n\r\n", json); -#endif - if (resp == sinsp_docker_response::RESP_OK) - { - break; - } - /* FALLTHRU */ - case sinsp_docker_response::RESP_ERROR: - ASSERT(false); - return false; - - case sinsp_docker_response::RESP_OK: - break; - } - - Json::Value root; - Json::Reader reader; - bool parsingSuccessful = reader.parse(json, root); - if(!parsingSuccessful) - { - ASSERT(false); - return false; - } - - const Json::Value& config_obj = root["Config"]; + out << "["; - container->m_image = config_obj["Image"].asString(); - - string imgstr = root["Image"].asString(); - size_t cpos = imgstr.find(":"); - if(cpos != string::npos) - { - container->m_imageid = imgstr.substr(cpos + 1); - } - - // containers can be spawned using just the imageID as image name, - // with or without the hash prefix (e.g. sha256:) - bool no_name = !container->m_imageid.empty() && - strncmp(container->m_image.c_str(), container->m_imageid.c_str(), - MIN(container->m_image.length(), container->m_imageid.length())) == 0; - no_name |= !imgstr.empty() && - strncmp(container->m_image.c_str(), imgstr.c_str(), - MIN(container->m_image.length(), imgstr.length())) == 0; - - if(!no_name || !m_query_image_info) + if (i != itr.end()) { - string hostname, port; - sinsp_utils::split_container_image(container->m_image, - hostname, - port, - container->m_imagerepo, - container->m_imagetag, - container->m_imagedigest, - false); - } + out << *i; - if(m_query_image_info && !container->m_imageid.empty() && - (no_name || container->m_imagedigest.empty() || (!container->m_imagedigest.empty() && container->m_imagetag.empty()))) - { - string img_json; -#ifndef CYGWING_AGENT - if(get_docker(manager, "http://localhost" + m_api_version + "/images/" + container->m_imageid + "/json?digests=1", img_json) == sinsp_docker_response::RESP_OK) -#else - if(get_docker(manager, "GET /v1.30/images/" + container->m_imageid + "/json?digests=1 HTTP/1.1\r\nHost: docker \r\n\r\n", img_json) == sinsp_docker_response::RESP_OK) -#endif + for(++i; i != itr.end(); ++i) { - Json::Value img_root; - if(reader.parse(img_json, img_root)) - { - for(const auto& rdig : img_root["RepoDigests"]) - { - if(rdig.isString()) - { - string repodigest = rdig.asString(); - if(container->m_imagerepo.empty()) - { - container->m_imagerepo = repodigest.substr(0, repodigest.find("@")); - } - if(repodigest.find(container->m_imagerepo) != string::npos) - { - container->m_imagedigest = repodigest.substr(repodigest.find("@")+1); - break; - } - } - } - for(const auto& rtag : img_root["RepoTags"]) - { - if(rtag.isString()) - { - string repotag = rtag.asString(); - if(container->m_imagerepo.empty()) - { - container->m_imagerepo = repotag.substr(0, repotag.rfind(":")); - } - if(repotag.find(container->m_imagerepo) != string::npos) - { - container->m_imagetag = repotag.substr(repotag.rfind(":")+1); - break; - } - } - } - } + out << ", " << *i; } } - if(container->m_imagetag.empty()) - { - container->m_imagetag = "latest"; - } - container->m_name = root["Name"].asString(); + out << "]"; - if(!container->m_name.empty() && container->m_name[0] == '/') - { - container->m_name = container->m_name.substr(1); - } + return out.str(); +} - const Json::Value& net_obj = root["NetworkSettings"]; +template +std::string map_to_string(const std::map& target) +{ + std::stringstream out; + auto i = target.begin(); - string ip = net_obj["IPAddress"].asString(); - if(ip.empty()) - { - const Json::Value& hconfig_obj = root["HostConfig"]; - string net_mode = hconfig_obj["NetworkMode"].asString(); - if(strncmp(net_mode.c_str(), "container:", strlen("container:")) == 0) - { - std::string container_id = net_mode.substr(net_mode.find(":") + 1); - uint32_t container_ip; - const sinsp_container_info *container_info = manager->get_container(container_id); - if(container_info) - { - container_ip = container_info->m_container_ip; - } - else - { - sinsp_container_info pcnt; - pcnt.m_id = container_id; - parse_docker(manager, &pcnt, tinfo); - container_ip = pcnt.m_container_ip; - } - container->m_container_ip = container_ip; - } - } - else + out << "{"; + + if (i != target.end()) { - if(inet_pton(AF_INET, ip.c_str(), &container->m_container_ip) == -1) + out << i->first << ":" << i->second; + + for(++i; i != target.end(); ++i) { - ASSERT(false); + out << ", " << i->first << ":" << i->second; } - container->m_container_ip = ntohl(container->m_container_ip); } - vector ports = net_obj["Ports"].getMemberNames(); - for(vector::const_iterator it = ports.begin(); it != ports.end(); ++it) - { - size_t tcp_pos = it->find("/tcp"); - if(tcp_pos == string::npos) - { - continue; - } + out << "}"; - uint16_t container_port = atoi(it->c_str()); + return out.str(); +} - const Json::Value& v = net_obj["Ports"][*it]; - if(v.isArray()) - { - for(uint32_t j = 0; j < v.size(); ++j) - { - sinsp_container_info::container_port_mapping port_mapping; +} // end namespace - ip = v[j]["HostIp"].asString(); - string port = v[j]["HostPort"].asString(); +std::string sinsp_container_info::to_string() const +{ + std::stringstream out; + + out << "container_info:" << std::endl; + + out << "m_id: " << m_id << std::endl; + out << "m_type: " << static_cast(m_type) << std::endl; + out << "m_name: " << m_name << std::endl; + out << "m_image: " << m_image << std::endl; + out << "m_imageid: " << m_imageid << std::endl; + out << "m_imagerepo: " << m_imagerepo << std::endl; + out << "m_imagetag: " << m_imagetag << std::endl; + out << "m_imagedigest: " << m_imagedigest << std::endl; + out << "m_container_ip: " << m_container_ip << std::endl; + out << "m_privileged: " << m_privileged << std::endl; + out << "m_mounts: " << ::iterable_to_string(m_mounts) << std::endl; + out << "m_port_mappings: " << ::iterable_to_string(m_port_mappings) << std::endl; + out << "m_labels: " << ::map_to_string(m_labels) << std::endl; + out << "m_env: " << ::iterable_to_string(m_env) << std::endl; + out << "m_mesos_task_id: " << m_mesos_task_id << std::endl; + out << "m_memory_limit: " << m_memory_limit << std::endl; + out << "m_swap_limit: " << m_swap_limit << std::endl; + out << "m_cpu_shares: " << m_cpu_shares << std::endl; + out << "m_cpu_quota: " << m_cpu_quota << std::endl; + out << "m_cpu_period: " << m_cpu_period << std::endl; +#ifdef HAS_ANALYZER + out << "m_sysdig_agent_conf: " << m_sysdig_agent_conf << std::endl; + out << "m_metadata_deadline: " << m_metadata_deadline << std::endl; +#endif + return out.str(); +} - if(inet_pton(AF_INET, ip.c_str(), &port_mapping.m_host_ip) == -1) - { - ASSERT(false); - continue; - } - port_mapping.m_host_ip = ntohl(port_mapping.m_host_ip); +std::unique_ptr s_docker_metadata; - port_mapping.m_container_port = container_port; - port_mapping.m_host_port = atoi(port.c_str()); - container->m_port_mappings.push_back(port_mapping); - } - } - } - vector labels = config_obj["Labels"].getMemberNames(); - for(vector::const_iterator it = labels.begin(); it != labels.end(); ++it) +bool sinsp_container_engine_docker::m_query_image_info = true; + +sinsp_container_engine_docker::sinsp_container_engine_docker() +{ + if(!s_docker_metadata) { - string val = config_obj["Labels"][*it].asString(); - container->m_labels[*it] = val; + s_docker_metadata.reset( + async_docker_metadata_source::new_async_docker_metadata_source(m_query_image_info)); } - const Json::Value& env_vars = config_obj["Env"]; +} - for(const auto& env_var : env_vars) - { - if(env_var.isString()) - { - container->m_env.emplace_back(env_var.asString()); - } - } -#ifndef CYGWING_AGENT - if (sinsp_container_engine_mesos::set_mesos_task_id(container, tinfo)) - { - g_logger.log("Mesos Docker container: [" + root["Id"].asString() + "], Mesos task ID: [" + container->m_mesos_task_id + ']', sinsp_logger::SEV_DEBUG); - } -#endif +void sinsp_container_engine_docker::cleanup() +{ + s_docker_metadata.reset(); +} - const auto& host_config_obj = root["HostConfig"]; - container->m_memory_limit = host_config_obj["Memory"].asInt64(); - container->m_swap_limit = host_config_obj["MemorySwap"].asInt64(); - const auto cpu_shares = host_config_obj["CpuShares"].asInt64(); - if(cpu_shares > 0) - { - container->m_cpu_shares = cpu_shares; - } - container->m_cpu_quota = host_config_obj["CpuQuota"].asInt64(); - const auto cpu_period = host_config_obj["CpuPeriod"].asInt64(); - if(cpu_period > 0) - { - container->m_cpu_period = cpu_period; - } - const Json::Value &privileged = host_config_obj["Privileged"]; - if(!privileged.isNull() && privileged.isBool()) +void sinsp_container_engine_docker::set_query_image_info(const bool query_image_info) +{ + m_query_image_info = query_image_info; + + if(s_docker_metadata.get() != nullptr) { - container->m_privileged = privileged.asBool(); + s_docker_metadata->set_query_image_info(m_query_image_info); } - - sinsp_container_info::parse_json_mounts(root["Mounts"], container->m_mounts); - -#ifdef HAS_ANALYZER - sinsp_utils::find_env(container->m_sysdig_agent_conf, container->get_env(), "SYSDIG_AGENT_CONF"); - // container->m_sysdig_agent_conf = get_docker_env(env_vars, "SYSDIG_AGENT_CONF"); -#endif - return true; } - #ifdef CYGWING_AGENT bool sinsp_container_engine_docker::resolve(sinsp_container_manager* manager, sinsp_threadinfo* tinfo, bool query_os_for_missing_info) { @@ -421,39 +241,11 @@ bool sinsp_container_engine_docker::resolve(sinsp_container_manager* manager, si return true; } -sinsp_docker_response sinsp_container_engine_docker::get_docker(sinsp_container_manager* manager, const string& url, string &json) -{ - const char* response = NULL; - bool qdres = wh_query_docker(manager->get_inspector()->get_wmi_handle(), - (char*)url.c_str(), - &response); - if(qdres == false) - { - ASSERT(false); - return sinsp_docker_response::RESP_ERROR; - } - - json = response; - if(strncmp(json.c_str(), "HTTP/1.0 200 OK", sizeof("HTTP/1.0 200 OK") -1)) - { - return sinsp_docker_response::RESP_BAD_REQUEST; - } - - size_t pos = json.find("{"); - if(pos == string::npos) - { - ASSERT(false); - return sinsp_docker_response::RESP_ERROR; - } - json = json.substr(pos); - - return sinsp_docker_response::RESP_OK; -} #else bool sinsp_container_engine_docker::resolve(sinsp_container_manager* manager, sinsp_threadinfo* tinfo, bool query_os_for_missing_info) { - sinsp_container_info container_info; + std::shared_ptr container_info(new sinsp_container_info()); bool matches = false; for(auto it = tinfo->m_cgroups.begin(); it != tinfo->m_cgroups.end(); ++it) @@ -470,8 +262,8 @@ bool sinsp_container_engine_docker::resolve(sinsp_container_manager* manager, si if(cgroup.length() - pos - 1 == 64 && cgroup.find_first_not_of("0123456789abcdefABCDEF", pos + 1) == string::npos) { - container_info.m_type = CT_DOCKER; - container_info.m_id = cgroup.substr(pos + 1, 12); + container_info->m_type = CT_DOCKER; + container_info->m_id = cgroup.substr(pos + 1, 12); matches = true; break; } @@ -487,8 +279,8 @@ bool sinsp_container_engine_docker::resolve(sinsp_container_manager* manager, si if(pos2 != string::npos && pos2 - pos - sizeof("docker-") + 1 == 64) { - container_info.m_type = CT_DOCKER; - container_info.m_id = cgroup.substr(pos + sizeof("docker-") - 1, 12); + container_info->m_type = CT_DOCKER; + container_info->m_id = cgroup.substr(pos + sizeof("docker-") - 1, 12); matches = true; break; } @@ -498,88 +290,55 @@ bool sinsp_container_engine_docker::resolve(sinsp_container_manager* manager, si if (!matches) return false; - tinfo->m_container_id = container_info.m_id; - if (!manager->container_exists(container_info.m_id)) + tinfo->m_container_id = container_info->m_id; + if (!manager->container_exists(container_info->m_id)) { + g_logger.log("resolve: container with id " + container_info->m_id + " does not exist"); #ifndef _WIN32 + g_logger.log("resolve: query_os_for_missing_info: " + std::to_string(query_os_for_missing_info)); if (query_os_for_missing_info) { - parse_docker(manager, &container_info, tinfo); - } -#endif - manager->add_container(container_info, tinfo); - manager->notify_new_container(container_info); - } - return true; -} + docker_metadata metadata(manager, container_info); -sinsp_docker_response sinsp_container_engine_docker::get_docker(sinsp_container_manager* manager, const string& url, string &json) -{ -#ifdef HAS_CAPTURE - if(curl_easy_setopt(m_curl, CURLOPT_URL, url.c_str()) != CURLE_OK) - { - ASSERT(false); - return sinsp_docker_response::RESP_ERROR; - } - if(curl_easy_setopt(m_curl, CURLOPT_WRITEDATA, &json) != CURLE_OK) - { - ASSERT(false); - return sinsp_docker_response::RESP_ERROR; - } + // TODO: This will need to eventually change when we + // want to report partial information to the + // backend. + if(s_docker_metadata->lookup(tinfo->m_container_id, metadata)) + { +#ifndef CYGWING_AGENT + if(sinsp_container_engine_mesos::set_mesos_task_id( + metadata.m_container_info.get(), + tinfo)) + { + g_logger.log("Mesos Docker container: [" + + metadata.m_container_info->m_id + + "], Mesos task ID: [" + + metadata.m_container_info->m_mesos_task_id + + ']', sinsp_logger::SEV_DEBUG); + } +#endif - if(curl_multi_add_handle(m_curlm, m_curl) != CURLM_OK) - { - ASSERT(false); - return sinsp_docker_response::RESP_ERROR; - } + manager->add_container(*metadata.m_container_info, + tinfo); + manager->notify_new_container(*metadata.m_container_info); - while(true) - { - int still_running; - CURLMcode res = curl_multi_perform(m_curlm, &still_running); - if(res != CURLM_OK) - { - ASSERT(false); - return sinsp_docker_response::RESP_ERROR; - } + return true; + } - if(still_running == 0) - { - break; + return false; } +#endif - int numfds; - res = curl_multi_wait(m_curlm, NULL, 0, -1, &numfds); - if(res != CURLM_OK) + if (!query_os_for_missing_info) { - ASSERT(false); - return sinsp_docker_response::RESP_ERROR; + manager->add_container(*container_info, tinfo); + manager->notify_new_container(*container_info); } } - - if(curl_multi_remove_handle(m_curlm, m_curl) != CURLM_OK) - { - ASSERT(false); - return sinsp_docker_response::RESP_ERROR; - } - - long http_code = 0; - if(curl_easy_getinfo(m_curl, CURLINFO_RESPONSE_CODE, &http_code) != CURLE_OK) - { - ASSERT(false); - return sinsp_docker_response::RESP_ERROR; - } - if(http_code != 200) - { - return sinsp_docker_response::RESP_BAD_REQUEST; - } - - return sinsp_docker_response::RESP_OK; -#else - return sinsp_docker_response::RESP_ERROR; -#endif + return true; } + bool sinsp_container_engine_lxc::resolve(sinsp_container_manager* manager, sinsp_threadinfo* tinfo, bool query_os_for_missing_info) { sinsp_container_info container_info; diff --git a/userspace/libsinsp/container.h b/userspace/libsinsp/container.h index a7130d1880..0126cf81af 100644 --- a/userspace/libsinsp/container.h +++ b/userspace/libsinsp/container.h @@ -60,6 +60,13 @@ class sinsp_container_info uint32_t m_host_ip; uint16_t m_host_port; uint16_t m_container_port; + + std::string to_string() const + { + return std::to_string(m_host_ip) + ":" + + std::to_string(m_host_port) + "->" + + std::to_string(m_container_port); + } }; class container_mount_info @@ -135,6 +142,8 @@ class sinsp_container_info const container_mount_info *mount_by_source(std::string &source) const; const container_mount_info *mount_by_dest(std::string &dest) const; + std::string to_string() const; + string m_id; sinsp_container_type m_type; string m_name; @@ -173,20 +182,9 @@ class sinsp_container_engine_docker bool resolve(sinsp_container_manager* manager, sinsp_threadinfo* tinfo, bool query_os_for_missing_info); static void cleanup(); static void set_query_image_info(bool query_image_info); -protected: -#if !defined(CYGWING_AGENT) && defined(HAS_CAPTURE) - static size_t curl_write_callback(const char* ptr, size_t size, size_t nmemb, string* json); -#endif - sinsp_docker_response get_docker(sinsp_container_manager* manager, const string& url, string &json); - bool parse_docker(sinsp_container_manager* manager, sinsp_container_info *container, sinsp_threadinfo* tinfo); - string m_unix_socket_path; - string m_api_version; +protected: static bool m_query_image_info; -#if !defined(CYGWING_AGENT) && defined(HAS_CAPTURE) - static CURLM *m_curlm; - static CURL *m_curl; -#endif }; #ifndef CYGWING_AGENT diff --git a/userspace/libsinsp/curl_url_fetcher.cpp b/userspace/libsinsp/curl_url_fetcher.cpp new file mode 100644 index 0000000000..ed099a2e64 --- /dev/null +++ b/userspace/libsinsp/curl_url_fetcher.cpp @@ -0,0 +1,257 @@ +/* +Copyright (C) 2018 Sysdig, Inc. + +This file is part of sysdig. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ +#include "curl_url_fetcher.h" +#include +#include + +using namespace sysdig; + +namespace +{ + +/** + * An exception that curl_url_fetcher can throw to indicate errors. + */ +class curl_url_fetcher_exception : public std::exception +{ +public: + curl_url_fetcher_exception(const std::string& message): + m_message("curl_url_fetcher_exception: " + message) + { } + + const char* what() const noexcept override + { + return m_message.c_str(); + } + +private: + std::string m_message; +}; + +/** + * Handles potentially partial buffer writes from libcurl. This function + * will append each chunk to the given body. + */ +size_t write_callback(const char* const ptr, + const size_t size, + const size_t nmemb, + std::stringstream* const body) +{ + const std::size_t total = size * nmemb; + + body->write(ptr, total); + + return total; +} + +/** + * Wrapper over curl_easy_strerror() that returns a std::string + */ +std::string easy_strerror(const CURLcode ecode) +{ + return curl_easy_strerror(ecode); +} + +/** + * Wrapper over curl_multi_strerror() that returns a std::string + */ +std::string multi_strerror(const CURLMcode mcode) +{ + return curl_multi_strerror(mcode); +} + +/** + * Wrapper over curl_easy_setopt() that throws a curl_url_fetcher_exception + * if the operation fails. + */ +template +void easy_setopt(CURL* const handle, const CURLoption option, param_type param) +{ + const CURLcode code = curl_easy_setopt(handle, option, param); + + if(code != CURLE_OK) + { + throw curl_url_fetcher_exception("Failed to set option: " + + easy_strerror(code)); + } +} + +/** + * A RAII component to handles adding/removing curl multi handles. + * This will ensure that curl_multi_remove_handle() is called before objects + * of this type go out of scope. + */ +class scoped_curl_multi_handle +{ +public: + scoped_curl_multi_handle(CURLM* const curlm, CURL* const curl): + m_curl(curl), + m_curlm(curlm), + m_added(false) + { + const CURLMcode code = curl_multi_add_handle(m_curlm, m_curl); + + if(code != CURLM_OK) + { + throw curl_url_fetcher_exception( + "Failed to add multi handler: " + + multi_strerror(code)); + } + m_added = true; + } + + ~scoped_curl_multi_handle() + { + try + { + remove(); + } + catch(...) + { + } + } + + void remove() + { + if(m_added) + { + const CURLMcode code = curl_multi_remove_handle(m_curlm, + m_curl); + + if(code != CURLM_OK) + { + throw curl_url_fetcher_exception( + "Failed curl_multi_remove_handle: " + + multi_strerror(code)); + } + + m_added = false; + } + } + +private: + CURL* const m_curl; + CURLM* const m_curlm; + bool m_added; +}; + +} // end namespace + +struct curl_url_fetcher::impl +{ + impl(CURL* const curl, + CURLM* const curlm, + const std::string socket_path = ""): + m_curl(curl), + m_curlm(curlm), + m_socket_path(socket_path) + { } + + CURL* const m_curl; + CURLM* const m_curlm; + const std::string m_socket_path; +}; + +curl_url_fetcher::curl_url_fetcher(): + curl_url_fetcher("") +{ } + +curl_url_fetcher::curl_url_fetcher(const std::string& socket_filename): + m_impl(new impl(curl_easy_init(), curl_multi_init(), socket_filename)) +{ + if(!m_impl->m_socket_path.empty()) + { + easy_setopt(m_impl->m_curl, + CURLOPT_UNIX_SOCKET_PATH, + m_impl->m_socket_path.c_str()); + } + easy_setopt(m_impl->m_curl, CURLOPT_HTTPGET, 1); + easy_setopt(m_impl->m_curl, CURLOPT_FOLLOWLOCATION, 1); + easy_setopt(m_impl->m_curl, CURLOPT_WRITEFUNCTION, write_callback); +} + +curl_url_fetcher::~curl_url_fetcher() +{ + curl_multi_cleanup(m_impl->m_curlm); + curl_easy_cleanup(m_impl->m_curl); +} + +int curl_url_fetcher::fetch(const std::string& url, std::string& body) +{ + std::stringstream out; + CURLcode ecode = CURLE_OK; + CURLMcode mcode = CURLM_OK; + + easy_setopt(m_impl->m_curl, CURLOPT_URL, url.c_str()); + easy_setopt(m_impl->m_curl, CURLOPT_WRITEDATA, &out); + + scoped_curl_multi_handle multi_handle(m_impl->m_curlm, m_impl->m_curl); + + for(;;) + { + int still_running = 42; + + mcode = curl_multi_perform(m_impl->m_curlm, &still_running); + if(mcode != CURLM_OK) + { + throw curl_url_fetcher_exception( + "Failed curl_multi_perform: " + + multi_strerror(mcode)); + } + + if(still_running == 0) + { + break; + } + + int numfds = 0; + mcode = curl_multi_wait(m_impl->m_curlm, NULL, 0, -1, &numfds); + if(mcode != CURLM_OK) + { + throw curl_url_fetcher_exception( + "Failed curl_multi_wait: " + + multi_strerror(mcode)); + } + } + + multi_handle.remove(); + + long http_code = 0; + ecode = curl_easy_getinfo(m_impl->m_curl, CURLINFO_RESPONSE_CODE, &http_code); + if(ecode != CURLE_OK) + { + throw curl_url_fetcher_exception( + "Failed to get response code: " + + easy_strerror(ecode)); + } + + body = out.str(); + + return http_code; +} + +bool curl_url_fetcher::is_tcp() const +{ + return m_impl->m_socket_path.empty(); +} + +const std::string& curl_url_fetcher::get_socket_path() const +{ + return m_impl->m_socket_path; +} diff --git a/userspace/libsinsp/curl_url_fetcher.h b/userspace/libsinsp/curl_url_fetcher.h new file mode 100644 index 0000000000..9716e5d5d1 --- /dev/null +++ b/userspace/libsinsp/curl_url_fetcher.h @@ -0,0 +1,74 @@ +/* +Copyright (C) 2018 Sysdig, Inc. + +This file is part of sysdig. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ +#pragma once + +#include "url_fetcher.h" +#include + +namespace sysdig +{ + +/** + * A concrete url_fetcher implemented using libcurl. + */ +class curl_url_fetcher : public url_fetcher +{ +public: + /** + * Initialize this curl_url_fetcher for fetching URLs via TCP. + */ + curl_url_fetcher(); + + /** + * Initialize this curl_url_fetcher for fetching URLs via UNIX + * domain sockets. + */ + curl_url_fetcher(const std::string& socket_filename); + + virtual ~curl_url_fetcher(); + + /** + * Fetch the given url and return its body in the given body. + * + * @param[in] url The URL to fetch + * @param[out] body On success, the body of the requested URL. + * + * @returns the HTTP status code returned by the far-end + */ + int fetch(const std::string& url, std::string& body) override; + + /** + * Returns true if this curl_url_fetcher will fetch URLs via TCP, + * false otherwise. + */ + bool is_tcp() const; + + /** + * Returns the UNIX domain socket that this curl_url_fetcher will + * use for requests. This method is meaningful only if is_tcp() + * returns false. + */ + const std::string& get_socket_path() const; + +private: + struct impl; + std::unique_ptr m_impl; +}; + +} // end namespace sysdig diff --git a/userspace/libsinsp/event.h b/userspace/libsinsp/event.h index 2916b8c326..1ec8ed49c6 100644 --- a/userspace/libsinsp/event.h +++ b/userspace/libsinsp/event.h @@ -18,6 +18,7 @@ limitations under the License. */ #pragma once +#include "sinsp_public.h" #include #ifndef VISIBILITY_PRIVATE diff --git a/userspace/libsinsp/ifinfo.h b/userspace/libsinsp/ifinfo.h index 1f77e5ae9d..355c5bf3fe 100644 --- a/userspace/libsinsp/ifinfo.h +++ b/userspace/libsinsp/ifinfo.h @@ -93,4 +93,4 @@ void sinsp_network_interfaces::clear() { m_ipv4_interfaces.clear(); m_ipv6_interfaces.clear(); -} \ No newline at end of file +} diff --git a/userspace/libsinsp/logger.h b/userspace/libsinsp/logger.h index e444e53d80..2e86733b3e 100644 --- a/userspace/libsinsp/logger.h +++ b/userspace/libsinsp/logger.h @@ -19,6 +19,8 @@ limitations under the License. #pragma once +#include "sinsp_public.h" + /////////////////////////////////////////////////////////////////////////////// // The logger class /////////////////////////////////////////////////////////////////////////////// @@ -78,7 +80,7 @@ class SINSP_PUBLIC sinsp_logger void set_log_output_type(sinsp_logger::output_type log_output_type); void add_stdout_log(); void add_stderr_log(); - void add_file_log(string filename); + void add_file_log(std::string filename); void add_file_log(FILE* f); void add_callback_log(sinsp_logger_callback callback); void remove_callback_log(); @@ -86,8 +88,8 @@ class SINSP_PUBLIC sinsp_logger void set_severity(severity sev); severity get_severity() const; - void log(string msg, severity sev=SEV_INFO); - void log(string msg, event_severity sev); + void log(std::string msg, severity sev=SEV_INFO); + void log(std::string msg, event_severity sev); // Log functions that accept printf syntax and return the formatted buffer. char* format(severity sev, const char* fmt, ...); @@ -104,6 +106,8 @@ class SINSP_PUBLIC sinsp_logger char m_tbuf[32768]; }; +extern sinsp_logger g_logger; + inline bool sinsp_logger::is_callback() const { return (m_flags & sinsp_logger::OT_CALLBACK) != 0; diff --git a/userspace/libsinsp/sinsp_int.h b/userspace/libsinsp/sinsp_int.h index e0e2d18f6a..bfb1a6d761 100644 --- a/userspace/libsinsp/sinsp_int.h +++ b/userspace/libsinsp/sinsp_int.h @@ -80,10 +80,8 @@ using namespace std; // Public export macro // #ifdef _WIN32 -#define SINSP_PUBLIC __declspec(dllexport) #define BRK(X) {if(evt != NULL && evt->get_num() == X)__debugbreak();} #else -#define SINSP_PUBLIC #define BRK(X) #endif diff --git a/userspace/libsinsp/sinsp_public.h b/userspace/libsinsp/sinsp_public.h new file mode 100644 index 0000000000..dca73e1dda --- /dev/null +++ b/userspace/libsinsp/sinsp_public.h @@ -0,0 +1,26 @@ +/* +Copyright (C) 2018 Sysdig, Inc. + +This file is part of sysdig. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + +#pragma once + +#if defined(_WIN32) +# define SINSP_PUBLIC __declspec(dllexport) +#else +# define SINSP_PUBLIC +#endif diff --git a/userspace/libsinsp/url_fetcher.cpp b/userspace/libsinsp/url_fetcher.cpp new file mode 100644 index 0000000000..f59afd488d --- /dev/null +++ b/userspace/libsinsp/url_fetcher.cpp @@ -0,0 +1,35 @@ +/* +Copyright (C) 2018 Sysdig, Inc. + +This file is part of sysdig. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ +#include "url_fetcher.h" +#include "curl_url_fetcher.h" + +using namespace sysdig; + +url_fetcher::~url_fetcher() +{ } + +url_fetcher::ptr url_fetcher::new_fetcher() +{ + return ptr(new curl_url_fetcher()); +} + +url_fetcher::ptr url_fetcher::new_fetcher(const std::string& socket_filename) +{ + return ptr(new curl_url_fetcher(socket_filename)); +} diff --git a/userspace/libsinsp/url_fetcher.h b/userspace/libsinsp/url_fetcher.h new file mode 100644 index 0000000000..360b4ed11a --- /dev/null +++ b/userspace/libsinsp/url_fetcher.h @@ -0,0 +1,69 @@ +/* +Copyright (C) 2018 Sysdig, Inc. + +This file is part of sysdig. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ +#pragma once + +#include +#include + +namespace sysdig +{ + +/** + * Interface to an abstract url_fetcher -- an object that clients can use + * to fetch URLs. We'll eventually be able to unit test-specific concrete + * implementaions of this that can serve pre-canned responses without actually + * spinning up HTTP servers. + */ +class url_fetcher +{ +public: + typedef std::unique_ptr ptr; + + virtual ~url_fetcher(); + + /** + * Fetches the given url and stores the fetched document in the + * given body. + * + * @param[in] url The URL to fetch + * @param[out] body The body of the fetched URL. + * + * @returns the HTTP response code + */ + virtual int fetch(const std::string& url, std::string& body) = 0; + + /** + * Factory method for creating url_fetcher%s that can TCP. + * + * @returns a pointer to a concrete url_fetcher. + */ + static ptr new_fetcher(); + + /** + * Factory method for creating url_fetcher%s that use UNIX domain + * sockets. + * + * @param[in] socket_filename The filename of the UNIX domain socket. + * + * @returns a pointer to a concrete url_fetcher. + */ + static ptr new_fetcher(const std::string& socket_filename); +}; + +} // end namespace sysdig