Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

使用ctx.Done代替isClose #279

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 46 additions & 56 deletions znet/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ type Connection struct {
// (有缓冲管道,用于读、写两个goroutine之间的消息通信)
msgBuffChan chan []byte

// Lock for user message reception and transmission
// (用户收发消息的Lock)
msgLock sync.RWMutex

// Connection properties
// (链接属性)
property map[string]interface{}
Expand All @@ -66,10 +62,6 @@ type Connection struct {
// (保护当前property的锁)
propertyLock sync.Mutex

// The current connection's close state
// (当前连接的关闭状态)
isClosed bool

// Which Connection Manager the current connection belongs to
// (当前链接是属于哪个Connection Manager的)
connManager ziface.IConnManager
Expand Down Expand Up @@ -120,7 +112,6 @@ func newServerConn(server ziface.IServer, conn net.Conn, connID uint64) ziface.I
conn: conn,
connID: connID,
connIdStr: strconv.FormatUint(connID, 10),
isClosed: false,
msgBuffChan: nil,
property: nil,
name: server.ServerName(),
Expand Down Expand Up @@ -157,7 +148,6 @@ func newClientConn(client ziface.IClient, conn net.Conn) ziface.IConnection {
conn: conn,
connID: 0, // client ignore
connIdStr: "", // client ignore
isClosed: false,
msgBuffChan: nil,
property: nil,
name: client.GetName(),
Expand Down Expand Up @@ -309,7 +299,10 @@ func (c *Connection) Start() {
// Stop stops the connection and ends the current connection state.
// (停止连接,结束当前连接状态)
func (c *Connection) Stop() {
c.cancel()
if c.cancel != nil {
c.cancel()
}

}

func (c *Connection) GetConnection() net.Conn {
Expand Down Expand Up @@ -346,12 +339,15 @@ func (c *Connection) LocalAddr() net.Addr {
}

func (c *Connection) Send(data []byte) error {
c.msgLock.RLock()
defer c.msgLock.RUnlock()
if c.isClosed == true {
return errors.New("connection closed when send msg")
if c.ctx == nil {
return errors.New("connection not start when send msg")
}
select {
case <-c.ctx.Done():
return errors.New("Send error data = %+v, ctx is done")
default:

}
_, err := c.conn.Write(data)
if err != nil {
zlog.Ins().ErrorF("SendMsg err data = %+v, err = %+v", data, err)
Expand All @@ -362,9 +358,14 @@ func (c *Connection) Send(data []byte) error {
}

func (c *Connection) SendToQueue(data []byte) error {
c.msgLock.RLock()
defer c.msgLock.RUnlock()

if c.ctx == nil {
return errors.New("connection not start when send msg")
}
select {
case <-c.ctx.Done():
return errors.New("Connection closed when send buff msg")
default:
}
if c.msgBuffChan == nil {
c.msgBuffChan = make(chan []byte, zconf.GlobalObject.MaxMsgChanLen)
// Start a Goroutine to write data back to the client
Expand All @@ -374,34 +375,29 @@ func (c *Connection) SendToQueue(data []byte) error {
go c.StartWriter()
}

idleTimeout := time.NewTimer(5 * time.Millisecond)
defer idleTimeout.Stop()

if c.isClosed == true {
return errors.New("Connection closed when send buff msg")
}

if data == nil {
zlog.Ins().ErrorF("Pack data is nil")
return errors.New("Pack data is nil")
}

// Send timeout
select {
case <-idleTimeout.C:
return errors.New("send buff msg timeout")
case c.msgBuffChan <- data:
return nil
}
c.msgBuffChan <- data
return nil

}

// SendMsg directly sends Message data to the remote TCP client.
// (直接将Message数据发送数据给远程的TCP客户端)
func (c *Connection) SendMsg(msgID uint32, data []byte) error {
if c.isClosed == true {
if c.ctx == nil {
return errors.New("connection not start when send msg")
}
select {
case <-c.ctx.Done():
return errors.New("connection closed when send msg")
default:
// Pack data and send it

}
// Pack data and send it
msg, err := c.packet.Pack(zpack.NewMsgPackage(msgID, data))
if err != nil {
zlog.Ins().ErrorF("Pack error msg ID = %d", msgID)
Expand All @@ -418,8 +414,13 @@ func (c *Connection) SendMsg(msgID uint32, data []byte) error {
}

func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error {
if c.isClosed == true {
if c.ctx == nil {
return errors.New("connection not start when send msg")
}
select {
case <-c.ctx.Done():
return errors.New("connection closed when send buff msg")
default:
}
if c.msgBuffChan == nil {
c.msgBuffChan = make(chan []byte, zconf.GlobalObject.MaxMsgChanLen)
Expand All @@ -430,22 +431,14 @@ func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error {
go c.StartWriter()
}

idleTimeout := time.NewTimer(5 * time.Millisecond)
defer idleTimeout.Stop()

msg, err := c.packet.Pack(zpack.NewMsgPackage(msgID, data))
if err != nil {
zlog.Ins().ErrorF("Pack error msg ID = %d", msgID)
return errors.New("Pack error msg ")
}
c.msgBuffChan <- msg
return nil

// send timeout
select {
case <-idleTimeout.C:
return errors.New("send buff msg timeout")
case c.msgBuffChan <- msg:
return nil
}
}

func (c *Connection) SetProperty(key string, value interface{}) {
Expand Down Expand Up @@ -485,14 +478,6 @@ func (c *Connection) finalizer() {
// (如果用户注册了该链接的 关闭回调业务,那么在此刻应该显示调用)
c.callOnConnStop()

c.msgLock.Lock()
defer c.msgLock.Unlock()

// If the connection has already been closed
if c.isClosed == true {
return
}

// Stop the heartbeat detector associated with the connection
if c.hc != nil {
c.hc.Stop()
Expand All @@ -511,8 +496,6 @@ func (c *Connection) finalizer() {
close(c.msgBuffChan)
}

c.isClosed = true

zlog.Ins().InfoF("Conn Stop()...ConnID = %d", c.connID)
}

Expand All @@ -531,13 +514,20 @@ func (c *Connection) callOnConnStop() {
}

func (c *Connection) IsAlive() bool {
if c.isClosed {
if c.ctx == nil {
return false
}
select {
case <-c.ctx.Done():
return false
default:

}
// Check the last activity time of the connection. If it's beyond the heartbeat interval,
// then the connection is considered dead.
// (检查连接最后一次活动时间,如果超过心跳间隔,则认为连接已经死亡)
return time.Now().Sub(c.lastActivityTime) < zconf.GlobalObject.HeartbeatMaxDuration()

}

func (c *Connection) updateActivity() {
Expand Down
Loading