Unverified Commit 3797f77d authored by 望哥's avatar 望哥 Committed by GitHub

Merge pull request #10 from fangyincheng/dubbogo-master

Add:add time wheel
parents 0aabe35d 1cf3079f
...@@ -152,7 +152,7 @@ func (c *client) dialTCP() Session { ...@@ -152,7 +152,7 @@ func (c *client) dialTCP() Session {
} }
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, err) log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, err)
time.Sleep(connectInterval) <-wheel.After(connectInterval)
} }
} }
...@@ -180,7 +180,7 @@ func (c *client) dialUDP() Session { ...@@ -180,7 +180,7 @@ func (c *client) dialUDP() Session {
} }
if err != nil { if err != nil {
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, err) log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, err)
time.Sleep(connectInterval) <-wheel.After(connectInterval)
continue continue
} }
...@@ -189,7 +189,7 @@ func (c *client) dialUDP() Session { ...@@ -189,7 +189,7 @@ func (c *client) dialUDP() Session {
if length, err = conn.Write(connectPingPackage[:]); err != nil { if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close() conn.Close()
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, err) log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, err)
time.Sleep(connectInterval) <-wheel.After(connectInterval)
continue continue
} }
conn.SetReadDeadline(time.Now().Add(1e9)) conn.SetReadDeadline(time.Now().Add(1e9))
...@@ -200,7 +200,7 @@ func (c *client) dialUDP() Session { ...@@ -200,7 +200,7 @@ func (c *client) dialUDP() Session {
if err != nil { if err != nil {
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, err) log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, err)
conn.Close() conn.Close()
time.Sleep(connectInterval) <-wheel.After(connectInterval)
continue continue
} }
//if err == nil { //if err == nil {
...@@ -238,7 +238,7 @@ func (c *client) dialWS() Session { ...@@ -238,7 +238,7 @@ func (c *client) dialWS() Session {
} }
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err) log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
time.Sleep(connectInterval) <-wheel.After(connectInterval)
} }
} }
...@@ -316,7 +316,7 @@ func (c *client) dialWSS() Session { ...@@ -316,7 +316,7 @@ func (c *client) dialWSS() Session {
} }
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err) log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
time.Sleep(connectInterval) <-wheel.After(connectInterval)
} }
} }
...@@ -421,7 +421,7 @@ func (c *client) reConnect() { ...@@ -421,7 +421,7 @@ func (c *client) reConnect() {
if maxTimes < times { if maxTimes < times {
times = maxTimes times = maxTimes
} }
time.Sleep(time.Duration(int64(times) * int64(interval))) <-wheel.After(time.Duration(int64(times) * int64(interval)))
} }
} }
......
module github.com/dubbogo/getty module github.com/dubbogo/getty
require ( require (
github.com/dubbogo/gostd v0.0.0-20190625030817-87d8669125b7
github.com/golang/snappy v0.0.1 github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.0 github.com/gorilla/websocket v1.4.0
github.com/pkg/errors v0.8.1 github.com/pkg/errors v0.8.1
......
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/gostd v0.0.0-20190625030817-87d8669125b7 h1:Wmt8yQMGkNx4GDUvU4CA+dwIsDwgi+DbP28NZV2ruqQ=
github.com/dubbogo/gostd v0.0.0-20190625030817-87d8669125b7/go.mod h1:lQ7PmKvs6xplvjzEEMKw8XmP20D9raD+wFfzxkKaBd4=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
......
...@@ -243,7 +243,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) { ...@@ -243,7 +243,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
return return
} }
if delay != 0 { if delay != 0 {
time.Sleep(delay) <-wheel.After(delay)
} }
client, err = s.accept(newSession) client, err = s.accept(newSession)
if err != nil { if err != nil {
......
...@@ -20,6 +20,7 @@ import ( ...@@ -20,6 +20,7 @@ import (
) )
import ( import (
gstime "github.com/dubbogo/gostd/time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
perrors "github.com/pkg/errors" perrors "github.com/pkg/errors"
) )
...@@ -42,6 +43,14 @@ const ( ...@@ -42,6 +43,14 @@ const (
// session // session
///////////////////////////////////////// /////////////////////////////////////////
var (
wheel = gstime.NewWheel(gstime.TimeMillisecondDuration(100), 1200) // wheel longest span is 2 minute
)
func GetTimeWheel() *gstime.Wheel {
return wheel
}
// getty base session // getty base session
type session struct { type session struct {
name string name string
...@@ -374,7 +383,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { ...@@ -374,7 +383,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
case s.wQ <- pkg: case s.wQ <- pkg:
break // for possible gen a new pkg break // for possible gen a new pkg
case <-time.After(timeout): case <-wheel.After(timeout):
log.Warnf("%s, [session.WritePkg] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ)) log.Warnf("%s, [session.WritePkg] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
return ErrSessionBlocked return ErrSessionBlocked
} }
...@@ -480,7 +489,7 @@ func (s *session) handleLoop() { ...@@ -480,7 +489,7 @@ func (s *session) handleLoop() {
wsFlag bool wsFlag bool
wsConn *gettyWSConn wsConn *gettyWSConn
// start time.Time // start time.Time
counter CountWatch counter gstime.CountWatch
inPkg interface{} inPkg interface{}
outPkg interface{} outPkg interface{}
) )
...@@ -548,7 +557,7 @@ LOOP: ...@@ -548,7 +557,7 @@ LOOP:
log.Infof("[session.handleLoop] drop writeout package{%#v}", outPkg) log.Infof("[session.handleLoop] drop writeout package{%#v}", outPkg)
} }
case <-time.After(s.period): case <-wheel.After(s.period):
if flag { if flag {
if wsFlag { if wsFlag {
err := wsConn.writePing() err := wsConn.writePing()
......
package getty package getty
import ( import (
"context"
"net" "net"
"strings" "strings"
"context"
"sync"
"time"
) )
// refers from https://github.com/facebookgo/grace/blob/master/gracenet/net.go#L180:6 // refers from https://github.com/facebookgo/grace/blob/master/gracenet/net.go#L180:6
...@@ -30,7 +28,6 @@ func IsSameAddr(a1, a2 net.Addr) bool { ...@@ -30,7 +28,6 @@ func IsSameAddr(a1, a2 net.Addr) bool {
return a1s == a2s return a1s == a2s
} }
var ( var (
defaultCtxKey int = 1 defaultCtxKey int = 1
) )
...@@ -81,116 +78,3 @@ func (c *ValuesContext) Delete(key interface{}) { ...@@ -81,116 +78,3 @@ func (c *ValuesContext) Delete(key interface{}) {
func (c *ValuesContext) Set(key interface{}, value interface{}) { func (c *ValuesContext) Set(key interface{}, value interface{}) {
c.Context.Value(defaultCtxKey).(Values).Set(key, value) c.Context.Value(defaultCtxKey).(Values).Set(key, value)
} }
type Wheel struct {
sync.RWMutex
span time.Duration
period time.Duration
ticker *time.Ticker
index int
ring []chan struct{}
once sync.Once
now time.Time
}
func NewWheel(span time.Duration, buckets int) *Wheel {
var (
w *Wheel
)
if span == 0 {
panic("@span == 0")
}
if buckets == 0 {
panic("@bucket == 0")
}
w = &Wheel{
span: span,
period: span * (time.Duration(buckets)),
ticker: time.NewTicker(span),
index: 0,
ring: make([](chan struct{}), buckets),
now: time.Now(),
}
go func() {
var notify chan struct{}
// var cw CountWatch
// cw.Start()
for t := range w.ticker.C {
w.Lock()
w.now = t
// fmt.Println("index:", w.index, ", value:", w.bitmap.Get(w.index))
notify = w.ring[w.index]
w.ring[w.index] = nil
w.index = (w.index + 1) % len(w.ring)
w.Unlock()
if notify != nil {
close(notify)
}
}
// fmt.Println("timer costs:", cw.Count()/1e9, "s")
}()
return w
}
func (w *Wheel) Stop() {
w.once.Do(func() { w.ticker.Stop() })
}
func (w *Wheel) After(timeout time.Duration) <-chan struct{} {
if timeout >= w.period {
panic("@timeout over ring's life period")
}
var pos = int(timeout / w.span)
if 0 < pos {
pos--
}
w.Lock()
pos = (w.index + pos) % len(w.ring)
if w.ring[pos] == nil {
w.ring[pos] = make(chan struct{})
}
// fmt.Println("pos:", pos)
c := w.ring[pos]
w.Unlock()
return c
}
func (w *Wheel) Now() time.Time {
w.RLock()
now := w.now
w.RUnlock()
return now
}
type CountWatch struct {
start time.Time
}
func (w *CountWatch) Start() {
var t time.Time
if t.Equal(w.start) {
w.start = time.Now()
}
}
func (w *CountWatch) Reset() {
w.start = time.Now()
}
func (w *CountWatch) Count() int64 {
return time.Since(w.start).Nanoseconds()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment