Skip to content

Commit

Permalink
Merge pull request ceph#54613 from Matan-B/wip-matanb-crimson-build-i…
Browse files Browse the repository at this point in the history
…nc-maps

crimson/osd: Support incremental maps

Reviewed-by: Samuel Just <[email protected]>
  • Loading branch information
Matan-B authored Jan 7, 2024
2 parents 7455933 + 57030a0 commit 930ed08
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 49 deletions.
5 changes: 2 additions & 3 deletions src/crimson/osd/heartbeat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,8 @@ seastar::future<> Heartbeat::maybe_share_osdmap(
return seastar::now();
}

const epoch_t send_from = peer.get_projected_epoch();
logger().debug("{} sending peer {} peer maps from projected epoch {} through "
"local osdmap epoch {}",
const epoch_t send_from = peer.get_projected_epoch() + 1;
logger().debug("{} sending peer {} peer maps ({}, {}]",
__func__,
from,
send_from,
Expand Down
25 changes: 24 additions & 1 deletion src/crimson/osd/osd_meta.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "osd/OSDMap.h"

using std::string;
using read_errorator = crimson::os::FuturizedStore::Shard::read_errorator;

void OSDMeta::create(ceph::os::Transaction& t)
{
Expand All @@ -25,11 +24,22 @@ void OSDMeta::store_map(ceph::os::Transaction& t,
t.write(coll->get_cid(), osdmap_oid(e), 0, m.length(), m);
}

void OSDMeta::store_inc_map(ceph::os::Transaction& t,
epoch_t e, const bufferlist& m)
{
t.write(coll->get_cid(), inc_osdmap_oid(e), 0, m.length(), m);
}

void OSDMeta::remove_map(ceph::os::Transaction& t, epoch_t e)
{
t.remove(coll->get_cid(), osdmap_oid(e));
}

void OSDMeta::remove_inc_map(ceph::os::Transaction& t, epoch_t e)
{
t.remove(coll->get_cid(), inc_osdmap_oid(e));
}

seastar::future<bufferlist> OSDMeta::load_map(epoch_t e)
{
return store.read(coll,
Expand All @@ -41,6 +51,13 @@ seastar::future<bufferlist> OSDMeta::load_map(epoch_t e)
}));
}

read_errorator::future<ceph::bufferlist> OSDMeta::load_inc_map(epoch_t e)
{
return store.read(coll,
osdmap_oid(e), 0, 0,
CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
}

void OSDMeta::store_superblock(ceph::os::Transaction& t,
const OSDSuperblock& superblock)
{
Expand Down Expand Up @@ -122,6 +139,12 @@ ghobject_t OSDMeta::osdmap_oid(epoch_t epoch)
return ghobject_t(hobject_t(sobject_t(object_t(name), 0)));
}

ghobject_t OSDMeta::inc_osdmap_oid(epoch_t epoch)
{
string name = fmt::format("inc_osdmap.{}", epoch);
return ghobject_t(hobject_t(sobject_t(object_t(name), 0)));
}

ghobject_t OSDMeta::final_pool_info_oid(int64_t pool)
{
string name = fmt::format("final_pool_{}", pool);
Expand Down
8 changes: 8 additions & 0 deletions src/crimson/osd/osd_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ namespace crimson::os {
class FuturizedStore;
}

using read_errorator = crimson::os::FuturizedStore::Shard::read_errorator;

/// metadata shared across PGs, or put in another way,
/// metadata not specific to certain PGs.
class OSDMeta {
Expand All @@ -40,8 +42,13 @@ class OSDMeta {

void store_map(ceph::os::Transaction& t,
epoch_t e, const bufferlist& m);
void store_inc_map(ceph::os::Transaction& t,
epoch_t e, const bufferlist& m);
void remove_map(ceph::os::Transaction& t, epoch_t e);
void remove_inc_map(ceph::os::Transaction& t, epoch_t e);

seastar::future<bufferlist> load_map(epoch_t e);
read_errorator::future<ceph::bufferlist> load_inc_map(epoch_t e);

void store_superblock(ceph::os::Transaction& t,
const OSDSuperblock& sb);
Expand All @@ -60,6 +67,7 @@ class OSDMeta {
std::map<epoch_t, OSDMap*>&);
private:
static ghobject_t osdmap_oid(epoch_t epoch);
static ghobject_t inc_osdmap_oid(epoch_t epoch);
static ghobject_t final_pool_info_oid(int64_t pool);
static ghobject_t superblock_oid();
};
7 changes: 7 additions & 0 deletions src/crimson/osd/osdmap_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ class OSDMapService {
public:
using cached_map_t = OSDMapRef;
using local_cached_map_t = LocalOSDMapRef;
enum class encoded_osdmap_type_t {
FULLMAP,
INCMAP
};
using bls_pair = std::pair<encoded_osdmap_type_t, bufferlist>;
using bls_map_pair_t = std::pair<epoch_t, bls_pair>;
using bls_map_t = std::map<epoch_t, bls_pair>;

virtual ~OSDMapService() = default;
virtual seastar::future<cached_map_t> get_map(epoch_t e) = 0;
Expand Down
156 changes: 112 additions & 44 deletions src/crimson/osd/shard_services.cc
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,14 @@ void OSDSingletonState::store_map_bl(
map_bl_cache.insert(e, std::move(bl));
}

void OSDSingletonState::store_inc_map_bl(
ceph::os::Transaction& t,
epoch_t e, bufferlist&& bl)
{
meta_coll->store_inc_map(t, e, bl);
inc_map_bl_cache.insert(e, std::move(bl));
}

seastar::future<bufferlist> OSDSingletonState::load_map_bl(
epoch_t e)
{
Expand All @@ -387,29 +395,56 @@ seastar::future<bufferlist> OSDSingletonState::load_map_bl(
return seastar::make_ready_future<bufferlist>(*found);
} else {
logger().debug("{} loading osdmap.{} from disk", __func__, e);
return meta_coll->load_map(e);
return meta_coll->load_map(e).then([this, e](auto&& bl) {
map_bl_cache.insert(e, bl);
return seastar::make_ready_future<bufferlist>(std::move(bl));
});
}
}

seastar::future<std::map<epoch_t, bufferlist>> OSDSingletonState::load_map_bls(
read_errorator::future<ceph::bufferlist> OSDSingletonState::load_inc_map_bl(
epoch_t e)
{
if (std::optional<bufferlist> found = inc_map_bl_cache.find(e); found) {
logger().debug("{} inc map.{} found in cache", __func__, e);
return read_errorator::make_ready_future<bufferlist>(*found);
} else {
logger().debug("{} loading inc map.{} from disk", __func__, e);
return meta_coll->load_inc_map(e).safe_then([this, e](auto&& bl) {
inc_map_bl_cache.insert(e, bl);
return seastar::make_ready_future<bufferlist>(std::move(bl));
}, read_errorator::pass_further{});
}
}

seastar::future<OSDMapService::bls_map_t> OSDSingletonState::load_map_bls(
epoch_t first,
epoch_t last)
{
logger().debug("{} loading maps [{},{}]",
__func__, first, last);
ceph_assert(first <= last);
// TODO: take osd_map_max into account
//int max = cct->_conf->osd_map_message_max;
//ssize_t max_bytes = cct->_conf->osd_map_message_max_bytes;
return seastar::map_reduce(boost::make_counting_iterator<epoch_t>(first),
boost::make_counting_iterator<epoch_t>(last + 1),
[this](epoch_t e) {
return load_map_bl(e).then([e](auto&& bl) {
return seastar::make_ready_future<std::pair<epoch_t, bufferlist>>(
std::make_pair(e, std::move(bl)));
return load_inc_map_bl(e).safe_then([](auto&& bl) {
return seastar::make_ready_future<OSDMapService::bls_pair>(
std::make_pair(OSDMapService::encoded_osdmap_type_t::INCMAP,
std::move(bl)));
}, read_errorator::all_same_way([this, e] {
logger().debug("load_map_bls: can't load inc map {}, attempting full map instread",
e);
return load_map_bl(e).then([](auto&& bl) {
return seastar::make_ready_future<OSDMapService::bls_pair>(
std::make_pair(OSDMapService::encoded_osdmap_type_t::FULLMAP,
std::move(bl)));
});
})).then([e] (auto&& loaded_map) {
return seastar::make_ready_future<OSDMapService::bls_map_pair_t>(
std::make_pair(e, std::move(loaded_map)));
});
},
std::map<epoch_t, bufferlist>{},
OSDMapService::bls_map_t{},
[](auto&& bls, auto&& epoch_bl) {
bls.emplace(std::move(epoch_bl));
return std::move(bls);
Expand Down Expand Up @@ -453,11 +488,12 @@ seastar::future<> OSDSingletonState::store_maps(ceph::os::Transaction& t,
"loading osdmap.{}", e, e - 1);
ceph_assert(std::cmp_greater(e, 0u));
return load_map(e - 1).then(
[&added_maps, e, bl=p->second, &t, this](auto o) {
[&added_maps, e, bl=p->second, &t, this](auto o) mutable {
OSDMap::Incremental inc;
auto i = bl.cbegin();
inc.decode(i);
o->apply_incremental(inc);
store_inc_map_bl(t, e, std::move(bl));
bufferlist fbl;
o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
logger().info("store_maps storing osdmap.{}", o->get_epoch());
Expand Down Expand Up @@ -499,6 +535,7 @@ void OSDSingletonState::trim_maps(ceph::os::Transaction& t,
t.get_num_ops() < crimson::common::local_conf()->osd_target_transaction_size) {
logger().debug("{}: removing old osdmap epoch {}", __func__, superblock.get_oldest_map());
meta_coll->remove_map(t, superblock.get_oldest_map());
meta_coll->remove_inc_map(t, superblock.get_oldest_map());
superblock.maps.erase(superblock.get_oldest_map());
}

Expand Down Expand Up @@ -757,49 +794,80 @@ seastar::future<> ShardServices::dispatch_context(
});
}

seastar::future<> OSDSingletonState::send_incremental_map(
crimson::net::Connection &conn,
epoch_t first)
seastar::future<MURef<MOSDMap>> OSDSingletonState::build_incremental_map_msg(
epoch_t first,
epoch_t last)
{
logger().info("{}: first osdmap: {} "
"superblock's oldest map: {}",
__func__, first, superblock.get_oldest_map());
if (first >= superblock.get_oldest_map()) {
// TODO: osd_map_share_max_epochs
// See OSDService::build_incremental_map_msg
return seastar::do_with(crimson::common::local_conf()->osd_map_message_max,
crimson::make_message<MOSDMap>(
monc.get_fsid(),
osdmap->get_encoding_features()),
[this, &first, last](unsigned int map_message_max,
auto& m) {
m->cluster_osdmap_trim_lower_bound = superblock.cluster_osdmap_trim_lower_bound;
m->newest_map = superblock.get_newest_map();
auto maybe_handle_mapgap = seastar::now();
if (first < superblock.cluster_osdmap_trim_lower_bound) {
logger().info("{}: cluster osdmap lower bound: {} "
" > first {}, starting with full map",
__func__, superblock.cluster_osdmap_trim_lower_bound, first);
" > first {}, starting with full map",
__func__, superblock.cluster_osdmap_trim_lower_bound, first);
// we don't have the next map the target wants,
// so start with a full map.
first = superblock.cluster_osdmap_trim_lower_bound;
maybe_handle_mapgap = load_map_bl(first).then(
[&first, &map_message_max, &m](auto&& bl) {
m->maps[first] = std::move(bl);
--map_message_max;
++first;
});
}
return load_map_bls(
first, superblock.get_newest_map()
).then([this, &conn](auto&& bls) {
auto m = crimson::make_message<MOSDMap>(
monc.get_fsid(),
osdmap->get_encoding_features());
m->cluster_osdmap_trim_lower_bound = superblock.cluster_osdmap_trim_lower_bound;
m->newest_map = superblock.get_newest_map();
m->maps = std::move(bls);
return conn.send(std::move(m));
});
} else {
// See OSDService::send_incremental_map
// just send latest full map
return load_map_bl(osdmap->get_epoch()
).then([this, &conn](auto&& bl) mutable {
auto m = crimson::make_message<MOSDMap>(
monc.get_fsid(),
osdmap->get_encoding_features());
m->cluster_osdmap_trim_lower_bound = superblock.cluster_osdmap_trim_lower_bound;
m->newest_map = superblock.get_newest_map();
m->maps.emplace(osdmap->get_epoch(), std::move(bl));
return conn.send(std::move(m));
return maybe_handle_mapgap.then([this, first, last, &map_message_max, &m] {
if (first > last) {
// first may be later than last in the case of map gap
ceph_assert(!m->maps.empty());
return seastar::make_ready_future<MURef<MOSDMap>>(std::move(m));
}
return load_map_bls(
first,
((last - first) > map_message_max) ? (first + map_message_max) : last
).then([&m](auto&& bls) {
ssize_t map_message_max_bytes = crimson::common::local_conf()->osd_map_message_max_bytes;
for (auto const& [e, val] : bls) {
map_message_max_bytes -= val.second.length();
if (map_message_max_bytes < 0) {
break;
}
if (val.first == OSDMapService::encoded_osdmap_type_t::FULLMAP) {
m->maps.emplace(e, std::move(val.second));
} else if (val.first == OSDMapService::encoded_osdmap_type_t::INCMAP) {
m->incremental_maps.emplace(e, std::move(val.second));
} else {
ceph_abort();
}
}
return seastar::make_ready_future<MURef<MOSDMap>>(std::move(m));
});
});
});
}

seastar::future<> OSDSingletonState::send_incremental_map(
crimson::net::Connection &conn,
epoch_t first)
{
epoch_t to = osdmap->get_epoch();
logger().info("{}: first osdmap: {} "
"superblock's oldest map: {}, "
"to {}",
__func__, first, superblock.get_oldest_map(), to);
if (to > first && (int64_t)(to - first) > crimson::common::local_conf()->osd_map_share_max_epochs) {
logger().debug("{} {} > max epochs to send of {}, only sending most recent,",
__func__, (to - first), crimson::common::local_conf()->osd_map_share_max_epochs);
first = to - crimson::common::local_conf()->osd_map_share_max_epochs;
}
return build_incremental_map_msg(first, to).then([&conn](auto&& m) {
return conn.send(std::move(m));
});
}

seastar::future<> OSDSingletonState::send_incremental_map_to_osd(
Expand Down
12 changes: 11 additions & 1 deletion src/crimson/osd/shard_services.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class OSDSingletonState : public md_config_obs_t {
friend class OSD;
using cached_map_t = OSDMapService::cached_map_t;
using local_cached_map_t = OSDMapService::local_cached_map_t;
using read_errorator = crimson::os::FuturizedStore::Shard::read_errorator;

public:
OSDSingletonState(
Expand All @@ -236,6 +237,7 @@ class OSDSingletonState : public md_config_obs_t {

SharedLRU<epoch_t, OSDMap> osdmaps;
SimpleLRU<epoch_t, bufferlist, false> map_bl_cache;
SimpleLRU<epoch_t, bufferlist, false> inc_map_bl_cache;

cached_map_t osdmap;
cached_map_t &get_osdmap() { return osdmap; }
Expand Down Expand Up @@ -268,6 +270,10 @@ class OSDSingletonState : public md_config_obs_t {
superblock = std::move(_superblock);
}

seastar::future<MURef<MOSDMap>> build_incremental_map_msg(
epoch_t first,
epoch_t last);

seastar::future<> send_incremental_map(
crimson::net::Connection &conn,
epoch_t first);
Expand Down Expand Up @@ -318,10 +324,13 @@ class OSDSingletonState : public md_config_obs_t {
seastar::future<local_cached_map_t> get_local_map(epoch_t e);
seastar::future<std::unique_ptr<OSDMap>> load_map(epoch_t e);
seastar::future<bufferlist> load_map_bl(epoch_t e);
seastar::future<std::map<epoch_t, bufferlist>>
read_errorator::future<ceph::bufferlist> load_inc_map_bl(epoch_t e);
seastar::future<OSDMapService::bls_map_t>
load_map_bls(epoch_t first, epoch_t last);
void store_map_bl(ceph::os::Transaction& t,
epoch_t e, bufferlist&& bl);
void store_inc_map_bl(ceph::os::Transaction& t,
epoch_t e, bufferlist&& bl);
seastar::future<> store_maps(ceph::os::Transaction& t,
epoch_t start, Ref<MOSDMap> m);
void trim_maps(ceph::os::Transaction& t, OSDSuperblock& superblock);
Expand Down Expand Up @@ -505,6 +514,7 @@ class ShardServices : public OSDMapService {
FORWARD_TO_OSD_SINGLETON(get_pool_info)
FORWARD(with_throttle_while, with_throttle_while, local_state.throttler)

FORWARD_TO_OSD_SINGLETON(build_incremental_map_msg)
FORWARD_TO_OSD_SINGLETON(send_incremental_map)
FORWARD_TO_OSD_SINGLETON(send_incremental_map_to_osd)

Expand Down

0 comments on commit 930ed08

Please sign in to comment.