Skip to content

Commit

Permalink
fixed many small things regarding log, added gateway mode for ssh_han…
Browse files Browse the repository at this point in the history
…dler.py and fixed rest added get_log option
  • Loading branch information
XaverStiensmeier committed Aug 18, 2023
1 parent 7a589a4 commit e713525
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 38 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ resources/playbook/group_vars/

# any log files
*.log
log/

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
6 changes: 4 additions & 2 deletions bibigrid/core/actions/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,13 @@ def initialize_instances(self):
self.master_ip = configuration["floating_ip"]
ssh_handler.ansible_preparation(floating_ip=configuration["floating_ip"],
private_key=KEY_FOLDER + self.key_name, username=self.ssh_user,
commands=self.ssh_add_public_key_commands, log=self.log)
commands=self.ssh_add_public_key_commands, log=self.log,
gateway_ip=configuration.get("gateway_ip"))
elif configuration.get("vpnInstance"):
ssh_handler.execute_ssh(floating_ip=configuration["floating_ip"],
private_key=KEY_FOLDER + self.key_name, username=self.ssh_user,
commands=ssh_handler.VPN_SETUP, log=self.log)
commands=ssh_handler.VPN_SETUP, log=self.log,
gateway_ip=configuration.get("gateway_ip"))

def prepare_volumes(self, provider, mounts):
"""
Expand Down
71 changes: 45 additions & 26 deletions bibigrid/core/startup_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import asyncio
import logging
import multiprocessing
import os
import subprocess

import uvicorn
import yaml
Expand All @@ -18,24 +20,24 @@

app = FastAPI()

LOG_FOLDER = "log"
if not os.path.isdir(LOG_FOLDER):
os.mkdir(LOG_FOLDER)

LOG_FORMAT = "%(asctime)s [%(levelname)s] %(message)s"
LOG_FORMATTER = logging.Formatter(LOG_FORMAT)
LOG = logging.getLogger("bibigrid")
stream_handler = logging.StreamHandler()
file_handler = logging.FileHandler(os.path.join(LOG_FOLDER, "bibigrid_rest.log"))
for handler in [stream_handler, file_handler]:
handler.setFormatter(LOG_FORMATTER)
LOG.addHandler(handler)
logging.addLevelName(42, "PRINT")
LOG.setLevel(logging.DEBUG)


def prepare_logging():
stream_handler = logging.StreamHandler()
file_handler = logging.FileHandler("bibigrid_rest.log")
for handler in [stream_handler, file_handler]:
handler.setFormatter(LOG_FORMATTER)
LOG.addHandler(handler)
logging.addLevelName(42, "PRINT")
LOG.setLevel(logging.DEBUG)
LOG.log(42, "Test0")
LOG.error("Test1")
LOG.warning("Test2")
LOG.info("Test3")
LOG.debug("Test4")
def tail(file_path, lines):
return subprocess.check_output(['tail', '-n', str(lines), file_path], universal_newlines=True)


def setup(cluster_id):
Expand All @@ -49,9 +51,9 @@ def setup(cluster_id):
log = logging.getLogger(cluster_id)
log.setLevel(logging.DEBUG)
if not log.handlers:
handler = logging.FileHandler(f"{cluster_id}.log")
handler.setFormatter(LOG_FORMATTER)
log.addHandler(handler)
log_handler = logging.FileHandler(os.path.join(LOG_FOLDER, f"{cluster_id}.log"))
log_handler.setFormatter(LOG_FORMATTER)
log.addHandler(log_handler)
return cluster_id, log


Expand Down Expand Up @@ -82,7 +84,7 @@ async def validate_configuration(cluster_id: str = None, config_file: UploadFile

@app.post("/bibigrid/create")
async def create_cluster(cluster_id: str = None, config_file: UploadFile = File(...)):
LOG.debug(f"Requested termination on {cluster_id}")
LOG.debug(f"Requested creation on {cluster_id}")
cluster_id, log = setup(cluster_id)

async def create_async():
Expand All @@ -92,8 +94,8 @@ async def create_async():
content = await config_file.read()
configurations = yaml.safe_load(content.decode())
providers = provider_handler.get_providers(configurations, log)
creator = create.Create(providers=providers, configurations=configurations, log=log, config_path=None,
cluster_id=cluster_id)
creator = create.Create(providers=providers, configurations=configurations, log=log,
config_path=config_file.filename, cluster_id=cluster_id)
cluster_id = creator.cluster_id
asyncio.create_task(create_async())
return JSONResponse(content={"message": "Cluster creation started.", "cluster_id": cluster_id}, status_code=200)
Expand Down Expand Up @@ -121,13 +123,12 @@ async def terminate_async():
return JSONResponse(content={"error": str(exc)}, status_code=400)


@app.post("/bibigrid/info/")
async def info(cluster_id: str, configurations: list = None, config_file: UploadFile = None):
if not configurations and not config_file:
return JSONResponse(content={"message": "Missing parameters: configurations or config_file required."},
status_code=400)
@app.get("/bibigrid/info/")
async def info(cluster_id: str, config_file: UploadFile):
LOG.debug(f"Requested info on {cluster_id}.")
cluster_id, log = setup(cluster_id)
content = await config_file.read()
configurations = yaml.safe_load(content.decode())
try:
providers = provider_handler.get_providers(configurations, log)
cluster_dict = list_clusters.dict_clusters(providers, log).get(cluster_id) # add information filtering
Expand All @@ -139,6 +140,25 @@ async def info(cluster_id: str, configurations: list = None, config_file: Upload
return JSONResponse(content={"error": str(exc)}, status_code=400)


@app.get("/bibigrid/log/")
async def get_log(cluster_id: str, lines: int = None):
LOG.debug(f"Requested log on {cluster_id}.")
# cluster_id, log = setup(cluster_id)
try:
file_name = os.path.join(LOG_FOLDER, f"{cluster_id}.log")
print(file_name)
if os.path.isfile(file_name):
if not lines:
with open(file_name, "r", encoding="utf8") as log_file:
response = log_file.read()
else:
response = tail(file_name, lines)
return JSONResponse(content={"message": "Log found", "log": response}, status_code=200)
return JSONResponse(content={"message": "Log not found."}, status_code=404)
except Exception as exc: # pylint: disable=broad-except
return JSONResponse(content={"error": str(exc)}, status_code=400)


# outdated tests
client = TestClient(app)

Expand Down Expand Up @@ -190,6 +210,5 @@ def test_get_nonexistent_configuration_info():


if __name__ == "__main__":
prepare_logging()
uvicorn.run("bibigrid.core.startup_rest:app", host="0.0.0.0", port=8000,
workers=multiprocessing.cpu_count() * 2 + 1) # Use the on_starting event)
workers=multiprocessing.cpu_count() * 2 + 1) # Use the on_starting event
24 changes: 18 additions & 6 deletions bibigrid/core/utility/handler/ssh_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def copy_to_server(sftp, local_path, remote_path, log):
copy_to_server(sftp, os.path.join(local_path, filename), os.path.join(remote_path, filename), log)


def is_active(client, floating_ip_address, private_key, username, log, timeout=5):
def is_active(client, floating_ip_address, private_key, username, log, timeout=5, gateway_ip=None):
"""
Checks if connection is possible and therefore if server is active.
Raises paramiko.ssh_exception.NoValidConnectionsError if timeout is reached
Expand All @@ -96,13 +96,23 @@ def is_active(client, floating_ip_address, private_key, username, log, timeout=5
:param username: SSH-username
:param log:
:param timeout: how long to wait between ping
:param gateway_ip: if node should be reached over a gateway port is set to 30000 + subnet * 256 + host
(waiting grows quadratically till 2**timeout before accepting failure)
"""
attempts = 0
establishing_connection = True
while establishing_connection:
try:
client.connect(hostname=floating_ip_address, username=username, pkey=private_key, timeout=7, auth_timeout=5)
# Add Port
port = 22
if gateway_ip:
log.info("Using SSH Gateway...")
ip_split = gateway_ip.split(".")
host = int(ip_split[-1])
subnet = int(ip_split[-2])
port = 30000 + subnet * 256 + host
client.connect(hostname=gateway_ip or floating_ip_address, username=username,
pkey=private_key, timeout=7, auth_timeout=5, port=port)
establishing_connection = False
log.info(f"Successfully connected to {floating_ip_address}")
except paramiko.ssh_exception.NoValidConnectionsError as exc:
Expand Down Expand Up @@ -174,7 +184,7 @@ def execute_ssh_cml_commands(client, commands, log):
raise ExecutionException(msg)


def ansible_preparation(floating_ip, private_key, username, log, commands=None, filepaths=None):
def ansible_preparation(floating_ip, private_key, username, log, commands=None, filepaths=None, gateway_ip=None):
"""
Installs python and pip. Then installs ansible over pip.
Copies private key to instance so cluster-nodes are reachable and sets permission as necessary.
Expand All @@ -187,6 +197,7 @@ def ansible_preparation(floating_ip, private_key, username, log, commands=None,
:param log:
:param commands: additional commands to execute
:param filepaths: additional files to copy: (localpath, remotepath)
:param gateway_ip
"""
if filepaths is None:
filepaths = []
Expand All @@ -195,10 +206,10 @@ def ansible_preparation(floating_ip, private_key, username, log, commands=None,
log.info("Ansible preparation...")
commands = ANSIBLE_SETUP + commands
filepaths.append((private_key, PRIVATE_KEY_FILE))
execute_ssh(floating_ip, private_key, username, log, commands, filepaths)
execute_ssh(floating_ip, private_key, username, log, commands, filepaths, gateway_ip)


def execute_ssh(floating_ip, private_key, username, log, commands=None, filepaths=None):
def execute_ssh(floating_ip, private_key, username, log, commands=None, filepaths=None, gateway_ip=None):
"""
Executes commands on remote and copies files given in filepaths
:param floating_ip: public ip of remote
Expand All @@ -207,6 +218,7 @@ def execute_ssh(floating_ip, private_key, username, log, commands=None, filepath
:param commands: commands
:param log:
:param filepaths: filepaths (localpath, remotepath)
:param gateway_ip: IP of gateway if used
"""
if commands is None:
commands = []
Expand All @@ -215,7 +227,7 @@ def execute_ssh(floating_ip, private_key, username, log, commands=None, filepath
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
is_active(client=client, floating_ip_address=floating_ip, username=username, private_key=paramiko_key,
log=log)
log=log, gateway_ip=gateway_ip)
except ConnectionException as exc:
log.error(f"Couldn't connect to floating ip {floating_ip} using private key {private_key}.")
raise exc
Expand Down
19 changes: 15 additions & 4 deletions sample_rest_using.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,26 @@ def create(cluster_id, configuration="test.yml"):

def terminate(cluster_id, configuration="test.yml"):
with open(configuration, 'rb') as file:
files = {'config_file': (file)}
files = {'config_file': file}
params = {"cluster_id": cluster_id}
response = requests.post("http://localhost:8000/bibigrid/terminate", params=params, files=files, timeout=20)
response_data = response.json()
print(response_data)


def info(cluster_id):
params = {"cluster_id": cluster_id, "configurations": [{"infrastructure": "openstack", "cloud": "bibiserv"}]}
response = requests.post("http://localhost:8000/bibigrid/info", params=params, timeout=20)
def info(cluster_id, configuration="test.yml"):
with open(configuration, 'rb') as file:
files = {'config_file': (configuration, file)}
params = {"cluster_id": cluster_id}
response = requests.get("http://localhost:8000/bibigrid/info", params=params, files=files, timeout=20)
response_data = response.json()
print(response_data)


def get_log(cluster_id, configuration="test.yml", lines=5):
with open(configuration, 'rb') as file:
files = {'config_file': (configuration, file)}
params = {"cluster_id": cluster_id, "lines": lines}
response = requests.get("http://localhost:8000/bibigrid/log", params=params, files=files, timeout=20)
response_data = response.json()
print(response_data)

0 comments on commit e713525

Please sign in to comment.