Skip to content

Commit

Permalink
[Notion] reset state after restart (#14241)
Browse files Browse the repository at this point in the history
* add underscore to private methods

* activate and deactivate hooks to restart state

* remove deploy hook

* add logs

* save as timestamp in db

* bump versions
  • Loading branch information
andrewjschuang authored Oct 9, 2024
1 parent c6c7db2 commit 4c9475f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 26 deletions.
2 changes: 1 addition & 1 deletion components/notion/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/notion",
"version": "0.2.3",
"version": "0.2.4",
"description": "Pipedream Notion Components",
"main": "notion.app.mjs",
"keywords": [
Expand Down
58 changes: 33 additions & 25 deletions components/notion/sources/updated-page/updated-page.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export default {
key: "notion-updated-page",
name: "Updated Page in Database", /* eslint-disable-line pipedream/source-name */
description: "Emit new event when a page in a database is updated. To select a specific page, use `Updated Page ID` instead",
version: "0.1.2",
version: "0.1.3",
type: "source",
dedupe: "unique",
props: {
Expand Down Expand Up @@ -40,35 +40,37 @@ export default {
},
},
hooks: {
async deploy() {
const propertiesToCheck = await this.getPropertiesToCheck();
async activate() {
console.log("Activating: fetching pages and properties");
this._setLastUpdatedTimestamp(Date.now());
const propertyValues = {};
const propertiesToCheck = await this._getPropertiesToCheck();
const params = this.lastUpdatedSortParam();
const pagesStream = this.notion.getPages(this.databaseId, params);
let count = 0;
let lastUpdatedTimestamp = 0;
for await (const page of pagesStream) {
for (const propertyName of propertiesToCheck) {
const currentValue = this.maybeRemoveFileSubItems(page.properties[propertyName]);
const currentValue = this._maybeRemoveFileSubItems(page.properties[propertyName]);
propertyValues[page.id] = {
...propertyValues[page.id],
[propertyName]: currentValue,
};
}
lastUpdatedTimestamp = Math.max(
lastUpdatedTimestamp,
Date.parse(page.last_edited_time),
);
if (count++ < 25) {
this.emitEvent(page);
}
}
this._setPropertyValues(propertyValues);
this.setLastUpdatedTimestamp(lastUpdatedTimestamp);
},
async deactivate() {
console.log("Deactivating: clearing states");
this._setLastUpdatedTimestamp(null);
},
},
methods: {
...base.methods,
_getLastUpdatedTimestamp() {
return this.db.get(constants.timestamps.LAST_EDITED_TIME);
},
_setLastUpdatedTimestamp(ts) {
this.db.set(constants.timestamps.LAST_EDITED_TIME, ts);
},
_getPropertyValues() {
const compressed = this.db.get("propertyValues");
const buffer = Buffer.from(compressed, "base64");
Expand All @@ -80,14 +82,14 @@ export default {
const compressed = zlib.deflateSync(string).toString("base64");
this.db.set("propertyValues", compressed);
},
async getPropertiesToCheck() {
async _getPropertiesToCheck() {
if (this.properties?.length) {
return this.properties;
}
const { properties } = await this.notion.retrieveDatabase(this.databaseId);
return Object.keys(properties);
},
maybeRemoveFileSubItems(property) {
_maybeRemoveFileSubItems(property) {
// Files & Media type:
// `url` and `expiry_time` are constantly updated by Notion, so ignore these fields
if (property.type === "files") {
Expand All @@ -101,7 +103,7 @@ export default {
}
return property;
},
generateMeta(obj, summary) {
_generateMeta(obj, summary) {
const { id } = obj;
const title = this.notion.extractPageTitle(obj);
const ts = Date.now();
Expand All @@ -111,10 +113,10 @@ export default {
ts,
};
},
emitEvent(page, changes = [], isNewPage = true) {
_emitEvent(page, changes = [], isNewPage = true) {
const meta = isNewPage
? this.generateMeta(page, constants.summaries.PAGE_ADDED)
: this.generateMeta(page, constants.summaries.PAGE_UPDATED);
? this._generateMeta(page, constants.summaries.PAGE_ADDED)
: this._generateMeta(page, constants.summaries.PAGE_UPDATED);
const event = {
page,
changes,
Expand All @@ -123,9 +125,15 @@ export default {
},
},
async run() {
const lastCheckedTimestamp = this.getLastUpdatedTimestamp();
const lastCheckedTimestamp = this._getLastUpdatedTimestamp();
const propertyValues = this._getPropertyValues();

if (!lastCheckedTimestamp) {
// recently updated (deactivated / activated), skip execution
console.log("Awaiting restart completion: skipping execution");
return;
}

const params = {
...this.lastUpdatedSortParam(),
filter: {
Expand All @@ -136,7 +144,7 @@ export default {
},
};
let newLastUpdatedTimestamp = lastCheckedTimestamp;
const propertiesToCheck = await this.getPropertiesToCheck();
const propertiesToCheck = await this._getPropertiesToCheck();
const pagesStream = this.notion.getPages(this.databaseId, params);

for await (const page of pagesStream) {
Expand All @@ -156,7 +164,7 @@ export default {
for (const propertyName of propertiesToCheck) {
const previousValue = structuredClone(propertyValues[page.id]?.[propertyName]);
// value used to compare and to save to this.db
const currentValueToSave = this.maybeRemoveFileSubItems(page.properties[propertyName]);
const currentValueToSave = this._maybeRemoveFileSubItems(page.properties[propertyName]);
// (unmodified) value that should be emitted
const currentValueToEmit = page.properties[propertyName];

Expand Down Expand Up @@ -197,11 +205,11 @@ export default {
}

if (propertyHasChanged) {
this.emitEvent(page, changes, isNewPage);
this._emitEvent(page, changes, isNewPage);
}
}

this.setLastUpdatedTimestamp(newLastUpdatedTimestamp);
this._setLastUpdatedTimestamp(newLastUpdatedTimestamp);
this._setPropertyValues(propertyValues);
},
sampleEmit,
Expand Down

0 comments on commit 4c9475f

Please sign in to comment.