-
Notifications
You must be signed in to change notification settings - Fork 7
/
build_utils.py
770 lines (622 loc) · 26.9 KB
/
build_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
"""Universal build utilities."""
import argparse
import os
import re
import shutil
import subprocess
import sys
from typing import Dict, List, Match, Optional, Tuple, Union
from universal_build._utilities import DashInsensitiveDict
_ALLOWED_BRANCH_TYPES_FOR_RELEASE = ["release", "production"]
_MAIN_BRANCH_NAMES = ["master", "main"]
FLAG_MAKE = "make"
FLAG_TEST = "test"
FLAG_TEST_MARKER = "test_marker"
FLAG_RELEASE = "release"
FLAG_VERSION = "version"
FLAG_CHECK = "check"
FLAG_RUN = "run"
FLAG_FORCE = "force"
_FLAG_SKIP_PATH = "skip_path"
_FLAG_SANITIZED = "_sanitized"
TEST_MARKER_SLOW = "slow"
EXIT_CODE_GENERAL = 1
EXIT_CODE_INVALID_VERSION = 2
EXIT_CODE_NO_VERSION_FOUND = 3
EXIT_CODE_VERSION_IS_REQUIRED = 4
EXIT_CODE_DEV_VERSION_REQUIRED = 5
EXIT_CODE_DEV_VERSION_NOT_MATCHES_BRANCH = 6
EXIT_CODE_INVALID_ARGUMENTS = 7
class _Version:
"""Parsed semantic version."""
major: int
minor: int
patch: int
suffix: str
def __init__(self, major: int, minor: int, patch: int, suffix: str):
self.major = major
self.minor = minor
self.patch = patch
self.suffix = suffix
def to_string(self) -> str:
suffix = "" if not self.suffix else "-" + self.suffix
return f"{self.major}.{self.minor}.{self.patch}{suffix}"
def to_pip_compatible_string(self) -> str:
return _Version.get_pip_compatible_string(self.to_string())
@staticmethod
def get_pip_compatible_string(version: str) -> str:
# See pip's version schema (see https://github.com/pypa/pip/issues/9188 , https://www.python.org/dev/peps/pep-0440/)
# pip compatible string: r"^v?([0-9]+)\.([0-9]+)\.([0-9]+)(?:\.[0-9A-Za-z]+)*?(?:\+[0-9A-Za-z-]+)?$"
if "-dev." in version:
return version.replace("-dev.", ".dev1+")
# Version does not have the branch name suffix
return version.replace("-dev", ".dev1")
@staticmethod
def get_version_from_string(version: str) -> Optional["_Version"]:
version_match = _Version.is_valid_version_format(version)
if version_match is None:
return None
major = int(version_match.group(1))
minor = int(version_match.group(2))
patch = int(version_match.group(3))
suffix = ""
if version_match.lastindex == 4:
suffix = version_match.group(4)
return _Version(major, minor, patch, suffix)
@staticmethod
def is_valid_version_format(version: str) -> Optional[Match[str]]:
return re.match(
r"^v?([0-9]+)\.([0-9]+)\.([0-9]+)(?:-([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+[0-9A-Za-z-]+)?$",
version,
)
def log(message: str) -> None:
"""Log message to stdout.
Args:
message (str): Message to log.
"""
print(message, flush=True)
def command_exists(
command: str, silent: bool = False, exit_on_error: bool = False
) -> bool:
"""Checks whether the `command` exists and is marked as executable.
Args:
command (str): Command to check.
silent (bool): If `True`, no message will be logged in case the command does not exist. Default is `False`.
exit_on_error (bool, optional): Exit process if the command does not exist. Defaults to `False`.
Returns:
bool: `True` if the commend exist and is executable.
"""
# Alternative:
# import distutils.spawn
# return distutils.spawn.find_executable(name)
from shutil import which
exists: bool = which(command) is not None
if not exists and not silent:
log(
f"The command {command} does not exist on the system or is not executable. Make sure to install {command}."
)
if exit_on_error:
exit_process(1)
return exists
def parse_arguments(
input_args: List[str] = None, argument_parser: argparse.ArgumentParser = None
) -> dict:
"""Parses all arguments and returns a sanitized & augmented list of arguments.
Sanitized means that, for example, the version is already checked and set depending on our build guidelines.
If arguments are not valid, exit the script run.
Args:
input_args (List[str], optional): List of arguments that are used instead of the arguments passed to the process. Defaults to None.
argument_parser (arparse.ArgumentParser, optional): An argument parser which is passed as a parents parser to the default ArgumentParser to be able to use additional flags besides the default ones.
Returns:
dict: The parsed default arguments thar are already checked for validity.
"""
argument_parser = argument_parser or argparse.ArgumentParser()
parser = _get_default_cli_arguments_parser(argument_parser)
parsed_args, _ = parser.parse_known_args(args=input_args)
if not input_args:
input_args = sys.argv
# convert args to dict
args = vars(parsed_args)
# Set defaults
if len(input_args) <= 1:
# Set default configuration if called without any arguments
args[FLAG_CHECK] = True
args[FLAG_MAKE] = True
args[FLAG_TEST] = True
if args.get(FLAG_TEST_MARKER) is None:
# Set test marker to an empty list for better access
args[FLAG_TEST_MARKER] = []
if args.get(_FLAG_SKIP_PATH) is None:
# Set skip path to an empty list for better access
args[_FLAG_SKIP_PATH] = []
# load from env variables
args = _load_from_env_variables(args, input_args)
if args.get(_FLAG_SANITIZED):
log("Sanatized Arguments: " + str(args))
return DashInsensitiveDict(args)
if not _is_valid_command_combination(args):
exit_process(EXIT_CODE_INVALID_ARGUMENTS)
try:
force = args.get(FLAG_FORCE)
# TODO: temp workaround: always use force in dev modus
if not args.get(FLAG_RELEASE):
force = True
version: Optional[_Version] = _get_version(
args.get(FLAG_VERSION), # type: ignore
force, # type: ignore
existing_versions=_get_version_tags(),
)
except _VersionInvalidFormatException as e:
log(str(e))
exit_process(EXIT_CODE_INVALID_VERSION)
except Exception:
version = None
if args.get(FLAG_RELEASE) and version is None:
log("For a release a valid semantic version has to be set.")
exit_process(EXIT_CODE_VERSION_IS_REQUIRED)
elif args.get(FLAG_RELEASE) is False and version is None:
latest_branch_version = _get_latest_branch_version()
if not latest_branch_version:
version = _Version(0, 0, 0, _get_dev_suffix(_get_current_branch()[0]))
else:
version = latest_branch_version
# higher minor version and add dev suffix
version.minor += 1
# Set patch to 0 since its a new minor version
version.patch = 0
# Apply dev prefix
version.suffix = _get_dev_suffix(_get_current_branch()[0])
elif args.get(FLAG_RELEASE) is False and args.get(FLAG_FORCE) is False and version:
version.suffix = _get_dev_suffix(_get_current_branch()[0])
assert version is not None
args[FLAG_VERSION] = version.to_string()
args[_FLAG_SANITIZED] = True
log("Sanatized Arguments: " + str(args))
return DashInsensitiveDict(args)
def _load_from_env_variables(sanatized_args: dict, program_args: List[str]) -> dict:
for argument in sanatized_args:
if not isinstance(sanatized_args[argument], str):
# Only load env variables for string variables
continue
if argument in " ".join(program_args) or argument.replace("_", "-") in " ".join(
program_args
):
# Argument was provided via command line arguments
continue
if os.environ.get(argument.upper()):
sanatized_args[argument] = os.environ.get(argument.upper())
if os.environ.get("INPUT_" + argument.upper()):
# Support for github action inputs
sanatized_args[argument] = os.environ.get("INPUT_" + argument.upper())
return sanatized_args
def _concat_command_line_arguments(args: Dict[str, Union[str, bool, List[str]]]) -> str:
command_line_arguments = ""
for arg in args:
arg_value = args[arg] # getattr(args, arg)
cli_arg_name = str(arg)
if not any(
(
sys_arg == f"--{str(cli_arg_name)}"
or sys_arg.startswith(f"--{str(cli_arg_name)}=")
or os.environ.get(cli_arg_name.upper())
)
for sys_arg in sys.argv
):
# For some args the underscores must be converted back to dashes,
# since the argparser initially transforms all dashes to underscores
cli_arg_name = cli_arg_name.replace("_", "-")
if arg_value:
# For boolean types, the existence of the flag is enough
if type(arg_value) == bool:
command_line_arguments += f" --{cli_arg_name}"
elif isinstance(arg_value, list):
for single_arg_value in arg_value:
command_line_arguments += f" --{cli_arg_name}={single_arg_value}"
else:
command_line_arguments += f" --{cli_arg_name}={arg_value}"
command_line_arguments = command_line_arguments.lstrip()
return command_line_arguments
def create_git_tag(
version: str, push: bool = False, force: bool = False, exit_on_error: bool = False
) -> subprocess.CompletedProcess:
"""Create an annotated git tag in the current HEAD via `git tag` and the provided version.
The version will be prefixed with 'v'.
If push is set, the tag is pushed to remote but only if the previous `git tag` command was successful.
Args:
version (str): The tag to be created. Will be prefixed with 'v'.
push (bool, optional): If true, push the tag to remote. Defaults to False.
force (bool, optional): If true, force the tag to be created. Defaults to False.
exit_on_error (bool): Exit program if the tag creation fails.
Returns:
subprocess.CompletedProcess: Returns the CompletedProcess object of either the `git tag` or the `git push tag` command. If `push` is set to true, the CompletedProcess of `git tag` is returned if it failed, otherwise the CompletedProcess object from the `git push tag` command is returned.
"""
force_flag = "-f" if force else ""
completed_process = run(
f"git tag -a -m 'Automatically tagged during build process.' {force_flag} v{version}",
disable_stderr_logging=True,
exit_on_error=exit_on_error,
)
if completed_process.returncode > 0:
log(
f"Executing `git tag` for version v{version} might have a problem: {completed_process.stderr}"
)
if completed_process.returncode == 0 and push:
completed_process = run(
f"git push origin v{version}", exit_on_error=exit_on_error
)
return completed_process
def build(component_path: str, args: Dict[str, Union[str, bool, List[str]]]) -> None:
"""Run the build logic of the specified component, except if the path is a (sub-)path in skipped-paths.
Args:
component_path (str): The path of the component to be built. The path must contain a build.py file.
args (Dict): The arguments to be passed to the component's build.py file. The default arguments that were used to call this
script are passed down to the component.
"""
if _is_path_skipped(component_path, args):
return
build_command = _create_build_cmd_from_args(component_path, args)
completed_process = run(build_command, exit_on_error=False)
if completed_process.returncode > 0:
log(
f"Failed to build module {component_path}. Code: {completed_process.returncode}."
)
exit_process(EXIT_CODE_GENERAL)
def run( # type: ignore
command: str,
disable_stdout_logging: bool = False,
disable_stderr_logging: bool = False,
exit_on_error: bool = True,
timeout: Optional[int] = None,
) -> subprocess.CompletedProcess:
"""Run a specified command.
Args:
command (str): The shell command that is executed via subprocess.Popen.
disable_stdout_logging (bool): Disable stdout logging when it is too much or handled by the caller.
exit_on_error (bool): Exit program if the exit code of the command is not 0.
timeout (Optional[int]): If the process does not terminate after timeout seconds, raise a TimeoutExpired exception.
Returns:
subprocess.CompletedProcess: State
"""
# Add timeout to command
if timeout:
command = f"timeout {timeout} {command}"
log(f"Executing: {command}")
with subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
# TODO: Collect stdout and stderr separately instead of merging them. This requires an implementation using thread to avoid dead locks:
# https://stackoverflow.com/questions/17190221/subprocess-popen-cloning-stdout-and-stderr-both-to-terminal-and-variables
stderr=subprocess.STDOUT if not disable_stderr_logging else subprocess.DEVNULL,
universal_newlines=True,
) as process:
try:
stdout = ""
with process.stdout: # type: ignore
for line in iter(process.stdout.readline, ""): # type: ignore
if not disable_stdout_logging:
log(line.rstrip("\n"))
stdout += line
exitcode = process.wait(timeout=timeout)
process.stdout.close() # type: ignore
if exit_on_error and exitcode != 0:
exit_process(exitcode)
return subprocess.CompletedProcess(
args=command, returncode=exitcode, stdout=stdout, stderr=stdout
)
except Exception as ex:
log(f"Exception during command run: {ex}")
process.terminate()
exit_process(1)
def exit_process(code: int = 0) -> None:
"""Exit the process with exit code.
`sys.exit` seems to be a bit unreliable, process just sleeps and does not exit.
So we are using os._exit instead and doing some manual cleanup.
"""
import atexit
import gc
gc.collect()
atexit._run_exitfuncs()
sys.stdout.flush()
os._exit(code)
def replace_in_files(
find: str,
replace: str,
file_paths: List[str],
regex: bool = False,
exit_on_error: bool = True,
) -> None:
"""Replaces a string or regex occurence in a collection of files.
Args:
find (str): A string to find and replace in the files.
replace (str): The string to replace it with.
file_paths (List[str]): Collection of file paths.
regex (bool, optional): If `True`, apply the find string as a regex notation. Defaults to `False`.
exit_on_error (bool, optional): If `True`, exit process as soon as error occures. Defaults to True.
"""
for file_path in file_paths:
if not os.path.exists(file_path):
log("File path does not exist for string replacement: " + file_path)
if exit_on_error:
exit_process(1)
try:
with open(file_path, "r+") as f:
data = f.read()
f.seek(0)
if regex:
f.write(re.sub(find, replace, data))
else:
f.write(data.replace(find, replace))
f.truncate()
except Exception as ex:
log(
"Failed to replace string in file: "
+ file_path
+ ". Exception: "
+ str(ex)
)
if exit_on_error:
exit_process(1)
def get_latest_version() -> Optional[str]:
"""Returns the latest version based on Git tags."""
try:
latest_version = _get_latest_branch_version()
assert latest_version is not None
return latest_version.to_string()
except Exception:
return None
def duplicate_folder(
src_path: str,
target_path: str,
preserve_target: bool = False,
exit_on_error: bool = True,
) -> bool:
"""Deprecated. Use `build_utils.copy` instead."""
return copy(src_path, target_path, preserve_target, exit_on_error)
def copy(
src_path: str,
target_path: str,
preserve_target: bool = False,
exit_on_error: bool = True,
) -> bool:
"""Copy the files from source to target.
Depending on the mode, it will either completely replace the target path with the source path or it will copy all files of the source directory to the target directory.
If `preserve_target` is `True`, if a file or directory with the same name exists at the target, it will be deleted first. Required directories will be created at the target so that the structure is preserved.
Example:
```
copy(
source_path="./temp/generated-openapi-client/src/",
target_path="./webapp/src/services/example-client/"
)
```
Args:
src_path (str): Source path to duplicate.
target_path (str): Target path to move the source folder.
The existing content in the folder will be deleted.
preserve_target (bool, optional): If `True`, the files/directories of the source target will be put into the target path instead of replacing the target directory.
exit_on_error (bool, optional): If `True`, exit process as soon as error occures. Defaults to True.
Returns:
bool: Returns `True` if the copy process was successful and `False` otherwise; if `exit_on_error` is True, the process exists instead of returning `False`.
"""
try:
if preserve_target:
# Use distutils copy tree method to preserve target data
# https://stackoverflow.com/questions/1868714/how-do-i-copy-an-entire-directory-of-files-into-an-existing-directory-using-pyth
from distutils.dir_util import copy_tree
copy_tree(src_path, target_path)
else:
if os.path.exists(target_path):
shutil.rmtree(target_path)
shutil.copytree(src_path, target_path)
except Exception as ex:
log("Failed to duplicate folder: " + str(ex))
if exit_on_error:
exit_process(1)
else:
return False
return True
# Private functions
def _get_current_branch() -> Tuple[str, str]:
"""Get the current branch name and type (feature, production, release etc.) if existing.
Returns:
Tuple: (branchname, type)
"""
full_branch_name = run(
"git branch --show-current", disable_stdout_logging=True, exit_on_error=False
).stdout.rstrip("\n")
if full_branch_name == "":
full_branch_name = "HEAD"
path_parts = full_branch_name.split("/")
if len(path_parts) == 1:
return (path_parts[0], "")
# if a branch name consists of multiple slashes, the parts are concatenated; otherwise it just consists of the normal branch name
# Example: "feature/foo/bar" -> (feature, foo-bar); "feature/foo" -> (feature, foo)
merged_branch_name = "-".join(path_parts[1:])
return (path_parts[0], merged_branch_name)
def _is_path_skipped(path: str, args: dict) -> bool:
"""Check whether the path is itself defined as a skip_path or is a sub-path of a skipped path.
Args:
path (str): The path to be checked
args (dict): The cli arguments that might contain paths to be skipped. Sub-pathes of these skip-pathes will be skipped as well.
Returns:
bool: Return true if the path should be skipped
"""
if _FLAG_SKIP_PATH not in args:
return False
skip_paths: list = args[_FLAG_SKIP_PATH]
skip_paths = skip_paths or []
real_path = os.path.realpath(path)
for skip_path in skip_paths:
real_skip_path = os.path.realpath(skip_path)
if real_path == real_skip_path or real_path.startswith(real_skip_path + os.sep):
return True
return False
def _get_default_cli_arguments_parser(
parser: argparse.ArgumentParser,
) -> argparse.ArgumentParser:
# NEW FLAGS
parser.add_argument(
f"--{FLAG_MAKE}", help="Make/compile/package all artifacts", action="store_true"
)
parser.add_argument(
f"--{FLAG_TEST}", help="Run unit and integration tests", action="store_true"
)
parser.add_argument(
f"--{FLAG_CHECK}", help="Run linting and style checks.", action="store_true"
)
parser.add_argument(
f"--{FLAG_RELEASE}",
help="Release all artifacts (e.g. to registries like DockerHub or NPM)",
action="store_true",
)
parser.add_argument(
f"--{FLAG_RUN}",
help="Run the component in development mode (e.g. dev server).",
action="store_true",
)
parser.add_argument(
f"--{FLAG_VERSION}", help="Version of the build (`MAJOR.MINOR.PATCH-TAG`)"
)
parser.add_argument(
f"--{FLAG_FORCE}",
help="Ignore all enforcements and warnings.",
action="store_true",
)
parser.add_argument(
"--" + _FLAG_SKIP_PATH.replace("_", "-"),
help="Skips the build phases for all (sub)paths provided here",
action="append",
)
parser.add_argument(
"--" + FLAG_TEST_MARKER.replace("_", "-"),
help="Provide custom markers for testing. The default marker for slow tests is `slow`.",
action="append",
)
parser.add_argument(
f"--{_FLAG_SANITIZED}",
help="Indicates that a parent build.py script already checked the validity of the passed arguments so that subsequent scripts don't do it again.",
action="store_true",
)
return parser
def _create_build_cmd_from_args(module_path: str, sanitized_args: dict) -> str:
build_command = f"{sys.executable} -u build.py " + _concat_command_line_arguments(
sanitized_args
)
working_dir = os.getcwd()
full_command = (
"cd '" + module_path + "' && " + build_command + " && cd '" + working_dir + "'"
)
log("Building " + module_path + " with: " + full_command)
return full_command
def _is_valid_command_combination(args: dict) -> bool:
if (
args.get(FLAG_RELEASE)
and not args.get(FLAG_VERSION)
and not args.get(FLAG_FORCE)
):
log(
f"Please provide a version for deployment (--{FLAG_VERSION}=MAJOR.MINOR.PATCH-TAG)"
)
return False
if args.get(FLAG_RELEASE) and not args.get(FLAG_TEST) and not args.get(FLAG_FORCE):
log(f"The release steps requires test to be executed (--{FLAG_TEST})")
return False
if args.get(FLAG_RELEASE) and not args.get(FLAG_MAKE) and not args.get(FLAG_FORCE):
log(f"The release steps requires make to be executed (--{FLAG_MAKE})")
return False
if args.get(FLAG_RELEASE):
current_branch, current_branch_type = _get_current_branch()
if (
current_branch.lower() not in _MAIN_BRANCH_NAMES
and current_branch_type.lower() not in _ALLOWED_BRANCH_TYPES_FOR_RELEASE
and not args.get(FLAG_FORCE)
):
log(
f"Release is only allowed from branches: [{', '.join(_MAIN_BRANCH_NAMES)}] or in branch types: [{', '.join(_ALLOWED_BRANCH_TYPES_FOR_RELEASE)}]"
)
return False
return True
def _get_version_tags() -> List["_Version"]:
unformatted_tags = _get_remote_git_tags()
versions = []
for tag in unformatted_tags:
tag_parts = tag.split("/")
tag = tag_parts[-1]
# only consider tags that resemble versions
version = _Version.get_version_from_string(tag)
if version is not None:
versions.append(version)
return versions
def _get_latest_branch_version() -> Optional[_Version]:
result = run(
"git describe --tags --match 'v[0-9].*' --abbrev=0",
disable_stdout_logging=True,
exit_on_error=False,
)
return _Version.get_version_from_string(result.stdout.rstrip("\n"))
def _get_remote_git_tags() -> List[str]:
if not os.getenv("GITHUB_TOKEN"):
# if no github token is set, don't try to get the tags from remote
return []
result = run(
"git ls-remote --tags --sort='-v:refname' --refs",
disable_stdout_logging=True,
exit_on_error=False,
)
return result.stdout.rstrip("\n").split("\n")
def _get_version(
version: str, force: bool = False, existing_versions: List[_Version] = []
) -> _Version:
"""Get validated version. If force is set to True, the version is allowed to be equal or smaller than the existing patch version.
Raises:
_VersionInvalidFormatException: Raised when the provided version's format is not valid
Exception: Raised when no version is passed
Args:
version (str): The version to be checked for validity. It will be tried to be transformed into a build_utils.Version object.
force (bool, optional): If set to true, the version can be equal or smaller than existing patch version version numbers
existing_versions (list, optional): The list of versions to be checked against
Returns:
Version: Validated version
"""
provided_version = version
if not provided_version:
raise Exception("No version is provided")
version_obj = _Version.get_version_from_string(provided_version)
if version_obj is None:
raise _VersionInvalidFormatException(
"The provided version {provided_version} is not in a valid format. Valid formats include 1.0.0, 1.0.0-dev or 1.0.0-dev.foo"
)
for existing_version in existing_versions:
if (
existing_version.major == version_obj.major
and existing_version.minor == version_obj.minor
and existing_version.patch >= version_obj.patch
and existing_version.suffix
== "" # Only consider release versions, not suffixed dev versions
and not force
):
raise _VersionInvalidFormatException(
f"A version ({existing_version.to_string()}) with the same or higher patch version as provided ({version_obj.to_string()}) already exists."
)
return version_obj
def _get_current_branch_version(
existing_versions: List[_Version] = [],
) -> Tuple[Optional[_Version], List[_Version]]:
"""Returns a tuple of the best suiting version based on our logic and all available versions.
Returns:
[Tuple]:(best suited version | None, list of all existing versions sorted from highest to lowest based on git's sorting algorithm)
"""
# TODO Check that latest dev tag is given although there should be only one dev version per branch
branch_name, branch_type = _get_current_branch()
for version in existing_versions:
if version.suffix == _get_dev_suffix(branch_name):
return (version, existing_versions)
return (None, existing_versions)
def _get_dev_suffix(branch_name: Optional[str]) -> str:
# TODO: ignore branch name as temp fix
# branch_name = branch_name or ""
# return "dev." + branch_name
return "dev"
class _VersionInvalidFormatException(Exception):
"""Raised when the provided version's format is not valid."""
pass