forked from kushaldas/johnnycanencrypt
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Attempt on using pathlib to deal better with os differences
- Loading branch information
Showing
2 changed files
with
18 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,11 @@ | ||
# SPDX-FileCopyrightText: © 2020 Kushal Das <[email protected]> | ||
# SPDX-License-Identifier: LGPL-3.0-or-later | ||
|
||
import os | ||
import shutil | ||
import sqlite3 | ||
import urllib.parse | ||
from datetime import datetime | ||
from enum import Enum | ||
from pprint import pprint | ||
from pathlib import Path | ||
from typing import Dict, List, Optional, Union, Tuple, Any | ||
|
||
import httpx | ||
|
@@ -147,12 +145,12 @@ class KeyStore: | |
"""Returns `KeyStore` class object, takes the directory path as string.""" | ||
|
||
def __init__(self, path: str) -> None: | ||
fullpath = os.path.abspath(path) | ||
if not os.path.exists(fullpath): | ||
fullpath = Path(path).absolute() | ||
if not fullpath.exists(): | ||
raise OSError(f"The {fullpath} does not exist.") | ||
self.dbpath = os.path.join(fullpath, "jce.db") | ||
self.dbpath = fullpath / "jce.db" | ||
self.path = fullpath | ||
if not os.path.exists(self.dbpath): | ||
if not self.dbpath.exists(): | ||
con = sqlite3.connect(self.dbpath) | ||
with con: | ||
cursor = con.cursor() | ||
|
@@ -196,8 +194,8 @@ def upgrade_if_required(self): | |
return | ||
# Temporay db setup | ||
oldpath = self.dbpath | ||
self.dbpath = os.path.join(self.path, "jce_upgrade.db") | ||
if os.path.exists(self.dbpath): # Means the upgrade db already exist. | ||
self.dbpath = self.path / "jce_upgrade.db" | ||
if self.dbpath.exists(): # Means the upgrade db already exist. | ||
# Unrecoverable error | ||
raise RuntimeError( | ||
f"{self.dbpath} already exists, please remove and then try again." | ||
|
@@ -245,7 +243,7 @@ def upgrade_if_required(self): | |
sql = "UPDATE keys set oncard=?, primary_on_card=? where fingerprint=?" | ||
cursor.execute(sql, (oncard, primary_on_card, fingerprint)) | ||
# Now let us rename the file | ||
os.rename(self.dbpath, oldpath) | ||
self.dbpath.rename(oldpath) | ||
self.dbpath = oldpath | ||
|
||
def update_password(self, key: Key, password: str, newpassword: str) -> Key: | ||
|
@@ -558,7 +556,7 @@ def add_userid(self, key: Key, userid: str, password: str) -> Key: | |
# To make sure we actually have a secret key | ||
assert keytype == True | ||
# Let us write the new keydata to the disk | ||
key_filename = os.path.join(self.path, f"{fingerprint}.sec") | ||
key_filename = self.path / f"{fingerprint}.sec" | ||
with open(key_filename, "wb") as fobj: | ||
fobj.write(newcert) | ||
con = sqlite3.connect(self.dbpath) | ||
|
@@ -627,7 +625,7 @@ def revoke_userid(self, key: Key, userid: str, password: str) -> Key: | |
# To make sure we actually have a secret key | ||
assert keytype == True | ||
# Let us write the new keydata to the disk | ||
key_filename = os.path.join(self.path, f"{fingerprint}.sec") | ||
key_filename = self.path / f"{fingerprint}.sec" | ||
with open(key_filename, "wb") as fobj: | ||
fobj.write(newcert) | ||
con = sqlite3.connect(self.dbpath) | ||
|
@@ -997,11 +995,11 @@ def create_key( | |
can_primary_expire, | ||
) | ||
# Now save the secret key | ||
key_filename = os.path.join(self.path, f"{fingerprint}.sec") | ||
key_filename = self.path / f"{fingerprint}.sec" | ||
with open(key_filename, "w") as fobj: | ||
fobj.write(secret) | ||
|
||
key = self.import_key(key_filename) | ||
key = self.import_key(key_filename.as_posix()) | ||
|
||
# TODO: should we remove the key_filename from the disk? | ||
return key | ||
|
@@ -1121,7 +1119,7 @@ def encrypt_file(self, keys, inputfilepath, outputfilepath, armor=True): | |
fh = inputfilepath | ||
use_filehandler = True | ||
if check_path: # Only verify if it is a file path | ||
if not os.path.exists(inputfilepath): | ||
if not Path(inputfilepath).exists(): | ||
raise FileNotFoundError(f"{inputfilepath} can not be found.") | ||
|
||
if not isinstance(keys, list): | ||
|
@@ -1351,11 +1349,11 @@ def verify_file_detached( | |
else: | ||
k = key | ||
|
||
if not os.path.exists(signature_path): | ||
if not Path(signature_path).exists(): | ||
raise FileNotFoundError( | ||
f"The signature file at {signature_path} is missing." | ||
) | ||
if not os.path.exists(filepath): | ||
if not Path(filepath).exists(): | ||
raise FileNotFoundError(f"The file at {str(filepath)} is missing.") | ||
|
||
# Let us read the signature | ||
|
@@ -1380,7 +1378,7 @@ def verify_file(self, key: Union[str, Key], filepath): | |
else: | ||
k = key | ||
|
||
if not os.path.exists(filepath): | ||
if not Path(filepath).exists(): | ||
raise FileNotFoundError(f"The file at {str(filepath)} is missing.") | ||
|
||
if isinstance(filepath, str): | ||
|
@@ -1428,7 +1426,7 @@ def verify_and_extract_file( | |
else: | ||
k = key | ||
|
||
if not os.path.exists(filepath): | ||
if not Path(filepath).exists(): | ||
raise FileNotFoundError(f"The file at {str(filepath)} is missing.") | ||
|
||
if isinstance(filepath, str): | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters