From aaa7f6b1a7163f5abbb0ac2067c7ad66a9f3134c Mon Sep 17 00:00:00 2001 From: wzrtzt Date: Thu, 7 Sep 2023 17:12:54 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BD=BF=E7=94=A8ctx.Done=E4=BB=A3?= =?UTF-8?q?=E6=9B=BFisCLose=201.=E5=8F=AF=E4=BB=A5=E5=87=8F=E5=B0=91?= =?UTF-8?q?=E9=94=81=E7=9A=84=E4=BD=BF=E7=94=A8=202.=E5=8F=AF=E4=BB=A5?= =?UTF-8?q?=E9=81=BF=E5=85=8D=E6=B6=88=E6=81=AF=E9=87=8F=E5=A4=A7=E6=97=B6?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=8C=85=E8=A2=AB=E4=B8=A2=E5=BC=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- znet/connection.go | 84 ++++++++++++++--------------------------- znet/kcp_connection.go | 79 +++++++++++++-------------------------- znet/ws_connection.go | 85 +++++++++++++----------------------------- 3 files changed, 78 insertions(+), 170 deletions(-) diff --git a/znet/connection.go b/znet/connection.go index eb6ccb21..f61155af 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -54,10 +54,6 @@ type Connection struct { // (有缓冲管道,用于读、写两个goroutine之间的消息通信) msgBuffChan chan []byte - // Lock for user message reception and transmission - // (用户收发消息的Lock) - msgLock sync.RWMutex - // Connection properties // (链接属性) property map[string]interface{} @@ -66,10 +62,6 @@ type Connection struct { // (保护当前property的锁) propertyLock sync.Mutex - // The current connection's close state - // (当前连接的关闭状态) - isClosed bool - // Which Connection Manager the current connection belongs to // (当前链接是属于哪个Connection Manager的) connManager ziface.IConnManager @@ -120,7 +112,6 @@ func newServerConn(server ziface.IServer, conn net.Conn, connID uint64) ziface.I conn: conn, connID: connID, connIdStr: strconv.FormatUint(connID, 10), - isClosed: false, msgBuffChan: nil, property: nil, name: server.ServerName(), @@ -157,7 +148,6 @@ func newClientConn(client ziface.IClient, conn net.Conn) ziface.IConnection { conn: conn, connID: 0, // client ignore connIdStr: "", // client ignore - isClosed: false, msgBuffChan: nil, property: nil, name: client.GetName(), @@ -346,12 +336,12 @@ func (c *Connection) LocalAddr() net.Addr { } func (c *Connection) Send(data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() - if c.isClosed == true { - return errors.New("connection closed when send msg") - } + select { + case <-c.ctx.Done(): + return errors.New("Send error data = %+v, ctx is done") + default: + } _, err := c.conn.Write(data) if err != nil { zlog.Ins().ErrorF("SendMsg err data = %+v, err = %+v", data, err) @@ -362,9 +352,11 @@ func (c *Connection) Send(data []byte) error { } func (c *Connection) SendToQueue(data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() - + select { + case <-c.ctx.Done(): + return errors.New("Connection closed when send buff msg") + default: + } if c.msgBuffChan == nil { c.msgBuffChan = make(chan []byte, zconf.GlobalObject.MaxMsgChanLen) // Start a Goroutine to write data back to the client @@ -374,34 +366,26 @@ func (c *Connection) SendToQueue(data []byte) error { go c.StartWriter() } - idleTimeout := time.NewTimer(5 * time.Millisecond) - defer idleTimeout.Stop() - - if c.isClosed == true { - return errors.New("Connection closed when send buff msg") - } - if data == nil { zlog.Ins().ErrorF("Pack data is nil") return errors.New("Pack data is nil") } - // Send timeout - select { - case <-idleTimeout.C: - return errors.New("send buff msg timeout") - case c.msgBuffChan <- data: - return nil - } + c.msgBuffChan <- data + return nil + } // SendMsg directly sends Message data to the remote TCP client. // (直接将Message数据发送数据给远程的TCP客户端) func (c *Connection) SendMsg(msgID uint32, data []byte) error { - if c.isClosed == true { + select { + case <-c.ctx.Done(): return errors.New("connection closed when send msg") + default: + // Pack data and send it + } - // Pack data and send it msg, err := c.packet.Pack(zpack.NewMsgPackage(msgID, data)) if err != nil { zlog.Ins().ErrorF("Pack error msg ID = %d", msgID) @@ -418,8 +402,10 @@ func (c *Connection) SendMsg(msgID uint32, data []byte) error { } func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error { - if c.isClosed == true { + select { + case <-c.ctx.Done(): return errors.New("connection closed when send buff msg") + default: } if c.msgBuffChan == nil { c.msgBuffChan = make(chan []byte, zconf.GlobalObject.MaxMsgChanLen) @@ -430,22 +416,14 @@ func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error { go c.StartWriter() } - idleTimeout := time.NewTimer(5 * time.Millisecond) - defer idleTimeout.Stop() - msg, err := c.packet.Pack(zpack.NewMsgPackage(msgID, data)) if err != nil { zlog.Ins().ErrorF("Pack error msg ID = %d", msgID) return errors.New("Pack error msg ") } + c.msgBuffChan <- msg + return nil - // send timeout - select { - case <-idleTimeout.C: - return errors.New("send buff msg timeout") - case c.msgBuffChan <- msg: - return nil - } } func (c *Connection) SetProperty(key string, value interface{}) { @@ -485,14 +463,6 @@ func (c *Connection) finalizer() { // (如果用户注册了该链接的 关闭回调业务,那么在此刻应该显示调用) c.callOnConnStop() - c.msgLock.Lock() - defer c.msgLock.Unlock() - - // If the connection has already been closed - if c.isClosed == true { - return - } - // Stop the heartbeat detector associated with the connection if c.hc != nil { c.hc.Stop() @@ -511,8 +481,6 @@ func (c *Connection) finalizer() { close(c.msgBuffChan) } - c.isClosed = true - zlog.Ins().InfoF("Conn Stop()...ConnID = %d", c.connID) } @@ -531,13 +499,17 @@ func (c *Connection) callOnConnStop() { } func (c *Connection) IsAlive() bool { - if c.isClosed { + select { + case <-c.ctx.Done(): return false + default: + } // Check the last activity time of the connection. If it's beyond the heartbeat interval, // then the connection is considered dead. // (检查连接最后一次活动时间,如果超过心跳间隔,则认为连接已经死亡) return time.Now().Sub(c.lastActivityTime) < zconf.GlobalObject.HeartbeatMaxDuration() + } func (c *Connection) updateActivity() { diff --git a/znet/kcp_connection.go b/znet/kcp_connection.go index 0272596c..1c52ed0d 100644 --- a/znet/kcp_connection.go +++ b/znet/kcp_connection.go @@ -55,10 +55,6 @@ type KcpConnection struct { // (有缓冲管道,用于读、写两个goroutine之间的消息通信) msgBuffChan chan []byte - // Lock for user message reception and transmission - // (用户收发消息的Lock) - msgLock sync.RWMutex - // Connection properties // (链接属性) property map[string]interface{} @@ -67,10 +63,6 @@ type KcpConnection struct { // (保护当前property的锁) propertyLock sync.Mutex - // The current connection's close state - // (当前连接的关闭状态) - isClosed bool - // Which Connection Manager the current connection belongs to // (当前链接是属于哪个Connection Manager的) connManager ziface.IConnManager @@ -120,7 +112,6 @@ func newKcpServerConn(server ziface.IServer, conn *kcp.UDPSession, connID uint64 conn: conn, connID: connID, connIdStr: strconv.FormatUint(connID, 10), - isClosed: false, msgBuffChan: nil, property: nil, name: server.ServerName(), @@ -157,7 +148,6 @@ func newKcpClientConn(client ziface.IClient, conn *kcp.UDPSession) ziface.IConne conn: conn, connID: 0, // client ignore connIdStr: "", // client ignore - isClosed: false, msgBuffChan: nil, property: nil, name: client.GetName(), @@ -344,10 +334,10 @@ func (c *KcpConnection) LocalAddr() net.Addr { } func (c *KcpConnection) Send(data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() - if c.isClosed == true { + select { + case <-c.ctx.Done(): return errors.New("connection closed when send msg") + default: } _, err := c.conn.Write(data) @@ -360,8 +350,11 @@ func (c *KcpConnection) Send(data []byte) error { } func (c *KcpConnection) SendToQueue(data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() + select { + case <-c.ctx.Done(): + return errors.New("Connection closed when send buff msg") + default: + } if c.msgBuffChan == nil { c.msgBuffChan = make(chan []byte, zconf.GlobalObject.MaxMsgChanLen) @@ -372,32 +365,23 @@ func (c *KcpConnection) SendToQueue(data []byte) error { go c.StartWriter() } - idleTimeout := time.NewTimer(5 * time.Millisecond) - defer idleTimeout.Stop() - - if c.isClosed == true { - return errors.New("Connection closed when send buff msg") - } - if data == nil { zlog.Ins().ErrorF("Pack data is nil") return errors.New("Pack data is nil") } - // Send timeout - select { - case <-idleTimeout.C: - return errors.New("send buff msg timeout") - case c.msgBuffChan <- data: - return nil - } + c.msgBuffChan <- data + return nil + } // SendMsg directly sends Message data to the remote KCP client. // (直接将Message数据发送数据给远程的KCP客户端) func (c *KcpConnection) SendMsg(msgID uint32, data []byte) error { - if c.isClosed == true { - return errors.New("connection closed when send msg") + select { + case <-c.ctx.Done(): + return errors.New("Connection closed when send buff msg") + default: } // Pack data and send it msg, err := c.packet.Pack(zpack.NewMsgPackage(msgID, data)) @@ -416,8 +400,10 @@ func (c *KcpConnection) SendMsg(msgID uint32, data []byte) error { } func (c *KcpConnection) SendBuffMsg(msgID uint32, data []byte) error { - if c.isClosed == true { - return errors.New("connection closed when send buff msg") + select { + case <-c.ctx.Done(): + return errors.New("Connection closed when send buff msg") + default: } if c.msgBuffChan == nil { c.msgBuffChan = make(chan []byte, zconf.GlobalObject.MaxMsgChanLen) @@ -428,22 +414,15 @@ func (c *KcpConnection) SendBuffMsg(msgID uint32, data []byte) error { go c.StartWriter() } - idleTimeout := time.NewTimer(5 * time.Millisecond) - defer idleTimeout.Stop() - msg, err := c.packet.Pack(zpack.NewMsgPackage(msgID, data)) if err != nil { zlog.Ins().ErrorF("Pack error msg ID = %d", msgID) return errors.New("Pack error msg ") } - // send timeout - select { - case <-idleTimeout.C: - return errors.New("send buff msg timeout") - case c.msgBuffChan <- msg: - return nil - } + c.msgBuffChan <- msg + return nil + } func (c *KcpConnection) SetProperty(key string, value interface{}) { @@ -483,14 +462,6 @@ func (c *KcpConnection) finalizer() { // (如果用户注册了该链接的 关闭回调业务,那么在此刻应该显示调用) c.callOnConnStop() - c.msgLock.Lock() - defer c.msgLock.Unlock() - - // If the connection has already been closed - if c.isClosed == true { - return - } - // Stop the heartbeat detector associated with the connection if c.hc != nil { c.hc.Stop() @@ -509,8 +480,6 @@ func (c *KcpConnection) finalizer() { close(c.msgBuffChan) } - c.isClosed = true - zlog.Ins().InfoF("Conn Stop()...ConnID = %d", c.connID) } @@ -529,8 +498,10 @@ func (c *KcpConnection) callOnConnStop() { } func (c *KcpConnection) IsAlive() bool { - if c.isClosed { + select { + case <-c.ctx.Done(): return false + default: } // Check the last activity time of the connection. If it's beyond the heartbeat interval, // then the connection is considered dead. diff --git a/znet/ws_connection.go b/znet/ws_connection.go index 825a7f9d..42d5bcf4 100644 --- a/znet/ws_connection.go +++ b/znet/ws_connection.go @@ -52,19 +52,12 @@ type WsConnection struct { // (有缓冲管道,用于读、写两个goroutine之间的消息通信) msgBuffChan chan []byte - // msgLock is used for locking when users send and receive messages. - // (用户收发消息的Lock) - msgLock sync.RWMutex - // property is the connection attribute. (链接属性) property map[string]interface{} // propertyLock protects the current property lock. (保护当前property的锁) propertyLock sync.Mutex - // isClosed is the current connection's closed state. (当前连接的关闭状态) - isClosed bool - // connManager is the Connection Manager to which the current connection belongs. (当前链接是属于哪个Connection Manager的) connManager ziface.IConnManager @@ -112,7 +105,6 @@ func newWebsocketConn(server ziface.IServer, conn *websocket.Conn, connID uint64 conn: conn, connID: connID, connIdStr: strconv.FormatUint(connID, 10), - isClosed: false, msgBuffChan: nil, property: nil, name: server.ServerName(), @@ -147,7 +139,6 @@ func newWsClientConn(client ziface.IClient, conn *websocket.Conn) ziface.IConnec conn: conn, connID: 0, // client ignore connIdStr: "", // client ignore - isClosed: false, msgBuffChan: nil, property: nil, name: client.GetName(), @@ -332,10 +323,10 @@ func (c *WsConnection) LocalAddr() net.Addr { } func (c *WsConnection) Send(data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() - if c.isClosed == true { + select { + case <-c.ctx.Done(): return errors.New("WsConnection closed when send msg") + default: } err := c.conn.WriteMessage(websocket.BinaryMessage, data) @@ -348,8 +339,11 @@ func (c *WsConnection) Send(data []byte) error { } func (c *WsConnection) SendToQueue(data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() + select { + case <-c.ctx.Done(): + return errors.New("WsConnection closed when send msg") + default: + } if c.msgBuffChan == nil { c.msgBuffChan = make(chan []byte, zconf.GlobalObject.MaxMsgChanLen) @@ -360,33 +354,23 @@ func (c *WsConnection) SendToQueue(data []byte) error { go c.StartWriter() } - idleTimeout := time.NewTimer(5 * time.Millisecond) - defer idleTimeout.Stop() - - if c.isClosed == true { - return errors.New("WsConnection closed when send buff msg") - } - if data == nil { zlog.Ins().ErrorF("Pack data is nil") return errors.New("Pack data is nil ") } - select { - case <-idleTimeout.C: - return errors.New("send buff msg timeout") - case c.msgBuffChan <- data: - return nil - } + c.msgBuffChan <- data + return nil + } // SendMsg directly sends the Message data to the remote TCP client. // (直接将Message数据发送数据给远程的TCP客户端) func (c *WsConnection) SendMsg(msgID uint32, data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() - if c.isClosed == true { + select { + case <-c.ctx.Done(): return errors.New("WsConnection closed when send msg") + default: } // Package data and send @@ -409,9 +393,11 @@ func (c *WsConnection) SendMsg(msgID uint32, data []byte) error { // SendBuffMsg sends BuffMsg func (c *WsConnection) SendBuffMsg(msgID uint32, data []byte) error { - c.msgLock.RLock() - defer c.msgLock.RUnlock() - + select { + case <-c.ctx.Done(): + return errors.New("WsConnection closed when send msg") + default: + } if c.msgBuffChan == nil { c.msgBuffChan = make(chan []byte, zconf.GlobalObject.MaxMsgChanLen) // Start the Goroutine for writing back to the client data stream @@ -421,13 +407,6 @@ func (c *WsConnection) SendBuffMsg(msgID uint32, data []byte) error { go c.StartWriter() } - idleTimeout := time.NewTimer(5 * time.Millisecond) - defer idleTimeout.Stop() - - if c.isClosed == true { - return errors.New("WsConnection closed when send buff msg") - } - // Package data and send // (将data封包,并且发送) msg, err := c.packet.Pack(zpack.NewMsgPackage(msgID, data)) @@ -436,13 +415,9 @@ func (c *WsConnection) SendBuffMsg(msgID uint32, data []byte) error { return errors.New("Pack error msg ") } - // Send timeout - select { - case <-idleTimeout.C: - return errors.New("send buff msg timeout") - case c.msgBuffChan <- msg: - return nil - } + c.msgBuffChan <- msg + return nil + } func (c *WsConnection) SetProperty(key string, value interface{}) { @@ -484,15 +459,6 @@ func (c *WsConnection) finalizer() { // (如果用户注册了该链接的 关闭回调业务,那么在此刻应该显示调用) c.callOnConnStop() - c.msgLock.Lock() - defer c.msgLock.Unlock() - - // If the current connection is already closed. - // (如果当前链接已经关闭) - if c.isClosed == true { - return - } - // Stop the heartbeat detector bound to the connection. // (关闭链接绑定的心跳检测器) if c.hc != nil { @@ -515,9 +481,6 @@ func (c *WsConnection) finalizer() { close(c.msgBuffChan) } - // Set the flag to indicate that the connection is closed. (设置标志位) - c.isClosed = true - zlog.Ins().InfoF("Conn Stop()...ConnID = %d", c.connID) } @@ -536,8 +499,10 @@ func (c *WsConnection) callOnConnStop() { } func (c *WsConnection) IsAlive() bool { - if c.isClosed { + select { + case <-c.ctx.Done(): return false + default: } // Check the time duration since the last activity of the connection, if it exceeds the maximum heartbeat interval, // then the connection is considered dead From 7c82f3016178db485bb78d22d897e650644e5abe Mon Sep 17 00:00:00 2001 From: wzrtzt Date: Mon, 11 Sep 2023 09:14:05 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix:=20=E6=B7=BB=E5=8A=A0=E7=A9=BA=E6=8C=87?= =?UTF-8?q?=E9=92=88=E5=88=A4=E6=96=AD=EF=BC=8C=E6=8F=90=E5=8D=87=E7=A8=B3?= =?UTF-8?q?=E5=AE=9A=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- znet/connection.go | 20 +++++++++++++++++++- znet/kcp_connection.go | 17 ++++++++++++++++- znet/ws_connection.go | 20 +++++++++++++++++++- 3 files changed, 54 insertions(+), 3 deletions(-) diff --git a/znet/connection.go b/znet/connection.go index f61155af..355aac1e 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -299,7 +299,10 @@ func (c *Connection) Start() { // Stop stops the connection and ends the current connection state. // (停止连接,结束当前连接状态) func (c *Connection) Stop() { - c.cancel() + if c.cancel != nil { + c.cancel() + } + } func (c *Connection) GetConnection() net.Conn { @@ -336,6 +339,9 @@ func (c *Connection) LocalAddr() net.Addr { } func (c *Connection) Send(data []byte) error { + if c.ctx == nil { + return errors.New("connection not start when send msg") + } select { case <-c.ctx.Done(): return errors.New("Send error data = %+v, ctx is done") @@ -352,6 +358,9 @@ func (c *Connection) Send(data []byte) error { } func (c *Connection) SendToQueue(data []byte) error { + if c.ctx == nil { + return errors.New("connection not start when send msg") + } select { case <-c.ctx.Done(): return errors.New("Connection closed when send buff msg") @@ -379,6 +388,9 @@ func (c *Connection) SendToQueue(data []byte) error { // SendMsg directly sends Message data to the remote TCP client. // (直接将Message数据发送数据给远程的TCP客户端) func (c *Connection) SendMsg(msgID uint32, data []byte) error { + if c.ctx == nil { + return errors.New("connection not start when send msg") + } select { case <-c.ctx.Done(): return errors.New("connection closed when send msg") @@ -402,6 +414,9 @@ func (c *Connection) SendMsg(msgID uint32, data []byte) error { } func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error { + if c.ctx == nil { + return errors.New("connection not start when send msg") + } select { case <-c.ctx.Done(): return errors.New("connection closed when send buff msg") @@ -499,6 +514,9 @@ func (c *Connection) callOnConnStop() { } func (c *Connection) IsAlive() bool { + if c.ctx == nil { + return false + } select { case <-c.ctx.Done(): return false diff --git a/znet/kcp_connection.go b/znet/kcp_connection.go index 1c52ed0d..15dbc77a 100644 --- a/znet/kcp_connection.go +++ b/znet/kcp_connection.go @@ -297,7 +297,10 @@ func (c *KcpConnection) Start() { // Stop stops the connection and ends the current connection state. // (停止连接,结束当前连接状态) func (c *KcpConnection) Stop() { - c.cancel() + if c.cancel != nil { + c.cancel() + } + } func (c *KcpConnection) GetConnection() net.Conn { @@ -334,6 +337,9 @@ func (c *KcpConnection) LocalAddr() net.Addr { } func (c *KcpConnection) Send(data []byte) error { + if c.ctx == nil { + return errors.New("connection not start when send msg") + } select { case <-c.ctx.Done(): return errors.New("connection closed when send msg") @@ -350,6 +356,9 @@ func (c *KcpConnection) Send(data []byte) error { } func (c *KcpConnection) SendToQueue(data []byte) error { + if c.ctx == nil { + return errors.New("connection not start when send msg") + } select { case <-c.ctx.Done(): return errors.New("Connection closed when send buff msg") @@ -378,6 +387,9 @@ func (c *KcpConnection) SendToQueue(data []byte) error { // SendMsg directly sends Message data to the remote KCP client. // (直接将Message数据发送数据给远程的KCP客户端) func (c *KcpConnection) SendMsg(msgID uint32, data []byte) error { + if c.ctx == nil { + return errors.New("connection not start when send msg") + } select { case <-c.ctx.Done(): return errors.New("Connection closed when send buff msg") @@ -400,6 +412,9 @@ func (c *KcpConnection) SendMsg(msgID uint32, data []byte) error { } func (c *KcpConnection) SendBuffMsg(msgID uint32, data []byte) error { + if c.ctx == nil { + return errors.New("connection not start when send msg") + } select { case <-c.ctx.Done(): return errors.New("Connection closed when send buff msg") diff --git a/znet/ws_connection.go b/znet/ws_connection.go index 42d5bcf4..2652668d 100644 --- a/znet/ws_connection.go +++ b/znet/ws_connection.go @@ -286,7 +286,10 @@ func (c *WsConnection) Start() { // Stop stops the connection and ends its current state. // (停止连接,结束当前连接状态) func (c *WsConnection) Stop() { - c.cancel() + if c.cancel != nil { + c.cancel() + } + } func (c *WsConnection) GetConnection() net.Conn { @@ -323,6 +326,9 @@ func (c *WsConnection) LocalAddr() net.Addr { } func (c *WsConnection) Send(data []byte) error { + if c.ctx == nil { + return errors.New("connection not start when send msg") + } select { case <-c.ctx.Done(): return errors.New("WsConnection closed when send msg") @@ -339,6 +345,9 @@ func (c *WsConnection) Send(data []byte) error { } func (c *WsConnection) SendToQueue(data []byte) error { + if c.ctx == nil { + return errors.New("connection not start when send msg") + } select { case <-c.ctx.Done(): return errors.New("WsConnection closed when send msg") @@ -367,6 +376,9 @@ func (c *WsConnection) SendToQueue(data []byte) error { // SendMsg directly sends the Message data to the remote TCP client. // (直接将Message数据发送数据给远程的TCP客户端) func (c *WsConnection) SendMsg(msgID uint32, data []byte) error { + if c.ctx == nil { + return errors.New("connection not start when send msg") + } select { case <-c.ctx.Done(): return errors.New("WsConnection closed when send msg") @@ -393,6 +405,9 @@ func (c *WsConnection) SendMsg(msgID uint32, data []byte) error { // SendBuffMsg sends BuffMsg func (c *WsConnection) SendBuffMsg(msgID uint32, data []byte) error { + if c.ctx == nil { + return errors.New("connection not start when send msg") + } select { case <-c.ctx.Done(): return errors.New("WsConnection closed when send msg") @@ -499,6 +514,9 @@ func (c *WsConnection) callOnConnStop() { } func (c *WsConnection) IsAlive() bool { + if c.ctx == nil { + return false + } select { case <-c.ctx.Done(): return false