Commit 7cf17969 authored by ztelur's avatar ztelur

quitOnce & keepSession

parent bedf9d59
...@@ -57,7 +57,8 @@ func NewConfigClient(opts ...Option) *Client { ...@@ -57,7 +57,8 @@ func NewConfigClient(opts ...Option) *Client {
// Client represents etcd client Configuration // Client represents etcd client Configuration
type Client struct { type Client struct {
lock sync.RWMutex lock sync.RWMutex
quitOnce sync.Once
// these properties are only set once when they are started. // these properties are only set once when they are started.
name string name string
...@@ -102,9 +103,9 @@ func NewClient(name string, endpoints []string, timeout time.Duration, heartbeat ...@@ -102,9 +103,9 @@ func NewClient(name string, endpoints []string, timeout time.Duration, heartbeat
exit: make(chan struct{}), exit: make(chan struct{}),
} }
if err := c.maintenanceStatus(); err != nil { if err := c.keepSession(); err != nil {
cancel() cancel()
return nil, perrors.WithMessage(err, "client maintenance status") return nil, perrors.WithMessage(err, "client keep session")
} }
return c, nil return c, nil
} }
...@@ -124,11 +125,15 @@ func (c *Client) clean() { ...@@ -124,11 +125,15 @@ func (c *Client) clean() {
func (c *Client) stop() bool { func (c *Client) stop() bool {
select { select {
case <-c.exit: case <-c.exit:
return true return false
default: default:
close(c.exit) ret := false
c.quitOnce.Do(func() {
ret = true
close(c.exit)
})
return ret
} }
return false
} }
// Close close client // Close close client
...@@ -138,9 +143,11 @@ func (c *Client) Close() { ...@@ -138,9 +143,11 @@ func (c *Client) Close() {
} }
// stop the client // stop the client
c.stop() if ret := c.stop(); !ret {
return
}
// wait client maintenance status stop // wait client keep session stop
c.Wait.Wait() c.Wait.Wait()
c.lock.Lock() c.lock.Lock()
...@@ -151,22 +158,22 @@ func (c *Client) Close() { ...@@ -151,22 +158,22 @@ func (c *Client) Close() {
log.Printf("etcd client{name:%s, endpoints:%s} exit now.", c.name, c.endpoints) log.Printf("etcd client{name:%s, endpoints:%s} exit now.", c.name, c.endpoints)
} }
func (c *Client) maintenanceStatus() error { func (c *Client) keepSession() error {
s, err := concurrency.NewSession(c.rawClient, concurrency.WithTTL(c.heartbeat)) s, err := concurrency.NewSession(c.rawClient, concurrency.WithTTL(c.heartbeat))
if err != nil { if err != nil {
return perrors.WithMessage(err, "new session with server") return perrors.WithMessage(err, "new session with server")
} }
// must add wg before go maintenance status goroutine // must add wg before go keep session goroutine
c.Wait.Add(1) c.Wait.Add(1)
go c.maintenanceStatusLoop(s) go c.keepSessionLoop(s)
return nil return nil
} }
func (c *Client) maintenanceStatusLoop(s *concurrency.Session) { func (c *Client) keepSessionLoop(s *concurrency.Session) {
defer func() { defer func() {
c.Wait.Done() c.Wait.Done()
log.Printf("etcd client {endpoints:%v, name:%s} maintenance goroutine game over.", c.endpoints, c.name) log.Printf("etcd client {endpoints:%v, name:%s} keep goroutine game over.", c.endpoints, c.name)
}() }()
for { for {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment