Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Retry queue #325

Merged
merged 26 commits into from
May 3, 2021
Merged

Retry queue #325

merged 26 commits into from
May 3, 2021

Conversation

mariusandra
Copy link
Collaborator

@mariusandra mariusandra commented Apr 20, 2021

Changes

  • Implements abstract retrying logic in plugins via scheduled tasks/jobs with this syntax:
export async function onRetry (type, payload, meta) {
    if (type === 'processEvent') {
        console.log('retrying event!', type)
    }
}
export async function processEvent (event, meta) {
    if (event.properties?.hi === 'ha') {
        meta.retry('processEvent', event, 30) // try again in 30 seconds
    }
    return event
}
  • The backbone for this is an abstract "retry queue" system
  • We can have one or more retry queue providers, with failover (e.g. if Postgres is full, use the slow S3 queue)
  • Implements queues
  • Retries are consumed by another redlocked worker (so guaranteed max concurrency of 1), which polls the queues and/or does what's needed. Using only one reader for retries will be slow for now, but it'll avoid a lot of complex concurrency code that might not be needed anyway. Data integrity first, speed second.

Still needs a lot of testing....

Checklist

  • Updated Settings section in README.md, if settings are affected
  • Jest tests

@mariusandra mariusandra marked this pull request as ready for review April 29, 2021 13:16
@mariusandra mariusandra added the bump minor Bump minor version when this PR gets merged label Apr 29, 2021
@mariusandra
Copy link
Collaborator Author

I realised there was a small issue. The postgres adapter wouldn't work if using helm. So I passed on the postgres connection pool instead of DATABASE_URL... and added another env to specify a custom retry db if needed.

@mariusandra
Copy link
Collaborator Author

@yakkomajuri regarding your questions, you understand correctly. This is just the first PR and the simplest system that lets us run code in the background. Next we'll build abstractions on top of this thing that will retry processEvent as well. However we'll still need this system for the async export plugins that batch things, so it made sense to build it first.

@yakkomajuri
Copy link
Contributor

@mariusandra right yes so completely makes sense to built this backbone first.

I guess that helps me answer your question though:

I think the reason this feels like a scheduler is in large part because we don't yet have the processEvent support. It will feel more like a retry system once that's in place I think.

As for exposing a scheduler, I think it could make sense, but I'd go through this retry stuff first, with a more limited API. With all the tooling we already have in place (storage, cache, buffer, runEveryMinute), one can actually do most scheduling tasks if they'd need to.

So I'd focus on retry, but think will end up with a scheduler of this nature at some point.

@mariusandra
Copy link
Collaborator Author

mariusandra commented Apr 29, 2021

Alternatively, we could get creative and morph this into something like this:

