Skip to content

Commit

Permalink
Naive implementation for issue alphazero#2. Added a simple connection…
Browse files Browse the repository at this point in the history
… pool using channels for concurrency
  • Loading branch information
Ryan Day committed Aug 1, 2012
1 parent 57ae451 commit b9a9fb1
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 30 deletions.
102 changes: 76 additions & 26 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"log"
"net"
// "os"
"sync"
"time"
)

const (
TCP = "tcp"
UNIX = "unix"
UNIX = "unix"
LOCALHOST = "127.0.0.1"
// ns1MSec = 1000000
//// ns1Sec = ns1MSec * 1000
Expand Down Expand Up @@ -148,6 +149,7 @@ type connHdl struct {
spec *ConnectionSpec
conn net.Conn // may want to change this to TCPConn
reader *bufio.Reader
m sync.Mutex
}

func (chdl *connHdl) String() string {
Expand All @@ -165,19 +167,19 @@ func newConnHdl(spec *ConnectionSpec) (hdl *connHdl, err Error) {
return nil, NewError(SYSTEM_ERR, fmt.Sprintf("%s(): failed to allocate connHdl", here))
}

var mode, addr string
if (spec.port == 0) {
mode = UNIX
addr = spec.host
} else {
mode = TCP
addr = fmt.Sprintf("%s:%d", spec.host, spec.port)
_, e := net.ResolveTCPAddr(TCP, addr)
if e != nil {
msg := fmt.Sprintf("%s(): failed to resolve remote address %s", here, addr)
return nil, NewErrorWithCause(SYSTEM_ERR, msg, e)
}
}
var mode, addr string
if spec.port == 0 {
mode = UNIX
addr = spec.host
} else {
mode = TCP
addr = fmt.Sprintf("%s:%d", spec.host, spec.port)
_, e := net.ResolveTCPAddr(TCP, addr)
if e != nil {
msg := fmt.Sprintf("%s(): failed to resolve remote address %s", here, addr)
return nil, NewErrorWithCause(SYSTEM_ERR, msg, e)
}
}

conn, e := net.Dial(mode, addr)
switch {
Expand All @@ -195,6 +197,7 @@ func newConnHdl(spec *ConnectionSpec) (hdl *connHdl, err Error) {
hdl.reader = bufio.NewReaderSize(conn, bufsize)
log.Printf("<INFO> Connected to %s", hdl)
}
hdl.m = sync.Mutex{}
return hdl, nil
}

Expand All @@ -204,12 +207,12 @@ func configureConn(conn net.Conn, spec *ConnectionSpec) {
// but we absolutely need to be able to use timeouts.
// conn.SetReadTimeout(spec.rTimeout);
// conn.SetWriteTimeout(spec.wTimeout);
if tcp, ok := conn.(*net.TCPConn); ok {
tcp.SetLinger(spec.lingerspec)
tcp.SetKeepAlive(spec.keepalive)
tcp.SetReadBuffer(spec.rBufSize)
tcp.SetWriteBuffer(spec.wBufSize)
}
if tcp, ok := conn.(*net.TCPConn); ok {
tcp.SetLinger(spec.lingerspec)
tcp.SetKeepAlive(spec.keepalive)
tcp.SetReadBuffer(spec.rBufSize)
tcp.SetWriteBuffer(spec.wBufSize)
}
}

// onConnect event handler will issue AUTH/SELECT on new connection
Expand Down Expand Up @@ -259,15 +262,62 @@ type SyncConnection interface {
Close() error
}

type connPool struct {
active int
count int
pool []*connHdl // Connection pool
conn chan *connHdl // To handle concurrency
}

func (p *connPool) Close() error {
var err error

for i := 0; i < p.count; i++ {
err = p.pool[i].Close()
// XXX handle err
}

return err
}

func (p *connPool) AddConnection(conn *connHdl) {
// Put the connection in the pool, and add it to the channel
p.pool[p.active] = conn
p.active = p.active + 1
p.conn <- conn
}

func (p *connPool) ServiceRequest(cmd *Command, args [][]byte) (Response, Error) {
// Wait for a connection to become available
var conn = <-p.conn
res, err := conn.ServiceRequest(cmd, args)

// Return our connection to the pool
p.conn <- conn
return res, err
}

func newConnPool(count int) *connPool {
return &connPool{0, count, make([]*connHdl, count), make(chan *connHdl, count)}
}

// Creates a new SyncConnection using the provided ConnectionSpec
func NewSyncConnection(spec *ConnectionSpec) (c SyncConnection, err Error) {
connHdl, e := newConnHdl(spec)
if e != nil {
return nil, e
}
var e Error

// XXX In order to allow backwards compatability
// the pool size is hardcoded for now.
pool := newConnPool(4)
for i := 0; i < 4; i++ {
connHdl, e := newConnHdl(spec)
if e != nil {
return nil, e
}

e = connHdl.onConnect()
return connHdl, e
e = connHdl.onConnect()
pool.AddConnection(connHdl)
}
return pool, e
}

// Implementation of SyncConnection.ServiceRequest.
Expand Down
7 changes: 3 additions & 4 deletions synchclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,10 @@ func NewSynchClient() (c Client, err Error) {
//
func NewSynchClientWithSpec(spec *ConnectionSpec) (c Client, err Error) {
_c := new(syncClient)
_c.conn, err = NewSyncConnection(spec)
_c.conn, err = NewSyncConnection(spec)
if err != nil {
return nil, withError (err)
}
// _c.conn = conn
return nil, withError (err)
}
return _c, nil
}

Expand Down

0 comments on commit b9a9fb1

Please sign in to comment.