Skip to content

Commit

Permalink
chore: update ipfs (now with typing) dependencies (#1337)
Browse files Browse the repository at this point in the history
Co-authored-by: Rod Vagg <[email protected]>
Co-authored-by: Spencer T Brody <[email protected]>
  • Loading branch information
3 people committed Jun 2, 2021
1 parent a598c10 commit c9438bd
Show file tree
Hide file tree
Showing 26 changed files with 20,138 additions and 3,054 deletions.
22,945 changes: 20,020 additions & 2,925 deletions package-lock.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@
"@ceramicnetwork/streamid": "^1.0.1",
"@stablelib/random": "^1.0.0",
"aws-sdk": "^2.902.0",
"blockcodec-to-ipld-format": "^1.0.0",
"commander": "^7.0.0",
"cors": "^2.8.5",
"dag-jose": "^0.3.0",
"did-resolver": "^3.1.0",
"dids": "^2.1.0",
"express": "^4.17.1",
"flat": "^5.0.2",
"ipfs-http-client": "~49.0.3",
"ipfs-http-client": "^50.1.0",
"key-did-provider-ed25519": "^1.1.0",
"key-did-resolver": "^1.2.1",
"levelup": "^4.4.0",
"morgan": "^1.10.0",
"multiformats": "~4.6.1",
"s3leveldown": "^2.2.1",
"stream-to-array": "^2.3.0",
"uint8arrays": "^2.0.5"
Expand All @@ -68,7 +68,7 @@
"@types/node": "^13.13.15",
"@types/stream-to-array": "^2.3.0",
"get-port": "^5.1.1",
"ipfs-core": "~0.5.2",
"ipfs-core": "^0.7.0",
"rxjs": "^7.0.0",
"tmp-promise": "^2.0.2"
},
Expand Down
8 changes: 2 additions & 6 deletions packages/cli/src/__tests__/create-ipfs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { IpfsApi } from '@ceramicnetwork/common';
import getPort from 'get-port';
import { sha256 } from 'multiformats/hashes/sha2';
import legacy from 'multiformats/legacy';
import { convert } from 'blockcodec-to-ipld-format'
import dagJose from 'dag-jose';
import IPFS from 'ipfs-core';

Expand All @@ -10,9 +9,7 @@ import IPFS from 'ipfs-core';
*/
export async function createIPFS(path: string): Promise<IpfsApi> {
const port = await getPort();
const hasher = {};
hasher[sha256.code] = sha256;
const format = legacy(dagJose, { hashes: hasher });
const format = convert(dagJose);

const config = {
ipld: { formats: [format] },
Expand All @@ -24,6 +21,5 @@ export async function createIPFS(path: string): Promise<IpfsApi> {
},
};

// @ts-ignore
return IPFS.create(config);
}
9 changes: 3 additions & 6 deletions packages/cli/src/build-ipfs-connection.util.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
import {IpfsDaemon} from "@ceramicnetwork/ipfs-daemon";

import dagJose from 'dag-jose'
import legacy from 'multiformats/legacy'
import { convert } from 'blockcodec-to-ipld-format'
import ipfsClient from "ipfs-http-client"
import { IpfsApi } from "@ceramicnetwork/common"
import { DiagnosticsLogger } from "@ceramicnetwork/common";
import { sha256 } from 'multiformats/hashes/sha2'

const hasher = {}
hasher[sha256.code] = sha256
const dagJoseFormat = legacy(dagJose, {hashes: hasher})
const dagJoseFormat = convert(dagJose)


const IPFS_DHT_SERVER_MODE = process.env.IPFS_DHT_SERVER_MODE === 'true'
const IPFS_GET_TIMEOUT = 60000 // 1 minute

export async function buildIpfsConnection(network: string, logger: DiagnosticsLogger, ipfsEndpoint?: string): Promise<IpfsApi>{
if (ipfsEndpoint) {
return ipfsClient({ url: ipfsEndpoint, ipld: { formats: [dagJoseFormat] }, timeout: IPFS_GET_TIMEOUT })
return ipfsClient.create({ url: ipfsEndpoint, ipld: { formats: [dagJoseFormat] }, timeout: IPFS_GET_TIMEOUT })
} else {
const ipfsDaemon = await IpfsDaemon.create({
ipfsDhtServerMode: IPFS_DHT_SERVER_MODE,
Expand Down
3 changes: 2 additions & 1 deletion packages/common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
"@types/logfmt": "^1.2.1",
"@types/node": "^13.13.15",
"dids": "^2.1.0",
"ipfs-core": "~0.5.2",
"ipfs-core": "^0.7.0",
"ipfs-core-types": "^0.5.0",
"json-schema-to-typescript": "^9.1.1",
"typescript-json-schema": "^0.42.0"
},
Expand Down
6 changes: 2 additions & 4 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,5 @@ export * from './running-state-like'
export * from './stream-state-subject'
export * from './subscription-set'

// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import type { IPFSAPI as IpfsApi } from 'ipfs-core/dist/src/components'
export type IpfsApi = typeof IpfsApi
import type { IPFS } from 'ipfs-core-types'
export type IpfsApi = IPFS
3 changes: 1 addition & 2 deletions packages/common/src/utils/stream-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,9 @@ export class StreamUtils {
static async convertCommitToSignedCommitContainer(commit: CeramicCommit, ipfs: IpfsApi): Promise<CeramicCommit> {
if (StreamUtils.isSignedCommit(commit)) {
const block = await ipfs.block.get((commit as DagJWS).link)
const linkedBlock = block.data instanceof Uint8Array ? block.data : new Uint8Array(block.data.buffer)
return {
jws: commit as DagJWS,
linkedBlock,
linkedBlock: block.data,
}
}
return commit
Expand Down
8 changes: 3 additions & 5 deletions packages/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ $ npm install @ceramicnetwork/core
import Ceramic from '@ceramicnetwork/core'
import TileDocument from '@ceramicnetwork/stream-tile'
import IPFS from 'ipfs'
import IPFS from 'ipfs-core'
import dagJose from 'dag-jose'
import basicsImport from 'multiformats/cjs/src/basics-import.js'
import legacy from 'multiformats/cjs/src/legacy.js'
import { convert } from 'blockcodec-to-ipld-format'
basicsImport.multicodec.add(dagJose)
const format = legacy(basicsImport, dagJose.name)
const format = convert(dagJose)
const ipfs = Ipfs.create({
ipld: { formats: [format] },
Expand Down
8 changes: 4 additions & 4 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@
"ajv": "^8.1.0",
"ajv-formats": "^2.0.2",
"await-semaphore": "^0.1.3",
"blockcodec-to-ipld-format": "^1.0.0",
"cids": "~1.1.6",
"dids": "^2.1.0",
"ipld-dag-cbor": "^0.17.0",
"ipld-dag-cbor": "^1.0.0",
"level-ts": "^2.0.5",
"lodash.clonedeep": "^4.5.0",
"multihashes": "^3.1.2",
"multihashes": "^4.0.2",
"p-queue": "^6.6.1",
"rxjs": "^7.0.0",
"uint8arrays": "^2.0.5"
Expand All @@ -61,11 +62,10 @@
"dag-jose": "^0.3.0",
"did-resolver": "^3.1.0",
"get-port": "^5.1.1",
"ipfs": "~0.54.2",
"ipfs-core": "^0.7.0",
"key-did-provider-ed25519": "^1.1.0",
"key-did-resolver": "^1.2.1",
"mockdate": "^3.0.5",
"multiformats": "~4.6.1",
"tmp-promise": "^2.0.2"
},
"jest": {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/__tests__/dispatcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ describe('Dispatcher', () => {
// Store the query ID sent when the stream is registered so we can use it as the response ID later
const publishArgs = ipfs.pubsub.publish.mock.calls[0];
expect(publishArgs[0]).toEqual(TOPIC);
const queryMessageSent = JSON.parse(publishArgs[1]);
const queryMessageSent = JSON.parse(new TextDecoder().decode(publishArgs[1]));
const queryID = queryMessageSent.id;

// Handle UPDATE message
Expand Down
13 changes: 4 additions & 9 deletions packages/core/src/__tests__/ipfs-util.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import dagJose from 'dag-jose';
import { sha256 } from 'multiformats/hashes/sha2'
import legacy from 'multiformats/legacy'
import IPFS from 'ipfs';
import { convert } from 'blockcodec-to-ipld-format'
import IPFS from 'ipfs-core';
import { IpfsApi } from '@ceramicnetwork/common';
import tmp from 'tmp-promise';
import getPort from 'get-port';
Expand All @@ -11,9 +10,7 @@ import getPort from 'get-port';
* @param overrideConfig - IFPS config for override
*/
export async function createIPFS(overrideConfig: Record<string, unknown> = {}): Promise<IpfsApi> {
const hasher = {}
hasher[sha256.code] = sha256
const format = legacy(dagJose, {hashes: hasher})
const format = convert(dagJose)
const tmpFolder = await tmp.dir({ unsafeCleanup: true });

const port = await getPort();
Expand All @@ -27,8 +24,6 @@ export async function createIPFS(overrideConfig: Record<string, unknown> = {}):
};

const config = { ...defaultConfig, ...overrideConfig };
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
const instance = await IPFS.create(config);

// IPFS does not notify you when it stops.
Expand All @@ -51,6 +46,6 @@ export async function createIPFS(overrideConfig: Record<string, unknown> = {}):
* @param b - Receives connection
*/
export async function swarmConnect(a: IpfsApi, b: IpfsApi) {
const addressB = (await b.id()).addresses[0].toString();
const addressB = (await b.id()).addresses[0];
await a.swarm.connect(addressB);
}
6 changes: 4 additions & 2 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ export class Dispatcher {
*/
async retrieveCommit (cid: CID | string): Promise<any> {
try {
const record = await this._ipfs.dag.get(cid, {timeout: IPFS_GET_TIMEOUT})
const asCid = typeof cid === 'string' ? new CID(cid) : cid
const record = await this._ipfs.dag.get(asCid, {timeout: IPFS_GET_TIMEOUT})
await this._restrictRecordSize(cid)
return cloneDeep(record.value)
} catch (e) {
Expand All @@ -91,7 +92,8 @@ export class Dispatcher {
*/
async retrieveFromIPFS (cid: CID | string, path?: string): Promise<any> {
try {
const record = await this._ipfs.dag.get(cid, {timeout: IPFS_GET_TIMEOUT, path})
const asCid = typeof cid === 'string' ? new CID(cid) : cid
const record = await this._ipfs.dag.get(asCid, {timeout: IPFS_GET_TIMEOUT, path})
return cloneDeep(record.value)
} catch (e) {
this._logger.err(`Error while loading CID ${cid.toString()} from IPFS: ${e}`)
Expand Down
3 changes: 1 addition & 2 deletions packages/core/src/pubsub/__tests__/as-ipfs-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import * as random from '@stablelib/random';
* @param from - Peer ID that ostensibly sent the message.
*/
export function asIpfsMessage(data: PubsubMessage, from?: string): IPFSPubsubMessage {
const asBytes = uint8arrays.fromString(serialize(data));
return {
from: from || 'outer-space',
data: asBytes,
data: serialize(data),
topicIDs: ['topic'],
seqno: random.randomBytes(10),
signature: random.randomBytes(10),
Expand Down
17 changes: 10 additions & 7 deletions packages/core/src/pubsub/pubsub-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { UnreachableCaseError } from '@ceramicnetwork/common';
import dagCBOR from 'ipld-dag-cbor';
import * as multihashes from 'multihashes';
import * as sha256 from '@stablelib/sha256';
import { TextDecoder } from 'util';
import { TextDecoder, TextEncoder } from 'util';
import * as uint8arrays from 'uint8arrays';

/**
Expand Down Expand Up @@ -36,6 +36,9 @@ export type ResponseMessage = {

export type PubsubMessage = UpdateMessage | QueryMessage | ResponseMessage;

const textEncoder = new TextEncoder()
const textDecoder = new TextDecoder('utf-8')

function messageHash(message: any): string {
// DAG-CBOR encoding
const encoded = dagCBOR.util.serialize(message);
Expand All @@ -59,14 +62,14 @@ export function buildQueryMessage(streamId: StreamID): QueryMessage {
};
}

export function serialize(message: PubsubMessage): string {
export function serialize(message: PubsubMessage): Uint8Array {
switch (message.typ) {
case MsgType.QUERY: {
return JSON.stringify({
return textEncoder.encode(JSON.stringify({
...message,
doc: message.stream.toString(), // todo remove once we no longer support interop with nodes older than v1.0.0
stream: message.stream.toString(),
});
}));
}
case MsgType.RESPONSE: {
const tips = {};
Expand All @@ -75,20 +78,20 @@ export function serialize(message: PubsubMessage): string {
...message,
tips: tips,
};
return JSON.stringify(payload);
return textEncoder.encode(JSON.stringify(payload));
}
case MsgType.UPDATE: {
// todo remove 'doc' once we no longer support interop with nodes older than v1.0.0
const payload = { typ: MsgType.UPDATE, doc: message.stream.toString(), stream: message.stream.toString(), tip: message.tip.toString() };
return JSON.stringify(payload);
return textEncoder.encode(JSON.stringify(payload));
}
default:
throw new UnreachableCaseError(message, 'Unknown message type');
}
}

export function deserialize(message: any): PubsubMessage {
const asString = new TextDecoder('utf-8').decode(message.data);
const asString = textDecoder.decode(message.data);
const parsed = JSON.parse(asString);

const typ = parsed.typ as MsgType;
Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/pubsub/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import { IpfsApi } from '@ceramicnetwork/common';
import { map, catchError, mergeMap, withLatestFrom } from 'rxjs/operators';
import { IncomingChannel, filterExternal, IPFSPubsubMessage } from './incoming-channel';
import { DiagnosticsLogger, ServiceLogger } from '@ceramicnetwork/common';
import { TextDecoder } from 'util';

const textDecoder = new TextDecoder('utf-8')

/**
* Deserialize incoming message in an internal observable that does not emit if error happens.
Expand All @@ -25,7 +28,7 @@ function ipfsToPubsub(
map((incoming) => {
const message = deserialize(incoming);
const serializedMessage = serialize(message);
const logMessage = { ...incoming, ...JSON.parse(serializedMessage) };
const logMessage = { ...incoming, ...JSON.parse(textDecoder.decode(serializedMessage)) };
delete logMessage.key;
delete logMessage.signature;
pubsubLogger.log({ peer: peerId, event: 'received', topic: topic, message: logMessage });
Expand Down Expand Up @@ -79,7 +82,7 @@ export class Pubsub extends Observable<PubsubMessage> {
)
.subscribe({
next: ({ peerId, serializedMessage }) => {
const logMessage = { ...message, ...JSON.parse(serializedMessage) };
const logMessage = { ...message, ...JSON.parse(textDecoder.decode(serializedMessage)) };
this.pubsubLogger.log({ peer: peerId, event: 'published', topic: this.topic, message: logMessage });
},
error: (error) => {
Expand Down
9 changes: 5 additions & 4 deletions packages/ipfs-daemon/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,19 @@
"@ceramicnetwork/common": "^1.0.4",
"@ceramicnetwork/ipfs-topology": "^1.1.3",
"aws-sdk": "^2.902.0",
"blockcodec-to-ipld-format": "^1.0.0",
"dag-jose": "^0.3.0",
"datastore-fs": "^4.0.0",
"datastore-level": "^5.0.1",
"datastore-s3": "^5.0.0",
"express": "^4.17.1",
"ipfs": "~0.54.2",
"ipfs-http-gateway": "~0.3.2",
"ipfs-http-server": "~0.3.3",
"multiformats": "~4.6.1"
"ipfs-core-types": "^0.5.0",
"ipfs-http-gateway": "^0.4.1",
"ipfs-http-server": "^0.5.0"
},
"devDependencies": {
"@types/express": "^4.17.8",
"@types/jest": "^26.0.23",
"@types/node": "^13.13.15"
}
}
5 changes: 3 additions & 2 deletions packages/ipfs-daemon/src/create-repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ const notALock = {
getLockfilePath: () => {
// Do Nothing
},
lock: () => notALock.getCloser(),
lock: () => Promise.resolve(notALock.getCloser()),
getCloser: () => ({
close: () => {
// Do Nothing
return Promise.resolve()
},
}),
locked: () => false,
locked: () => Promise.resolve(false),
};

export enum StorageBackend {
Expand Down
Loading

0 comments on commit c9438bd

Please sign in to comment.