Skip to content

Commit

Permalink
Move functions under new class
Browse files Browse the repository at this point in the history
  • Loading branch information
AT0myks committed Dec 14, 2023
1 parent 747b29b commit 6841490
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 159 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,13 @@ You should not use it to repack a custom firmware.

```py
from reolinkfw import ReolinkFirmware, get_info
from reolinkfw.extract import extract_pak

url = "https://reolink-storage.s3.amazonaws.com/website/firmware/20200523firmware/RLC-410-5MP_20_20052300.zip"
print(get_info(url))
file = "/home/ben/RLC-410-5MP_20_20052300.zip"
print(get_info(file))
with ReolinkFirmware.from_file(file) as fw:
extract_pak(fw)
fw.extract_pak()
```

In most cases where a URL is used, it will be a direct link to the file
Expand Down
172 changes: 127 additions & 45 deletions reolinkfw/__init__.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,44 @@
import asyncio
import hashlib
import io
import posixpath
import re
from collections.abc import Iterable, Iterator, Mapping
from collections.abc import Iterator, Mapping
from contextlib import redirect_stdout
from functools import partial
from pathlib import Path
from typing import IO, Any, BinaryIO, Optional, Union
from urllib.parse import parse_qsl, urlparse
from zipfile import ZipFile, is_zipfile

import aiohttp
import pybcl
from aiohttp.typedefs import StrOrURL
from lxml.etree import fromstring
from lxml.html import document_fromstring
from pakler import PAK, Section, is_pak_file
from pycramfs import Cramfs
from pycramfs.extract import extract_dir as extract_cramfs
from PySquashfsImage import SquashFsImage
from PySquashfsImage.extract import extract_dir as extract_squashfs
from ubireader.ubifs import ubifs as ubifs_
from ubireader.ubifs.output import extract_files as extract_ubifs
from ubireader.ubi_io import ubi_file
from ubireader.utils import guess_leb_size

from reolinkfw.tmpfile import TempFile
from reolinkfw.typedefs import Buffer, Files, StrPath, StrPathURL
from reolinkfw.ubifs import UBIFS
from reolinkfw.uboot import get_arch_name, get_uboot_version, get_uimage_header
from reolinkfw.uboot import LegacyImageHeader, get_arch_name
from reolinkfw.util import (
ONEMIB,
FileType,
SectionFile,
closing_ubifile,
get_cache_file,
get_fs_from_ubi,
has_cache,
make_cache_file,
sha256_pak
)

__version__ = "1.1.0"
Expand All @@ -49,6 +60,7 @@ def __init__(self, fd: BinaryIO, offset: int = 0, closefd: bool = True) -> None:
self._kernel_section_name = self._get_kernel_section_name()
self._sdict = {s.name: s for s in self}
self._open_files = 1
self._fs_sections = [s for s in self if s.name in FS_SECTIONS]

def __del__(self) -> None:
self.close()
Expand Down Expand Up @@ -93,6 +105,117 @@ def close(self) -> None:
self._fdclose(self._fd)
self._fd = None

def sha256(self) -> str:
sha = hashlib.sha256()
self._fd.seek(0)
for block in iter(partial(self._fd.read, ONEMIB), b''):
sha.update(block)
return sha.hexdigest()

def get_uboot_version(self) -> Optional[str]:
for section in self:
if section.len and "uboot" in section.name.lower():
# This section is always named 'uboot' or 'uboot1'.
with self.open(section) as f:
if f.peek(len(pybcl.BCL_MAGIC_BYTES)) == pybcl.BCL_MAGIC_BYTES:
# Sometimes section.len - sizeof(hdr) is 1 to 3 bytes larger
# than hdr.size. The extra bytes are 0xff (padding?). This
# could explain why the compressed size is added to the header.
hdr = pybcl.HeaderVariant.from_fd(f)
data = pybcl.decompress(f.read(hdr.size), hdr.algo, hdr.outsize)
else:
data = f.read(section.len)
match = re.search(b"U-Boot [0-9]{4}\.[0-9]{2}.*? \(.*?\)", data)
return match.group().decode() if match is not None else None
return None

def get_uimage_header(self) -> LegacyImageHeader:
for section in self:
with self.open(section) as f:
if section.len and FileType.from_magic(f.peek(4)) == FileType.UIMAGE:
# This section is always named 'KERNEL' or 'kernel'.
return LegacyImageHeader.from_fd(f)
raise Exception("No kernel section found")

