Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support read-after-write consistency for NodeGraph bucket operations #244

Closed
joshuakarp opened this issue Sep 21, 2021 · 22 comments · Fixed by #419
Closed

Support read-after-write consistency for NodeGraph bucket operations #244

joshuakarp opened this issue Sep 21, 2021 · 22 comments · Fixed by #419
Assignees
Labels
bug Something isn't working development Standard development epic Big issue with multiple subissues r&d:polykey:core activity 4 End to End Networking behind Consumer NAT Devices

Comments

@joshuakarp
Copy link
Contributor

joshuakarp commented Sep 21, 2021

Specification

There's currently an issue of "read-after-write" consistency with sequences of batch operations in the NodeGraph buckets database, as setNode operations rely on the state of the bucket on insertion.

Recall that a bucket has a fixed capacity of k nodes. If we attempt to add a new node to a full bucket, we find the least active node in the bucket and remove it, in favour of the new node.

However, we come across an issue if we want to create a batch operation of multiple setNodeOps within the same bucket.

Because the ops are created before any additions, the bucket value is statically defined, based on the initial state of the bucket.

For example, if I were to batch add 3 nodes (A, B, C) into the same bucket (that already has some node Z, we'd have the following array of ops:

    [
      {
        type: 'put',
        domain: this.nodeGraphBucketsDbDomain,
        key: bucketIndex,
        value: {'Z':9.9.9.9, 'A':1.1.1.1},
      },
      {
        type: 'put',
        domain: this.nodeGraphBucketsDbDomain,
        key: bucketIndex,
        value: {'Z':9.9.9.9, 'B':2.2.2.2},
      },
      {
        type: 'put',
        domain: this.nodeGraphBucketsDbDomain,
        key: bucketIndex,
        value: {'Z':9.9.9.9, 'C':3.3.3.3},
      },
    ]

That is, the final setNode op would overwrite the entire bucket with only nodes Z and C in the bucket.

This problem can be solved by utilising the same system in js-encryptedfs. We'd need to use:

  • separate sublevels for each individual bucket index
  • Transaction snapshot system

In this process, it would also be worthwhile to change our keys to be buffers.

Additional context

Tasks

  1. Investigate the snapshot system in the EFS Transaction class
  2. Change buckets to use individual sublevels for each bucket index (instead of a single sublevel indexed by bucket indexes - see how RFS)
  3. Add "higher" metadata level for storing the size of a particular bucket. See INodeManager sublevel vs INodeManager dir sublevel. The INodeManager sublevel is capable of storing metadata.
  4. Incorporate transaction system into NodeGraph operations
  5. Remove lexicographic-integer dependency from PK (see original comments regarding this here https://gitlab.com/MatrixAI/Engineering/Polykey/js-polykey/-/merge_requests/205#note_707121227)
@CMCDragonkai
Copy link
Member

CMCDragonkai commented Sep 21, 2021 via email

@joshuakarp joshuakarp added the epic Big issue with multiple subissues label Nov 29, 2021
@joshuakarp
Copy link
Contributor Author

This is currently within #291, but it should be considered low priority. The only function that requires this concurrent operation is refreshBuckets, and I've already implemented a work-around to resolve it within the function itself:

  public async refreshBuckets(): Promise<void> {
    return await this._transaction(async () => {
      const ops: Array<DBOp> = [];
      // Get a local copy of all the buckets
      const buckets = await this.getAllBuckets();
      // Wrap as a batch operation. We want to rollback if we encounter any
      // errors (such that we don't clear the DB without re-adding the nodes)
      // 1. Delete every bucket
      for await (const k of this.nodeGraphBucketsDb.createKeyStream()) {
        const hexBucketIndex = k as string;
        ops.push({
          type: 'del',
          domain: this.nodeGraphBucketsDbDomain,
          key: hexBucketIndex,
        });
      }
      const tempBuckets: Record<string, NodeBucket> = {};
      // 2. Re-add all the nodes from all buckets
      for (const b of buckets) {
        for (const n of Object.keys(b)) {
          const nodeId = n as NodeId;
          const newIndex = this.getBucketIndex(nodeId);
          let expectedBucket = tempBuckets[newIndex];
          // The following is more or less copied from setNodeOps
          if (expectedBucket == null) {
            expectedBucket = {};
          }
          const bucketEntries = Object.entries(expectedBucket);
          // Add the old node
          expectedBucket[nodeId] = {
            address: b[nodeId].address,
            lastUpdated: b[nodeId].lastUpdated,
          };
          // If, with the old node added, we exceed the limit...
          if (bucketEntries.length > this.maxNodesPerBucket) {
            // Then, with the old node added, find the least active and remove
            const leastActive = bucketEntries.reduce((prev, curr) => {
              return prev[1].lastUpdated < curr[1].lastUpdated ? prev : curr;
            });
            delete expectedBucket[leastActive[0]];
          }
          // Add this reconstructed bucket (with old node) into the temp storage
          tempBuckets[newIndex] = expectedBucket;
        }
      }
      // Now that we've reconstructed all the buckets, perform batch operations
      // on a bucket level (i.e. per bucket, instead of per node)
      for (const bucketIndex in tempBuckets) {
        ops.push({
          type: 'put',
          domain: this.nodeGraphBucketsDbDomain,
          key: bucketIndex,
          value: tempBuckets[bucketIndex],
        });
      }
      await this.db.batch(ops);
    });
  }

That is to say, this issue is only required for making any potential, future concurrent operations on the NodeGraph safe.

@CMCDragonkai
Copy link
Member

The sublevels used in INodeManager in EFS does not care about the order of the sublevels. This is why we could just do this:

    const dataDb = await this.db.level(ino.toString(), this.dataDb);

But in the buckets of the NodeGraph, the order of the sublevels do matter. So we want to ensure that the sublevel prefix ordering is the same as the numeric order of the bucket index.

To do this, you need to lexi int encode the bucket index before you call this.db.level.

Note that iteration over buckets is not efficient, but iteration over key-values of all buckets will be efficient.

// this becomes difficult (you need a way to indicate every possible bucket)
for (const bucket of ...) {
}
// this is easy
for (const o of ALLBUCKETS) {
}

// this is also easy
const bucket = await this.db.level(lexint(bucketIndex), nodeGraphDb);
for (const o of bucket) {
}

@CMCDragonkai
Copy link
Member

Fixing this is also about correctness, not just performance. The read after write consistency can be assured by using the DBTransaction.

@joshuakarp
Copy link
Contributor Author

joshuakarp commented Feb 17, 2022

Something that's also arisen as I've been writing the getAll command for getting all buckets in #326. A bucket at the moment is the following structure:

bucket {
  'nodeId1': {
    address: { host: ..., port: ... },
    lastUpdated: ...,
  },
  'nodeId2': {
    address: { host: ..., port: ... },
    lastUpdated: ...,
  },
  ...
}

The NodeBucket type we currently use for this is as follows (from src/nodes/types):

type NodeBucket = {
  [key: string]: {
    address: NodeAddress;
    lastUpdated: Date;
  };
};

I found it interesting that the key is a string. So, taking a look at a specific example of a bucket:

    bucket {
      '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05': {
        address: { host: '227.1.1.1', port: 4567 },
        lastUpdated: '2022-02-17T23:04:27.678Z'
      }
    }

We can see that we're getting the NodeId "buffer" as a string. (Can just translate this into the raw NodeId buffer with IdInternal.fromString<NodeId>(nodeId)). I can't quite remember if this was the intention of when we refactored the node ID. This may just be how leveldb stores the buffer, but I'm not 100% sure.

Anyway, this may be worthwhile to revisit and investigate when looking into separating the buckets into separate leveldb instances.

@CMCDragonkai
Copy link
Member

That problem becomes non-problem once you move to using sublevels.

However in the meantime, any POJO keys have to be strings so it may be using NodeIdString and not NodeIdEncoded.

I usually prefer storing any external data db or network as the encoded version, not string version.

@joshuakarp
Copy link
Contributor Author

joshuakarp commented Feb 18, 2022

I've added the CommandGetAll in #326 for retrieving all of the buckets at the CLI level: 366c9dd.

Once the buckets are moved to their own sublevels, it would also be worthwhile to take a look at that command in this issue. It was a bit of a quick and dirty addition (the outputting of the buckets is very rough, and the getAllBuckets call doesn't retain the bucket index nicely, so have had to recalculate bucket index after retrieval), and doesn't currently have any tests either.

The getAllBuckets function in NodeGraph that it calls is likely to change in this issue anyway, so can throw that all together.

@CMCDragonkai CMCDragonkai mentioned this issue Feb 21, 2022
29 tasks
@CMCDragonkai
Copy link
Member

CMCDragonkai commented Feb 21, 2022

The first method to change over is this:

  /**
   * Gets NodeBucket POJO, where keys are NodeIdString
   */
  @ready(new nodesErrors.ErrorNodeGraphNotRunning())
  public async getBucket(bucketIndex: number): Promise<NodeBucket> {
    if (bucketIndex < 0 || bucketIndex >= this.nodeIdBits) {
      throw new nodesErrors.ErrorNodeGraphBucketIndex(`bucketIndex must be between 0 and ${this.nodeIdBits - 1} inclusive`);
    }
    const bucketDb = await this.db.level(
      lexi.pack(bucketIndex, 'hex'),
      this.nodeGraphBucketsDb
    );
    const nodeBucket: NodeBucket = {};
    for await (const o of bucketDb.createReadStream()) {
      const nodeId = IdInternal.fromBuffer<NodeId>((o as any).key);
      const data = (o as any).value as Buffer;
      const nodeBucketEntry = await this.db.deserializeDecrypt<NodeBucketEntry>(data, false)
      nodeBucket[nodeId] = nodeBucketEntry;
    }
    return nodeBucket;
  }

Note how we are now checking if the bucketIndex is in the correct range.

Furthermore the bucket db is just a sublevel of nodeGraphBucketsDb, then it just streams out the entries, and puts them into the object.

Now that {} is the sentinel value. One can use utils.isEmptyObject to check if there are no entries in the bucket.

I'm expecting that NodeBucket is primarily used internally, but if this were to be exposed to the outside world, the keys should be re-encoded with nodesUtils.encodeNodeId so that they are using NodeIdEncoded.

Also no locking is required here, cause it cannot be affected by other things here. So no read lock nor transactions needed here.

@CMCDragonkai
Copy link
Member

If we are to use DBTransaction, we would need to expose this as a parameter in all the methods so that operations can be chained up within a DB transaction. This is why it would be unnecessary to have in-memory locking.

At any case, right now I'm removing the transaction wrappers as a WIP to figure out how to do this properly.

  @ready(new nodesErrors.ErrorNodeGraphNotRunning())
  public async getBucket(bucketIndex: number): Promise<NodeBucket> {
    if (bucketIndex < 0 || bucketIndex >= this.nodeIdBits) {
      throw new nodesErrors.ErrorNodeGraphBucketIndex(`bucketIndex must be between 0 and ${this.nodeIdBits - 1} inclusive`);
    }
    const bucketDb = await this.db.level(
      lexi.pack(bucketIndex, 'hex'),
      this.nodeGraphBucketsDb
    );
    const nodeBucket: NodeBucket = {};
    for await (const o of bucketDb.createReadStream()) {
      const nodeId = IdInternal.fromBuffer<NodeId>((o as any).key);
      const data = (o as any).value as Buffer;
      const nodeBucketEntry = await this.db.deserializeDecrypt<NodeData>(data, false)
      nodeBucket[nodeId] = nodeBucketEntry;
    }
    return nodeBucket;
  }

  @ready(new nodesErrors.ErrorNodeGraphNotRunning())
  public async getNode(nodeId: NodeId): Promise<NodeData | undefined> {
    const bucketIndex = nodesUtils.calculateBucketIndex(
      this.keyManager.getNodeId(),
      nodeId,
    );
    const bucketDomain = [...this.nodeGraphBucketsDbDomain, lexi.pack(bucketIndex, 'hex')];
    return await this.db.get<NodeData>(
      bucketDomain,
      nodeId.toBuffer()
    );
  }

A transactional API would be very different from the rest of the application, since no other parts of the system expose the transactions system. At least high level domains they don't.

The question is how exposed is the transactional logic of NodeGraph is to other parts of the codebase. If we want to expose db transactions, we're saying the internal state changes can be made consistent with the other db levels in PK. If we only expose the public methods, then we are saying that other parts of the code should never consider the DB at all, and only think about the NodeGraph's public methods.

@CMCDragonkai
Copy link
Member

CMCDragonkai commented Feb 22, 2022

We may not need an transactions or locking once the entries in the node graph are no longer buckets, and are just NodeId -> NodeData. So the above discussions about transactions may be less important.

But if transactions are necessary, then advisory locking is also necessary. And different locks can be created along with different transactional contexts.


We do need need locks because we are doing operations to the bucket meta count as we are adding nodes or whatnot.

@CMCDragonkai
Copy link
Member

So right now we have 2 sublevels:

  • nodeGraphMetaDbDomain
  • nodeGraphBucketsDbDomain

Both are further indexed by bucket keys.

!NodeGraph!meta!BUCKETKEY!PROP
!NodeGraph!buckets!BUCKETKEY!NODEID

The BUCKETKEY is lexi.pack(bucketIndex, 'hex').

The PROP is currently only count. Which is keeping track of how many entries in each bucket.

The NODEID is the buffer version of NodeId i.e. nodeId.toBuffer().

This is mirrors the EFS design where metadata are individual properties rather than putting into the JSON.

I'm going to look into passing tran around the methods public and protected. However public methods do that call each other can be wasting cycles doing checks that have already been done like checking the bucket index is in-range.

@CMCDragonkai
Copy link
Member

All of the ops methods will be replaced with the DBTransaction.

@CMCDragonkai
Copy link
Member

Also I think updateNode shouldn't need to exist. It seems like something that can just be done with setNode.

@CMCDragonkai
Copy link
Member

Just realised that since the k-buckets are meant to be kept sorted, instead of sorting upon streaming in a bucket, we should try to keep the each entry sorted by the last updated. This has a similar problem to #329 where we want the entries to be sorted by last updated time.

In #329 a suggestion was to use a compound index, and to store it as so. However here we just want an index that maintains order, so the sorting would have to be updated when the keys change. I can imagine that the lastUpdated timestamp number becomes a key in this case, and they, and when the lastUpdated is changed, it would need to update the relevant entry as well. However we don't yet have indexing yet, so this requires a dynamic sort of the k-bucket. Another reason to get MatrixAI/js-db#1 done.

@CMCDragonkai
Copy link
Member

CMCDragonkai commented Feb 23, 2022

Something that's also arisen as I've been writing the getAll command for getting all buckets in #326. A bucket at the moment is the following structure:

bucket {
  'nodeId1': {
    address: { host: ..., port: ... },
    lastUpdated: ...,
  },
  'nodeId2': {
    address: { host: ..., port: ... },
    lastUpdated: ...,
  },
  ...
}

The NodeBucket type we currently use for this is as follows (from src/nodes/types):

type NodeBucket = {
  [key: string]: {
    address: NodeAddress;
    lastUpdated: Date;
  };
};

I found it interesting that the key is a string. So, taking a look at a specific example of a bucket:

    bucket {
      '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05': {
        address: { host: '227.1.1.1', port: 4567 },
        lastUpdated: '2022-02-17T23:04:27.678Z'
      }
    }

We can see that we're getting the NodeId "buffer" as a string. (Can just translate this into the raw NodeId buffer with IdInternal.fromString<NodeId>(nodeId)). I can't quite remember if this was the intention of when we refactored the node ID. This may just be how leveldb stores the buffer, but I'm not 100% sure.

Anyway, this may be worthwhile to revisit and investigate when looking into separating the buckets into separate leveldb instances.

The new getAllBuckets is now getBuckets.

This call now has to use streaming iteration over each node entry as discussed in #244 (comment).

The only issue is that when doing this:

    for await (const o of this.nodeGraphBucketsDb.createReadStream()) {
      // o.key is not just `NodeId` as a buffer
      // it hs the bucket key as a sublevel prefix
    }

So we have to extract out the BucketIndex out of the o.key.

@CMCDragonkai
Copy link
Member

We use ! as the the sublevelprefixer separator.

When outputting the keys as a string we see:

!fb04!5cf7212ef26a716d34501c6c45d4f501

So to parse this buffer, we can try to split it on the !.

However I don't think that's very efficient. So instead I want to parse the buffer version of !, and since I know the buffer key is a prefix, it should be sufficient to to extract out the buffer key and be left with the NodeId.

@CMCDragonkai
Copy link
Member

The getBuckets now looks like this:

  @ready(new nodesErrors.ErrorNodeGraphNotRunning())
  public async *getBuckets(): AsyncGenerator<[NodeBucketIndex, NodeBucket]> {
    let bucketIndex: NodeBucketIndex | undefined;
    let bucket: NodeBucket = {};
    // Iterating over all entries across all buckets
    for await (const o of this.nodeGraphBucketsDb.createReadStream()) {
      // The key is a combined bucket key and node ID
      // It needs to be parsed into `BucketIndex` and `NodeId`
      const [bucketIndex_, nodeId] = nodesUtils.parseBucketNodeKey(o.key as Buffer);
      const data = (o as any).value as Buffer;
      const nodeData = await this.db.deserializeDecrypt<NodeData>(data, false)
      if (bucketIndex == null) {
        // First entry of the first bucket
        bucketIndex = bucketIndex_
        bucket[nodeId] = nodeData;
      } else if (bucketIndex === bucketIndex_) {
        // Subsequent entries of the same bucket
        bucket[nodeId] = nodeData;
      } else if (bucketIndex !== bucketIndex_) {
        // New bucket
        yield [bucketIndex, bucket];
        bucketIndex = bucketIndex_;
        bucket = { [nodeId]: nodeData };
      }
    }
    // Yield the last bucket if it exists
    if (bucketIndex != null) {
      yield [bucketIndex, bucket];
    }
  }

This will stream the buckets as well as the bucket index. It will also be streamed in order of the bucket indexes too, since we know lower buckets will be ordered first in lexicographic order.

@CMCDragonkai
Copy link
Member

This can only be fully fixed after #326 and #366 is merged.

@tegefaulkes
Copy link
Contributor

All tasks except for 5. is completed. it should be a quick fix, we just need to know what to replace it with.

@CMCDragonkai
Copy link
Member

I'm not sure about 5. We are now using buckets as sublevels anyway, but do we need to maintain bucket order according to the index? If so, then we have to continue using lexi int.

@tegefaulkes
Copy link
Contributor

Yes, order still matters since we're iterating over nodes in bucket order for getClosestNodes. So if we're keeping it then I think we can consider this issue done.

@CMCDragonkai
Copy link
Member

Still need the SI transactions for this to be done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working development Standard development epic Big issue with multiple subissues r&d:polykey:core activity 4 End to End Networking behind Consumer NAT Devices
Development

Successfully merging a pull request may close this issue.

3 participants