Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

format with ruff #385

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions .github/workflows/test-basic-funcs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,8 @@ jobs:
- name: Install dependencies
run: |
poetry install
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
poetry run flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
poetry run flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Lint with ruff
run: poetry run ruff check .
- name: Test avid.py
run: |
poetry run pytest unittest/test_avid.py
Expand Down
362 changes: 220 additions & 142 deletions javsp/__main__.py

Large diffs are not rendered by default.

122 changes: 66 additions & 56 deletions javsp/avid.py
Original file line number Diff line number Diff line change
@@ -1,149 +1,159 @@
"""获取和转换影片的各类番号(DVD ID, DMM cid, DMM pid)"""

import os
import re
from pathlib import Path


__all__ = ['get_id', 'get_cid', 'guess_av_type']
__all__ = ["get_id", "get_cid", "guess_av_type"]


from javsp.config import Cfg


def get_id(filepath_str: str) -> str:
"""从给定的文件路径中提取番号(DVD ID)"""
filepath = Path(filepath_str)
# 通常是接收文件的路径,当然如果是普通字符串也可以
ignore_pattern = re.compile('|'.join(Cfg().scanner.ignored_id_pattern))
norm = ignore_pattern.sub('', filepath.stem).upper()
if 'FC2' in norm:
ignore_pattern = re.compile("|".join(Cfg().scanner.ignored_id_pattern))
norm = ignore_pattern.sub("", filepath.stem).upper()
if "FC2" in norm:
# 根据FC2 Club的影片数据,FC2编号为5-7个数字
match = re.search(r'FC2[^A-Z\d]{0,5}(PPV[^A-Z\d]{0,5})?(\d{5,7})', norm, re.I)
match = re.search(r"FC2[^A-Z\d]{0,5}(PPV[^A-Z\d]{0,5})?(\d{5,7})", norm, re.I)
if match:
return 'FC2-' + match.group(2)
elif 'HEYDOUGA' in norm:
match = re.search(r'(HEYDOUGA)[-_]*(\d{4})[-_]0?(\d{3,5})', norm, re.I)
return "FC2-" + match.group(2)
elif "HEYDOUGA" in norm:
match = re.search(r"(HEYDOUGA)[-_]*(\d{4})[-_]0?(\d{3,5})", norm, re.I)
if match:
return '-'.join(match.groups())
elif 'GETCHU' in norm:
match = re.search(r'GETCHU[-_]*(\d+)', norm, re.I)
return "-".join(match.groups())
elif "GETCHU" in norm:
match = re.search(r"GETCHU[-_]*(\d+)", norm, re.I)
if match:
return 'GETCHU-' + match.group(1)
elif 'GYUTTO' in norm:
match = re.search(r'GYUTTO-(\d+)', norm, re.I)
return "GETCHU-" + match.group(1)
elif "GYUTTO" in norm:
match = re.search(r"GYUTTO-(\d+)", norm, re.I)
if match:
return 'GYUTTO-' + match.group(1)
elif '259LUXU' in norm: # special case having form of '259luxu'
match = re.search(r'259LUXU-(\d+)', norm, re.I)
return "GYUTTO-" + match.group(1)
elif "259LUXU" in norm: # special case having form of '259luxu'
match = re.search(r"259LUXU-(\d+)", norm, re.I)
if match:
return '259LUXU-' + match.group(1)
return "259LUXU-" + match.group(1)

else:
# 先尝试移除可疑域名进行匹配,如果匹配不到再使用原始文件名进行匹配
no_domain = re.sub(r'\w{3,10}\.(COM|NET|APP|XYZ)', '', norm, flags=re.I)
no_domain = re.sub(r"\w{3,10}\.(COM|NET|APP|XYZ)", "", norm, flags=re.I)
if no_domain != norm:
avid = get_id(no_domain)
if avid:
return avid
# 匹配缩写成hey的heydouga影片。由于番号分三部分,要先于后面分两部分的进行匹配
match = re.search(r'(?:HEY)[-_]*(\d{4})[-_]0?(\d{3,5})', norm, re.I)
match = re.search(r"(?:HEY)[-_]*(\d{4})[-_]0?(\d{3,5})", norm, re.I)
if match:
return 'heydouga-' + '-'.join(match.groups())
return "heydouga-" + "-".join(match.groups())
# 匹配片商 MUGEN 的奇怪番号。由于MK3D2DBD的模式,要放在普通番号模式之前进行匹配
match = re.search(r'(MKB?D)[-_]*(S\d{2,3})|(MK3D2DBD|S2M|S2MBD)[-_]*(\d{2,3})', norm, re.I)
match = re.search(
r"(MKB?D)[-_]*(S\d{2,3})|(MK3D2DBD|S2M|S2MBD)[-_]*(\d{2,3})", norm, re.I
)
if match:
if match.group(1) is not None:
avid = match.group(1) + '-' + match.group(2)
avid = match.group(1) + "-" + match.group(2)
else:
avid = match.group(3) + '-' + match.group(4)
avid = match.group(3) + "-" + match.group(4)
return avid
# 匹配IBW这样带有后缀z的番号
match = re.search(r'(IBW)[-_](\d{2,5}z)', norm, re.I)
match = re.search(r"(IBW)[-_](\d{2,5}z)", norm, re.I)
if match:
return match.group(1) + '-' + match.group(2)
return match.group(1) + "-" + match.group(2)
# 普通番号,优先尝试匹配带分隔符的(如ABC-123)
match = re.search(r'([A-Z]{2,10})[-_](\d{2,5})', norm, re.I)
match = re.search(r"([A-Z]{2,10})[-_](\d{2,5})", norm, re.I)
if match:
return match.group(1) + '-' + match.group(2)
return match.group(1) + "-" + match.group(2)
# 普通番号,运行到这里时表明无法匹配到带分隔符的番号
# 先尝试匹配东热的red, sky, ex三个不带-分隔符的系列
# (这三个系列已停止更新,因此根据其作品编号将数字范围限制得小一些以降低误匹配概率)
match = re.search(r'(RED[01]\d\d|SKY[0-3]\d\d|EX00[01]\d)', norm, re.I)
match = re.search(r"(RED[01]\d\d|SKY[0-3]\d\d|EX00[01]\d)", norm, re.I)
if match:
return match.group(1)
# 然后再将影片视作缺失了-分隔符来匹配
match = re.search(r'([A-Z]{2,})(\d{2,5})', norm, re.I)
match = re.search(r"([A-Z]{2,})(\d{2,5})", norm, re.I)
if match:
return match.group(1) + '-' + match.group(2)
return match.group(1) + "-" + match.group(2)
# 尝试匹配TMA制作的影片(如'T28-557',他家的番号很乱)
match = re.search(r'(T[23]8[-_]\d{3})', norm)
match = re.search(r"(T[23]8[-_]\d{3})", norm)
if match:
return match.group(1)
# 尝试匹配东热n, k系列
match = re.search(r'(N\d{4}|K\d{4})', norm, re.I)
match = re.search(r"(N\d{4}|K\d{4})", norm, re.I)
if match:
return match.group(1)
# 尝试匹配纯数字番号(无码影片)
match = re.search(r'(\d{6}[-_]\d{2,3})', norm)
match = re.search(r"(\d{6}[-_]\d{2,3})", norm)
if match:
return match.group(1)
# 如果还是匹配不了,尝试将')('替换为'-'后再试,少部分影片的番号是由')('分隔的
if ')(' in norm:
avid = get_id(norm.replace(')(', '-'))
if ")(" in norm:
avid = get_id(norm.replace(")(", "-"))
if avid:
return avid
# 如果最后仍然匹配不了番号,则尝试使用文件所在文件夹的名字去匹配
if filepath.parent.name != '': # haven't reach '.' or '/'

if filepath.parent.name != "": # haven't reach '.' or '/'
return get_id(filepath.parent.name)
else:
return ''
return ""


CD_POSTFIX = re.compile(r"([-_]\w|cd\d)$")


CD_POSTFIX = re.compile(r'([-_]\w|cd\d)$')
def get_cid(filepath: str) -> str:
"""尝试将给定的文件名匹配为CID(Content ID)"""
basename = os.path.splitext(os.path.basename(filepath))[0]
# 移除末尾可能带有的分段影片序号
possible = CD_POSTFIX.sub('', basename)
possible = CD_POSTFIX.sub("", basename)
# cid只由数字、小写字母和下划线组成
match = re.match(r'^([a-z\d_]+)$', possible, re.A)
match = re.match(r"^([a-z\d_]+)$", possible, re.A)
if match:
possible = match.group(1)
if '_' not in possible:
if "_" not in possible:
# 长度为7-14的cid就占了约99.01%. 最长的cid为24,但是长为20-24的比例不到十万分之五
match = re.match(r'^[a-z\d]{7,19}$', possible)
match = re.match(r"^[a-z\d]{7,19}$", possible)
if match:
return possible
else:
# 绝大多数都只有一个下划线(只有约万分之一带有两个下划线)
match2 = re.match(r'''^h_\d{3,4}[a-z]{1,10}\d{2,5}[a-z\d]{0,8}$ # 约 99.17%
match2 = re.match(
r"""^h_\d{3,4}[a-z]{1,10}\d{2,5}[a-z\d]{0,8}$ # 约 99.17%
|^\d{3}_\d{4,5}$ # 约 0.57%
|^402[a-z]{3,6}\d*_[a-z]{3,8}\d{5,6}$ # 约 0.09%
|^h_\d{3,4}wvr\d\w\d{4,5}[a-z\d]{0,8}$ # 约 0.06%
$''', possible, re.VERBOSE)
$""",
possible,
re.VERBOSE,
)
if match2:
return possible
return ''
return ""


def guess_av_type(avid: str) -> str:
"""识别给定的番号所属的分类: normal, fc2, cid"""
match = re.match(r'^FC2-\d{5,7}$', avid, re.I)
match = re.match(r"^FC2-\d{5,7}$", avid, re.I)
if match:
return 'fc2'
match = re.match(r'^GETCHU-(\d+)',avid,re.I)
return "fc2"
match = re.match(r"^GETCHU-(\d+)", avid, re.I)
if match:
return 'getchu'
match = re.match(r'^GYUTTO-(\d+)',avid,re.I)
return "getchu"
match = re.match(r"^GYUTTO-(\d+)", avid, re.I)
if match:
return 'gyutto'
return "gyutto"
# 如果传入的avid完全匹配cid的模式,则将影片归类为cid
cid = get_cid(avid)
if cid == avid:
return 'cid'
return "cid"
# 以上都不是: 默认归类为normal
return 'normal'
return "normal"


if __name__ == "__main__":
print(get_id('FC2-123456/Unknown.mp4'))
print(get_id("FC2-123456/Unknown.mp4"))
77 changes: 45 additions & 32 deletions javsp/chromium.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""解析Chromium系浏览器Cookies的相关函数"""

import os
import sys
import json
Expand All @@ -9,7 +10,7 @@
from shutil import copyfile
from datetime import datetime

__all__ = ['get_browsers_cookies']
__all__ = ["get_browsers_cookies"]


from cryptography.hazmat.primitives.ciphers.aead import AESGCM
Expand All @@ -18,48 +19,55 @@
logger = logging.getLogger(__name__)


class Decrypter():
class Decrypter:
def __init__(self, key):
self.key = key

def decrypt(self, encrypted_value):
nonce = encrypted_value[3:3+12]
ciphertext = encrypted_value[3+12:-16]
nonce = encrypted_value[3 : 3 + 12]
ciphertext = encrypted_value[3 + 12 : -16]
tag = encrypted_value[-16:]
cipher = AES.new(self.key, AES.MODE_GCM, nonce=nonce)
plaintext = cipher.decrypt_and_verify(ciphertext, tag).decode('utf-8')
plaintext = cipher.decrypt_and_verify(ciphertext, tag).decode("utf-8")
return plaintext


def get_browsers_cookies():
"""获取系统上的所有Chromium系浏览器的JavDB的Cookies"""
# 不予支持: Opera, 360安全&极速, 搜狗使用非标的用户目录或数据格式; QQ浏览器屏蔽站点
user_data_dirs = {
'Chrome': '/Google/Chrome/User Data',
'Chrome Beta': '/Google/Chrome Beta/User Data',
'Chrome Canary': '/Google/Chrome SxS/User Data',
'Chromium': '/Google/Chromium/User Data',
'Edge': '/Microsoft/Edge/User Data',
'Vivaldi': '/Vivaldi/User Data'
"Chrome": "/Google/Chrome/User Data",
"Chrome Beta": "/Google/Chrome Beta/User Data",
"Chrome Canary": "/Google/Chrome SxS/User Data",
"Chromium": "/Google/Chromium/User Data",
"Edge": "/Microsoft/Edge/User Data",
"Vivaldi": "/Vivaldi/User Data",
}
LocalAppDataDir = os.getenv('LOCALAPPDATA')
LocalAppDataDir = os.getenv("LOCALAPPDATA")
all_browser_cookies = []
exceptions = []
for brw, path in user_data_dirs.items():
user_dir = LocalAppDataDir + path
cookies_files = glob(user_dir+'/*/Cookies') + glob(user_dir+'/*/Network/Cookies')
local_state = user_dir+'/Local State'
cookies_files = glob(user_dir + "/*/Cookies") + glob(
user_dir + "/*/Network/Cookies"
)
local_state = user_dir + "/Local State"
if os.path.exists(local_state):
key = decrypt_key(local_state)
decrypter = Decrypter(key)
for file in cookies_files:
profile = brw + ": " + file.split('User Data')[1].split(os.sep)[1]
profile = brw + ": " + file.split("User Data")[1].split(os.sep)[1]
file = os.path.normpath(file)
try:
records = get_cookies(file, decrypter)
if records:
# 将records转换为便于使用的格式
for site, cookies in records.items():
entry = {'profile': profile, 'site': site, 'cookies': cookies}
entry = {
"profile": profile,
"site": site,
"cookies": cookies,
}
all_browser_cookies.append(entry)
except Exception as e:
exceptions.append(e)
Expand All @@ -78,45 +86,51 @@ def convert_chrome_utc(chrome_utc):
unix_utc = datetime.fromtimestamp(second)
return unix_utc


