Commit 5846fc59 authored by AlexStocks's avatar AlexStocks Committed by watermelo

Add: writev

parent 96ceb767
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
# LICENCE : Apache License 2.0 # LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com # EMAIL : alexstocks@foxmail.com
# MOD : 2016-08-17 11:21 # MOD : 2016-08-17 11:21
# FILE : conn.go # FILE : connection.go
******************************************************/ ******************************************************/
package getty package getty
...@@ -89,7 +89,7 @@ func (c *gettyConn) GetActive() time.Time { ...@@ -89,7 +89,7 @@ func (c *gettyConn) GetActive() time.Time {
return launchTime.Add(time.Duration(atomic.LoadInt64(&(c.active)))) return launchTime.Add(time.Duration(atomic.LoadInt64(&(c.active))))
} }
func (c *gettyConn) Write(interface{}) (int, error) { func (c *gettyConn) send(interface{}) (int, error) {
return 0, nil return 0, nil
} }
...@@ -228,7 +228,7 @@ func (t *gettyTCPConn) SetCompressType(c CompressType) { ...@@ -228,7 +228,7 @@ func (t *gettyTCPConn) SetCompressType(c CompressType) {
} }
// tcp connection read // tcp connection read
func (t *gettyTCPConn) read(p []byte) (int, error) { func (t *gettyTCPConn) recv(p []byte) (int, error) {
var ( var (
err error err error
currentTime time.Time currentTime time.Time
...@@ -258,7 +258,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) { ...@@ -258,7 +258,7 @@ func (t *gettyTCPConn) read(p []byte) (int, error) {
} }
// tcp connection write // tcp connection write
func (t *gettyTCPConn) Write(pkg interface{}) (int, error) { func (t *gettyTCPConn) send(pkg interface{}) (int, error) {
var ( var (
err error err error
currentTime time.Time currentTime time.Time
...@@ -379,7 +379,7 @@ func (u *gettyUDPConn) SetCompressType(c CompressType) { ...@@ -379,7 +379,7 @@ func (u *gettyUDPConn) SetCompressType(c CompressType) {
} }
// udp connection read // udp connection read
func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) { func (u *gettyUDPConn) recv(p []byte) (int, *net.UDPAddr, error) {
var ( var (
err error err error
currentTime time.Time currentTime time.Time
...@@ -411,7 +411,7 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) { ...@@ -411,7 +411,7 @@ func (u *gettyUDPConn) read(p []byte) (int, *net.UDPAddr, error) {
} }
// write udp packet, @ctx should be of type UDPContext // write udp packet, @ctx should be of type UDPContext
func (u *gettyUDPConn) Write(udpCtx interface{}) (int, error) { func (u *gettyUDPConn) send(udpCtx interface{}) (int, error) {
var ( var (
err error err error
currentTime time.Time currentTime time.Time
...@@ -539,7 +539,7 @@ func (w *gettyWSConn) handlePong(string) error { ...@@ -539,7 +539,7 @@ func (w *gettyWSConn) handlePong(string) error {
} }
// websocket connection read // websocket connection read
func (w *gettyWSConn) read() ([]byte, error) { func (w *gettyWSConn) recv() ([]byte, error) {
// Pls do not set read deadline when using ReadMessage. AlexStocks 20180310 // Pls do not set read deadline when using ReadMessage. AlexStocks 20180310
// gorilla/websocket/conn.go:NextReader will always fail when got a timeout error. // gorilla/websocket/conn.go:NextReader will always fail when got a timeout error.
_, b, e := w.conn.ReadMessage() // the first return value is message type. _, b, e := w.conn.ReadMessage() // the first return value is message type.
...@@ -578,7 +578,7 @@ func (w *gettyWSConn) updateWriteDeadline() error { ...@@ -578,7 +578,7 @@ func (w *gettyWSConn) updateWriteDeadline() error {
} }
// websocket connection write // websocket connection write
func (w *gettyWSConn) Write(pkg interface{}) (int, error) { func (w *gettyWSConn) send(pkg interface{}) (int, error) {
var ( var (
err error err error
ok bool ok bool
......
...@@ -22,11 +22,12 @@ func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, ...@@ -22,11 +22,12 @@ func (h *PackageHandler) Read(ss getty.Session, data []byte) (interface{}, int,
return s, len(s), nil return s, len(s), nil
} }
func (h *PackageHandler) Write(ss getty.Session, pkg interface{}) error { func (h *PackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
s, ok := pkg.(string) s, ok := pkg.(string)
if !ok { if !ok {
log.Infof("illegal pkg:%+v", pkg) log.Infof("illegal pkg:%+v", pkg)
return errors.New("invalid package") return nil, errors.New("invalid package")
} }
return ss.WriteBytes([]byte(s))
return []byte(s), nil
} }
...@@ -34,7 +34,7 @@ var ( ...@@ -34,7 +34,7 @@ var (
) )
var ( var (
taskPool *gxsync.TaskPool taskPool *gxsync.TaskPool
) )
func main() { func main() {
...@@ -44,13 +44,13 @@ func main() { ...@@ -44,13 +44,13 @@ func main() {
util.Profiling(*pprofPort) util.Profiling(*pprofPort)
if *taskPoolMode { if *taskPoolMode {
taskPool = gxsync.NewTaskPool( taskPool = gxsync.NewTaskPool(
gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength), gxsync.WithTaskPoolTaskQueueLength(*taskPoolQueueLength),
gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber), gxsync.WithTaskPoolTaskQueueNumber(*taskPoolQueueNumber),
gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize), gxsync.WithTaskPoolTaskPoolSize(*taskPoolSize),
) )
} }
client := getty.NewTCPClient( client := getty.NewTCPClient(
getty.WithServerAddress(*ip+":8090"), getty.WithServerAddress(*ip+":8090"),
...@@ -63,4 +63,3 @@ func main() { ...@@ -63,4 +63,3 @@ func main() {
util.WaitCloseSignals(client) util.WaitCloseSignals(client)
} }
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
import ( import (
"github.com/dubbogo/getty" "github.com/dubbogo/getty"
"github.com/dubbogo/gost/sync"
) )
import ( import (
...@@ -30,12 +31,12 @@ func NewHelloClientSession(session getty.Session, taskPool *gxsync.TaskPool) (er ...@@ -30,12 +31,12 @@ func NewHelloClientSession(session getty.Session, taskPool *gxsync.TaskPool) (er
eventListener.SessionOnOpen = func(session getty.Session) { eventListener.SessionOnOpen = func(session getty.Session) {
hello.Sessions = append(hello.Sessions, session) hello.Sessions = append(hello.Sessions, session)
} }
err = InitialSession(session) err = InitialSession(session)
if err != nil { if err != nil {
return return
} }
session.SetTaskPool(taskPool) session.SetTaskPool(taskPool)
return return
} }
func InitialSession(session getty.Session) (err error) { func InitialSession(session getty.Session) (err error) {
......
...@@ -38,7 +38,7 @@ func main() { ...@@ -38,7 +38,7 @@ func main() {
util.SetLimit() util.SetLimit()
util.Profiling(*pprofPort) util.Profiling(*pprofPort)
options := []getty.ServerOption{getty.WithLocalAddress(":8090")} options := []getty.ServerOption{getty.WithLocalAddress(":8090")}
......
...@@ -8,13 +8,13 @@ ...@@ -8,13 +8,13 @@
package util package util
import ( import (
"fmt" "fmt"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
) )
func Profiling(port int) { func Profiling(port int) {
go func() { go func() {
http.ListenAndServe(fmt.Sprintf(":%d", port), nil) http.ListenAndServe(fmt.Sprintf(":%d", port), nil)
}() }()
} }
...@@ -46,10 +46,10 @@ type Reader interface { ...@@ -46,10 +46,10 @@ type Reader interface {
// Writer is used to marshal pkg and write to session // Writer is used to marshal pkg and write to session
type Writer interface { type Writer interface {
// if @Session is udpGettySession, the second parameter is UDPContext. // if @Session is udpGettySession, the second parameter is UDPContext.
Write(Session, interface{}) error Write(Session, interface{}) ([]byte, error)
} }
// tcp package handler interface // package handler interface
type ReadWriter interface { type ReadWriter interface {
Reader Reader
Writer Writer
...@@ -120,7 +120,7 @@ type Connection interface { ...@@ -120,7 +120,7 @@ type Connection interface {
writeTimeout() time.Duration writeTimeout() time.Duration
// SetWriteTimeout sets deadline for the future read calls. // SetWriteTimeout sets deadline for the future read calls.
SetWriteTimeout(time.Duration) SetWriteTimeout(time.Duration)
Write(interface{}) (int, error) send(interface{}) (int, error)
// don't distinguish between tcp connection and websocket connection. Because // don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close // gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close
close(int) close(int)
......
...@@ -33,6 +33,7 @@ const ( ...@@ -33,6 +33,7 @@ const (
period = 60 * 1e9 // 1 minute period = 60 * 1e9 // 1 minute
pendingDuration = 3e9 pendingDuration = 3e9
defaultQLen = 1024 defaultQLen = 1024
maxIovecNum = 10
defaultSessionName = "session" defaultSessionName = "session"
defaultTCPSessionName = "tcp-session" defaultTCPSessionName = "tcp-session"
defaultUDPSessionName = "udp-session" defaultUDPSessionName = "udp-session"
...@@ -108,7 +109,6 @@ func newSession(endPoint EndPoint, conn Connection) *session { ...@@ -108,7 +109,6 @@ func newSession(endPoint EndPoint, conn Connection) *session {
wait: pendingDuration, wait: pendingDuration,
attrs: NewValuesContext(nil), attrs: NewValuesContext(nil),
rDone: make(chan struct{}), rDone: make(chan struct{}),
grNum: 0,
} }
ss.Connection.setSession(ss) ss.Connection.setSession(ss)
...@@ -355,6 +355,9 @@ func (s *session) sessionToken() string { ...@@ -355,6 +355,9 @@ func (s *session) sessionToken() string {
} }
func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
if pkg == nil {
return fmt.Errorf("@pkg is nil")
}
if s.IsClosed() { if s.IsClosed() {
return ErrSessionClosed return ErrSessionClosed
} }
...@@ -368,12 +371,31 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { ...@@ -368,12 +371,31 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
} }
}() }()
var err error
if timeout <= 0 { if timeout <= 0 {
if err = s.writer.Write(s, pkg); err != nil { pkgBytes, err := s.writer.Write(s, pkg)
if err != nil {
log.Warnf("%s, [session.WritePkg] session.writer.Write(@pkg:%#v) = error:%v", s.Stat(), pkg, err)
return perrors.WithStack(err)
}
var udpCtxPtr *UDPContext
if udpCtx, ok := pkg.(UDPContext); ok {
udpCtxPtr = &udpCtx
} else if udpCtxP, ok := pkg.(*UDPContext); ok {
udpCtxPtr = udpCtxP
}
if udpCtxPtr != nil {
udpCtxPtr.Pkg = pkgBytes
pkg = *udpCtxPtr
} else {
pkg = pkgBytes
}
_, err = s.Connection.send(pkg)
if err != nil {
log.Warn("%s, [session.WritePkg] @s.Connection.Write(pkg:%#v) = err:%v", s.Stat(), pkg, err)
return perrors.WithStack(err) return perrors.WithStack(err)
} }
s.incWritePkgNum() s.incWritePkgNum()
return nil
} }
select { select {
case s.wQ <- pkg: case s.wQ <- pkg:
...@@ -394,7 +416,7 @@ func (s *session) WriteBytes(pkg []byte) error { ...@@ -394,7 +416,7 @@ func (s *session) WriteBytes(pkg []byte) error {
} }
// s.conn.SetWriteTimeout(time.Now().Add(s.wTimeout)) // s.conn.SetWriteTimeout(time.Now().Add(s.wTimeout))
if _, err := s.Connection.Write(pkg); err != nil { if _, err := s.Connection.send(pkg); err != nil {
return perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg)) return perrors.Wrapf(err, "s.Connection.Write(pkg len:%d)", len(pkg))
} }
...@@ -403,7 +425,7 @@ func (s *session) WriteBytes(pkg []byte) error { ...@@ -403,7 +425,7 @@ func (s *session) WriteBytes(pkg []byte) error {
return nil return nil
} }
// Write multiple packages at once // Write multiple packages at once. so we invoke write sys.call just one time.
func (s *session) WriteBytesArray(pkgs ...[]byte) error { func (s *session) WriteBytesArray(pkgs ...[]byte) error {
if s.IsClosed() { if s.IsClosed() {
return ErrSessionClosed return ErrSessionClosed
...@@ -438,7 +460,6 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { ...@@ -438,7 +460,6 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
l += len(pkgs[i]) l += len(pkgs[i])
} }
// return s.Connection.Write(arr)
if err = s.WriteBytes(arr); err != nil { if err = s.WriteBytes(arr); err != nil {
return perrors.WithStack(err) return perrors.WithStack(err)
} }
...@@ -446,7 +467,6 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error { ...@@ -446,7 +467,6 @@ func (s *session) WriteBytesArray(pkgs ...[]byte) error {
num := len(pkgs) - 1 num := len(pkgs) - 1
for i := 0; i < num; i++ { for i := 0; i < num; i++ {
s.incWritePkgNum() s.incWritePkgNum()
// gxlog.CError("after write, ss:%s", s.Stat())
} }
return nil return nil
...@@ -468,7 +488,7 @@ func (s *session) run() { ...@@ -468,7 +488,7 @@ func (s *session) run() {
// call session opened // call session opened
s.UpdateActive() s.UpdateActive()
if err := s.listener.OnOpen(s); err != nil { if err := s.listener.OnOpen(s); err != nil {
log.Errorf("[OnOpen] error: %#v", err) log.Errorf("[OnOpen] session %s, error: %#v", s.Stat(), err)
s.Close() s.Close()
return return
} }
...@@ -481,13 +501,17 @@ func (s *session) run() { ...@@ -481,13 +501,17 @@ func (s *session) run() {
func (s *session) handleLoop() { func (s *session) handleLoop() {
var ( var (
err error err error
flag bool ok bool
wsFlag bool flag bool
wsConn *gettyWSConn wsFlag bool
// start time.Time udpFlag bool
counter gxtime.CountWatch loopFlag bool
outPkg interface{} wsConn *gettyWSConn
counter gxtime.CountWatch
outPkg interface{}
pkgBytes []byte
iovec [][]byte
) )
defer func() { defer func() {
...@@ -506,6 +530,8 @@ func (s *session) handleLoop() { ...@@ -506,6 +530,8 @@ func (s *session) handleLoop() {
flag = true // do not do any read/Write/cron operation while got Write error flag = true // do not do any read/Write/cron operation while got Write error
wsConn, wsFlag = s.Connection.(*gettyWSConn) wsConn, wsFlag = s.Connection.(*gettyWSConn)
_, udpFlag = s.Connection.(*gettyUDPConn)
iovec = make([][]byte, 0, maxIovecNum)
LOOP: LOOP:
for { for {
// A select blocks until one of its cases is ready to run. // A select blocks until one of its cases is ready to run.
...@@ -519,21 +545,67 @@ LOOP: ...@@ -519,21 +545,67 @@ LOOP:
break LOOP break LOOP
} }
counter.Start() counter.Start()
// if time.Since(start).Nanoseconds() >= s.wait.Nanoseconds() {
if counter.Count() > s.wait.Nanoseconds() { if counter.Count() > s.wait.Nanoseconds() {
log.Infof("%s, [session.handleLoop] got done signal ", s.Stat()) log.Infof("%s, [session.handleLoop] got done signal ", s.Stat())
break LOOP break LOOP
} }
case outPkg = <-s.wQ: case outPkg, ok = <-s.wQ:
if flag { if !ok {
if err = s.writer.Write(s, outPkg); err != nil { continue
}
if !flag {
log.Warn("[session.handleLoop] drop write out package %#v", outPkg)
continue
}
if udpFlag || wsFlag {
err = s.WritePkg(outPkg, 0)
if err != nil {
log.Errorf("%s, [session.handleLoop] = error:%+v", s.sessionToken(), err) log.Errorf("%s, [session.handleLoop] = error:%+v", s.sessionToken(), err)
s.stop() s.stop()
// break LOOP
flag = false flag = false
}
continue
}
iovec = iovec[:0]
for idx := 0; idx < maxIovecNum; idx++ {
pkgBytes, err = s.writer.Write(s, outPkg)
if err != nil {
log.Errorf("%s, [session.handleLoop] = error:%+v", s.sessionToken(), err)
s.stop()
// break LOOP // break LOOP
flag = false
break
}
iovec = append(iovec, pkgBytes)
if idx < maxIovecNum-1 {
loopFlag = true
select {
case outPkg, ok = <-s.wQ:
if !ok {
loopFlag = false
}
default:
loopFlag = false
break
}
if !loopFlag {
break // break for-idx loop
}
} }
} else { }
log.Infof("[session.handleLoop] drop writeout package{%#v}", outPkg) err = s.WriteBytesArray(iovec[:]...)
if err != nil {
log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v",
s.sessionToken(), len(iovec), err)
s.stop()
// break LOOP
flag = false
} }
case <-wheel.After(s.period): case <-wheel.After(s.period):
...@@ -643,7 +715,7 @@ func (s *session) handleTCPPackage() error { ...@@ -643,7 +715,7 @@ func (s *session) handleTCPPackage() error {
for { for {
// for clause for the network timeout condition check // for clause for the network timeout condition check
// s.conn.SetReadTimeout(time.Now().Add(s.rTimeout)) // s.conn.SetReadTimeout(time.Now().Add(s.rTimeout))
bufLen, err = conn.read(buf) bufLen, err = conn.recv(buf)
if err != nil { if err != nil {
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() { if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
break break
...@@ -664,7 +736,6 @@ func (s *session) handleTCPPackage() error { ...@@ -664,7 +736,6 @@ func (s *session) handleTCPPackage() error {
if pktBuf.Len() <= 0 { if pktBuf.Len() <= 0 {
break break
} }
// pkg, err = s.pkgHandler.Read(s, pktBuf)
pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes()) pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes())
// for case 3/case 4 // for case 3/case 4
if err == nil && s.maxMsgLen > 0 && pkgLen > int(s.maxMsgLen) { if err == nil && s.maxMsgLen > 0 && pkgLen > int(s.maxMsgLen) {
...@@ -724,7 +795,7 @@ func (s *session) handleUDPPackage() error { ...@@ -724,7 +795,7 @@ func (s *session) handleUDPPackage() error {
break break
} }
bufLen, addr, err = conn.read(buf) bufLen, addr, err = conn.recv(buf)
log.Debugf("conn.read() = bufLen:%d, addr:%#v, err:%+v", bufLen, addr, err) log.Debugf("conn.read() = bufLen:%d, addr:%#v, err:%+v", bufLen, addr, err)
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() { if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
continue continue
...@@ -785,7 +856,7 @@ func (s *session) handleWSPackage() error { ...@@ -785,7 +856,7 @@ func (s *session) handleWSPackage() error {
if s.IsClosed() { if s.IsClosed() {
break break
} }
pkg, err = conn.read() pkg, err = conn.recv()
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() { if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
continue continue
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment