Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: recoverdata support load disk table #3888

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/en/maintain/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ $ ./openmldb --endpoint=172.27.2.52:9520 --role=client

### loadtable

1. Load an existing table
Load an existing table, only support memory table

Command format: `loadtable table_name tid pid ttl segment_cnt`

Expand Down
2 changes: 1 addition & 1 deletion docs/zh/maintain/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ $ ./openmldb --endpoint=172.27.2.52:9520 --role=client

### loadtable

1、加载已有表
加载已有表,只支持内存表

命令格式: loadtable table\_name tid pid ttl segment\_cnt

Expand Down
1 change: 0 additions & 1 deletion src/cmd/openmldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3256,7 +3256,6 @@ void HandleClientLoadTable(const std::vector<std::string> parts, ::openmldb::cli
return;
}
}
// TODO(): get status msg
auto st = client->LoadTable(parts[1], boost::lexical_cast<uint32_t>(parts[2]),
boost::lexical_cast<uint32_t>(parts[3]), ttl, is_leader, seg_cnt);
if (st.OK()) {
Expand Down
1 change: 1 addition & 0 deletions src/tablet/tablet_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3038,6 +3038,7 @@ void TabletImpl::LoadTable(RpcController* controller, const ::openmldb::api::Loa
break;
}
std::string root_path;
// we can't know table is memory or disk, so set the right storage_mode in request message
bool ok = ChooseDBRootPath(tid, pid, table_meta.storage_mode(), root_path);
if (!ok) {
response->set_code(::openmldb::base::ReturnCode::kFailToGetDbRootPath);
Expand Down
61 changes: 35 additions & 26 deletions tools/openmldb_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,41 +97,43 @@ def CheckTable(executor, db, table_name):
return Status(-1, "role is not match")
return Status()

def RecoverPartition(executor, db, partitions, endpoint_status):
def RecoverPartition(executor, db, replicas, endpoint_status, storage):
"""recover all replicas of one partition"""
leader_pos = -1
max_offset = 0
table_name = partitions[0].GetName()
pid = partitions[0].GetPid()
for pos in range(len(partitions)):
partition = partitions[pos]
if partition.IsLeader() and partition.GetOffset() >= max_offset:
table_name = replicas[0].GetName()
pid = replicas[0].GetPid()
tid = replicas[0].GetTid()
for pos in range(len(replicas)):
replica = replicas[pos]
if replica.IsLeader() and replica.GetOffset() >= max_offset:
leader_pos = pos
if leader_pos < 0:
log.error("cannot find leader partition. db {db} name {table_name} partition {pid}".format(
db=db, table_name=table_name, pid=pid))
return Status(-1, "recover partition failed")
tid = partitions[0].GetTid()
leader_endpoint = partitions[leader_pos].GetEndpoint()
msg = "cannot find leader replica. db {db} name {table_name} partition {pid}".format(
db=db, table_name=table_name, pid=pid)
log.error(msg)
return Status(-1, "recover partition failed: {msg}".format(msg=msg))
leader_endpoint = replicas[leader_pos].GetEndpoint()
# recover leader
if "{tid}_{pid}".format(tid=tid, pid=pid) not in endpoint_status[leader_endpoint]:
log.info("leader partition is not in tablet, db {db} name {table_name} pid {pid} endpoint {leader_endpoint}. start loading data...".format(
log.info("leader replica is not in tablet, db {db} name {table_name} pid {pid} endpoint {leader_endpoint}. start loading data...".format(
db=db, table_name=table_name, pid=pid, leader_endpoint=leader_endpoint))
status = executor.LoadTable(leader_endpoint, table_name, tid, pid)
status = executor.LoadTableHTTP(leader_endpoint, table_name, tid, pid, storage)
if not status.OK():
log.error("load table failed. db {db} name {table_name} tid {tid} pid {pid} endpoint {leader_endpoint} msg {status}".format(
db=db, table_name=table_name, tid=tid, pid=pid, leader_endpoint=leader_endpoint, status=status.GetMsg()))
return Status(-1, "recover partition failed")
if not partitions[leader_pos].IsAlive():
return status
if not replicas[leader_pos].IsAlive():
status = executor.UpdateTableAlive(db, table_name, pid, leader_endpoint, "yes")
if not status.OK():
log.error("update leader alive failed. db {db} name {table_name} pid {pid} endpoint {leader_endpoint}".format(
db=db, table_name=table_name, pid=pid, leader_endpoint=leader_endpoint))
return Status(-1, "recover partition failed")
# recover follower
for pos in range(len(partitions)):
for pos in range(len(replicas)):
if pos == leader_pos:
continue
partition = partitions[pos]
partition = replicas[pos]
endpoint = partition.GetEndpoint()
if partition.IsAlive():
status = executor.UpdateTableAlive(db, table_name, pid, endpoint, "no")
Expand All @@ -149,24 +151,31 @@ def RecoverTable(executor, db, table_name):
log.info("{table_name} in {db} is healthy".format(table_name=table_name, db=db))
return Status()
log.info("recover {table_name} in {db}".format(table_name=table_name, db=db))
status, table_info = executor.GetTableInfo(db, table_name)
status, table_info = executor.GetTableInfoHTTP(db, table_name)
if not status.OK():
log.warning("get table info failed. msg is {msg}".format(msg=status.GetMsg()))
return Status(-1, "get table info failed. msg is {msg}".format(msg=status.GetMsg()))
partition_dict = executor.ParseTableInfo(table_info)
log.warning("get table info failed. msg is {msg}".format(msg=status))
return Status(-1, "get table info failed. msg is {msg}".format(msg=status))
if len(table_info) != 1:
log.warning("table info should be 1, {table_info}".format(table_info=table_info))
return Status(-1, "table info should be 1")
table_info = table_info[0]
partition_dict = executor.ParseTableInfoJson(table_info)
storage = "kMemory" if "storage_mode" not in table_info else table_info["storage_mode"]
endpoints = set()
for record in table_info:
endpoints.add(record[3])
for _, reps in partition_dict.items():
# list of replicas
for rep in reps:
endpoints.add(rep.GetEndpoint())
endpoint_status = {}
for endpoint in endpoints:
status, result = executor.GetTableStatus(endpoint)
if not status.OK():
log.warning("get table status failed. msg is {msg}".format(msg=status.GetMsg()))
return Status(-1, "get table status failed. msg is {msg}".format(msg=status.GetMsg()))
endpoint_status[endpoint] = result
max_pid = int(table_info[-1][2])
for pid in range(max_pid + 1):
RecoverPartition(executor, db, partition_dict[str(pid)], endpoint_status)

for _, part in partition_dict.items():
RecoverPartition(executor, db, part, endpoint_status, storage)
# wait op
time.sleep(1)
while True:
Expand Down
98 changes: 73 additions & 25 deletions tools/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@
import subprocess
import sys
import time
# http lib for python2 or 3
import json
try:
import httplib
import urllib
except ImportError:
import http.client as httplib
import urllib.parse as urllib

# for Python 2, don't use f-string
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format = '%(levelname)s: %(message)s')
Expand All @@ -35,6 +44,9 @@ def GetMsg(self):
def GetCode(self):
return self.code

def __str__(self):
return "code: {code}, msg: {msg}".format(code = self.code, msg = self.msg)

class Partition:
def __init__(self, name, tid, pid, endpoint, is_leader, is_alive, offset):
self.name = name
Expand Down Expand Up @@ -202,17 +214,48 @@ def GetTableInfo(self, database, table_name = ''):
continue
result.append(record)
return Status(), result
def GetTableInfoHTTP(self, database, table_name = ''):
"""http post ShowTable to ns leader, return one or all table info"""
ns = self.endpoint_map[self.ns_leader]
conn = httplib.HTTPConnection(ns)
param = {"db": database, "name": table_name}
headers = {"Content-type": "application/json"}
conn.request("POST", "/NameServer/ShowTable", json.dumps(param), headers)
response = conn.getresponse()
if response.status != 200:
return Status(response.status, response.reason), None
result = json.loads(response.read())
conn.close()
# check resp
if result["code"] != 0:
return Status(result["code"], "get table info failed: {msg}".format(msg=result["msg"]))
return Status(), result["table_info"]

def ParseTableInfo(self, table_info):
result = {}
for record in table_info:
is_leader = True if record[4] == "leader" else False
is_alive = True if record[5] == "yes" else False
partition = Partition(record[0], record[1], record[2], record[3], is_leader, is_alive, record[6]);
partition = Partition(record[0], record[1], record[2], record[3], is_leader, is_alive, record[6])
result.setdefault(record[2], [])
result[record[2]].append(partition)
return result

def ParseTableInfoJson(self, table_info):
"""parse one table's partition info from json"""
result = {}
parts = table_info["table_partition"]
for partition in parts:
# one partition(one leader and others)
for replica in partition["partition_meta"]:
is_leader = replica["is_leader"]
is_alive = True if "is_alive" not in replica else replica["is_alive"]
# the classname should be replica, but use partition for compatible
pinfo = Partition(table_info["name"], table_info["tid"], partition["pid"], replica["endpoint"], is_leader, is_alive, replica["offset"])
result.setdefault(partition["pid"], [])
result[partition["pid"]].append(pinfo)
return result

def GetTablePartition(self, database, table_name):
status, result = self.GetTableInfo(database, table_name)
if not status.OK:
Expand Down Expand Up @@ -274,30 +317,35 @@ def ShowTableStatus(self, pattern = '%'):

return Status(), output_processed

def LoadTable(self, endpoint, name, tid, pid, sync = True):
cmd = list(self.tablet_base_cmd)
cmd.append("--endpoint=" + self.endpoint_map[endpoint])
cmd.append("--cmd=loadtable {} {} {} 0 8".format(name, tid, pid))
log.info("run {cmd}".format(cmd = cmd))
status, output = self.RunWithRetuncode(cmd)
time.sleep(1)
if status.OK() and output.find("LoadTable ok") != -1:
if not sync:
return Status()
while True:
status, result = self.GetTableStatus(endpoint, tid, pid)
key = "{}_{}".format(tid, pid)
if status.OK() and key in result:
table_stat = result[key][4]
if table_stat == "kTableNormal":
return Status()
elif table_stat == "kTableLoading" or table_stat == "kTableUndefined":
log.info("table is loading... tid {tid} pid {pid}".format(tid = tid, pid = pid))
else:
return Status(-1, "table stat is {table_stat}".format(table_stat = table_stat))
time.sleep(2)

return Status(-1, "execute load table failed, status {msg}, output {output}".format(msg = status.GetMsg(), output = output))
def LoadTableHTTP(self, endpoint, name, tid, pid, storage):
"""http post LoadTable to tablet, support all storage mode"""
conn = httplib.HTTPConnection(endpoint)
# ttl won't effect, set to 0, and seg cnt is always 8
# and no matter if leader
param = {"table_meta": {"name": name, "tid": tid, "pid": pid, "ttl":0, "seg_cnt":8, "storage_mode": storage}}
headers = {"Content-type": "application/json"}
conn.request("POST", "/TabletServer/LoadTable", json.dumps(param), headers)
response = conn.getresponse()
if response.status != 200:
return Status(response.status, response.reason)
result = response.read()
conn.close()
resp = json.loads(result)
if resp["code"] != 0:
return Status(resp["code"], resp["msg"])
# wait for success TODO(hw): refactor
while True:
status, result = self.GetTableStatus(endpoint, str(tid), str(pid))
key = "{}_{}".format(tid, pid)
if status.OK() and key in result:
table_stat = result[key][4]
if table_stat == "kTableNormal":
return Status()
elif table_stat == "kTableLoading" or table_stat == "kTableUndefined":
log.info("table is loading... tid {tid} pid {pid}".format(tid = tid, pid = pid))
else:
return Status(-1, "table stat is {table_stat}".format(table_stat = table_stat))
time.sleep(2)

def GetLeaderFollowerOffset(self, endpoint, tid, pid):
cmd = list(self.tablet_base_cmd)
Expand Down
Loading