def decrypt_key_win(local_state):
"""从Local State文件中提取并解密出Cookies文件的密钥"""
# Chrome 80+ 的Cookies解密方法参考自: https://stackoverflow.com/a/60423699/6415337
import win32crypt
with open(local_state, 'rt', encoding='utf-8') as file:
encrypted_key = json.loads(file.read())['os_crypt']['encrypted_key']
encrypted_key = base64.b64decode(encrypted_key) # Base64 decoding
encrypted_key = encrypted_key[5:] # Remove DPAPI
decrypted_key = win32crypt.CryptUnprotectData(encrypted_key, None, None, None, 0)[1] # Decrypt key

with open(local_state, "rt", encoding="utf-8") as file:
encrypted_key = json.loads(file.read())["os_crypt"]["encrypted_key"]
encrypted_key = base64.b64decode(encrypted_key) # Base64 decoding
encrypted_key = encrypted_key[5:] # Remove DPAPI
decrypted_key = win32crypt.CryptUnprotectData(encrypted_key, None, None, None, 0)[
1
] # Decrypt key
return decrypted_key


def decrypt_key_linux(local_state):
"""从Local State文件中提取并解密出Cookies文件的密钥,适用于Linux"""
# 读取Local State文件中的密钥
with open(local_state, 'rt', encoding='utf-8') as file:
encrypted_key = json.loads(file.read())['os_crypt']['encrypted_key']
with open(local_state, "rt", encoding="utf-8") as file:
encrypted_key = json.loads(file.read())["os_crypt"]["encrypted_key"]
encrypted_key = base64.b64decode(encrypted_key)
encrypted_key = encrypted_key[5:]
key = encrypted_key
nonce = b' ' * 12
nonce = b" " * 12
aesgcm = AESGCM(key)
decrypted_key = aesgcm.decrypt(nonce, encrypted_key, None)
return decrypted_key


decrypt_key = decrypt_key_win if sys.platform == 'win32' else decrypt_key_linux
decrypt_key = decrypt_key_win if sys.platform == "win32" else decrypt_key_linux


def get_cookies(cookies_file, decrypter, host_pattern='javdb%.com'):
def get_cookies(cookies_file, decrypter, host_pattern="javdb%.com"):
"""从cookies_file文件中查找指定站点的所有Cookies"""
# 复制Cookies文件到临时目录,避免直接操作原始的Cookies文件
temp_dir = os.getenv('TMPDIR', os.getenv('TEMP', os.getenv('TMP', '.')))
temp_cookie = os.path.join(temp_dir, 'Cookies')
temp_dir = os.getenv("TMPDIR", os.getenv("TEMP", os.getenv("TMP", ".")))
temp_cookie = os.path.join(temp_dir, "Cookies")
copyfile(cookies_file, temp_cookie)
# 连接数据库进行查询
conn = sqlite3.connect(temp_cookie)
cursor = conn.cursor()
cursor.execute(f'SELECT host_key, name, encrypted_value, expires_utc FROM cookies WHERE host_key LIKE "{host_pattern}"')
cursor.execute(
f'SELECT host_key, name, encrypted_value, expires_utc FROM cookies WHERE host_key LIKE "{host_pattern}"'
)
# 将查询结果按照host_key进行组织
now = datetime.now()
records = {}
Expand All @@ -127,7 +141,7 @@ def get_cookies(cookies_file, decrypter, host_pattern='javdb%.com'):
if expires > now:
d[name] = decrypter.decrypt(encrypted_value)
# Cookies的核心字段是'_jdb_session',因此如果records中缺失此字段(说明已过期),则对应的Cookies不再有效
valid_records = {k: v for k, v in records.items() if '_jdb_session' in v}
valid_records = {k: v for k, v in records.items() if "_jdb_session" in v}
conn.close()
os.remove(temp_cookie)
return valid_records
Expand All @@ -136,5 +150,4 @@ def get_cookies(cookies_file, decrypter, host_pattern='javdb%.com'):
if __name__ == "__main__":
all_cookies = get_browsers_cookies()
for d in all_cookies:
print('{:<20}{}'.format(d['profile'], d['site']))

print("{:<20}{}".format(d["profile"], d["site"]))
Loading
Loading