Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot make OpenTelemetry work with GCP PubSub: how to seperate traces for different events in an EventEmitter listener? #3993

Closed
1 of 2 tasks
reith opened this issue Jul 13, 2023 · 6 comments

Comments

@reith
Copy link

reith commented Jul 13, 2023

I'm using OpenTelemetry to create traces for messages I get from a PubSub subscription. I found two ways to do this:

  1. call tracer.startActiveSpan to wrap my actual message handler function,
  2. start span using tracer.startSpan in my message handler.

Using the first approach, subsequent messages may get assigned as children of previously processed messages. I say may because it depends on the workload of a PubSub, I guess, but I reproduced the problem without using an actual PubSub:

fake-queue.js

'use strict'

import { tracer, enableConsoleSpanExporter } from './trace.js'
import { EventEmitter } from 'events'
import * as otel from '@opentelemetry/api'
import * as winston from 'winston'
import * as assert from 'assert'

async function sleep(amount) {
    await new Promise((resolve, reject) => {
        setTimeout(resolve, amount)
    })
}

function createLogger({ logLevel = 'info' } = {}) {
    return winston.createLogger({
        level: logLevel,
        exitOnError: false,
        format: winston.format.combine(winston.format.timestamp(), winston.format.json(), winston.format.colorize()),
        transports: [
            new winston.transports.Console()
        ]
    })
}

export class FakeQueue {
    constructor() {
        this.logger = createLogger({ logLevel: 'debug' })
        this.subscription = new EventEmitter()
    }

    async run(channel) {
        // First approach
        this.subscription.on('message', async(message) => {
             await tracer.startActiveSpan('consumeMessage', { kind: otel.SpanKind.CONSUMER }, async(span) => {
                 await this.consumeMessage(message, { span })
                 // Q1: can I do something so that the next function call don't use the same span context?
             })
         })

        // This is the second approach. comment the previous lines
        // this.subscription.on('message', this.consumeMessageInDetachedSpan.bind(this))

        channel.emit('subscribed')
    }

    async publishMessage(message, id, retryCount = 0) {
        this.subscription.emit('message', {
            ack: () => { },
            nack: () => {
                if (retryCount < 2) {
                    process.nextTick(() => this.publishMessage(message, id, retryCount + 1))
                }
            },
            id,
            retryCount,
            data: JSON.stringify(message)
        })
    }

    async consumeMessage(message, { span = tracer.startSpan() } = {}) {
        let payload
        const logger = this.logger
        span.setAttribute('messageId', message.id)
        try {
            payload = JSON.parse(message.data)
            logger.info('received message', { payload })
            span.setAttributes({ pageURL: payload.url })
            const { response } = await tracer.startActiveSpan('fakeDownload', async(childSpan) => this.fakeDownload(payload.url, { span: childSpan }))
            span.setAttribute('responseStatus', response.status)
            if (response.status < 400) {
                message.ack()
                span.setStatus({ code: otel.SpanStatusCode.OK })
            } else {
                message.nack()
                span.setStatus({ code: otel.SpanStatusCode.ERROR })
            }
        } catch (e) {
            span.recordException(e)
            span.setStatus({ code: otel.SpanStatusCode.ERROR })
        } finally {
            span.end()
        }
    }

    async consumeMessageInDetachedSpan(message) {
        let payload
        const logger = this.logger
        const span = tracer.startSpan('consumeMessage', { kind: otel.SpanKind.CONSUMER })
        const contextWithSpan = otel.trace.setSpan(otel.context.active(), span)
        span.setAttribute('messageId', message.id)
        try {
            payload = JSON.parse(message.data)
            logger.info('received message', { payload })
            span.setAttributes({ pageURL: payload.url })
            const { response } = await tracer.startActiveSpan('fakeDownload', undefined, contextWithSpan, async(childSpan) => await this.fakeDownload(payload.url, { span: childSpan }))
            span.setAttribute('responseStatus', response.status)
            if (response.status < 400) {
                message.ack()
                span.setStatus({ code: otel.SpanStatusCode.OK })
            } else {
                message.nack()
                span.setStatus({ code: otel.SpanStatusCode.ERROR })
            }
        } catch (e) {
            span.recordException(e)
            span.setStatus({ code: otel.SpanStatusCode.ERROR })
        } finally {
            span.end()
        }
    }

    async fakeDownload(urlStr, { span = tracer.startSpan() } = {}) {
        await sleep(Math.random() * 2000)
        try {
            const url = new URL(urlStr)
            assert.ok(['http', 'https'].indexOf(url.protocol.split(':')[0]) > -1)
            span.setStatus({ code: otel.SpanStatusCode.OK })
            return { response: { status: 200 } }
        } catch (e) {
            span.recordException(e)
            span.setStatus({ code: otel.SpanStatusCode.ERROR })
            return { response: { status: 500 } }
        } finally {
            span.end()
        }
    }
}

enableConsoleSpanExporter()
const processor = new FakeQueue()
const runChannel = new EventEmitter()
runChannel.on('subscribed', () => {
    processor.publishMessage({ url: 'http://google.com', id: 1 })
    processor.publishMessage({ url: 'httpXX://google.com', id: 3 })
})
await processor.run(runChannel)

trace.js

'use strict'

import { NodeTracerProvider, SimpleSpanProcessor, ConsoleSpanExporter} from '@opentelemetry/sdk-trace-node'
import { WinstonInstrumentation } from '@opentelemetry/instrumentation-winston'
import { registerInstrumentations } from '@opentelemetry/instrumentation'
import { Resource } from '@opentelemetry/resources'
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'
import { diag, DiagConsoleLogger, DiagLogLevel } from '@opentelemetry/api'

const resource = new Resource({ [SemanticResourceAttributes.SERVICE_NAME]: 'fake-queue' })
export const provider = new NodeTracerProvider({ resource })
export const tracer = provider.getTracer('root')

diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG)
provider.register()

registerInstrumentations({
    instrumentations: [
        new WinstonInstrumentation()
    ]
})

export function enableConsoleSpanExporter() {
    const exporter = new ConsoleSpanExporter()
    provider.addSpanProcessor(new SimpleSpanProcessor(exporter))
}

The above code mimics the behavior of a node-js client for PubSub, at least for the nack() part. Notice that when I run the code, using consumeMessage as message handler, I see only two traces are created. The second trace, f5732dff30bc02790c75c793852abb73 in my logs, is repeated three times. I'm looking for a way to make open-telemetry create different spans for each message processing. Does open-telemetry happen to provide a function that I can use in the line I commented with Q1 label?

The other approach, using consumeMessageInDetachedSpan as message handler, creates four traces for me. That's what I expect. There are two problems though. It won't be able to extract context from a PubSub message, but that's not a big deal since the PubSub client doesn't actually propagate context yet, as far as I know. The other problem is instrumentation for external libraries doesn't work anymore. For example, using the second approach I see logs like this:

{"level":"info","message":"received message","payload":{"id":1,"url":"http://google.com"},"timestamp":"2023-07-14T00:11:55.921Z"}

While using the first approach, span and trace ID are injected into the logs:

{"level":"info","message":"received message","payload":{"id":1,"url":"http://google.com"},"span_id":"3a98357837d3bd61","timestamp":"2023-07-14T00:09:53.677Z","trace_flags":"01","trace_id":"dff9d9a5a08f06911b8328c9d82d3ec6"}

Since my code calls other HTTP endpoints, I need HTTP instrumentation which doesn't work in the second approach.

Logs for first approach:
fake-1.log

and second approach:
fake-2.log

Dependencies versions:

{
  "dependencies": {
    "@google-cloud/pubsub": "^3.1.1",
    "@opentelemetry/instrumentation-dns": "^0.31.5",
    "@opentelemetry/instrumentation-http": "^0.40.0",
    "@opentelemetry/instrumentation-net": "^0.31.4",
    "@opentelemetry/instrumentation-winston": "^0.31.4",
    "@opentelemetry/sdk-node": "^0.40.0",
    "winston": "^3.8.2"
  }
}
  • This only affects the JavaScript OpenTelemetry library
  • This may affect other libraries, but I would like to get opinions here first
@reith reith changed the title Cannot make OpenTelemetry work with GCP PubSub, how to seperate traces in different messages? Cannot make OpenTelemetry work with GCP PubSub: how to seperate traces for different events in an EventEmitter listener? Jul 14, 2023
@Flarna
Copy link
Member

Flarna commented Jul 14, 2023

This is a context propagation topic.
For a real message queue the producer has to inject the span to be used as parent via OTel propagation API into the outgoing message.
The consumer extracts this parent span context and uses it as parent for the consumer span.

This explicit propagation inject/extract is not done above. Therefore the node.js built in context propagation via AsyncLocalStore context manager kicks in. because everything is in a single process the context propagation is completely different then between two separate node processes communicating via some Queue/TCP/...

In your sample the messages/message operations are not encapsulated in an AsyncResource therefore AsyncLocalStore can't track them instead it tracks the underlying code flow via promises, setTimeout,... which is completely different.

You could take a look into one of the instrumenations in https://github.com/open-telemetry/opentelemetry-js-contrib/ to get a better insight into these details.

regarding the difference between startSpan and startActiveSpan: actually startActiveSpan is just a small helper on top of startSpan, it starts a span, puts it into a context and runs a callback with this new context set as active (see here for details)

@reith
Copy link
Author

reith commented Jul 14, 2023

This is a context propagation topic. For a real message queue the producer has to inject the span to be used as parent via OTel propagation API into the outgoing message. The consumer extracts this parent span context and uses it as parent for the consumer span.

Let's say I don't care about producer side context; I just want each call of the event emitter handler get a separate trace, not nested from any previous message. (e.g. separate span context)

In your sample the messages/message operations are not encapsulated in an AsyncResource therefore AsyncLocalStore can't track them instead it tracks the underlying code flow via promises, setTimeout,... which is completely different.

Thanks for pointing that out. This the new code:

    async run(channel) {
        this.subscription.on('message', async(message) => {
            const asyncResource = new AsyncResource('consumeMessage', { triggerAsyncId: message.id * 1000 + Math.trunc(Math.random() * 1000) })
            await asyncResource.runInAsyncScope(tracer.startActiveSpan, tracer, { kind: otel.SpanKind.CONSUMER }, async(span) => {
                console.log('executionId', executionAsyncId(), 'triggerId', triggerAsyncId())
                await this.consumeMessage(message, { span })
            })
            asyncResource.emitDestroy()
        channel.emit('subscribed')
    }

(I also had a bug passing message IDs, so processor.publishMessage({ url: 'http://google.com' , id: 1 }) should get replaced with processor.publishMessage({ url: 'http://google.com' }, 1) in original post.

While each process gets a separate executionAsyncId and triggerAsyncId, I still get a single trace for all spans created for the second message:

executionId 18 triggerId 1017
{"level":"info","message":"received message","payload":{"url":"http://google.com"},"span_id":"6de9af4545838659","timestamp":"2023-07-14T18:13:11.288Z","trace_flags":"01","trace_id":"75adc0bb524342c48381bf484a144337"}
executionId 38 triggerId 2817
{"level":"info","message":"received message","payload":{"url":"httpXX://google.com"},"span_id":"6f016ee3c8bf40ed","timestamp":"2023-07-14T18:13:11.290Z","trace_flags":"01","trace_id":"46a0a7c2ad56904675a6ef3cc141d433"}
executionId 73 triggerId 2473
{"level":"info","message":"received message","payload":{"url":"httpXX://google.com"},"span_id":"e235a5518988a225","timestamp":"2023-07-14T18:13:11.706Z","trace_flags":"01","trace_id":"46a0a7c2ad56904675a6ef3cc141d433"}
executionId 102 triggerId 2561
{"level":"info","message":"received message","payload":{"url":"httpXX://google.com"},"span_id":"8ed9f0bfa443794a","timestamp":"2023-07-14T18:13:12.450Z","trace_flags":"01","trace_id":"46a0a7c2ad56904675a6ef3cc141d433"}

I also tried calling span.end() and asyncResource.emitDestroy() before message.ack or message.nack. It didn't change the outcome.

You could take a look into one of the instrumenations in https://github.com/open-telemetry/opentelemetry-js-contrib/ to get a better insight into these details.

I couldn't find any code directly using async_hook in that repo, can you please share any instance of code using them?

regarding the difference between startSpan and startActiveSpan: actually startActiveSpan is just a small helper on top of startSpan, it starts a span, puts it into a context and runs a callback with this new context set as active (see here for details)

Yes, and I tried to mimic that in my second approach but external libraries won't get instrumented...

@reith
Copy link
Author

reith commented Jul 14, 2023

So, I could get it work by moving message.ack() or message.nack() operations out of the function that receives span:

'use strict'

import { tracer, enableConsoleSpanExporter } from './trace.js'
import { EventEmitter } from 'events'
import * as otel from '@opentelemetry/api'
import * as winston from 'winston'
import * as assert from 'assert'

async function sleep(amount) {
    await new Promise((resolve, reject) => {
        setTimeout(resolve, amount)
    })
}

function createLogger({ logLevel = 'info' } = {}) {
    return winston.createLogger({
        level: logLevel,
        exitOnError: false,
        format: winston.format.combine(winston.format.timestamp(), winston.format.json(), winston.format.colorize()),
        transports: [
            new winston.transports.Console()
        ]
    })
}

export class FakeQueue {
    constructor() {
        this.logger = createLogger({ logLevel: 'debug' })
        this.subscription = new EventEmitter()
    }

    async run(channel) {
        // this works as I expect.
        this.subscription.on('message', async(message) => {
            const ok = await tracer.startActiveSpan('consumeMessage', { kind: otel.SpanKind.CONSUMER }, async(span) => {
                const ok = await this.consumeMessage(message, { span })
                span.end() // I'm not sure if we need this
                return ok
            })
            console.log('requeue', message.id, !ok)
            ok ? message.ack() : message.nack()
        })
        channel.emit('subscribed')
    }

    async publishMessage(message, id, retryCount = 0) {
        this.subscription.emit('message', {
            ack: () => { },
            nack: () => {
                if (retryCount < 2) {
                    process.nextTick(() => this.publishMessage(message, id, retryCount + 1))
                }
            },
            id,
            retryCount,
            data: JSON.stringify(message)
        })
    }

    async consumeMessage(message, { span = tracer.startSpan() } = {}) {
        // returns false for requeue
        let payload
        const logger = this.logger
        span.setAttribute('messageId', message.id)
        try {
            payload = JSON.parse(message.data)
            logger.info('received message', { payload })
            span.setAttributes({ pageURL: payload.url })
            const { response } = await tracer.startActiveSpan('fakeDownload', async(childSpan) => this.fakeDownload(payload.url, { span: childSpan }))
            span.setAttribute('responseStatus', response.status)
            if (response.status < 400) {
                span.setStatus({ code: otel.SpanStatusCode.OK })
                return true
            } else {
                span.setStatus({ code: otel.SpanStatusCode.ERROR })
                return false
            }
        } catch (e) {
            span.recordException(e)
            span.setStatus({ code: otel.SpanStatusCode.ERROR })
            return false
        }
    }

    async fakeDownload(urlStr, { span = tracer.startSpan() } = {}) {
        await sleep(Math.random() * 2000)
        try {
            const url = new URL(urlStr)
            assert.ok(['http', 'https'].indexOf(url.protocol.split(':')[0]) > -1)
            span.setStatus({ code: otel.SpanStatusCode.OK })
            return { response: { status: 200 } }
        } catch (e) {
            span.recordException(e)
            span.setStatus({ code: otel.SpanStatusCode.ERROR })
            return { response: { status: 500 } }
        } finally {
            span.end()
        }
    }
}

enableConsoleSpanExporter()
const processor = new FakeQueue()
const runChannel = new EventEmitter()
runChannel.on('subscribed', () => {
    processor.publishMessage({ url: 'http://google.com' }, 1)
    processor.publishMessage({ url: 'httpXX://google.com' }, 2)
})
await processor.run(runChannel)

@reith
Copy link
Author

reith commented Jul 14, 2023

My questions:

  1. do I need explicit span.end() in previous code? Update: it seems yes, I need, as context.with doesn't touch spans.. Good
  2. if I keep message.ack() or message.nack() inside consumeMessage, and I call span.end() before message.ack() or message.nack(), I still get the previous behavior that would instrument only one trace for EventEmitter listener calls for the same nacked message. Does Open-Telemetry have an API to say don't consider rest of code in this function as part of current span ?
  3. This is not about OpenTelemetry, but I ask :) I'm not using AsyncResource in previous code but I get expected behavior. Can I assure everytime my EventEmitter listener gets called I'll get a fresh execution context ?

@weyert
Copy link
Contributor

weyert commented Jul 20, 2023

@reith Can I ask you to +1 the PR in the pubsub repo? googleapis/nodejs-pubsub#1659 (comment)

@Flarna
Copy link
Member

Flarna commented Jul 24, 2023

Let's say I don't care about producer side context; I just want each call of the event emitter handler get a separate trace, not nested from any previous message. (e.g. separate span context)

You can use the ROOT_CONTEXT exported by OTel API as parent context (either explicit by directly passing it to tracer.startSpan() or implicit by setting it as active via context.with()).

2. if I keep message.ack() or message.nack() inside consumeMessage, and I call span.end() before message.ack() or message.nack(), I still get the previous behavior that would instrument only one trace for EventEmitter listener calls for the same nacked message. Does Open-Telemetry have an API to say don't consider rest of code in this function as part of current span ?

I haven't followed all your code because I think the in process messaging system is a bit of topic here. As long as you are in one process AsyncLocalStore will track. This is a major difference to two processes where context propagation needs to happen by inject/extract which serializes the span context into the message/deserializes from message.

There is no dedicated API to don't consider rest of code in this function as part of current span - There is only context.with() which let you set a context for the provided callback. You could use context.with(ROOT_CONTEXT, cb) to explicit run some function with an empty active context. I guess if you place this at the right places in your single process message queue sample it would be possible to break the autopropagation happening via AsyncLocalStore.

3. This is not about OpenTelemetry, but I ask :) I'm not using AsyncResource in previous code but I get expected behavior. Can I assure everytime my EventEmitter listener gets called I'll get a fresh execution context ?

No, EventEmitter is a sync thing so the callback are run in the context of caller of emit. There is EventEmitterAsyncResource which is an EventEmitter using an AsyncResource internally.

I couldn't find any code directly using async_hook in that repo, can you please share any instance of code using them?

That's not used here, it should be used by e.g. database drivers,.. to encapsulated individual operations (e.g. a DB request) sent over a shared/reused resource (e.g. a network connection).
I found this place in node core. HTTP agent uses it to give each request a dedicated resource because the underlying socket (which is also an AsyncResource) may get reused but HTTP requests should be seperated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants