From e7520422cb39b5f477e89cdc59a0a2378a5917e9 Mon Sep 17 00:00:00 2001 From: relipa Date: Tue, 5 Mar 2024 14:31:46 +0700 Subject: [PATCH] feat: retry when error. start at the last data fetched --- src/cmd/cmdStakerReward.ts | 2 +- src/cmd/cmdValidatorReward.ts | 2 +- src/contants/common.ts | 2 + src/execute/executeStakerReward.ts | 208 +++++++++++++++++--------- src/execute/executeValidatorReward.ts | 207 ++++++++++++++++--------- src/module/RewardStakes.ts | 69 ++++++--- 6 files changed, 330 insertions(+), 160 deletions(-) create mode 100644 src/contants/common.ts diff --git a/src/cmd/cmdStakerReward.ts b/src/cmd/cmdStakerReward.ts index acae0f3..4a7fbdf 100755 --- a/src/cmd/cmdStakerReward.ts +++ b/src/cmd/cmdStakerReward.ts @@ -1,5 +1,5 @@ import { Arguments, Argv } from 'yargs'; -import { main } from '../execute/executeStakerReward'; +import main from '../execute/executeStakerReward'; import { stakerRewardArgs } from '../types'; import { LogUtils } from '../utils/Logger'; diff --git a/src/cmd/cmdValidatorReward.ts b/src/cmd/cmdValidatorReward.ts index 3314533..5babcba 100755 --- a/src/cmd/cmdValidatorReward.ts +++ b/src/cmd/cmdValidatorReward.ts @@ -1,5 +1,5 @@ import { Arguments, Argv } from 'yargs'; -import { main } from '../execute/executeValidatorReward'; +import main from '../execute/executeValidatorReward'; import { validatorRewardArgs } from '../types'; import { LogUtils } from '../utils/Logger'; diff --git a/src/contants/common.ts b/src/contants/common.ts new file mode 100644 index 0000000..d50e8c2 --- /dev/null +++ b/src/contants/common.ts @@ -0,0 +1,2 @@ +export const MAX_RETRIES = 5 +export const RETRY_INTERVAL_MS = 5000 diff --git a/src/execute/executeStakerReward.ts b/src/execute/executeStakerReward.ts index fffcb23..617205b 100755 --- a/src/execute/executeStakerReward.ts +++ b/src/execute/executeStakerReward.ts @@ -1,18 +1,14 @@ import moment = require('moment-timezone'); import { GoogleSpreadsheet } from 'google-spreadsheet'; +import { MAX_RETRIES, RETRY_INTERVAL_MS } from '../contants/common'; import { exportCsv, getAdditionalDataForStakerReward, getEpoches, + getLastDataFetchedByEpoch, getOasPricesForEpoch, } from '../module/RewardStakes'; -import { - DataExport, - PrepareData, - TimeData, - Verse, - stakerRewardArgs, -} from '../types'; +import { PrepareData, TimeData, Verse, stakerRewardArgs } from '../types'; import { generateNumberArray, isValidAddresses, sleep } from '../utils'; import { convertAddressesToArray } from '../utils/convert'; import { getTotalSecondProcess } from '../utils/date'; @@ -23,48 +19,13 @@ import { } from '../utils/google'; import { Subgraph } from '../utils/subgraph'; -export const main = async (argv: stakerRewardArgs) => { - const startTimeProcess = Date.now(); - - // validate address - const addresses = convertAddressesToArray(argv.staker_addresses); - if (!isValidAddresses(addresses)) { - return; - } - const subgraph = new Subgraph(argv.chain as Verse); - // header for staker reward - const header: string[] = getHeader(argv); - - // get the list of epoches based on the passed options - const epoches = await getEpoches(argv, subgraph); - - let doc: GoogleSpreadsheet; - if (argv.export_csv_online) { - doc = await getSpreadSheet(); - await doc.loadInfo(); - } - - const loopAsync: number[] = generateNumberArray(epoches.from, epoches.to); - - const prepareData: PrepareData[] = await getPrepareData( - loopAsync, - subgraph, - argv, - ); - // data to export - await handleExport(prepareData, subgraph, argv, header); - - const totalSecondsProcess = getTotalSecondProcess(startTimeProcess); - console.log(`==> Total: ${totalSecondsProcess} seconds`); -}; - const getHeader = (argv: stakerRewardArgs): string[] => { let header: string[] = HEADER_FOR_STAKING_REWARD; // if API_KEY exists and price option exists => export that price otherwise export default price if (process.env.COINGECKO_API_KEY) { header = argv.price - ? [...header, 'Price timestamp UTC', 'Oas price'] - : [...header, 'Price timestamp UTC', ...DEFAULT_LIST_PRICE]; + ? [...header, 'Oas price', 'Price timestamp UTC'] + : [...header, ...DEFAULT_LIST_PRICE, 'Price timestamp UTC']; } return header; }; @@ -113,57 +74,160 @@ const handleExport = async ( subgraph: Subgraph, argv: stakerRewardArgs, header: string[], -): Promise => { +) => { // set the address to lowercase const addresses = convertAddressesToArray(argv.staker_addresses); + let numberOfRetries = 0; - const results: DataExport[] = []; + let doc: GoogleSpreadsheet; + if (Boolean(argv.export_csv_online)) { + doc = await getSpreadSheet(); + await doc.loadInfo(); + } + for (let i = 0; i < prepareData.length; i++) { + try { + await processExportByEpoch( + prepareData[i], + addresses, + subgraph, + argv, + header, + doc, + ); + } catch (error) { + console.log(error); + numberOfRetries += 1; + await sleep(RETRY_INTERVAL_MS); + + console.log('\n----------Trying again----------'); + console.log('\n----------Please wait!----------'); + + if (numberOfRetries > MAX_RETRIES) { + throw error; + } + i = -1; // Reset the loop counter to 0 after error + } + } +}; - for (const item of prepareData) { +const processExportByEpoch = async ( + item: PrepareData, + addresses: string[], + subgraph: Subgraph, + argv: stakerRewardArgs, + header: string[], + doc: GoogleSpreadsheet, +) => { + try { const { oasPrices, timeData, priceTime } = item; const { block, epoch, timestamp } = timeData; - const startTimeProcess = Date.now(); - console.log('PROCESSING WITH EPOCH', epoch); - const promises = addresses?.map(async (address: string) => { - const listStakerStake = await subgraph.getListStakerStake( - block, - address, - epoch, - ); + const result = await getLastDataFetchedByEpoch( + doc, + header, + argv, + timestamp, + 'Staker Address', + 'staker-reward', + argv.output + ); - // format data - const { rowData } = getAdditionalDataForStakerReward( - oasPrices, - listStakerStake, - timeData, - argv.price, - address, - priceTime, - ); + if (epoch < result.epoch) { + return; + } - await sleep(100); + const startTimeProcess = Date.now(); + console.log('PROCESSING WITH EPOCH', epoch); - return { - rowData, - timestamp, - }; + const promises = []; + + addresses.forEach(async (address: string) => { + const validatorAddress = address; + + if ( + !(result.epoch == epoch && result.addresses.includes(validatorAddress)) + ) { + const promise = (async () => { + const listStakerStake = await subgraph.getListStakerStake( + block, + address, + epoch, + ); + + // format data + const { rowData } = getAdditionalDataForStakerReward( + oasPrices, + listStakerStake, + timeData, + argv.price, + address, + priceTime, + ); + + await sleep(100); + + return { + rowData, + timestamp, + }; + })(); + + promises.push(promise); + } }); + if (promises?.length == 0) { + return; + } + const dataExport = await Promise.all(promises); + // process export await exportCsv( dataExport, Boolean(argv.export_csv_online), argv.output, - `staker-reward`, + 'staker-reward', header, + doc, ); - results.push(...dataExport); + const totalSecondsEpoch = getTotalSecondProcess(startTimeProcess); console.info( `-->Export at Epoch ${epoch} took ${totalSecondsEpoch} seconds`, ); + } catch (error) { + throw error; } - return results; }; + +const main = async (argv: stakerRewardArgs) => { + const startTimeProcess = Date.now(); + + // validate address + const addresses = convertAddressesToArray(argv.staker_addresses); + if (!isValidAddresses(addresses)) { + return; + } + const subgraph = new Subgraph(argv.chain as Verse); + // header for staker reward + const header: string[] = getHeader(argv); + + // get the list of epoches based on the passed options + const epoches = await getEpoches(argv, subgraph); + + const loopAsync: number[] = generateNumberArray(epoches.from, epoches.to); + + const prepareData: PrepareData[] = await getPrepareData( + loopAsync, + subgraph, + argv, + ); + // data to export + await handleExport(prepareData, subgraph, argv, header); + + const totalSecondsProcess = getTotalSecondProcess(startTimeProcess); + console.log(`==> Total: ${totalSecondsProcess} seconds`); +}; + +export default main; diff --git a/src/execute/executeValidatorReward.ts b/src/execute/executeValidatorReward.ts index 953626d..e7c2d18 100755 --- a/src/execute/executeValidatorReward.ts +++ b/src/execute/executeValidatorReward.ts @@ -1,57 +1,23 @@ import moment = require('moment-timezone'); +import { GoogleSpreadsheet } from 'google-spreadsheet'; +import { MAX_RETRIES, RETRY_INTERVAL_MS } from '../contants/common'; import { exportCsv, getAdditionalDataForCommissionReward, getEpoches, + getLastDataFetchedByEpoch, getOasPricesForEpoch, } from '../module/RewardStakes'; -import { - DataExport, - PrepareData, - TimeData, - Verse, - validatorRewardArgs, -} from '../types'; +import { PrepareData, TimeData, Verse, validatorRewardArgs } from '../types'; import { generateNumberArray, isValidAddresses, sleep } from '../utils'; import { convertAddressesToArray } from '../utils/convert'; import { getTotalSecondProcess } from '../utils/date'; import { DEFAULT_LIST_PRICE, HEADER_FOR_VALIDATOR_REWARD, + getSpreadSheet, } from '../utils/google'; import { Subgraph } from '../utils/subgraph'; -// main process -export const main = async (argv: validatorRewardArgs) => { - const startTimeProcess = Date.now(); - - // validate address - const addresses = convertAddressesToArray(argv.validator_addresses); - if (!isValidAddresses(addresses)) { - return; - } - - const subgraph = new Subgraph(argv.chain as Verse); - - // header for validator reward - const header: string[] = getHeader(argv); - - // get the list of epochs based on the passed options - const epoches = await getEpoches(argv, subgraph); - - const loopAsync: number[] = generateNumberArray(epoches.from, epoches.to); - - const prepareData: PrepareData[] = await getPrepareData( - loopAsync, - subgraph, - argv, - ); - - // data to export - await handleExport(prepareData, subgraph, argv, header); - - const totalSecondsProcess = getTotalSecondProcess(startTimeProcess); - console.log(`==> Total: ${totalSecondsProcess} seconds`); -}; const getHeader = (argv: validatorRewardArgs): string[] => { let header: string[] = HEADER_FOR_VALIDATOR_REWARD; @@ -59,8 +25,8 @@ const getHeader = (argv: validatorRewardArgs): string[] => { if (process.env.COINGECKO_API_KEY) { header = argv.price - ? [...header, 'Price timestamp UTC', 'Oas price'] - : [...header, 'Price timestamp UTC', ...DEFAULT_LIST_PRICE]; + ? [...header, 'Oas price', 'Price timestamp UTC'] + : [...header, ...DEFAULT_LIST_PRICE, 'Price timestamp UTC']; } return header; }; @@ -110,43 +76,114 @@ const handleExport = async ( subgraph: Subgraph, argv: validatorRewardArgs, header: string[], -): Promise => { +): Promise => { const validator_addresses = convertAddressesToArray(argv.validator_addresses); + let numberOfRetries = 0; + + let doc: GoogleSpreadsheet; + if (Boolean(argv.export_csv_online)) { + doc = await getSpreadSheet(); + await doc.loadInfo(); + } + + for (let i = 0; i < prepareData.length; i++) { - const results: DataExport[] = []; + try { + await processExportByEpoch( + prepareData[i], + validator_addresses, + subgraph, + argv, + header, + doc, + ); + } catch (error) { + console.log(error); + numberOfRetries += 1; + await sleep(RETRY_INTERVAL_MS); + + console.log('\n----------Trying again----------'); + console.log('\n----------Please wait!----------'); + + if (numberOfRetries > MAX_RETRIES) { + throw error; + } + i = -1; // Reset the loop counter to 0 after error + } + } +}; - for (const item of prepareData) { +const processExportByEpoch = async ( + item: PrepareData, + validator_addresses: string[], + subgraph: Subgraph, + argv: validatorRewardArgs, + header: string[], + doc: GoogleSpreadsheet, +) => { + try { const { oasPrices, timeData, priceTime } = item; const { block, epoch, timestamp } = timeData; + + const result = await getLastDataFetchedByEpoch( + doc, + header, + argv, + timestamp, + 'Validator address', + 'commission-reward', + argv.output, + ); + + if (epoch < result.epoch) { + return; + } + const startTimeProcess = Date.now(); console.log('PROCESSING WITH EPOCH', epoch); - const promises = validator_addresses.map(async (address: string) => { - const validatorAddress = address; - const validatorStake = await subgraph.getValidatorTotalStake( - epoch, - block, - validatorAddress, - ); - - const { rowData } = getAdditionalDataForCommissionReward( - oasPrices, - validatorStake, - timeData, - argv.price, - validatorAddress, - priceTime, - ); + const promises = []; - await sleep(100); + validator_addresses.forEach(async (address: string) => { + const validatorAddress = address; - return { - rowData, - timestamp, - }; + if ( + !(result.epoch == epoch && result.addresses.includes(validatorAddress)) + ) { + const promise = (async () => { + const validatorStake = await subgraph.getValidatorTotalStake( + epoch, + block, + validatorAddress, + ); + + const { rowData } = getAdditionalDataForCommissionReward( + oasPrices, + validatorStake, + timeData, + argv.price, + validatorAddress, + priceTime, + ); + + await sleep(100); + + return { + rowData, + timestamp, + }; + })(); + + promises.push(promise); + } }); + if (promises?.length == 0) { + return; + } + const dataExport = await Promise.all(promises); + // process export await exportCsv( dataExport, @@ -154,14 +191,48 @@ const handleExport = async ( argv.output, 'commission-reward', header, + doc, ); - results.push(...dataExport); - const totalSecondsEpoch = getTotalSecondProcess(startTimeProcess); console.info( `-->Export at Epoch ${epoch} took ${totalSecondsEpoch} seconds`, ); + } catch (error) { + throw error; } +}; + +// main process +const main = async (argv: validatorRewardArgs) => { + const startTimeProcess = Date.now(); - return results; + // validate address + const addresses = convertAddressesToArray(argv.validator_addresses); + if (!isValidAddresses(addresses)) { + return; + } + + const subgraph = new Subgraph(argv.chain as Verse); + + // header for validator reward + const header: string[] = getHeader(argv); + + // get the list of epochs based on the passed options + const epoches = await getEpoches(argv, subgraph); + + const loopAsync: number[] = generateNumberArray(epoches.from, epoches.to); + + const prepareData: PrepareData[] = await getPrepareData( + loopAsync, + subgraph, + argv, + ); + + // data to export + await handleExport(prepareData, subgraph, argv, header); + + const totalSecondsProcess = getTotalSecondProcess(startTimeProcess); + console.log(`==> Total: ${totalSecondsProcess} seconds`); }; + +export default main; diff --git a/src/module/RewardStakes.ts b/src/module/RewardStakes.ts index 1448418..62bc884 100755 --- a/src/module/RewardStakes.ts +++ b/src/module/RewardStakes.ts @@ -15,8 +15,8 @@ import { validatorTotalStake, } from '../types'; import { convertArrayToObject } from '../utils/convert'; -import { writeFile } from '../utils/file'; -import { getDataSheet, getSpreadSheet } from '../utils/google'; +import { isExists, writeFile } from '../utils/file'; +import { getDataSheet } from '../utils/google'; import { Subgraph } from '../utils/subgraph'; export const getEpoches = async ( @@ -106,10 +106,10 @@ export async function getOasPricesForEpoch(argv, epochData) { } export const exportCsvOnline = async ( - doc, + doc: GoogleSpreadsheet, rowData: string[][], timestamp: moment.Moment, - header, + header: string[], ) => { const dataSheet = await getDataSheet(doc, timestamp, header); await dataSheet.addRows(rowData, { @@ -120,7 +120,7 @@ export const exportCsvOnline = async ( export const exportCsvLocal = async ( rowData: string[][], - header, + header: string[], fileName: string, outPut: string, ) => { @@ -132,6 +132,7 @@ export const exportCsvLocal = async ( } const csvContent = await fsPromise.readFile(output_csv, 'utf-8'); const result = await Papa.parse(csvContent, { header: true }); + rowData.forEach((item) => { result.data.push(convertArrayToObject(item, header)); }); @@ -148,26 +149,58 @@ export const exportCsv = async ( output: string, fileName: string, header: string[], + doc: GoogleSpreadsheet, ): Promise => { - // console.info(`Start handle export`); - - let doc: GoogleSpreadsheet; - if (isOnline) { - doc = await getSpreadSheet(); - await doc.loadInfo(); - } - for (let i = 0; i < array.length; i++) { const { rowData, timestamp } = array[i]; isOnline ? await exportCsvOnline(doc, rowData, timestamp, header) : await exportCsvLocal(rowData, header, fileName, output); - // await sleep(1500); } - // console.log('Export process complete!'); return true; }; +export async function getLastDataFetchedByEpoch( + doc: GoogleSpreadsheet, + header: string[], + argv, + timestamp: moment.Moment, + addressKey: string, + csvFileName: string, + output: string, +) { + const isOnlineExport = Boolean(argv.export_csv_online); + let output_csv: string; + let addresses = []; + let epoch = 0; + let lastRow; + let rows; + + if (isOnlineExport) { + const dataSheet = await getDataSheet(doc, timestamp, header); + rows = await dataSheet.getRows(); + } else { + output_csv = output || `output_csv/${csvFileName}.csv`; + const exist = await isExists(output_csv); + + if (exist) { + const csvContent = await fsPromise.readFile(output_csv, 'utf-8'); + const result = Papa.parse(csvContent, { header: true }); + rows = result?.data; + } + } + + lastRow = rows?.[rows.length - 1]; + if (lastRow && lastRow['Epoch']) { + epoch = lastRow['Epoch']; + addresses = rows + .filter((row) => row['Epoch'] == epoch) + .map((row) => row[addressKey]); + } + return { epoch, addresses }; +} + + export const getAdditionalDataForCommissionReward = ( oasPrices: OasPrices, stakeData: validatorTotalStake[], @@ -214,8 +247,8 @@ export const getAdditionalDataForCommissionReward = ( timestamp.format('YYYY/MM/DD HH:mm:ss'), utils.formatEther(validatorTotalStake).toString(), utils.formatEther(stake.dailyCommission).toString(), - moment(priceTime).utc().format('YYYY/MM/DD HH:mm:ss'), ...prices, + priceTime && moment(priceTime).utc().format('YYYY/MM/DD HH:mm:ss'), ]; }); @@ -260,11 +293,11 @@ export const getAdditionalDataForStakerReward = ( timestamp.format('YYYY-MM-DD HH:mm:ss'), utils.formatEther(stakeData.totalStake).toString(), utils.formatEther(stakeData.stakerReward).toString(), - moment(priceTime).utc().format('YYYY/MM/DD HH:mm:ss'), ...prices, + priceTime && moment(priceTime).utc().format('YYYY/MM/DD HH:mm:ss'), ], ]; - return { + return { rowData: rowData, }; };