Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[💡 FEATURE REQUEST]: NSQ Jobs driver #1948

Open
1 task done
shellphy opened this issue Jun 21, 2024 · 4 comments
Open
1 task done

[💡 FEATURE REQUEST]: NSQ Jobs driver #1948

shellphy opened this issue Jun 21, 2024 · 4 comments
Assignees
Labels
C-feature-accepted Category: Feature discussed and accepted W-waiting on response Wait: waiting for the issue creator response

Comments

@shellphy
Copy link

No duplicates 🥲.

  • I have searched for a similar issue.

What should be improved or cleaned up?

Nsq is a realtime distributed messaging platform created by golang, The protocol is very simple, very lightweight, and very popular, and hopefully supported

@shellphy shellphy added the C-enhancement Category: enhancement. Meaning improvements of current module, transport, etc.. label Jun 21, 2024
@rustatian rustatian added C-feature-accepted Category: Feature discussed and accepted and removed C-enhancement Category: enhancement. Meaning improvements of current module, transport, etc.. labels Jun 21, 2024
@rustatian
Copy link
Member

Hey @shellphy 👋
Thanks for your feature request 👍

@rustatian rustatian changed the title [🧹 CHORE]: add nsq job driver [🧹 FEATURE REQUEST]: add nsq job driver Jun 21, 2024
@rustatian rustatian changed the title [🧹 FEATURE REQUEST]: add nsq job driver [💡 FEATURE REQUEST]: NSQ Jobs driver Jun 21, 2024
@rustatian
Copy link
Member

@shellphy
Here you may track progress: https://github.com/roadrunner-server/nsq
But since I didn't use previously this driver, it would be nice if you update the description of this ticket and explain your use case, options you use, environment. As much info as you may provide to include your feedback in the initial release of this driver.

@rustatian rustatian added the W-waiting on response Wait: waiting for the issue creator response label Jun 22, 2024
@shellphy
Copy link
Author

shellphy commented Jun 24, 2024

@shellphy Here you may track progress: https://github.com/roadrunner-server/nsq But since I didn't use previously this driver, it would be nice if you update the description of this ticket and explain your use case, options you use, environment. As much info as you may provide to include your feedback in the initial release of this driver.

Thank you very much for accepting my request. I maintain a NSQ wrapper: https://github.com/zhimaAi/go_tools/blob/master/mq/nsq_pro.go

There are specific ways to use it in this project:

func StartConsumer() {
	common.RunTask(define.ConvertPdfTopic, define.ConvertPdfChannel, 1, business.ConvertPdf)
	common.RunTask(define.ConvertVectorTopic, define.ConvertVectorChannel, 2, business.ConvertVector)
	common.RunTask(lib_define.PushMessage, lib_define.PushChannel, 10, business.AppPush)
	common.RunTask(lib_define.PushEvent, lib_define.PushChannel, 5, business.AppPush)
}
if err := common.AddJobs(define.ConvertVectorTopic, message); err != nil {
    logs.Error(err.Error())
}
func AddJobs(topic, message string, delay ...time.Duration) error {
	topic = define.Env + `_` + topic
	return define.ProducerHandle.AddJobs(topic, message, delay...)
}

func RunTask(topic, channel string, workNum uint, callback func(msg string, args ...string) error) {
	topic = define.Env + `_` + topic
	err := define.ConsumerHandle.PushZero(define.Config.Nsqd[`host`]+`:`+define.Config.Nsqd[`port`], topic)
	if err != nil {
		logs.Error(`PushZero Error:%s`, err.Error())
	}
	err = define.ConsumerHandle.Run(topic, channel, workNum, callback)
	if err != nil {
		logs.Error(`Consumer Run Error:%s`, err.Error())
	}
}

It is important to note that if the producer has not set up the topic and the consumer starts listening, there will be an error, so I will usually send an empty message containing 0 when the consumer starts, as seen in the PushZero() function

If you want to quickly deploy nsq for easy testing, you can use the following docker compose configuration:

services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    restart: always
    environment:
      TZ: ${TIMEZONE}
    ports:
      - "${NSQLOOKUPD_PORT}:4161"

  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 --broadcast-address=121.40.109.241 --broadcast-tcp-port=${NSQD_TCP_PORT} --broadcast-http-port=${NSQD_HTTP_PORT}
    restart: always
    depends_on:
      - nsqlookupd
    environment:
      TZ: ${TIMEZONE}
    ports:
      - "${NSQD_TCP_PORT}:4150"
      - "${NSQD_HTTP_PORT}:4151"

  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    restart: always
    depends_on:
      - nsqlookupd
    environment:
      TZ: ${TIMEZONE}
    ports:
      - ${NSQADMIN_PORT}:4171

All those can be seen at my new project: https://github.com/zhimaAi/chatwiki

In my project, the task model is that one tcp connection corresponds to one go coroutine task, which is simpler and recommended for the initial release.

In nsq, only one tcp connection can be established, and multiple messages can be received per communication. This is achieved through rdy. It is recommended to implement this function in the second version

@rustatian
Copy link
Member

Got u, thanks @shellphy 👍
I'll try to push this plugin to the v2024.2.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-feature-accepted Category: Feature discussed and accepted W-waiting on response Wait: waiting for the issue creator response
Projects
Status: 📋 Backlog
Development

No branches or pull requests

2 participants