Commit 1cf3079f authored by fangyincheng's avatar fangyincheng

Add:add time wheel

parent 78e88386
......@@ -152,7 +152,7 @@ func (c *client) dialTCP() Session {
}
log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, err)
time.Sleep(connectInterval)
<-wheel.After(connectInterval)
}
}
......@@ -180,7 +180,7 @@ func (c *client) dialUDP() Session {
}
if err != nil {
log.Warnf("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, err)
time.Sleep(connectInterval)
<-wheel.After(connectInterval)
continue
}
......@@ -189,7 +189,7 @@ func (c *client) dialUDP() Session {
if length, err = conn.Write(connectPingPackage[:]); err != nil {
conn.Close()
log.Warnf("conn.Write(%s) = {length:%d, err:%+v}", string(connectPingPackage), length, err)
time.Sleep(connectInterval)
<-wheel.After(connectInterval)
continue
}
conn.SetReadDeadline(time.Now().Add(1e9))
......@@ -200,7 +200,7 @@ func (c *client) dialUDP() Session {
if err != nil {
log.Infof("conn{%#v}.Read() = {length:%d, err:%+v}", conn, length, err)
conn.Close()
time.Sleep(connectInterval)
<-wheel.After(connectInterval)
continue
}
//if err == nil {
......@@ -238,7 +238,7 @@ func (c *client) dialWS() Session {
}
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
time.Sleep(connectInterval)
<-wheel.After(connectInterval)
}
}
......@@ -316,7 +316,7 @@ func (c *client) dialWSS() Session {
}
log.Infof("websocket.dialer.Dial(addr:%s) = error:%+v", c.addr, err)
time.Sleep(connectInterval)
<-wheel.After(connectInterval)
}
}
......@@ -421,7 +421,7 @@ func (c *client) reConnect() {
if maxTimes < times {
times = maxTimes
}
time.Sleep(time.Duration(int64(times) * int64(interval)))
<-wheel.After(time.Duration(int64(times) * int64(interval)))
}
}
......
module github.com/dubbogo/getty
require (
github.com/dubbogo/gostd v0.0.0-20190625030817-87d8669125b7
github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.0
github.com/pkg/errors v0.8.1
......
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/gostd v0.0.0-20190625030817-87d8669125b7 h1:Wmt8yQMGkNx4GDUvU4CA+dwIsDwgi+DbP28NZV2ruqQ=
github.com/dubbogo/gostd v0.0.0-20190625030817-87d8669125b7/go.mod h1:lQ7PmKvs6xplvjzEEMKw8XmP20D9raD+wFfzxkKaBd4=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
......
......@@ -243,7 +243,7 @@ func (s *server) runTcpEventLoop(newSession NewSessionCallback) {
return
}
if delay != 0 {
time.Sleep(delay)
<-wheel.After(delay)
}
client, err = s.accept(newSession)
if err != nil {
......
......@@ -20,6 +20,7 @@ import (
)
import (
gstime "github.com/dubbogo/gostd/time"
"github.com/gorilla/websocket"
perrors "github.com/pkg/errors"
)
......@@ -42,6 +43,14 @@ const (
// session
/////////////////////////////////////////
var (
wheel = gstime.NewWheel(gstime.TimeMillisecondDuration(100), 1200) // wheel longest span is 2 minute
)
func GetTimeWheel() *gstime.Wheel {
return wheel
}
// getty base session
type session struct {
name string
......@@ -374,7 +383,7 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
case s.wQ <- pkg:
break // for possible gen a new pkg
case <-time.After(timeout):
case <-wheel.After(timeout):
log.Warnf("%s, [session.WritePkg] wQ{len:%d, cap:%d}", s.Stat(), len(s.wQ), cap(s.wQ))
return ErrSessionBlocked
}
......@@ -480,7 +489,7 @@ func (s *session) handleLoop() {
wsFlag bool
wsConn *gettyWSConn
// start time.Time
counter CountWatch
counter gstime.CountWatch
inPkg interface{}
outPkg interface{}
)
......@@ -548,7 +557,7 @@ LOOP:
log.Infof("[session.handleLoop] drop writeout package{%#v}", outPkg)
}
case <-time.After(s.period):
case <-wheel.After(s.period):
if flag {
if wsFlag {
err := wsConn.writePing()
......
package getty
import (
"context"
"net"
"strings"
"context"
"sync"
"time"
)
// refers from https://github.com/facebookgo/grace/blob/master/gracenet/net.go#L180:6
......@@ -30,7 +28,6 @@ func IsSameAddr(a1, a2 net.Addr) bool {
return a1s == a2s
}
var (
defaultCtxKey int = 1
)
......@@ -81,116 +78,3 @@ func (c *ValuesContext) Delete(key interface{}) {
func (c *ValuesContext) Set(key interface{}, value interface{}) {
c.Context.Value(defaultCtxKey).(Values).Set(key, value)
}
type Wheel struct {
sync.RWMutex
span time.Duration
period time.Duration
ticker *time.Ticker
index int
ring []chan struct{}
once sync.Once
now time.Time
}
func NewWheel(span time.Duration, buckets int) *Wheel {
var (
w *Wheel
)
if span == 0 {
panic("@span == 0")
}
if buckets == 0 {
panic("@bucket == 0")
}
w = &Wheel{
span: span,
period: span * (time.Duration(buckets)),
ticker: time.NewTicker(span),
index: 0,
ring: make([](chan struct{}), buckets),
now: time.Now(),
}
go func() {
var notify chan struct{}
// var cw CountWatch
// cw.Start()
for t := range w.ticker.C {
w.Lock()
w.now = t
// fmt.Println("index:", w.index, ", value:", w.bitmap.Get(w.index))
notify = w.ring[w.index]
w.ring[w.index] = nil
w.index = (w.index + 1) % len(w.ring)
w.Unlock()
if notify != nil {
close(notify)
}
}
// fmt.Println("timer costs:", cw.Count()/1e9, "s")
}()
return w
}
func (w *Wheel) Stop() {
w.once.Do(func() { w.ticker.Stop() })
}
func (w *Wheel) After(timeout time.Duration) <-chan struct{} {
if timeout >= w.period {
panic("@timeout over ring's life period")
}
var pos = int(timeout / w.span)
if 0 < pos {
pos--
}
w.Lock()
pos = (w.index + pos) % len(w.ring)
if w.ring[pos] == nil {
w.ring[pos] = make(chan struct{})
}
// fmt.Println("pos:", pos)
c := w.ring[pos]
w.Unlock()
return c
}
func (w *Wheel) Now() time.Time {
w.RLock()
now := w.now
w.RUnlock()
return now
}
type CountWatch struct {
start time.Time
}
func (w *CountWatch) Start() {
var t time.Time
if t.Equal(w.start) {
w.start = time.Now()
}
}
func (w *CountWatch) Reset() {
w.start = time.Now()
}
func (w *CountWatch) Count() int64 {
return time.Since(w.start).Nanoseconds()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment