Skip to content
This repository has been archived by the owner on Jan 19, 2024. It is now read-only.

Fixes for import and export of nodes. #73

Merged
merged 2 commits into from
Nov 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ good-names=i,
k,
ex,
Run,
rv,
_

# Good variable names regexes, separated by a comma. If names match any regex,
Expand Down
10 changes: 6 additions & 4 deletions src/dune/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,12 @@ def __init__(self):
help='set a node to active status')
self._parser.add_argument('--get-active', action='store_true',
help='get the name of the node that is currently active')
self._parser.add_argument('--export-node', metavar=("NODE", "DIR"), nargs=2,
help='export state and blocks log for the given node.')
self._parser.add_argument('--import-node', metavar=("DIR", "NODE"), nargs=2,
help='import state and blocks log to a given node')
self._parser.add_argument('--export-node', metavar=("NODE", "PATH"), nargs=2,
help='export state and blocks log for the given node. '
'PATH may be a directory or a filename with `.tgz` extension.')
self._parser.add_argument('--import-node', metavar=("NODE", "PATH"), nargs=2,
help='import state and blocks log to a given node'
'PATH *must* be a previously exported node ending in `.tgz`.')
self._parser.add_argument('--monitor', action='store_true',
help='monitor the currently active node')
self._parser.add_argument('--import-dev-key', metavar="KEY",
Expand Down
267 changes: 163 additions & 104 deletions src/dune/dune.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# pylint: disable=missing-function-docstring, missing-module-docstring
import os
import sys # sys.stderr
from context import context
from docker import docker

from node_state import node_state

# VERSION INFORMATION
def version_major():
Expand Down Expand Up @@ -84,8 +86,7 @@ def node_exists(self, nod):
return self._docker.dir_exists('/app/nodes/' + nod.name())

def is_node_running(self, nod):
return self._docker.find_pid(
'/app/nodes/' + nod.name() + ' ') != -1
return self._docker.find_pid('/app/nodes/' + nod.name() + ' ') != -1

def set_active(self, nod):
if self.node_exists(nod):
Expand Down Expand Up @@ -201,122 +202,181 @@ def stop_container(self):
def start_container(self):
self._docker.start()


def state_list(self):
# [(node_name, active, running, ports),...]
rv=[]
stdout, stderr, exit_code = self._docker.execute_cmd(['ls', '/app/nodes'])
ctx = self._context.get_ctx()
for node_name in stdout.split():
active = False
if node_name == ctx.active:
active=True
running = self.is_node_running(node(node_name))
addrs = self._context.get_config_args(node(node_name))
rv.append( node_state(node_name, active, running, addrs[0], addrs[1], addrs[2]) )
return rv


# pylint: disable=too-many-branches
def list_nodes(self, simple=False):
def list_nodes(self, simple=False, sep='|'):

if simple:
print("Node|Active|Running|HTTP|P2P|SHiP")
else:
print(
"Node Name | Active? | Running? | HTTP | "
"P2P | SHiP")
print(
"---------------------------------------------------------"
"-----------------------------")
stdout, stderr, exit_code = self._docker.execute_cmd(
['ls', '/app/nodes'])
print("Node Name\t\t | Active? | Running? | HTTP | P2P | SHiP\n"
"----------------------------------------------------------------------------------------------")

states=self.state_list()
for state in states:
print( state.string(sep=sep, simple=simple) )

# pylint: disable=too-many-locals,too-many-statements
def export_node(self, nod, path):
# Sanity check
if not self.node_exists(nod):
raise dune_node_not_found(nod.name())

ctx = self._context.get_ctx()
for string in stdout.split():
print(string, end='')
if string == ctx.active:
if simple:
print('|Y', end='')
else:
print('\t\t | Y', end='')
else:
if simple:
print('|N', end='')
else:
print('\t\t | N', end='')
if not self.is_node_running(node(string)):
if simple:
print('|N', end='')
else:
print('\t | N', end='')
else:
if simple:
print('|Y', end='')
else:
print('\t | Y', end='')

ports = self._context.get_config_args(node(string))
if simple:
print('|' + ports[0] + '|' + ports[1] + '|' + ports[2])
else:
print(
' | ' + ports[0] + ' | ' + ports[1] + ' | ' + ports[2])

def export_node(self, nod, directory):
if self.node_exists(nod):
is_active=nod.name() == ctx.active
is_running=self.is_node_running(nod)
my_addrs=self._context.get_config_args(nod)

was_running=[]
was_active=None

initial_states=[]

if not is_active or not is_running:
# Get the current states.
initial_states=self.state_list()

# For each state, make decisions based on it's
for state in initial_states:
# Don't operate on our node.
if state.name == nod.name():
continue
if state.is_active:
was_active = state.name
if state.is_running:
# We only need to stop a running node if there are address collisions.
if state.http in my_addrs or state.p2p in my_addrs or state.ship in my_addrs:
was_running.append(state.name)
self.stop_node(node(state.name))
print("\t", state.name, "was stopped due to address collision.")

# Get this node ready for export.
if not is_active:
self.set_active(nod)
print(
"Exporting data from node [" + nod.name() + "] to location " +
directory)
if not self.is_node_running(nod):
self.start_node(nod)
self.create_snapshot()
self.stop_node(nod)
self._docker.execute_cmd(
['mkdir', '-p', '/app/tmp/' + nod.name()])
self._docker.execute_cmd(
['cp', '-R', '/app/nodes/' + nod.name() + '/blocks',
'/app/tmp/' + nod.name() + '/blocks'])
self._docker.execute_cmd(
['cp', '/app/nodes/' + nod.name() + '/config.ini',
'/app/tmp/' + nod.name() + '/config.ini'])
self._docker.execute_cmd(['cp', '-R',
'/app/nodes/' + nod.name() +
'/protocol_features',
'/app/tmp/' + nod.name() +
'/protocol_features'])
self._docker.execute_cmd(
['cp', '-R', '/app/nodes/' + nod.name() + '/snapshots',
'/app/tmp/' + nod.name() + '/snapshots'])
self._docker.tar_dir(nod.name(), 'tmp/' + nod.name())
self._docker.cp_to_host('/app/' + nod.name() + '.tgz',
directory)
self._docker.rm_file('/app/' + nod.name() + '.tgz')
self._docker.rm_file('/app/tmp/' + nod.name())
if not is_running:
self.start_node(nod)
else:
raise dune_node_not_found(nod.name())

def import_node(self, directory, nod):

# Paths:
directory=path
filename=nod.name()+".tgz"

# Update paths based on input.
if os.path.splitext(path)[1].lower() == ".tgz":
directory=os.path.split(path)[0]
filename=os.path.split(path)[1]

# Ensure the directory is absolute and it exists.
directory=os.path.realpath(directory)
if not os.path.exists(directory):
os.makedirs(directory)

# Determine the final full path.
fullpath=os.path.join(directory,filename)

src_path='/app/nodes/' + nod.name()
dst_path='/app/tmp/' + nod.name()


print("Exporting data from node [" + nod.name() + "] to location " + fullpath)

# Create the snapshot
self.create_snapshot()
# Stop the node for copy.
self.stop_node(nod)

self._docker.execute_cmd(['mkdir', '-p', dst_path])
self._docker.execute_cmd(['cp', '-R', src_path + '/blocks', dst_path + '/blocks'])
self._docker.execute_cmd(['cp', src_path + '/config.ini', dst_path + '/config.ini'])
self._docker.execute_cmd(['cp', '-R', src_path + '/protocol_features', dst_path + '/protocol_features'])
self._docker.execute_cmd(['cp', '-R', src_path + '/snapshots', dst_path + '/snapshots'])

self._docker.tar_dir(nod.name(), 'tmp/' + nod.name())
self._docker.cp_to_host('/app/' + nod.name() + '.tgz', fullpath)
self._docker.rm_file('/app/' + nod.name() + '.tgz')
self._docker.rm_file(dst_path)

# Restore previously active node.
if not is_active and was_active is not None:
self.set_active(node(was_active))

# Restart the node if necessary.
if is_running:
self.start_node(nod)

# Restart any nodes that were previously running.
for old_runner in was_running:
self.start_node(node(old_runner))


def import_node(self, path, nod):

# Sanity check path
if not os.path.exists(path):
print("File not found: ", path, file=sys.stderr)
raise dune_error
if os.path.splitext(path)[1].lower() != ".tgz":
print("Path extension must be `.tgz`: ", path, file=sys.stderr)
raise dune_error

print("Importing node data [" + nod.name() + "]")

# If the node already exists we delete it.
if self.node_exists(nod):
self.remove_node(nod)
stdout, stderr, exit_code = \
self._docker.cp_from_host(directory,
'/app/tmp.tgz')

# Copy the tgz file.
stdout, stderr, exit_code = self._docker.cp_from_host(path, '/app/tmp.tgz')
if exit_code != 0:
print(stderr)
raise dune_error

# Clean up the tmp file, untar, and remove the file.
self._docker.rm_file('/app/tmp') # remove any existing file
self._docker.untar('/app/tmp.tgz')
self._docker.rm_file('/app/tmp.tgz')
stdout, stderr, exit_code = self._docker.execute_cmd(
['ls', '/app/tmp'])
self._docker.execute_cmd(
['mkdir', '-p', '/app/nodes/' + nod.name()])
self._docker.execute_cmd(['mv', '/app/tmp/' + stdout.split()[
0] + '/blocks/blocks.index',
'/app/nodes/' + nod.name() +
'/blocks/blocks.index'])
self._docker.execute_cmd(['mv', '/app/tmp/' + stdout.split()[
0] + '/blocks/blocks.log',
'/app/nodes/' + nod.name() +
'/blocks/blocks.log'])
self._docker.execute_cmd(
['mv', '/app/tmp/' + stdout.split()[0] + '/config.ini',
'/app/nodes/' + nod.name() + '/config.ini'])
self._docker.execute_cmd(['mv', '/app/tmp/' + stdout.split()[
0] + '/protocol_features',
'/app/nodes/' + nod.name() +
'/protocol_features'])
self._docker.execute_cmd(
['mv', '/app/tmp/' + stdout.split()[0] + '/snapshots',
'/app/nodes/' + nod.name() + '/snapshots'])
self._docker.rm_file('/app/tmp/' + stdout.split()[0])
stdout, stderr, exit_code = self._docker.execute_cmd(
['ls', '/app/nodes/' + nod.name() + '/snapshots'])

# Find the path inside temp of the import data.
stdout, stderr, exit_code = self._docker.execute_cmd(['ls', '/app/tmp'])
src_name=stdout.split()[0]
src_path='/app/tmp/' + src_name

# Calculate and create the destination path.
dst_path='/app/nodes/' + nod.name()
self._docker.execute_cmd(['mkdir', '-p', dst_path + '/blocks'])

# Move data to the destination.
self._docker.execute_cmd(['mv', src_path + '/blocks/blocks.index', dst_path + '/blocks/blocks.index'])
self._docker.execute_cmd(['mv', src_path + '/blocks/blocks.log', dst_path + '/blocks/blocks.log'])
self._docker.execute_cmd(['mv', src_path + '/config.ini', dst_path + '/config.ini'])
self._docker.execute_cmd(['mv', src_path + '/protocol_features', dst_path + '/protocol_features'])
self._docker.execute_cmd(['mv', src_path + '/snapshots', dst_path + '/snapshots'])
# Clean up the temp.
self._docker.rm_file('/app/tmp')

# Ensure a snapshot exists
stdout, stderr, exit_code = self._docker.execute_cmd(['ls', dst_path + '/snapshots'])
if len(stdout) == 0:
print('No snapshot found for ', nod.name(), ' sourced from: \n\t', path, file=sys.stderr)
raise dune_error

# Start and activate the node...
self.start_node(nod, stdout.split()[0])
self.set_active(nod)

Expand Down Expand Up @@ -414,8 +474,7 @@ def init_project(self, name, directory, cmake=True):
def create_snapshot(self):
ctx = self._context.get_ctx()
url = "http://" + ctx.http_port + "/v1/producer/create_snapshot"
stdout, stderr, exit_code = self._docker.execute_cmd(
['curl', '-X', 'POST', url])
stdout, stderr, exit_code = self._docker.execute_cmd(['curl', '-X', 'POST', url])
print(stdout)
print(stderr)
print(url)
Expand Down
45 changes: 45 additions & 0 deletions src/dune/node_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import sys
#from typing import NamedTuple


class node_state:
"""A simple class for reporting node state."""

name: str
is_active: bool
is_running: bool
http: str
p2p: str
ship: str


# pylint: disable=too-many-arguments
def __init__(self, name, is_active, is_running, http, p2p, ship):
self.name=name
self.is_active=is_active
self.is_running=is_running
self.http=http
self.p2p=p2p
self.ship=ship


def __str__(self):
active_str='inactive'
if self.is_active:
active_str='active'
running_str='halted'
if self.is_running:
running_str='running'
return f"{self.name}, {active_str}, {running_str}, {self.http}, {self.p2p}, {self.ship}"


def string(self, file=sys.stdout, sep=',', simple=True):
active_str='N'
if self.is_active:
active_str='Y'
running_str='N'
if self.is_running:
running_str='Y'
if simple:
return f"{self.name}{sep}{active_str}{sep}{running_str}{sep}{self.http}{sep}{self.p2p}{sep}{self.ship}"
return f"{self.name}\t\t {sep} {active_str}\t {sep} {running_str} {sep} {self.http} {sep} {self.p2p} {sep} {self.ship}"