Skip to content

Commit

Permalink
Add support for replacing any partition image
Browse files Browse the repository at this point in the history
This commit adds support for replacing any partition image within
the payload with a custom image. This is useful, for example, to add a
custom kernel to an OTA, which may involve partitions that wouldn't
normally be touched (eg. `vendor_dlkm`).

Any image specified via `--replace` will have its corresponding
descriptor in the vbmeta image updated. This is handled recursively. For
example, replacing `vendor_dlkm` would update both `vbmeta_vendor` and
`vbmeta`. This requires all vbmeta images to be extracted during the
patching process so that a complete dependency graph can be computed.
The performance hit in doing so is negligible, but does require the
checksums of the stripped images to be updated for the tests.

Fixes: #102

Signed-off-by: Andrew Gunnerson <[email protected]>
  • Loading branch information
chenxiaolong committed Jun 28, 2023
1 parent 2bc69f5 commit d1f096e
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 115 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ Note that avbroot will validate that the prepatched image is compatible with the

avbroot can be used for just resigning an OTA by specifying `--rootless` instead of `--magisk`/`--prepatched`. With this option, the patched OTA will not be rooted. The only modification applied is the replacement of the OTA verification certificate so that the OS can be upgraded with future (patched) OTAs.

### Replacing partitions

avbroot supports replacing entire partitions in the OTA, even partitions that are not boot images (eg. `vendor_dlkm`). A partition can be replaced by passing in `--replace <partition name> /path/to/partition.img`. The replacement images must have a valid vbmeta footer. avbroot will automatically update the appropriate descriptors in the corresponding vbmeta partitions.

Note that the partition image replacement is processed before other steps in the patching process. If, for example, `--replace boot /path/to/boot.img` is passed in, avbroot will first take the replacement `boot.img` and apply the root patches to it before it is added to the patched OTA.

### Clearing vbmeta flags

Some Android builds may ship with a root `vbmeta` image with the flags set such that AVB is effectively disabled. When avbroot encounters these images, the patching process will fail with a message like:
Expand Down
217 changes: 155 additions & 62 deletions avbroot/main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import argparse
import concurrent.futures
import copy
import dataclasses
import graphlib
import io
import os
import shutil
import struct
import tempfile
import time
import typing
import zipfile

import avbtool
Expand All @@ -28,7 +31,6 @@
PATH_PROPERTIES = 'payload_properties.txt'

PARTITION_PRIORITIES = {
'@vbmeta': ('vbmeta',),
# The kernel is always in boot
'@gki_kernel': ('boot',),
# Devices launching with Android 13 use a GKI init_boot ramdisk
Expand All @@ -38,6 +40,19 @@
}


@dataclasses.dataclass
class PatchContext:
replace_images: dict[str, os.PathLike[str]]
boot_partition: str
root_patch: typing.Optional[boot.BootImagePatch]
clear_vbmeta_flags: bool
privkey_avb: os.PathLike[str]
passphrase_avb: str
privkey_ota: os.PathLike[str]
passphrase_ota: str
cert_ota: os.PathLike[str]


def print_status(*args, **kwargs):
print('\x1b[1m*****', *args, '*****\x1b[0m', **kwargs)

Expand All @@ -57,14 +72,18 @@ def get_partitions_by_type(manifest):

by_type[t] = partition

for partition in all_partitions:
if 'vbmeta' in partition:
by_type[f'@vbmeta:{partition}'] = partition

return by_type


def get_required_images(manifest, boot_partition, with_root):
all_partitions = set(p.partition_name for p in manifest.partitions)
by_type = get_partitions_by_type(manifest)
images = {k: v for k, v in by_type.items()
if k in {'@otacerts', '@vbmeta'}}
if k == '@otacerts' or k.startswith('@vbmeta:')}

if with_root:
if boot_partition in by_type:
Expand All @@ -77,9 +96,33 @@ def get_required_images(manifest, boot_partition, with_root):
return images


def patch_ota_payload(f_in, open_more_f_in, f_out, file_size, boot_partition,
root_patch, clear_vbmeta_flags, privkey_avb,
passphrase_avb, privkey_ota, passphrase_ota, cert_ota):
def get_vbmeta_patch_order(avb, image_paths, vbmeta_images):
dep_graph = vbmeta.get_vbmeta_deps(
avb, {n: image_paths[n] for n in vbmeta_images})

# Only keep dependencies among the subset of images we're working with
dep_graph = {n: {d for d in deps if d in image_paths}
for n, deps in dep_graph.items() if n in image_paths}

# Avoid patching vbmeta images that don't need changes
while True:
unneeded_vbmeta = set(n for n, d in dep_graph.items()
if n in vbmeta_images and not d)
if not unneeded_vbmeta:
break

dep_graph = {n: {d for d in deps if d not in unneeded_vbmeta}
for n, deps in dep_graph.items()
if n not in unneeded_vbmeta}

full_order = graphlib.TopologicalSorter(dep_graph).static_order()
order = [n for n in full_order if n in vbmeta_images]

return dep_graph, order


def patch_ota_payload(f_in, open_more_f_in, f_out, file_size,
context: PatchContext):
with tempfile.TemporaryDirectory() as temp_dir:
extract_dir = os.path.join(temp_dir, 'extract')
patch_dir = os.path.join(temp_dir, 'patch')
Expand All @@ -89,70 +132,105 @@ def patch_ota_payload(f_in, open_more_f_in, f_out, file_size, boot_partition,
os.mkdir(payload_dir)

version, manifest, blob_offset = ota.parse_payload(f_in)
images = get_required_images(manifest, boot_partition,
root_patch is not None)
unique_images = set(images.values())

print_status('Extracting', ', '.join(sorted(unique_images)),
'from the payload')
ota.extract_images(open_more_f_in, manifest, blob_offset,
extract_dir, unique_images)
all_partitions = set(p.partition_name for p in manifest.partitions)
image_paths = {}

# Use user-provided partition images if provided. This may be a larger
# set than what's needed for our patches.
for name, path in context.replace_images.items():
if name not in all_partitions:
raise ValueError(
f'Cannot replace non-existent partition: {name}')

image_paths[name] = path

# Extract remaining required partition images from the original payload.
required_images = get_required_images(manifest, context.boot_partition,
context.root_patch is not None)
vbmeta_images = set(p for n, p in required_images.items()
if n.startswith('@vbmeta:'))

to_extract = required_images.values() - image_paths.keys()
for name in to_extract:
image_paths[name] = os.path.join(extract_dir, f'{name}.img')

if to_extract:
print_status('Extracting', ', '.join(sorted(to_extract)),
'from the payload')
ota.extract_images(open_more_f_in, manifest, blob_offset,
extract_dir, to_extract)

image_patches = {}
if root_patch is not None:
image_patches.setdefault(images['@rootpatch'], []).append(
root_patch)
image_patches.setdefault(images['@otacerts'], []).append(
boot.OtaCertPatch(cert_ota))
if context.root_patch is not None:
image_patches.setdefault(required_images['@rootpatch'], []).append(
context.root_patch)
image_patches.setdefault(required_images['@otacerts'], []).append(
boot.OtaCertPatch(context.cert_ota))

avb = avbtool.Avb()

print_status('Patching', ', '.join(sorted(image_patches)))
with concurrent.futures.ThreadPoolExecutor(
max_workers=len(image_patches)) as executor:
def apply_patches(image, patches):
patched_path = os.path.join(patch_dir, f'{image}.img')

boot.patch_boot(
avb,
os.path.join(extract_dir, f'{image}.img'),
os.path.join(patch_dir, f'{image}.img'),
privkey_avb,
passphrase_avb,
image_paths[image],
patched_path,
context.privkey_avb,
context.passphrase_avb,
True,
patches,
)

image_paths[image] = patched_path

futures = [executor.submit(apply_patches, i, p)
for i, p in image_patches.items()]

for future in concurrent.futures.as_completed(futures):
future.result()

print_status('Building new root vbmeta image')
vbmeta_image = images['@vbmeta']
vbmeta.patch_vbmeta_root(
avb,
[os.path.join(patch_dir, f'{i}.img')
for i in unique_images if i != vbmeta_image],
os.path.join(extract_dir, f'{vbmeta_image}.img'),
os.path.join(patch_dir, f'{vbmeta_image}.img'),
privkey_avb,
passphrase_avb,
manifest.block_size,
clear_vbmeta_flags,
)
vbmeta_deps, vbmeta_order = \
get_vbmeta_patch_order(avb, image_paths, vbmeta_images)
print_status('Building', ', '.join(vbmeta_order))

for image in vbmeta_order:
patched_path = os.path.join(patch_dir, f'{image}.img')

vbmeta.patch_vbmeta_image(
avb,
{n: p for n, p in image_paths.items()
if n in vbmeta_deps[image]},
image_paths[image],
patched_path,
context.privkey_avb,
context.passphrase_avb,
manifest.block_size,
context.clear_vbmeta_flags,
)

image_paths[image] = patched_path

print_status('Updating OTA payload to reference patched images')
# Don't replace untouched vbmeta images
for image in vbmeta_images - set(vbmeta_order):
del image_paths[image]

print_status('Updating OTA payload to reference new',
', '.join(sorted(image_paths)))
return ota.patch_payload(
f_in,
f_out,
version,
manifest,
blob_offset,
payload_dir,
{i: os.path.join(patch_dir, f'{i}.img') for i in unique_images},
image_paths,
file_size,
privkey_ota,
passphrase_ota,
context.privkey_ota,
context.passphrase_ota,
)


Expand All @@ -175,9 +253,7 @@ def strip_alignment_extra_field(extra):
return new_extra


def patch_ota_zip(f_zip_in, f_zip_out, boot_partition, root_patch,
clear_vbmeta_flags, privkey_avb, passphrase_avb, privkey_ota,
passphrase_ota, cert_ota):
def patch_ota_zip(f_zip_in, f_zip_out, context: PatchContext):
with (
zipfile.ZipFile(f_zip_in, 'r') as z_in,
zipfile.ZipFile(f_zip_out, 'w') as z_out,
Expand Down Expand Up @@ -242,7 +318,7 @@ def patch_ota_zip(f_zip_in, f_zip_out, boot_partition, root_patch,
print_status('Replacing', info.filename)

with (
open(cert_ota, 'rb') as f_cert,
open(context.cert_ota, 'rb') as f_cert,
z_out.open(out_info, 'w') as f_out,
):
shutil.copyfileobj(f_cert, f_out)
Expand All @@ -266,14 +342,7 @@ def patch_ota_zip(f_zip_in, f_zip_out, boot_partition, root_patch,
lambda: z_in.open(info, 'r'),
f_out,
info.file_size,
boot_partition,
root_patch,
clear_vbmeta_flags,
privkey_avb,
passphrase_avb,
privkey_ota,
passphrase_ota,
cert_ota,
context,
)

elif info.filename == PATH_PROPERTIES:
Expand Down Expand Up @@ -350,19 +419,20 @@ def patch_subcommand(args):
passphrase_ota, args.cert_ota) as temp,
ota.match_android_zip64_limit(),
):
metadata = patch_ota_zip(
args.input,
temp,
args.boot_partition,
root_patch,
args.clear_vbmeta_flags,
args.privkey_avb,
passphrase_avb,
args.privkey_ota,
passphrase_ota,
args.cert_ota,
context = PatchContext(
replace_images=args.replace or {},
boot_partition=args.boot_partition,
root_patch=root_patch,
clear_vbmeta_flags=args.clear_vbmeta_flags,
privkey_avb=args.privkey_avb,
passphrase_avb=passphrase_avb,
privkey_ota=args.privkey_ota,
passphrase_ota=passphrase_ota,
cert_ota=args.cert_ota,
)

metadata = patch_ota_zip(args.input, temp, context)

# We do a lot of low-level hackery. Reopen and verify offsets
print_status('Verifying metadata offsets')
with zipfile.ZipFile(temp_raw, 'r') as z:
Expand Down Expand Up @@ -431,6 +501,22 @@ def uint64_arg(arg):
return value


class KeyValuePairAction(argparse.Action):
def __init__(self, option_strings, dest, nargs=None, **kwargs):
if nargs != 2:
raise ValueError('nargs must be 2')

super().__init__(option_strings, dest, nargs=nargs, **kwargs)

def __call__(self, parser, namespace, values, option_string=None):
data = getattr(namespace, self.dest, None)
if data is None:
data = {}

data[values[0]] = values[1]
setattr(namespace, self.dest, data)


def parse_args(argv=None):
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(
Expand Down Expand Up @@ -480,6 +566,13 @@ def parse_args(argv=None):
help=f'File containing {arg} private key passphrase',
)

patch.add_argument(
'--replace',
nargs=2,
action=KeyValuePairAction,
help='Use partition image from a file instead of the original payload',
)

boot_group = patch.add_mutually_exclusive_group(required=True)
boot_group.add_argument(
'--magisk',
Expand Down
Loading

0 comments on commit d1f096e

Please sign in to comment.