-
Notifications
You must be signed in to change notification settings - Fork 0
/
vaultwarden.py
174 lines (147 loc) · 6.69 KB
/
vaultwarden.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#! /usr/bin/env python3
from pathlib import Path
from datetime import datetime
import requests
from pytz import timezone
import sqlite3
from tqdm import tqdm
from shutil import copytree, ignore_patterns, make_archive, rmtree, move
import configparser
import os
webhook_url = 'https://discord.com/api/webhooks/<embed>'
# Read the password from warden.ini file
config = configparser.ConfigParser()
config.read('/root/.secrets/warden.ini')
password = config['Vaultwarden']['password']
class Backup:
"""
Make a backup of a vaultwarden installation.
We create a backups directory, in which we create a staging subdirectory.
Files are copied into the stage, according to the instructions in
https://github.com/dani-garcia/vaultwarden/wiki/Backing-up-your-vault.
The stage is then archived into a .tar.bz2, and removed.
"""
def __init__(self,
datadir="/path/to/vaultwarden",
backupdir="/path/to/backup/Vaultwarden",
debug=True):
""" Constructor.
datadir: Location of the vaultwarden installation. Must be readable to the program.
backupdir: Location where the staging subdirectory will be created. Later the .tar.bz2 will be left here.
The staging directory will be {backupdir}/backup-{now}.
debug: prints some messages when set to True (Default: False).
"""
self.now = datetime.now().strftime("%m-%d-%Y")
self.debug = debug
self.datadir = Path(datadir)
self.backupdir = Path(backupdir)
self.stagedir = None
def make_staging(self):
""" Create the staging directory in /tmp. """
self.stagedir = Path("/tmp") / f"backup-{self.now}"
if self.debug:
print(f"Making staging {self.stagedir}")
self.stagedir.mkdir(parents=True, exist_ok=False)
def cleanup_staging(self):
""" Remove the staging directory if it exists. """
if self.stagedir is not None and self.stagedir.exists():
if self.debug:
print(f"Remove staging {self.stagedir}.")
rmtree(self.stagedir, ignore_errors=False)
def backup_db(self):
""" Make a backup of the sqlite3 database separately, using the iterdump() method for database backups. """
data_dbfile = self.datadir / "db.sqlite3"
backup_dbfile = self.stagedir / "db.sqlite3"
timeout_seconds = 30 # Adjust the timeout value as needed
with sqlite3.connect(str(data_dbfile), timeout=timeout_seconds) as con, \
sqlite3.connect(str(backup_dbfile), timeout=timeout_seconds) as backup_con:
dump_gen = con.iterdump()
dump_gen_list = list(dump_gen)
total_commands = len(dump_gen_list)
with tqdm(total=total_commands, desc="Backing up database", unit=" commands") as progress_bar:
for command in dump_gen_list:
if command.strip().lower() != "commit;": # Filter out the "COMMIT;" statement
with backup_con:
backup_con.executescript(command)
progress_bar.update(1)
def backup_everything_else(self):
""" Using copytree(), we make a copy of all things except the database and the staging directory. """
if self.debug:
print(f"Copy files from {self.datadir} to {self.stagedir}.")
copytree(self.datadir, self.stagedir, dirs_exist_ok=True, ignore=ignore_patterns('db.sqlite3*', 'staging'))
def get_backup_filename(self):
return f"backup-vaultwarden-{self.now}.tar.bz2"
def backup_bztar(self):
""" Compress the staging directory into a .tar.bz2 and move to backupdir. """
if self.debug:
print(f"Archive {self.stagedir} into {self.stagedir}.tar.bz2.")
tar_filename = f"{self.stagedir.parent}/{self.stagedir.name}.tar.bz2"
make_archive(str(self.stagedir.parent / self.stagedir.name), "bztar", self.stagedir)
move(tar_filename, self.backupdir / self.get_backup_filename())
def send_discord_message(self, filename, success=True, error_output=None):
""" Sends a Discord message. """
headers = {
"Content-Type": "application/json"
}
if success:
now = datetime.now(timezone('US/Eastern')).strftime(
"%m/%d/%Y %I:%M:%S %p %Z")
embed_data = {
"embeds": [{
"title": "Vaultwarden Backup Complete",
"thumbnail": {
"url": "https://i.imgur.com/2z5s0UP.png"
},
"fields": [{
"name": "Filename",
"value": filename
}, {
"name": "Date",
"value": now
}],
"color": 3066993
}]
}
else:
embed_data = {
"embeds": [{
"title": "Vaultwarden Backup failed",
"description": f'```{error_output}```',
"color": 15158332
}]
}
response = requests.post(webhook_url, headers=headers, json=embed_data)
if response.status_code != 204:
print(f"Failed to send Discord message: {response.text}")
def backup(self):
try:
self.make_staging()
self.backup_everything_else()
self.backup_db()
self.backup_bztar()
# Encrypt the tar file
tar_file = self.backupdir / self.get_backup_filename()
encrypted_tar_file = tar_file.with_suffix('.tar.bz2.enc')
os.system(f'7z a -p{password} -y {encrypted_tar_file} {tar_file}')
os.remove(tar_file)
self.cleanup_staging()
self.send_discord_message(encrypted_tar_file.name)
except Exception as e:
error_output = str(e)
self.send_discord_message(None, success=False, error_output=error_output) # Send failure notification
raise # Re-raise the exception to show the error message in the console
def expire(self, max_backups=5):
""" Expire Backups older than the most recent max_backups (default: 5). """
backup_files = list(self.backupdir.glob("*.tar.bz2.enc"))
backup_files.sort(key=lambda p: p.stat().st_ctime, reverse=True)
for idx, p in enumerate(backup_files):
if idx >= max_backups:
if self.debug:
print(f"Expire file {p} (index: {idx}).")
p.unlink()
else:
if self.debug:
print(f"File {p} still good (index: {idx}).")
b = Backup()
b.backup()
b.expire()