Skip to content

Commit

Permalink
fix: add retry to getting offers info from indexer when offer is crea…
Browse files Browse the repository at this point in the history
…ted (#998)

* fix: add retry to getting offers info from indexer when offer is created

* use dbg for ipfs operations logging
  • Loading branch information
shamsartem authored Aug 14, 2024
1 parent 253eff0 commit 909d8cb
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 46 deletions.
2 changes: 1 addition & 1 deletion cli/src/commands/workers/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export default class Deploy extends BaseCommand<typeof Deploy> {
);

await initFluenceClient(flags);
await doRegisterIpfsClient(true);
await doRegisterIpfsClient();
const { Fluence } = await import("@fluencelabs/js-client");
const relayId = Fluence.getClient().getRelayPeerId();
const initPeerId = Fluence.getClient().getPeerId();
Expand Down
2 changes: 1 addition & 1 deletion cli/src/commands/workers/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export default class Upload extends BaseCommand<typeof Upload> {
);

await initFluenceClient(flags);
await doRegisterIpfsClient(true);
await doRegisterIpfsClient();
const { Fluence } = await import("@fluencelabs/js-client");
const initPeerId = Fluence.getClient().getPeerId();

Expand Down
49 changes: 35 additions & 14 deletions cli/src/lib/chain/offer/offer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import { numToStr } from "../../helpers/typesafeStringify.js";
import { uint8ArrayToHex } from "../../helpers/typesafeStringify.js";
import {
commaSepStrToArr,
setTryTimeout,
splitErrorsAndResults,
stringifyUnknown,
} from "../../helpers/utils.js";
Expand Down Expand Up @@ -200,34 +201,54 @@ export async function createOffers(flags: OffersArgs) {
);
}

const [offerInfoErrors, offersInfo] = await getOffersInfo(offerIds);
const providerAddress = await getSignerAddress();
type GetOffersInfoReturnType = Awaited<
ReturnType<typeof getOffersInfo<(typeof offerIds)[number]>>
>;

offersInfo.forEach(({ offerName, offerIndexerInfo }) => {
const offerPerEnv = providerArtifactsConfig.offers[fluenceEnv] ?? {};
let offerInfoErrors: GetOffersInfoReturnType[0] = [];
let offersInfo: GetOffersInfoReturnType[1] = [];

offerPerEnv[offerName] = {
id: offerIndexerInfo.id,
providerAddress,
};
const getOffersInfoRes = await setTryTimeout(
"Getting offers info from indexer",
async () => {
[offerInfoErrors, offersInfo] = await getOffersInfo(offerIds);

if (offerInfoErrors.length > 0) {
throw new Error("Not all offers info received");
}

return { ok: true };
},
(error) => {
return { error };
},
20_000,
2_000,
);

if ("error" in getOffersInfoRes && offerInfoErrors.length === 0) {
commandObj.warn(stringifyUnknown(getOffersInfoRes.error));
}

const providerAddress = await getSignerAddress();

offerIds.forEach(({ offerName, offerId }) => {
const offerPerEnv = providerArtifactsConfig.offers[fluenceEnv] ?? {};
offerPerEnv[offerName] = { id: offerId, providerAddress };
providerArtifactsConfig.offers[fluenceEnv] = offerPerEnv;
});

await providerArtifactsConfig.$commit();

const offersStr = offersInfo
const offersStr = offerIds
.map(({ offerName }) => {
return offerName;
})
.join(", ");

commandObj.logToStderr(`
${
offersInfo.length === 0
? "No offers where created!"
: `Offers ${color.yellow(offersStr)} successfully created!`
}
Offers ${color.yellow(offersStr)} successfully created!
${await offersInfoToString([offerInfoErrors, offersInfo])}
`);
}
Expand Down
2 changes: 1 addition & 1 deletion cli/src/lib/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ async function uploadWorkerConfig(
ipfs: string,
config: Upload_deployArgConfig["workers"][number]["config"],
) {
const ipfsClient = getIpfsClient(false);
const ipfsClient = getIpfsClient();

const services = await Promise.all(
config.services.map(async ({ name, total_memory_limit, modules }) => {
Expand Down
49 changes: 20 additions & 29 deletions cli/src/lib/localServices/ipfs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import type { IPFSHTTPClient } from "ipfs-http-client";

import { commandObj } from "../commandObj.js";
import { FS_OPTIONS } from "../const.js";
import { dbg } from "../dbg.js";
import { setTryTimeout, stringifyUnknown } from "../helpers/utils.js";

// !IMPORTANT for some reason when in tsconfig.json "moduleResolution" is set to "nodenext" - "ipfs-http-client" types all become "any"
Expand All @@ -45,11 +46,10 @@ export async function createIPFSClient(multiaddrString: string) {
async function upload(
multiaddr: string,
content: Parameters<IPFSHTTPClient["add"]>[0],
log: (msg: unknown) => void,
): Promise<string> {
try {
const ipfsClient = await createIPFSClient(multiaddr);
log(`created ipfs client`);
dbg(`created ipfs client`);

const { cid } = await ipfsClient.add(content, {
pin: true,
Expand All @@ -70,16 +70,16 @@ async function upload(
5000,
);

log(`did pin ${cidString} to ${multiaddr}`);
dbg(`did pin ${cidString} to ${multiaddr}`);

try {
const pinned = ipfsClient.pin.ls({ paths: cidString, type: "all" });

for await (const r of pinned) {
if (r.type === "recursive") {
log(`file ${cidString} pinned to ${multiaddr}`);
dbg(`file ${cidString} pinned to ${multiaddr}`);
} else {
log(`pin result type is not recursive. ${stringifyUnknown(r)}`);
dbg(`pin result type is not recursive. ${stringifyUnknown(r)}`);
}
}
} catch (error) {
Expand All @@ -101,7 +101,6 @@ async function upload(
const dagUpload = async (
multiaddr: string,
content: Parameters<IPFSHTTPClient["dag"]["put"]>[0],
log: (msg: unknown) => void,
) => {
const ipfsClient = await createIPFSClient(multiaddr);

Expand All @@ -114,16 +113,16 @@ const dagUpload = async (
const cidString = cid.toString();

await ipfsClient.pin.add(cidString);
log(`did pin ${cidString} to ${multiaddr}`);
dbg(`did pin ${cidString} to ${multiaddr}`);

try {
const pinned = ipfsClient.pin.ls({ paths: cidString, type: "all" });

for await (const r of pinned) {
if (r.type === "recursive") {
log(`file ${cidString} pinned to ${multiaddr}`);
dbg(`file ${cidString} pinned to ${multiaddr}`);
} else {
log(`pin result type is not recursive. ${stringifyUnknown(r)}`);
dbg(`pin result type is not recursive. ${stringifyUnknown(r)}`);
}
}
} catch (error) {
Expand All @@ -142,16 +141,10 @@ const dagUpload = async (
}
};

export function getIpfsClient(offAquaLogs: boolean) {
const log = (msg: unknown) => {
if (!offAquaLogs) {
commandObj.logToStderr(`ipfs: ${stringifyUnknown(msg)}`);
}
};

export function getIpfsClient() {
return {
async upload(multiaddr: string, absolutePath: string) {
log(`uploading ${absolutePath} to ${multiaddr}`);
dbg(`uploading ${absolutePath} to ${multiaddr}`);

try {
await access(absolutePath);
Expand All @@ -161,13 +154,13 @@ export function getIpfsClient(offAquaLogs: boolean) {
);
}

log(`reading ${absolutePath}`);
dbg(`reading ${absolutePath}`);
const data = await readFile(absolutePath);
log(`uploading ${absolutePath} to ${multiaddr}`);
return upload(multiaddr, data, log);
dbg(`uploading ${absolutePath} to ${multiaddr}`);
return upload(multiaddr, data);
},
async upload_string(multiaddr: string, string: string) {
return upload(multiaddr, Buffer.from(string), log);
return upload(multiaddr, Buffer.from(string));
},
async dag_upload(multiaddr: string, absolutePath: string) {
try {
Expand All @@ -179,10 +172,10 @@ export function getIpfsClient(offAquaLogs: boolean) {
}

const data = await readFile(absolutePath, FS_OPTIONS);
return dagUpload(multiaddr, data, log);
return dagUpload(multiaddr, data);
},
async dag_upload_string(multiaddr: string, string: string) {
return dagUpload(multiaddr, string, log);
return dagUpload(multiaddr, string);
},
async id(multiaddr: string): Promise<string> {
const ipfsClient = await createIPFSClient(multiaddr);
Expand Down Expand Up @@ -225,25 +218,23 @@ export function getIpfsClient(offAquaLogs: boolean) {

for await (const r of rm) {
if (r.error !== undefined) {
log(`block rm failed. ${stringifyUnknown(r.error)}`);
dbg(`block rm failed. ${stringifyUnknown(r.error)}`);
}
}

return "Success";
} catch (err) {
log(`remove failed. ${stringifyUnknown(err)}`);
dbg(`remove failed. ${stringifyUnknown(err)}`);
return "Error: remove failed";
}
},
};
}

export async function doRegisterIpfsClient(
offAquaLogs: boolean,
): Promise<void> {
export async function doRegisterIpfsClient(): Promise<void> {
const { registerIpfsClient } = await import(
"../compiled-aqua/installation-spell/files.js"
);

registerIpfsClient(getIpfsClient(offAquaLogs));
registerIpfsClient(getIpfsClient());
}

0 comments on commit 909d8cb

Please sign in to comment.