def get_fs_info(self) -> list[dict[str, str]]:
result = []
for section in self._fs_sections:
with self.open(section) as f:
fs = FileType.from_magic(f.read(4))
if fs == FileType.UBI:
f.seek(266240)
fs = FileType.from_magic(f.read(4))
result.append({
"name": section.name,
"type": fs.name.lower() if fs is not None else "unknown"
})
return result

async def get_info_from_pak(self) -> dict[str, Any]:
ha = await asyncio.to_thread(self.sha256)
app = self._fs_sections[-1]
with self.open(app) as f:
fs = FileType.from_magic(f.read(4))
if fs == FileType.CRAMFS:
files = await asyncio.to_thread(get_files_from_cramfs, f, 0, False)
elif fs == FileType.UBI:
files = await asyncio.to_thread(get_files_from_ubi, f, app.len, 0)
elif fs == FileType.SQUASHFS:
files = await asyncio.to_thread(get_files_from_squashfs, f, 0, False)
else:
return {"error": "Unrecognized image type", "sha256": ha}
uimage = self.get_uimage_header()
return {
**get_info_from_files(files),
"os": "Linux" if uimage.os == 5 else "Unknown",
"architecture": get_arch_name(uimage.arch),
"kernel_image_name": uimage.name,
"uboot_version": self.get_uboot_version(),
"filesystems": self.get_fs_info(),
"sha256": ha
}

def extract_file_system(self, section: Section, dest: Optional[Path] = None) -> None:
dest = (Path.cwd() / "reolink_fs") if dest is None else dest
dest.mkdir(parents=True, exist_ok=True)
with self.open(section) as f:
fs = FileType.from_magic(f.read(4))
if fs == FileType.UBI:
fs_bytes = get_fs_from_ubi(f, section.len, 0)
fs = FileType.from_magic(fs_bytes[:4])
if fs == FileType.UBIFS:
with TempFile(fs_bytes) as file:
block_size = guess_leb_size(file)
with closing_ubifile(ubi_file(file, block_size)) as ubifile:
with redirect_stdout(io.StringIO()):
# Files that already exist are not written again.
extract_ubifs(ubifs_(ubifile), dest)
elif fs == FileType.SQUASHFS:
with SquashFsImage.from_bytes(fs_bytes) as image:
extract_squashfs(image.root, dest, True)
else:
raise Exception("Unknown file system in UBI")
elif fs == FileType.SQUASHFS:
with SquashFsImage(f, 0, False) as image:
extract_squashfs(image.root, dest, True)
elif fs == FileType.CRAMFS:
with Cramfs.from_fd(f, 0, False) as image:
extract_cramfs(image.rootdir, dest, True)
else:
raise Exception("Unknown file system")

def extract_pak(self, dest: Optional[Path] = None, force: bool = False) -> None:
dest = (Path.cwd() / "reolink_firmware") if dest is None else dest
dest.mkdir(parents=True, exist_ok=force)
rootfsdir = [s.name for s in self if s.name in ROOTFS_SECTIONS][0]
for section in self:
if section.name in FS_SECTIONS:
if section.name == "app":
outpath = dest / rootfsdir / "mnt" / "app"
else:
outpath = dest / rootfsdir
self.extract_file_system(section, outpath)


async def download(url: StrOrURL) -> Union[bytes, int]:
"""Return resource as bytes.
Expand Down Expand Up @@ -187,47 +310,6 @@ def is_local_file(string: StrPath) -> bool:
return Path(string).is_file()


def get_fs_info(fw: ReolinkFirmware, fs_sections: Iterable[Section]) -> list[dict[str, str]]:
result = []
for section in fs_sections:
with fw.open(section) as f:
fs = FileType.from_magic(f.read(4))
if fs == FileType.UBI:
f.seek(266240)
fs = FileType.from_magic(f.read(4))
result.append({
"name": section.name,
"type": fs.name.lower() if fs is not None else "unknown"
})
return result


async def get_info_from_pak(fw: ReolinkFirmware) -> dict[str, Any]:
ha = await asyncio.to_thread(sha256_pak, fw)
fs_sections = [s for s in fw if s.name in FS_SECTIONS]
app = fs_sections[-1]
with fw.open(app) as f:
fs = FileType.from_magic(f.read(4))
if fs == FileType.CRAMFS:
files = await asyncio.to_thread(get_files_from_cramfs, f, 0, False)
elif fs == FileType.UBI:
files = await asyncio.to_thread(get_files_from_ubi, f, app.len, 0)
elif fs == FileType.SQUASHFS:
files = await asyncio.to_thread(get_files_from_squashfs, f, 0, False)
else:
return {"error": "Unrecognized image type", "sha256": ha}
uimage = get_uimage_header(fw)
return {
**get_info_from_files(files),
"os": "Linux" if uimage.os == 5 else "Unknown",
"architecture": get_arch_name(uimage.arch),
"kernel_image_name": uimage.name,
"uboot_version": get_uboot_version(fw),
"filesystems": get_fs_info(fw, fs_sections),
"sha256": ha
}


async def direct_download_url(url: str) -> str:
if url.startswith("https://drive.google.com/file/d/"):
return f"https://drive.google.com/uc?id={url.split('/')[5]}&confirm=t"
Expand Down Expand Up @@ -290,7 +372,7 @@ async def get_info(file_or_url: StrPathURL, use_cache: bool = True) -> list[dict
return [{"file": file_or_url, "error": str(e)}]
if not paks:
return [{"file": file_or_url, "error": "No PAKs found in ZIP file"}]
info = [{**await get_info_from_pak(pakfile), "file": file_or_url, "pak": pakname} for pakname, pakfile in paks]
info = [{**await pakfile.get_info_from_pak(), "file": file_or_url, "pak": pakname} for pakname, pakfile in paks]
for _, pakfile in paks:
pakfile.close()
return info
6 changes: 2 additions & 4 deletions reolinkfw/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
from pathlib import Path, PurePath

from reolinkfw import __version__, get_info, get_paks
from reolinkfw.extract import extract_pak
from reolinkfw.util import sha256_pak

HW_FIELDS = ("board_type", "detail_machine_type", "board_name")

Expand Down Expand Up @@ -50,8 +48,8 @@ async def extract(args: Namespace) -> None:
raise Exception("No PAKs found in ZIP file")
dest = Path.cwd() if args.dest is None else args.dest
for pakname, pakfile in paks:
name = sha256_pak(pakfile) if pakname is None else PurePath(pakname).stem
await asyncio.to_thread(extract_pak, pakfile, dest / name, args.force)
name = pakfile.sha256() if pakname is None else PurePath(pakname).stem
await asyncio.to_thread(pakfile.extract_pak, dest / name, args.force)
pakfile.close()


Expand Down
61 changes: 0 additions & 61 deletions reolinkfw/extract.py

This file was deleted.

36 changes: 1 addition & 35 deletions reolinkfw/uboot.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
from __future__ import annotations

import re
from ctypes import BigEndianStructure, c_char, c_uint32, c_uint8, sizeof
from enum import IntEnum
from typing import TYPE_CHECKING, BinaryIO, Optional

import pybcl

if TYPE_CHECKING:
from reolinkfw import ReolinkFirmware
from reolinkfw.util import FileType
from typing import BinaryIO

UBOOT_MAGIC = 0x27051956

Expand Down Expand Up @@ -102,33 +95,6 @@ def from_fd(cls, fd: BinaryIO) -> LegacyImageHeader:
return cls.from_buffer_copy(fd.read(sizeof(cls)))


def get_uboot_version(fw: ReolinkFirmware) -> Optional[str]:
for section in fw:
if section.len and "uboot" in section.name.lower():
# This section is always named 'uboot' or 'uboot1'.
with fw.open(section) as f:
if f.peek(len(pybcl.BCL_MAGIC_BYTES)) == pybcl.BCL_MAGIC_BYTES:
# Sometimes section.len - sizeof(hdr) is 1 to 3 bytes larger
# than hdr.size. The extra bytes are 0xff (padding?). This
# could explain why the compressed size is added to the header.
hdr = pybcl.HeaderVariant.from_fd(f)
data = pybcl.decompress(f.read(hdr.size), hdr.algo, hdr.outsize)
else:
data = f.read(section.len)
match = re.search(b"U-Boot [0-9]{4}\.[0-9]{2}.*? \(.*?\)", data)
return match.group().decode() if match is not None else None
return None


def get_uimage_header(fw: ReolinkFirmware) -> LegacyImageHeader:
for section in fw:
with fw.open(section) as f:
if section.len and FileType.from_magic(f.peek(4)) == FileType.UIMAGE:
# This section is always named 'KERNEL' or 'kernel'.
return LegacyImageHeader.from_fd(f)
raise Exception("No kernel section found")


def get_arch_name(arch: Arch) -> str:
if arch == Arch.ARM:
return "ARM"
Expand Down
Loading

0 comments on commit 6841490

Please sign in to comment.