export const tasks = {
    checkSessionEnd: ({ distinct_id }, ({ cache, tasks }) => {
        const ping = await cache.get(`session_${distinct_id}`, null)
        if (!ping) {
            posthog.capture("session end", { distinct_id })
        } else {
            await tasks.runIn(60, 'seconds').checkSessionEnd({ distinct_id })
        }
    }
}

export async function processEvent(event, { cache, tasks }) {
    if ((await cache.incr(`session_${event.distinct_id}`)) === 1) {
        posthog.capture("session start", { distinct_id: event.distinct_id })
        await tasks.runIn(30, 'minutes').checkSessionEnd({ distinct_id: event.distinct_id })
    }
    await cache.expire(`session_${event.distinct_id}`, 30 * 60)
}

This would work with async retries as well:

import { createBuffer } from '@posthog/plugin-contrib'
import fetch from 'node-fetch'

export const tasks = {
    flushBatch: async ({ batch, retryCount = 0 }, { tasks }) => {
        const resp = await fetch('https://httpbin.org/post', {
            method: 'post',
            body: JSON.stringify(batch),
            headers: { 'Content-Type': 'application/json' },
        })
        if (resp.status !== 200) {
            if (retryCount > 5) {
                console.error('Could not post batch', batch)
                return
            }
            tasks.runIn(30, 'seconds').flushBatch({ batch, retryCount: retryCount + 1 })
        }
    }
}

export function setupPlugin({ global, tasks }) {
    global.buffer = createBuffer({
        limit: 10 * 1024 * 1024, // 10 MB
        timeoutSeconds: 10 * 60, // 10 minutes
        onFlush: async (batch) => {
            await tasks.runDirectly().flushBatch({ batch }) 
        },
    })
}

export function teardownPlugin({ global }) {
    global.buffer.flush()
}

export function processEvent(event, { config, global }) {
    global.buffer.add(event, JSON.stringify(event).length)
    return event
}

The TypeScript story there seems ambitious though :D

@mariusandra mariusandra mentioned this pull request Apr 30, 2021
2 tasks
@mariusandra
Copy link
Collaborator Author

Having slept over it, I think we should go with some "task" or "job" naming scheme. I don't yet know what's the best API for this though... and I suggest tackling this in another PR. All retry queues are disabled by default, so merging this in should have no effect for anyone, rendering the new retry syntax in plugins useless. It will allow us to already play with it though.

I'd suggest thus:

  • merging this in (unless some bugs are found)
  • merging in PR Big refactor #350 that's based on this and that renames "retry queue" to "job queue", without affecting the code in the VMs (it'll still be meta.retry)
  • create a new PR to refactor even more and give the plugins access to tasks or jobs instead of retry

@mariusandra
Copy link
Collaborator Author

The TS story for the different job/task system was quite fine at the end of the day: PostHog/plugin-scaffold#16

@mariusandra mariusandra mentioned this pull request Apr 30, 2021
2 tasks
const consoleFile = path.join(process.cwd(), 'tmp', 'test-console.txt')

export const writeToFile = {
console: {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be plugin logs instead of a custom tests-only solution?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this should be removed as soon as plugin logs land... ⌛


const minRetry = process.env.NODE_ENV === 'test' ? 1 : 30

// TODO: add type to scaffold
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is already ready in PostHog/plugin-scaffold#16, then I think the TODO can be deleted

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be fair, it'll be deleted in the next PR (or the one after that?), it's in any case gone in #351 already

pluginConfig: PluginConfig
): (type: string, payload: any, retry_in?: number) => Promise<void> {
return async (type: string, payload: any, retry_in = 30) => {
if (retry_in < minRetry || retry_in > 86400) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: retry_in → retryIn

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, though can we keep as is? #351 will refactor this totally. We'll get into bad merge conflicts otherwise...

src/main/services/redlock.ts Outdated Show resolved Hide resolved
@mariusandra mariusandra removed the bump minor Bump minor version when this PR gets merged label Apr 30, 2021
@Twixes
Copy link
Collaborator

Twixes commented Apr 30, 2021

Hm, I tried this out as a user trying to retry in 3 seconds, and I managed to crashed the plugin server, which seems pretty dangerous 🤔

boom

@mariusandra
Copy link
Collaborator Author

mariusandra commented Apr 30, 2021

What's the code? I can't reproduce locally (with clickhouse/kafka). I get the error properly:

image

The plugin I used was something like this:

async function processEvent(event, { config, retry }) {  
    await retry('haha', { bla: 'asd' }, -1)
    return event
}

@Twixes
Copy link
Collaborator

Twixes commented Apr 30, 2021

I just modified the example earlier slightly

export async function onRetry (type, payload, meta) {
    if (type === 'processEvent') {
        console.log('retrying event!', type)
    }
}
export async function processEvent (event, meta) {
    console.log('queuing retry')
    meta.retry('processEvent', event, 3)
    return event
}

@fuziontech
Copy link
Member

What queue do we want on cloud? I'd start with the postgres/graphile adapter, as I expect rather low traffic (at least for now), but should this be in our main database or should we provision a separate RDS cluster for this? We can always switch later as well.

I would definitely start up a new RDS instance just for the resource isolation and one less thing to migrate from Heroku to AWS eventually.

What queue do we want as the default and for OSS? Postgres with the main database?

This seems reasonable to me. Keep things simple.

Should this be called a "retry queue" in the first place? I assume this is what it'll mostly be used for... and hence it's a good name to indicate its intent and capabilities. However we can use it to run arbitrary tasks in the future.

deferred task queue?

For example here's a session tracker plugin that sends "session start" for the user's first event (consistency guaranteed) and "session end" once the user has not sent any event for about 30-31 minutes.

First, just the fact that we are able to write something like this with such ease is amazing in my book. However we are clearly not using the system to "retry" anything here, but as a task queue:

I love this idea, but there is a heavy cost to be paid here. If we go down this route we will no longer be able to split the task queue to the worker level. Meaning that we won't be able to localize these retry/task queues to the plugin-servers themselves. In a normal case each worker is responsible for the tasks that it is working on and can have their own personally managed queue retry later with. In this example every worker that has seen a certain Distinct_id is going to schedule a 'checkIfSessionOver' every minute. This also means a race condition on figuring out if a session is over and other tasks similar to this.

I think we should have two separate task queues:

  1. for retrying tasks that will eventually live local to the plugin-servers
  2. for handling deferred tasks that will be picked up by any worker that is available (like celery)

@mariusandra
Copy link
Collaborator Author

mariusandra commented Apr 30, 2021

👍 for the RDS instance. The postgres "graphile" worker is configured to automatically write to a schema graphile_worker (IIRC), not the default public schema. Not sure it makes a difference, but putting it out there.

In this example every worker that has seen a certain Distinct_id is going to schedule a 'checkIfSessionOver' every minute

Nope, they won't 😉. This is a nifty piece of code. We're using cache.incr(), which is Redis INCR. It's as single-threaded C++ sync as it gets. It always returns a different number. So we can guarantee that only the first time this distinct_id has been seen in the last 30 minutes globally we run the if block.

We also guarantee the 30min timing because that's when the key we incr'd expires. If another event with the same distinct_id comes on within the 30min window, we just reset the expiration to 30min from now.

export async function processEvent(event, { cache, tasks }) {
    if ((await cache.incr(`session_${event.distinct_id}`)) === 1) {
        posthog.capture("session start", { distinct_id: event.distinct_id })
        await tasks.runIn(30, 'minutes').checkSessionEnd({ distinct_id: event.distinct_id })
    }
    await cache.expire(`session_${event.distinct_id}`, 30 * 60)
}

Inside the task we check if the key is expired and if not, schedule another task to check it a bit later.

export const tasks = {
    checkSessionEnd: ({ distinct_id }, ({ cache, tasks }) => {
        const ping = await cache.get(`session_${distinct_id}`, null)
        if (!ping) {
            posthog.capture("session end", { distinct_id })
        } else {
            await tasks.runIn(1, 'minute').checkSessionEnd({ distinct_id })
        }
    }
}

This task will be picked up by just one worker, meaning that for the entire duration this user is on the website, we have just one job somewhere in the job queue to run.

This could even be extended to a function runEveryMinuteForEachUserCurrentlyBrowsingYourSite(distinct_id) {./* you fill this part in */ } that we conveniently expose to plugin authors ;).

If that's not uniquely powerful, I'm not sure what is :).

If we go down this route we will no longer be able to split the task queue to the worker level. Meaning that we won't be able to localize these retry/task queues to the plugin-servers themselves.

Also nope 😉 and I'm 10^6% with you on this!

Imagine the plugin servers job queue as an array of queues. Currently we have just one adapter, ["postgres"], but soon it'll be: ["memory", "postgres", "s3"]. Each queue is just a boring class that extends an interface and has an enqueue function (and also consumer functions). If the enqueue function throws, the next queue in the list gets tried.

As the first queue I'd like to have a in-memory "do this soon" queue, with some sensible max number of items it can hold. This would be used for all the "try this again in 5 seconds" jobs. The second level is postgres for all those "run this next hour" jobs. Finally, if postgres gets full or throws other errors, we have a backup S3 queue, which just saves text files with a timestamp in the filename. It'll be horribly slow to read from, but it'll be durable. Unless the network is down and we can't write anywhere (and then we will retry for a long time).

What's more, when we get a SIGTERM, we can tell the memory queue to quickly empty itself into postgres.

@mariusandra
Copy link
Collaborator Author

@Twixes I managed to catch this error locally and will fix on Monday. In the third pr this function will no longer throw, so the problem is... um... avoided? :P

Assuming I can fix this here on Monday (when you're off), 👍 for merging this and at least the next one in?

@fuziontech
Copy link
Member

after reading your comment and a re-read of the code this looks awesome 👍

🚢 it

Copy link
Collaborator

@Twixes Twixes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was told to be slightly less code cowboyish, but if you want to go for it then I guess… yeehaw?

@fuziontech
Copy link
Member

🤠

@Twixes
Copy link
Collaborator

Twixes commented Apr 30, 2021

Ah, but as in merging after fix, then yeehaw for sure.

@mariusandra
Copy link
Collaborator Author

I figured out the issue. You're most likely running NodeJS 15 or 16. Node 15 changed the behaviour of uncaught exceptions in promises to crash the entire app. So guess what happened:

export async function processEvent (event, meta) {
    console.log('queuing retry')
    meta.retry('processEvent', event, 3) // not awaited, so throws somewhere outside of this block
    return event
}

Boom 💥

The fix is here: #352

Since we're mostly just using NodeJS 14, including in all Dockerfiles and thus cloud, this is not that urgent to get in now. Hence I'll merge this PR before 😱 the fix and add the refactoring PR or PRs on top. I'll also merge the fix somewhere in there.

@mariusandra mariusandra merged commit b83c844 into master May 3, 2021
@mariusandra mariusandra deleted the retry-queue branch May 3, 2021 10:20
fuziontech pushed a commit to PostHog/posthog that referenced this pull request Oct 12, 2021
* extract redlock from schedule

* implement generic retrying

* capture console.log in tests via a temp file

* add graphile queue

* make it prettier and safe

* style fixes

* fix some tests

* release if there

* split postgres tests

* don't make a graphile worker in all tests

* revert "split postgres tests"

* skip retries if pluginConfig not found

* reset graphile schema before test

* fix failing tests by clearing the retry consumer redlock

* bust github actions cache

* slight cleanup

* fix github/eslint complaining about an `any`

* separate url for graphile retry queue, otherwise use existing postgres pool (fixes helm connection string issue)

* convert startRedlock params to options object

* move type around

* use an enum

* update typo in comment

Co-authored-by: Michael Matloka <[email protected]>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants