Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

paho golang client cannot keep stable connection #293

Closed
cyclamen opened this issue Sep 7, 2023 · 3 comments
Closed

paho golang client cannot keep stable connection #293

cyclamen opened this issue Sep 7, 2023 · 3 comments

Comments

@cyclamen
Copy link

cyclamen commented Sep 7, 2023

I use the paho golang client like the codes. The client prints:

2023/09/07 12:36:17 ERROR Connect lost: error=EOF
2023/09/07 12:36:17 INFO Connected
2023/09/07 12:36:27 ERROR Connect lost: error=EOF
2023/09/07 12:36:27 INFO Connected
2023/09/07 12:37:14 ERROR Connect lost: error=EOF
2023/09/07 12:37:14 INFO Connected
2023/09/07 12:37:22 ERROR Connect lost: error=EOF
2023/09/07 12:37:22 INFO Connected
2023/09/07 12:38:58 ERROR Connect lost: error=EOF
2023/09/07 12:38:58 INFO Connected
2023/09/07 12:39:23 ERROR Connect lost: error=EOF
2023/09/07 12:39:23 INFO Connected
2023/09/07 12:39:44 ERROR Connect lost: error=EOF
2023/09/07 12:39:44 INFO Connected
2023/09/07 12:42:52 ERROR Connect lost: error=EOF
2023/09/07 12:42:52 INFO Connected
2023/09/07 12:43:51 ERROR Connect lost: error=EOF
2023/09/07 12:43:51 INFO Connected
2023/09/07 12:43:59 ERROR Connect lost: error=EOF
2023/09/07 12:43:59 INFO Connected
.......

package main

import (
	"fmt"
	"github.com/eclipse/paho.mqtt.golang"
	"log/slog"
	"os"
	"time"
)

var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Printf("TOPIC: %s\n", msg.Topic())
	fmt.Printf("MSG: %s\n", msg.Payload())
}

func main() {
	//mqtt.DEBUG = log.New(os.Stdout, "", 0)
	//mqtt.ERROR = log.New(os.Stdout, "", 0)
	opts := mqtt.NewClientOptions().AddBroker("tcp://10.1.0.213:1883").SetClientID("abcdefghi1")
	opts.OnConnect = connectHandler
	opts.OnConnectionLost = connectLostHandler
	//opts.SetDefaultPublishHandler(messagePubHandler)
	opts.SetKeepAlive(2 * time.Second)
	opts.SetDefaultPublishHandler(f)
	opts.SetPingTimeout(1 * time.Second)

	c := mqtt.NewClient(opts)
	if token := c.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	if token := c.Subscribe("go-mqtt/sample", 0, nil); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
		os.Exit(1)
	}

	for i := 0; i < 5; i++ {
		text := fmt.Sprintf("this is msg #%d!", i)
		token := c.Publish("go-mqtt/sample", 0, false, text)
		token.Wait()
	}

	time.Sleep(600 * time.Second)

	if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
		os.Exit(1)
	}

	c.Disconnect(250)

	time.Sleep(1 * time.Second)
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
	slog.Info("Connected")

}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
	slog.Error("Connect lost:", "error", err)

}

and the server log :

12:35PM INF added hook hook=allow-all-auth
12:35PM INF attached listener address=:1883 id=t1 protocol=tcp
12:35PM INF mochi mqtt starting version=2.3.0
12:35PM INF mochi mqtt server started
12:36PM WRN error=EOF listener=t1
12:36PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:54389: i/o timeout" listener=t1
12:36PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:54431: i/o timeout" listener=t1
12:37PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:54474: i/o timeout" listener=t1
12:37PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:54643: i/o timeout" listener=t1
12:38PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:54671: i/o timeout" listener=t1
12:39PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:55054: i/o timeout" listener=t1
12:39PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:55139: i/o timeout" listener=t1
12:42PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:55217: i/o timeout" listener=t1
12:43PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:55913: i/o timeout" listener=t1
12:43PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:56168: i/o timeout" listener=t1
12:44PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:56196: i/o timeout" listener=t1
12:45PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:56332: i/o timeout" listener=t1
12:45PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:56539: i/o timeout" listener=t1

so, how to make the client be stable.

@werbenhu werbenhu closed this as completed Sep 7, 2023
@werbenhu werbenhu reopened this Sep 7, 2023
@werbenhu
Copy link
Member

werbenhu commented Sep 7, 2023

@cyclamen It's caused by a too short keepalive setting. For more information about keepalive, please refer to MQTT v3 spec 3.1.2.10

opts.SetKeepAlive(60 * time.Second)

@cyclamen
Copy link
Author

cyclamen commented Sep 7, 2023

@werbenhu

It works!Thanks!

@lucasjinreal
Copy link

@werbenhu hi, if just set 60s, does it too small?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants