Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…t-python into feat_per_tenant_logging
  • Loading branch information
shaangill025 committed Oct 20, 2023
2 parents c46c4d0 + 900b1d8 commit 37e1e82
Show file tree
Hide file tree
Showing 42 changed files with 1,033 additions and 801 deletions.
12 changes: 12 additions & 0 deletions UpgradingACA-Py.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,19 @@ In case, running multiple tags [say test1 & test2]:
./scripts/run_docker upgrade --force-upgrade --named-tag test1 --named-tag test2
```

## Subwallet upgrades
With multitenant enabled, there is a subwallet associated with each tenant profile, so there is a need to upgrade those sub wallets in addition to the base wallet associated with root profile.

There are 2 options to perform such upgrades:
- `--upgrade-all-subwallets`

This will apply the upgrade steps to all sub wallets [tenant profiles] and the base wallet [root profiles].

- `--upgrade-subwallet`

This will apply the upgrade steps to specified sub wallets [identified by wallet id] and the base wallet.

Note: multiple specification allowed

## Exceptions

Expand Down
84 changes: 82 additions & 2 deletions aries_cloudagent/commands/tests/test_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

from ...core.in_memory import InMemoryProfile
from ...connections.models.conn_record import ConnRecord
from ...storage.base import BaseStorage
from ...storage.base import BaseStorage, BaseStorageSearch
from ...storage.in_memory import InMemoryStorage
from ...storage.record import StorageRecord
from ...version import __version__
from ...wallet.models.wallet_record import WalletRecord

from .. import upgrade as test_module
from ..upgrade import UpgradeError
Expand All @@ -16,12 +18,33 @@ class TestUpgrade(AsyncTestCase):
async def setUp(self):
self.session = InMemoryProfile.test_session()
self.profile = self.session.profile
self.profile.context.injector.bind_instance(
BaseStorageSearch, InMemoryStorage(self.profile)
)
self.storage = self.session.inject(BaseStorage)
record = StorageRecord(
"acapy_version",
"v0.7.2",
)
await self.storage.add_record(record)
recs = [
WalletRecord(
key_management_mode=[
WalletRecord.MODE_UNMANAGED,
WalletRecord.MODE_MANAGED,
][i],
settings={
"wallet.name": f"my-wallet-{i}",
"wallet.type": "indy",
"wallet.key": f"dummy-wallet-key-{i}",
},
wallet_name=f"my-wallet-{i}",
)
for i in range(2)
]
async with self.profile.session() as session:
for rec in recs:
await rec.save(session)

def test_bad_calls(self):
with self.assertRaises(SystemExit):
Expand Down Expand Up @@ -85,6 +108,63 @@ async def test_upgrade_from_version(self):
profile=self.profile,
)

async def test_upgrade_all_subwallets(self):
self.profile.settings.extend(
{
"upgrade.from_version": "v0.7.2",
"upgrade.upgrade_all_subwallets": True,
"upgrade.force_upgrade": True,
"upgrade.page_size": 1,
}
)
with async_mock.patch.object(
ConnRecord,
"query",
async_mock.CoroutineMock(return_value=[ConnRecord()]),
), async_mock.patch.object(ConnRecord, "save", async_mock.CoroutineMock()):
await test_module.upgrade(
profile=self.profile,
)

async def test_upgrade_specified_subwallets(self):
wallet_ids = []
async with self.profile.session() as session:
wallet_recs = await WalletRecord.query(session, tag_filter={})
for wallet_rec in wallet_recs:
wallet_ids.append(wallet_rec.wallet_id)
self.profile.settings.extend(
{
"upgrade.named_tags": "fix_issue_rev_reg",
"upgrade.upgrade_subwallets": [wallet_ids[0]],
"upgrade.force_upgrade": True,
}
)
with async_mock.patch.object(
ConnRecord,
"query",
async_mock.CoroutineMock(return_value=[ConnRecord()]),
), async_mock.patch.object(ConnRecord, "save", async_mock.CoroutineMock()):
await test_module.upgrade(
profile=self.profile,
)

self.profile.settings.extend(
{
"upgrade.named_tags": "fix_issue_rev_reg",
"upgrade.upgrade_subwallets": wallet_ids,
"upgrade.force_upgrade": True,
"upgrade.page_size": 1,
}
)
with async_mock.patch.object(
ConnRecord,
"query",
async_mock.CoroutineMock(return_value=[ConnRecord()]),
), async_mock.patch.object(ConnRecord, "save", async_mock.CoroutineMock()):
await test_module.upgrade(
profile=self.profile,
)

async def test_upgrade_callable(self):
version_storage_record = await self.storage.find_record(
type_filter="acapy_version", tag_query={}
Expand Down Expand Up @@ -412,7 +492,7 @@ async def test_upgrade_x_invalid_config(self):
async_mock.MagicMock(return_value={}),
):
with self.assertRaises(UpgradeError) as ctx:
await test_module.upgrade(settings={})
await test_module.upgrade(profile=self.profile)
assert "No version configs found in" in str(ctx.exception)

async def test_upgrade_x_params(self):
Expand Down
159 changes: 143 additions & 16 deletions aries_cloudagent/commands/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,26 @@

from ..core.profile import Profile, ProfileSession
from ..config import argparse as arg
from ..config.injection_context import InjectionContext
from ..config.default_context import DefaultContextBuilder
from ..config.base import BaseError, BaseSettings
from ..config.util import common_config
from ..config.wallet import wallet_config
from ..messaging.models.base import BaseModelError
from ..messaging.models.base_record import BaseRecord, RecordType
from ..storage.base import BaseStorage
from ..storage.base import BaseStorage, BaseStorageSearch
from ..storage.error import StorageNotFoundError
from ..storage.record import StorageRecord
from ..revocation.models.issuer_rev_reg_record import IssuerRevRegRecord
from ..utils.classloader import ClassLoader, ClassNotFoundError
from ..version import __version__, RECORD_TYPE_ACAPY_VERSION
from ..wallet.models.wallet_record import WalletRecord

from . import PROG

DEFAULT_UPGRADE_CONFIG_FILE_NAME = "default_version_upgrade_config.yml"
LOGGER = logging.getLogger(__name__)
BATCH_SIZE = 25


class ExplicitUpgradeOption(Enum):
Expand Down Expand Up @@ -239,21 +242,129 @@ def _perform_upgrade(
return resave_record_path_sets, executables_call_set


def get_webhook_urls(
base_context: InjectionContext,
wallet_record: WalletRecord,
) -> list:
"""Get the webhook urls according to dispatch_type."""
wallet_id = wallet_record.wallet_id
dispatch_type = wallet_record.wallet_dispatch_type
subwallet_webhook_urls = wallet_record.wallet_webhook_urls or []
base_webhook_urls = base_context.settings.get("admin.webhook_urls", [])

if dispatch_type == "both":
webhook_urls = list(set(base_webhook_urls) | set(subwallet_webhook_urls))
if not webhook_urls:
LOGGER.warning(
"No webhook URLs in context configuration "
f"nor wallet record {wallet_id}, but wallet record "
f"configures dispatch type {dispatch_type}"
)
elif dispatch_type == "default":
webhook_urls = subwallet_webhook_urls
if not webhook_urls:
LOGGER.warning(
f"No webhook URLs in nor wallet record {wallet_id}, but "
f"wallet record configures dispatch type {dispatch_type}"
)
else:
webhook_urls = base_webhook_urls
return webhook_urls


async def get_wallet_profile(
base_context: InjectionContext,
wallet_record: WalletRecord,
extra_settings: dict = {},
) -> Profile:
"""Get profile for a wallet record."""
context = base_context.copy()
reset_settings = {
"wallet.recreate": False,
"wallet.seed": None,
"wallet.rekey": None,
"wallet.name": None,
"wallet.type": None,
"mediation.open": None,
"mediation.invite": None,
"mediation.default_id": None,
"mediation.clear": None,
}
extra_settings["admin.webhook_urls"] = get_webhook_urls(base_context, wallet_record)

context.settings = (
context.settings.extend(reset_settings)
.extend(wallet_record.settings)
.extend(extra_settings)
)

profile, _ = await wallet_config(context, provision=False)
return profile


async def upgrade(
settings: Optional[Union[Mapping[str, Any], BaseSettings]] = None,
profile: Optional[Profile] = None,
):
"""Invoke upgradation process for each applicable profile."""
profiles_to_upgrade = []
if settings:
batch_size = settings.get("upgrade.page_size", BATCH_SIZE)
else:
batch_size = BATCH_SIZE
if profile and (settings or settings == {}):
raise UpgradeError("upgrade requires either profile or settings, not both.")
if profile:
root_profile = profile
settings = profile.settings
else:
context_builder = DefaultContextBuilder(settings)
context = await context_builder.build_context()
root_profile, _ = await wallet_config(context)
profiles_to_upgrade.append(root_profile)
base_storage_search_inst = root_profile.inject(BaseStorageSearch)
if "upgrade.upgrade_all_subwallets" in settings and settings.get(
"upgrade.upgrade_all_subwallets"
):
search_session = base_storage_search_inst.search_records(
type_filter=WalletRecord.RECORD_TYPE, page_size=batch_size
)
while search_session._done is False:
wallet_storage_records = await search_session.fetch()
for wallet_storage_record in wallet_storage_records:
wallet_record = WalletRecord.from_storage(
wallet_storage_record.id,
json.loads(wallet_storage_record.value),
)
wallet_profile = await get_wallet_profile(
base_context=root_profile.context, wallet_record=wallet_record
)
profiles_to_upgrade.append(wallet_profile)
del settings["upgrade.upgrade_all_subwallets"]
if (
"upgrade.upgrade_subwallets" in settings
and len(settings.get("upgrade.upgrade_subwallets")) >= 1
):
for _wallet_id in settings.get("upgrade.upgrade_subwallets"):
async with root_profile.session() as session:
wallet_record = await WalletRecord.retrieve_by_id(
session, record_id=_wallet_id
)
wallet_profile = await get_wallet_profile(
base_context=root_profile.context, wallet_record=wallet_record
)
profiles_to_upgrade.append(wallet_profile)
del settings["upgrade.upgrade_subwallets"]
for _profile in profiles_to_upgrade:
await upgrade_per_profile(profile=_profile, settings=settings)


async def upgrade_per_profile(
profile: Profile,
settings: Optional[Union[Mapping[str, Any], BaseSettings]] = None,
):
"""Perform upgradation steps."""
try:
if profile and (settings or settings == {}):
raise UpgradeError("upgrade requires either profile or settings, not both.")
if profile:
root_profile = profile
settings = profile.settings
else:
context_builder = DefaultContextBuilder(settings)
context = await context_builder.build_context()
root_profile, _ = await wallet_config(context)
version_upgrade_config_inst = VersionUpgradeConfig(
settings.get("upgrade.config_path")
)
Expand All @@ -273,7 +384,7 @@ async def upgrade(
upgrade_from_version_storage = None
upgrade_from_version_config = None
upgrade_from_version = None
async with root_profile.session() as session:
async with profile.session() as session:
storage = session.inject(BaseStorage)
try:
version_storage_record = await storage.find_record(
Expand Down Expand Up @@ -391,8 +502,24 @@ async def upgrade(
raise UpgradeError(
f"Only BaseRecord can be resaved, found: {str(rec_type)}"
)
async with root_profile.session() as session:
all_records = await rec_type.query(session)
all_records = []
if settings:
batch_size = settings.get("upgrade.page_size", BATCH_SIZE)
else:
batch_size = BATCH_SIZE
base_storage_search_inst = profile.inject(BaseStorageSearch)
search_session = base_storage_search_inst.search_records(
type_filter=rec_type.RECORD_TYPE, page_size=batch_size
)
while search_session._done is False:
storage_records = await search_session.fetch()
for storage_record in storage_records:
_record = rec_type.from_storage(
storage_record.id,
json.loads(storage_record.value),
)
all_records.append(_record)
async with profile.session() as session:
for record in all_records:
await record.save(
session,
Expand All @@ -406,11 +533,11 @@ async def upgrade(
_callable = version_upgrade_config_inst.get_callable(callable_name)
if not _callable:
raise UpgradeError(f"No function specified for {callable_name}")
await _callable(root_profile)
await _callable(profile)

# Update storage version
if to_update_flag:
async with root_profile.session() as session:
async with profile.session() as session:
storage = session.inject(BaseStorage)
if not version_storage_record:
await storage.add_record(
Expand All @@ -428,7 +555,7 @@ async def upgrade(
f"set to {upgrade_to_version}"
)
if not profile:
await root_profile.close()
await profile.close()
except BaseError as e:
raise UpgradeError(f"Error during upgrade: {e}")

Expand Down
Loading

0 comments on commit 37e1e82

Please sign in to comment.