diff --git a/src/cli/load_votes.py b/src/cli/load_votes.py new file mode 100644 index 0000000..91d632a --- /dev/null +++ b/src/cli/load_votes.py @@ -0,0 +1,166 @@ +"""Load ballot, vote data for tallying +""" +import json +import logging +from urllib.request import OpenerDirector + +log = logging.getLogger(__name__) + +AGENDA = 'rho:id:5rcmyxwu8r7yywjz4qqg4ij3pox3d96joeky1gczdpf3fkaujejdxr' +VOTERS = 'rho:id:1ri71weozwuoanef9zit5p7ooafkmkzhkwo6phgaourcknbmi6ke7t' +INDEX_SVC = 'kc-strip.madmode.com:7070' +OBSERVER = 'https://observer.testnet.rchain.coop' + + +def main(argv, cwd, connect, build_opener): + [txf, db] = argv[1:3] + + node = Observer(build_opener(), OBSERVER) + voters = Registry.lookup(node, VOTERS) + ballot = Registry.lookup(node, AGENDA) + + log.info('in: %s out: %s', json, db) + with connect(db) as work: + initdb(work) + + loadChoices(work, ballot) + loadVoters(work, voters) + + with (cwd / txf).open() as infp: + txByAddr = json.load(infp) + loadTxs(work, txByAddr) + + +class Observer: + def __init__(self, ua: OpenerDirector, base: str): + self.base = base + self.__ua = ua + + def _fetchJSON(self, url: str, body: bytes): + ua = self.__ua + reply = ua.open(url, body) + return json.load(reply) + + def exploratoryDeploy(self, term: str): + addr = f'{self.base}/api/explore-deploy' + info = self._fetchJSON(addr, term.encode('utf-8')) + log.debug('exploratory deploy response: %s', info) + return info + + +class Registry: + @classmethod + def lookup(cls, node: Observer, target: str) -> str: + term = f''' + new return, lookup(`rho:registry:lookup`) in {{ + lookup!(`{target}`, *return) + }} + ''' + log.info('looking up %s', target) + info = node.exploratoryDeploy(term) + return RhoExpr.parse(info['expr'][0]) + + +def id(data): + return data + + +class Par(set): + pass + + +class RhoExpr: + + dispatch = { + 'ExprBool': id, + 'ExprInt': id, + 'ExprString': id, + 'ExprBytes': id, + 'ExprUri': id, + 'ExprUnforg': id, + 'ExprList': lambda items: [RhoExpr.parse(item) for item in items], + 'ExprTuple': lambda items: tuple([RhoExpr.parse(item) + for item in items]), + 'ExprPar': lambda items: Par([RhoExpr.parse(item) + for item in items]), + 'ExprMap': lambda data: {k: RhoExpr.parse(v) + for (k, v) in data.items()} + } + + @classmethod + def parse(cls, expr): + (ty, val) = next(iter(expr.items())) + decode = cls.dispatch[ty] + return decode(val['data']) + + +tables = { + 'tx': ''' + create table tx(fromAddr, toAddr, amount, timestamp, sig, deployer) + ''', + 'choice': ''' + create table choice(qid, addr, prop) + ''', + 'voter': ''' + create table voter(revAddr) + ''' +} + + +def initdb(work): + for (name, ddl) in tables.items(): + work.execute(f'drop table if exists {name}') + work.execute(ddl) + log.info(f'(re-)created table: {name}') + + +def loadTxs(work, txByAddr): + txs = [ + (tx['fromAddr'], tx['toAddr'], tx['amount'], + tx['deploy']['timestamp'], + tx['deploy']['sig'], + tx['deploy']['deployer']) + for txs in txByAddr.values() + for tx in txs + ] + work.executemany(''' + insert into tx(fromAddr, toAddr, amount, timestamp, sig, deployer) + values (?, ?, ?, ?, ?, ?) + ''', txs) + log.info('inserted %d records into tx', len(txs)) + + +def loadChoices(work, ballot): + choices = [ + (qid, info[prop], prop) + for (qid, info) in ballot.items() + for prop in ['yesAddr', 'noAddr', 'abstainAddr'] + if prop in info + ] + work.executemany(''' + insert into choice(qid, addr, prop) + values (?, ?, ?) + ''', choices) + log.info('inserted %d records into choice', len(choices)) + + +def loadVoters(work, voters): + work.executemany(''' + insert into voter(revAddr) + values (?) + ''', [(v,) for v in voters]) + log.info('inserted %d records into voter', len(voters)) + + +if __name__ == '__main__': + def _script_io(): + from pathlib import Path + from sqlite3 import connect + from sys import argv + from urllib.request import build_opener + + logging.basicConfig(level=logging.INFO) + main(argv[:], cwd=Path('.'), connect=connect, + build_opener=build_opener) + + _script_io() diff --git a/src/cli/tally.js b/src/cli/tally.js index 997bcad..9e0e2db 100644 --- a/src/cli/tally.js +++ b/src/cli/tally.js @@ -1,3 +1,6 @@ +/* eslint-disable no-multi-assign */ +/* eslint-disable no-await-in-loop */ +/* eslint-disable no-use-before-define */ // usage: ./tally.sh [ballotfile] [transaction-server:port] // https://github.com/rchain-community/rv2020/issues/35 // an account is counted only once for a choice. @@ -10,19 +13,26 @@ const { assert } = require('console'); const jq = JSON.parse; +/** @type { (items: string[]) => string[] } */ +const uniq = (items) => Array.from(new Set(items).values()); + /** * @param {string[]} argv - * @param {{ fsp: any, http: any, echo: (txt: string) => void }} powers + * @param {{ + * fsp: typeof import('fs').promises, + * http: typeof import('http'), + * echo: (txt: string) => void + * }} io */ async function main(argv, { fsp, http, echo }) { // console.log(argv); // TODO: consider docopt if this gets more complex - const ballot = argv.length >= 3 ? argv[2]: 'ballotexample.json'; - const server = argv.length >= 4 ? argv[3]: 'kc-strip.madmode.com:7070'; + const ballot = argv.length >= 3 ? argv[2] : '../web/ballotexample.json'; + const server = argv.length >= 4 ? argv[3] : 'kc-strip.madmode.com:7070'; const ballotData = JSON.parse(await fsp.readFile(ballot, 'utf8')); - let whichCurl = url => curl(url, { http }); + let whichCurl = (url) => nodeCurl(url, { http }); if (argv.includes('--test')) { runTests(ballotData, { fsp }); @@ -31,27 +41,46 @@ async function main(argv, { fsp, http, echo }) { whichCurl = cachingCurl(',cache', { fsp, http }); } - const perItem = await tally(ballotData, server, { curl: whichCurl, echo }); + const txByAddr = await download(ballotData, server, { + curl: whichCurl, + }); + + if (argv.includes('--save')) { + const dest = argv.slice(-1)[0]; + await fsp.writeFile(dest, JSON.stringify(txByAddr, null, 2)); + return; + } + + const perItem = tally(ballotData, txByAddr, { echo }); console.log(perItem); } function cachingCurl(dirname, { fsp, http }) { - const toCache = url => `${dirname}/${url.slice(-20)}`; + const toCache = (url) => `${dirname}/${url.slice(-20)}`; - return async (url, { http }) => { - const contents = await curl(url, { http }); - assert(url.match('/api/transfer/')); + return async (url) => { + console.log('cachingCurl', { url, fn: toCache(url) }); + const contents = await nodeCurl(url, { http }); + assert(url.match('/api/transfer/') && !url.match('undefined')); await fsp.writeFile(toCache(url), contents); return contents; - } + }; } function curlFromCache(dirname, { fsp }) { - const toCache = url => `../../test/${dirname}/${url.slice(-20)}`; + const toCache = (url) => `../../test/${dirname}/${url.slice(-20)}`; const curl = async (url, _powers) => { // console.log('look ma, no network!', url); - return await fsp.readFile(toCache(url), 'utf8'); - } + try { + const content = await fsp.readFile(toCache(url), 'utf8'); + return content; + } catch (err) { + if (err.code === 'ENOENT') { + return '[]'; + } + throw err; + } + }; return curl; } @@ -65,13 +94,13 @@ const testSuite = [ 'Board: DoD': { yes: 1, no: 2 }, 'Board: WEC': { yes: 2, no: 2 }, 'Board: RR': { yes: 2, no: 1 }, - } + }, }, ]; // TODO: move this from ./src to ./test /** - * @typedef {{[refID: string]: { shortDesc: string, docLink?: string, yesAddr: string, noAddr: string }}} QAs + * @typedef {{[refID: string]: { shortDesc: string, docLink?: string, yesAddr: string, noAddr: string, abstainAddr: string }}} QAs * @param {QAs} ballotData */ async function runTests(ballotData, { fsp }) { @@ -80,15 +109,32 @@ async function runTests(ballotData, { fsp }) { let result = 'pass'; const { dirname, expected } = testCase; const curl = curlFromCache(dirname, { fsp }); - const actual = await tally(ballotData, 'TEST_SERVER', { curl, echo: console.log }); + + const txByAddr = await download(ballotData, 'TEST_SERVER', { + curl, + }); + + const actual = tally(ballotData, txByAddr, { + echo: console.log, + }); // console.log(JSON.stringify({ actual, expected }, null, 2)); for (const [id, value] of Object.entries(expected)) { if (actual[id].yes !== value.yes) { - console.error({ id, field: 'yes', expected: value.yes, actual: actual[id].yes }); + console.error({ + id, + field: 'yes', + expected: value.yes, + actual: actual[id].yes, + }); result = 'FAIL'; } if (actual[id].no !== value.no) { - console.error({ id, field: 'no', expected: value.no, actual: actual[id].no }); + console.error({ + id, + field: 'no', + expected: value.no, + actual: actual[id].no, + }); result = 'FAIL'; } } @@ -99,44 +145,68 @@ async function runTests(ballotData, { fsp }) { /** * @param {QAs} ballotData * @param {string} server - * @param {{ curl: (url: string) => Promise, echo: (txt: string) => void }} powers + * @param {{ curl: (url: string) => Promise }} io + * @returns {Promise<{[revAddr: string]: TX[]}>} */ -async function tally(ballotData, server, { curl, echo }) { - // console.log('ballot:', ballotData); +async function download(ballotData, server, io) { + const choiceAddrs = Object.values(ballotData) + .map(({ yesAddr, noAddr, abstainAddr }) => [yesAddr, noAddr, abstainAddr]) + .flat(); + console.log( + `downloading transactions from ${choiceAddrs.length} choices listed in the ballot...`, + ); + const voteData = await getTransactions(choiceAddrs, server, io); + const voters = uniq( + Object.values(voteData) + .map((txs) => txs.map((tx) => [tx.fromAddr, tx.toAddr]).flat()) + .flat(), + ); + console.log(`downloading transactions from ${voters.length} voters...`); + const voterData = await getTransactions(voters, server, io); + return { ...voteData, ...voterData }; +} - const lastblock = '???????'; // when election is over +/** + * @param {QAs} ballotData + * @param {{[addr: string]: TX[]}} txByAddr + * @param {{ echo: (txt: string) => void }} io + */ +function tally(ballotData, txByAddr, { echo }) { + // console.log('ballot:', ballotData); - const voteData = await voteTransactions(ballotData, server, { curl }); + // const lastblock = '???????'; // when election is over const perItem = {}; - /** @type { (items: string[]) => string[] } */ - const uniq = items => Array.from(new Set(items).values()); /** @type { (as: string[], bs: string[]) => Set } */ - const intersection = (as, bs) => (bss => new Set(as.filter(x => bss.has(x))))(new Set(bs)); + const intersection = (as, bs) => + ((bss) => new Set(as.filter((x) => bss.has(x))))(new Set(bs)); for (const [id, item] of Object.entries(ballotData)) { const { shortDesc: desc, yesAddr, noAddr } = item; echo(desc); - const yesVotes = uniq(voteData[id].yes.map(tx => tx.fromAddr)); + const yesVotes = uniq(txByAddr[yesAddr].map((tx) => tx.fromAddr)); let yes = yesVotes.length; - const noVotes = uniq(voteData[id].no.map(tx => tx.fromAddr)); + const noVotes = uniq(txByAddr[noAddr].map((tx) => tx.fromAddr)); let no = noVotes.length; - perItem[id] = { yes: yes, no: no }; + perItem[id] = { yes, no }; echo(` ${yes} yes votes ${yesAddr}`); echo(` ${no} no votes ${noAddr}`); const double = Array.from(intersection(yesVotes, noVotes)); if (double.length !== 0) { echo(` ALERT: ${double} voted both yes and no.`); - const doubleVotes = await voterTransactions(double, server, { curl }); for (const voter of double) { - for (const acct of doubleVotes[voter].map(tx => tx.toAddr)) { - if (acct === yesAddr ) { // echo(`yes found`) - perItem[id].no = no = no - 1; break; - } else if (acct === noAddr) { // echo no found - perItem[id].yes = yes = yes - 1; break; + for (const acct of txByAddr[voter].map((tx) => tx.toAddr)) { + if (acct === yesAddr) { + // echo(`yes found`) + perItem[id].no = no -= 1; + break; + } else if (acct === noAddr) { + // echo no found + perItem[id].yes = yes -= 1; + break; } } } @@ -148,39 +218,24 @@ async function tally(ballotData, server, { curl, echo }) { } /** - * @typedef {{ fromAddr: string, toAddr: string }} TX - * @param {QAs} ballotData - * @param {string} server - * @param {{ curl: (url: string) => Promise }} powers - * @returns {Promise<{[id: string]: { yes: TX[], no: TX[] }}>} - */ -async function voteTransactions(ballotData, server, { curl }) { - /** @type { {[id: string]: { yes: TX[], no: TX[] }} } */ - const votes = {}; - for (const [id, item] of Object.entries(ballotData)) { - const { shortDesc, yesAddr, noAddr } = item; - - votes[id] = { - yes: jq(await curl(`http://${server}/api/transfer/${yesAddr}`)), - no: jq(await curl(`http://${server}/api/transfer/${noAddr}`)), - }; - } - return votes; -} - -/** - * @param {string[]} fromAddrs + * @param {string[]} revAddrs * @param {string} server * @param {{ curl: (url: string) => Promise }} powers - * @returns { Promise<{[voter: string]: TX[] }>} + * @returns { Promise<{[addr: string]: TX[] }>} + * + * @typedef {{ fromAddr: string, toAddr: string }} TX */ -async function voterTransactions(fromAddrs, server, { curl }) { - /** @type { {[voter: string]: TX[] } } */ - const byVoter = {}; - for (const voter of fromAddrs) { - byVoter[voter] = jq(await curl(`http://${server}/api/transfer/${voter}`)); - } - return byVoter; +async function getTransactions(revAddrs, server, { curl }) { + return Object.fromEntries( + await Promise.all( + revAddrs.map((addr) => + curl(`http://${server}/api/transfer/${addr}`).then((txt) => [ + addr, + jq(txt), + ]), + ), + ), + ); } /** @@ -188,26 +243,28 @@ async function voterTransactions(fromAddrs, server, { curl }) { * @param {{ http: any }} powers * @returns {Promise} */ -function curl(url, { http }) { +function nodeCurl(url, { http }) { // console.log('get', { url }); return new Promise((resolve, reject) => { - const req = http.get(url, response => { + const req = http.get(url, (response) => { let str = ''; // console.log('Response is ' + response.statusCode); - response.on('data', chunk => { - str += chunk; + response.on('data', (chunk) => { + str += chunk; }); response.on('end', () => resolve(str)); }); req.end(); req.on('error', reject); - }) + }); } if (require.main === module) { main(process.argv, { + // eslint-disable-next-line global-require fsp: require('fs').promises, + // eslint-disable-next-line global-require http: require('http'), echo: console.log, - }).catch(err => console.error(err)); + }).catch((err) => console.error(err)); } diff --git a/src/cli/tally.sql b/src/cli/tally.sql new file mode 100644 index 0000000..7b95818 --- /dev/null +++ b/src/cli/tally.sql @@ -0,0 +1,37 @@ +drop view if exists valid_votes; +create view valid_votes as +with valid_voter as ( + select * + from tx + where tx.fromAddr in (select revAddr from voter) +), +vote_cast as ( + select distinct qid, fromAddr, toAddr, choice.prop, amount, timestamp + from valid_voter tx + join choice on choice.addr = tx.toAddr +), +latest as ( + select qid, fromAddr, max(timestamp) max_ts + from vote_cast + group by qid, fromAddr +), +latest_vote as ( + select l.qid, l.fromAddr, c.toAddr, c.prop, c.amount, c.timestamp + from latest l + join vote_cast c on l.qid = c.qid and l.fromAddr = c.fromAddr and l.max_ts = c.timestamp +) +select * from latest_vote +; + +drop view if exists tally; +create view tally as +select choice.qid, replace(choice.prop, 'Addr', '') sentiment, count(distinct v.fromAddr) qty +from choice +join valid_votes v on v.toAddr = choice.addr +group by choice.qid, choice.prop +order by choice.qid, qty desc +; + +select qid, prop, addr from choice order by qid, prop; +select * from valid_votes order by fromAddr, qid; +select * from tally; diff --git a/src/cli/test_tally.py b/src/cli/test_tally.py new file mode 100644 index 0000000..5ef4563 --- /dev/null +++ b/src/cli/test_tally.py @@ -0,0 +1,204 @@ +import typing as py +from dataclasses import dataclass +from itertools import groupby, cycle +from datetime import datetime, timedelta +from pprint import pformat + +from hypothesis import given, infer, assume, settings, HealthCheck +# import hypothesis.strategies as st + +import load_votes + +DAY = 60 * 60 * 24 + +# KLUDGE. can't figure out how to use importlib.resources from main program +tally_sql = open('tally.sql').read() + + +class RevAddr: + pass + raw: bytes + role: bool + + def __repr__(self): + return f"111{['Src', 'Dest'][self.role]}{self.raw.hex()}" + + +@dataclass(frozen=True, order=True) +class SrcAddr(RevAddr): + raw: bytes + + def __repr__(self): + return f"111Src{self.raw.hex()}" + + +@dataclass(frozen=True, order=True) +class DestAddr(RevAddr): + raw: bytes + + def __repr__(self): + return f"111Dest{self.raw.hex()}" + + +@dataclass(order=True, frozen=True) +class Question: + ix: int + yesAddr: RevAddr + noAddr: RevAddr + abstainAddr: RevAddr + + @property + def id(self): + return f'Q{self.ix}' + + +@dataclass(frozen=True) +class Meeting: + roll: py.Set[SrcAddr] + choiceAddrs: py.Set[DestAddr] + + @property + def questions(self): + ch = sorted(self.choiceAddrs) + return [ + Question(ix, y, n, a) + for ix in list(range(len(ch) - 2))[::3] + for (y, n, a) in [(ch[ix], ch[ix + 1], ch[ix + 2])] + ] + + def voters(self): + return [str(addr) for addr in self.roll] + + def choices(self): + return { + q.id: { + 'yesAddr': str(q.yesAddr), + 'noAddr': str(q.noAddr), + 'abstainAddr': str(q.abstainAddr), + } + for q in self.questions + } + + +@dataclass(order=True, frozen=True) +class Tx: + fromAddr: RevAddr + toAddr: RevAddr + amount: int + time: datetime + + @classmethod + def byAddr(cls, txs: py.List['Tx']): + ea = [{ + 'fromAddr': str(tx.fromAddr), + 'toAddr': str(tx.toAddr), + 'amount': tx.amount, + 'deploy': { + 'timestamp': int(tx.time.timestamp() * 1000), + 'sig': 'TODO', + 'deployer': 'TODO'}} + for tx in txs] + byAddr = {} + for k, g in groupby(ea, lambda tx: tx['fromAddr']): + byAddr[k] = list(g) + return byAddr + + +@dataclass(order=True, frozen=True) +class Voter: + identity: py.Union[SrcAddr, int] + choices: py.List[int] + t0: datetime + d1: py.Optional[int] + d2: py.Optional[int] + + @property + def times(self): + return [self.t0] + [ + self.t0 + timedelta(seconds=abs(d) % DAY) + for d in [self.d1, self.d2] if d + ] + + def votes(self, roll: py.List[RevAddr], questions: py.List[Question]): + # print( + # f'votes(#ch={len(self.choices)}') + assume(len(self.choices) >= 1) + # print('... votes OK') + + if type(self.identity) is int: + fromAddr = roll[self.identity % len(roll)] + else: + fromAddr = self.identity + + ea = cycle(self.choices) + for t in self.times: + for q in questions: + toAddr = [q.yesAddr, q.noAddr, q.abstainAddr][next(ea) % 3] + yield Tx(fromAddr, toAddr, 1, t) + + +def records(cur): + cols = [d[0] for d in cur.description] + return [dict(zip(cols, row)) for row in cur.fetchall()] + + +@given(meeting=infer, voters=infer) +@settings(suppress_health_check=[ + HealthCheck.filter_too_much, HealthCheck.too_slow]) +def test_tally(conn, meeting: Meeting, + voters: py.List[Voter]): + # print('======== case') + # print('checking len(meeting.questions)', meeting.questions) + assume(len(meeting.questions) >= 1) + # print('checking roll disjoint choices', meeting.roll) + assume(len(meeting.roll) >= 1) + assume(meeting.roll.isdisjoint(meeting.choiceAddrs)) + # print("== In: Meeting agenda choices") + # print(pformat(meeting.choices())) + # print('checking votes', len(voters)) + assume(len(voters) >= 1) + roll = sorted(meeting.roll) + votes = [ + vote + for voter in voters + for vote in voter.votes(roll, meeting.questions)] + # print("== In: Votes") + # print(pformat(Tx.byAddr(votes))) + + load_votes.initdb(conn) + load_votes.loadTxs(conn, Tx.byAddr(votes)) + load_votes.loadVoters(conn, meeting.voters()) + load_votes.loadChoices(conn, meeting.choices()) + conn.executescript(tally_sql) + + q = conn.cursor() + q.execute( + 'select fromAddr, qid, prop from valid_votes order by fromAddr, qid') + print("== Out: Valid Votes") + print(pformat(records(q))) + q.execute('select * from tally order by qid, qty desc') + tally = records(q) + print("== Out: Tally") + print(pformat(tally)) + + qids = [q.id for q in meeting.questions] + for choice in tally: + assert choice['qid'] in qids + q.execute(''' + select (select count(*) from choice) choice_qty + , (select count(*) from voter) voter_qty + , (select count(*) from tx) tx_qty + , (select count(*) from valid_votes) vote_qty + , (select count(*) from tally) tally_qty + ''') + print(records(q)) + print('PASS!') + + +if __name__ == '__main__': + def _script_io(): + from sqlite3 import connect + + test_tally(connect(':memory:')) + + _script_io()