Skip to content

Commit

Permalink
Problem: test transactions can't be saved and reused (#1575)
Browse files Browse the repository at this point in the history
* remove unused function

* Problem: test transactions can't be saved and reused

Solution:
- support saving test transactions to files and reuse to speedup local test

Co-authored-by: mmsqe <[email protected]>

* use constant

* add log

* fix

* fix log

* log

* fix

* add log

* log

---------

Co-authored-by: huangyi <[email protected]>
  • Loading branch information
mmsqe and yihuang authored Sep 20, 2024
1 parent 3c8be96 commit 670fd8d
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 52 deletions.
48 changes: 42 additions & 6 deletions testground/benchmark/benchmark/stateless.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import click
import tomlkit

from . import transaction
from .cli import ChainCommand
from .echo import run_echo_server
from .peer import (
Expand All @@ -23,7 +24,6 @@
init_node,
patch_configs,
)
from .sendtx import prepare_txs, send_txs
from .stats import dump_block_stats
from .topology import connect_all
from .types import PeerPacket
Expand Down Expand Up @@ -184,7 +184,7 @@ def run(outdir: str, datadir: str, cronosd, global_seq):
home = datadir / group / str(group_seq)

try:
return do_run(home, cronosd, group, global_seq, cfg)
return do_run(datadir, home, cronosd, group, global_seq, cfg)
finally:
# collect outputs
output = Path("/data.tar.bz2")
Expand All @@ -198,10 +198,45 @@ def run(outdir: str, datadir: str, cronosd, global_seq):
shutil.copy(output, filename)


def do_run(home: str, cronosd: str, group: str, global_seq: int, cfg: dict):
@cli.command()
@click.argument("outdir")
@click.option("--nodes", default=10)
@click.option("--num-accounts", default=10)
@click.option("--num-txs", default=1000)
def gen_txs(**kwargs):
return _gen_txs(**kwargs)


@cli.command()
@click.argument("options", callback=validate_json)
def generic_gen_txs(options: dict):
return _gen_txs(**options)


def _gen_txs(
outdir: str,
nodes: int = 10,
num_accounts: int = 10,
num_txs: int = 1000,
):
outdir = Path(outdir)
for global_seq in range(nodes):
print("generating", num_accounts * num_txs, "txs for node", global_seq)
txs = transaction.gen(global_seq, num_accounts, num_txs)
transaction.save(txs, outdir, global_seq)
print("saved", len(txs), "txs for node", global_seq)


def do_run(
datadir: Path, home: Path, cronosd: str, group: str, global_seq: int, cfg: dict
):
if group == FULLNODE_GROUP or cfg.get("validator-generate-load", True):
print("preparing", cfg["num_accounts"] * cfg["num_txs"], "txs")
txs = prepare_txs(global_seq, cfg["num_accounts"], cfg["num_txs"])
txs = transaction.load(datadir, global_seq)
if txs:
print("loaded", len(txs), "txs")
else:
print("generating", cfg["num_accounts"] * cfg["num_txs"], "txs")
txs = transaction.gen(global_seq, cfg["num_accounts"], cfg["num_txs"])
else:
txs = []

Expand All @@ -222,7 +257,8 @@ def do_run(home: str, cronosd: str, group: str, global_seq: int, cfg: dict):
wait_for_block(cli, 3)

if txs:
asyncio.run(send_txs(txs))
asyncio.run(transaction.send(txs))
print("sent", len(txs), "txs")

# node quit when the chain is idle or halted for a while
detect_idle_halted(20, 20)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import asyncio
import time
from pathlib import Path

import aiohttp
import ujson
import web3
from eth_account import Account

from .utils import gen_account, send_transaction
from .utils import gen_account

GAS_PRICE = 1000000000
CHAIN_ID = 777
LOCAL_JSON_RPC = "http://localhost:8545"
CONNECTION_POOL_SIZE = 1024
TXS_DIR = "txs"


def test_tx(nonce: int):
Expand All @@ -25,57 +24,35 @@ def test_tx(nonce: int):
}


def sendtx(w3: web3.Web3, acct: Account, tx_amount: int):
initial_nonce = w3.eth.get_transaction_count(acct.address)
print(
"test begin, address:",
acct.address,
"balance:",
w3.eth.get_balance(acct.address),
"nonce:",
initial_nonce,
)

nonce = initial_nonce
while nonce < initial_nonce + tx_amount:
try:
send_transaction(w3, test_tx(nonce), acct, wait=False)
except ValueError as e:
msg = str(e)
if "invalid nonce" in msg:
print("invalid nonce and retry", nonce)
time.sleep(1)
continue
if "tx already in mempool" not in msg:
raise

nonce += 1

if nonce % 100 == 0:
print(f"{acct.address} sent {nonce} transactions")

print(
"test end, address:",
acct.address,
"balance:",
w3.eth.get_balance(acct.address),
"nonce:",
w3.eth.get_transaction_count(acct.address),
)


def prepare_txs(global_seq, num_accounts, num_txs):
def gen(global_seq, num_accounts, num_txs) -> [str]:
accounts = [gen_account(global_seq, i + 1) for i in range(num_accounts)]
txs = []
for i in range(num_txs):
for acct in accounts:
txs.append(acct.sign_transaction(test_tx(i)).rawTransaction.hex())
if len(txs) % 1000 == 0:
print("prepared", len(txs), "txs")
print("generated", len(txs), "txs for node", global_seq)

return txs


def save(txs: [str], datadir: Path, global_seq: int):
d = datadir / TXS_DIR
d.mkdir(parents=True, exist_ok=True)
path = d / f"{global_seq}.json"
with path.open("w") as f:
ujson.dump(txs, f)


def load(datadir: Path, global_seq: int) -> [str]:
path = datadir / TXS_DIR / f"{global_seq}.json"
if not path.exists():
return

with path.open("r") as f:
return ujson.load(f)


async def async_sendtx(session, raw):
async with session.post(
LOCAL_JSON_RPC,
Expand All @@ -91,7 +68,7 @@ async def async_sendtx(session, raw):
print("send tx error", data["error"])


async def send_txs(txs):
async def send(txs):
connector = aiohttp.TCPConnector(limit=1024)
async with aiohttp.ClientSession(
connector=connector, json_serialize=ujson.dumps
Expand Down

0 comments on commit 670fd8d

Please sign in to comment.