Commit f67153eb authored by AlexStocks's avatar AlexStocks

use gxnet.IsSameAddr, send out pkg asap in WritePkg, delete Chinese commenting

parent a8f2e95a
...@@ -16,6 +16,12 @@ ...@@ -16,6 +16,12 @@
- 2018/03/18 - 2018/03/18
> improvement > improvement
* use gxnet.IsSameAddr
* send out pkg asap in WritePkg when the second parameter @timeout is not greater then 0.
* delete Chinese commenting
- 2018/03/18
> improvement
* nerr -> netError * nerr -> netError
* check udp connection alive after connect() * check udp connection alive after connect()
* use ReadFromUDP as the uniform UDP read interface * use ReadFromUDP as the uniform UDP read interface
......
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
) )
import ( import (
"github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/sync" "github.com/AlexStocks/goext/sync"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
...@@ -126,7 +127,7 @@ func (c *client) dialTCP() Session { ...@@ -126,7 +127,7 @@ func (c *client) dialTCP() Session {
return nil return nil
} }
conn, err = net.DialTimeout("tcp", c.addr, connectTimeout) conn, err = net.DialTimeout("tcp", c.addr, connectTimeout)
if err == nil && conn.LocalAddr().String() == conn.RemoteAddr().String() { if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close() conn.Close()
err = errSelfConnect err = errSelfConnect
} }
...@@ -158,7 +159,7 @@ func (c *client) dialUDP() Session { ...@@ -158,7 +159,7 @@ func (c *client) dialUDP() Session {
return nil return nil
} }
conn, err = net.DialUDP("udp", localAddr, peerAddr) conn, err = net.DialUDP("udp", localAddr, peerAddr)
if err == nil && conn.LocalAddr().String() == conn.RemoteAddr().String() { if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close() conn.Close()
err = errSelfConnect err = errSelfConnect
} }
...@@ -212,7 +213,7 @@ func (c *client) dialWS() Session { ...@@ -212,7 +213,7 @@ func (c *client) dialWS() Session {
} }
conn, _, err = dialer.Dial(c.addr, nil) conn, _, err = dialer.Dial(c.addr, nil)
log.Info("websocket.dialer.Dial(addr:%s) = error:%s", c.addr, err) log.Info("websocket.dialer.Dial(addr:%s) = error:%s", c.addr, err)
if err == nil && conn.LocalAddr().String() == conn.RemoteAddr().String() { if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close() conn.Close()
err = errSelfConnect err = errSelfConnect
} }
...@@ -290,7 +291,7 @@ func (c *client) dialWSS() Session { ...@@ -290,7 +291,7 @@ func (c *client) dialWSS() Session {
return nil return nil
} }
conn, _, err = dialer.Dial(c.addr, nil) conn, _, err = dialer.Dial(c.addr, nil)
if err == nil && conn.LocalAddr().String() == conn.RemoteAddr().String() { if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
conn.Close() conn.Close()
err = errSelfConnect err = errSelfConnect
} }
......
...@@ -71,9 +71,9 @@ type ClientOptions struct { ...@@ -71,9 +71,9 @@ type ClientOptions struct {
addr string addr string
number int number int
// for wss client // the cert file of wss server which may contain server domain, server ip, the starting effective date, effective
// 服务端的证书文件(包含了公钥以及服务端其他一些验证信息:服务端域名、 // duration, the hash alg, the len of the private key.
// 服务端ip、起始有效日期、有效时长、hash算法、秘钥长度等) // wss client will use it.
cert string cert string
} }
......
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
) )
import ( import (
"github.com/AlexStocks/goext/net"
"github.com/AlexStocks/goext/sync" "github.com/AlexStocks/goext/sync"
"github.com/AlexStocks/goext/time" "github.com/AlexStocks/goext/time"
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
...@@ -117,16 +118,15 @@ func (s *server) stop() { ...@@ -117,16 +118,15 @@ func (s *server) stop() {
if s.server != nil { if s.server != nil {
ctx, _ = context.WithTimeout(context.Background(), serverFastFailTimeout) ctx, _ = context.WithTimeout(context.Background(), serverFastFailTimeout)
if err = s.server.Shutdown(ctx); err != nil { if err = s.server.Shutdown(ctx); err != nil {
// 如果下面内容输出为:server shutdown ctx: context deadline exceeded, // if the log output is "shutdown ctx: context deadline exceeded", it means that
// 则说明有未处理完的active connections。 // there are still some active connections.
log.Error("server shutdown ctx:%s error:%s", ctx, err) log.Error("server shutdown ctx:%s error:%s", ctx, err)
} }
} }
s.server = nil s.server = nil
s.lock.Unlock() s.lock.Unlock()
if s.streamListener != nil { if s.streamListener != nil {
// 把streamListener.Close放在这里,既能防止多次关闭调用, // let the server exit asap when got error from RunEventLoop.
// 又能及时让Server因accept返回错误而从RunEventLoop退出
s.streamListener.Close() s.streamListener.Close()
s.streamListener = nil s.streamListener = nil
} }
...@@ -207,7 +207,7 @@ func (s *server) accept(newSession NewSessionCallback) (Session, error) { ...@@ -207,7 +207,7 @@ func (s *server) accept(newSession NewSessionCallback) (Session, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if conn.RemoteAddr().String() == conn.LocalAddr().String() { if gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) {
log.Warn("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String()) log.Warn("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String())
return nil, errSelfConnect return nil, errSelfConnect
} }
...@@ -383,7 +383,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) { ...@@ -383,7 +383,7 @@ func (s *server) runWSSEventLoop(newSession NewSessionCallback) {
return return
} }
config = &tls.Config{ config = &tls.Config{
InsecureSkipVerify: true, // 不对对端的证书进行校验 InsecureSkipVerify: true, // do not verify peer cert
ClientAuth: tls.NoClientCert, ClientAuth: tls.NoClientCert,
NextProtos: []string{"http/1.1"}, NextProtos: []string{"http/1.1"},
Certificates: []tls.Certificate{certificate}, Certificates: []tls.Certificate{certificate},
......
...@@ -297,7 +297,7 @@ func (s *session) sessionToken() string { ...@@ -297,7 +297,7 @@ func (s *session) sessionToken() string {
return fmt.Sprintf("{%s:%s:%d:%s<->%s}", s.name, s.EndPoint().EndPointType(), s.ID(), s.LocalAddr(), s.RemoteAddr()) return fmt.Sprintf("{%s:%s:%d:%s<->%s}", s.name, s.EndPoint().EndPointType(), s.ID(), s.LocalAddr(), s.RemoteAddr())
} }
// Queued Write, for handler. // Queued Write, for handler. Pls attention that if timeout is less than 0, WritePkg will send @pkg asap.
// For udp session, the @pkg should be UDPContext. // For udp session, the @pkg should be UDPContext.
func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
if s.IsClosed() { if s.IsClosed() {
...@@ -314,7 +314,8 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error { ...@@ -314,7 +314,8 @@ func (s *session) WritePkg(pkg interface{}, timeout time.Duration) error {
}() }()
if timeout <= 0 { if timeout <= 0 {
timeout = netIOTimeout _, err := s.Connection.Write(pkg)
return err
} }
select { select {
case s.wQ <- pkg: case s.wQ <- pkg:
...@@ -442,7 +443,7 @@ LOOP: ...@@ -442,7 +443,7 @@ LOOP:
// It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready. // It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready.
select { select {
case <-s.done: case <-s.done:
// 这个分支确保(session)handleLoop gr在(session)handlePackage gr之后退出 // this case branch assure the (session)handleLoop gr will exit before (session)handlePackage gr.
// once.Do(func() { ticker.Stop() }) // use wheel instead, 2016/09/26 // once.Do(func() { ticker.Stop() }) // use wheel instead, 2016/09/26
if atomic.LoadInt32(&(s.grNum)) == 1 { // make sure @(session)handlePackage goroutine has been closed. if atomic.LoadInt32(&(s.grNum)) == 1 { // make sure @(session)handlePackage goroutine has been closed.
if len(s.rQ) == 0 && len(s.wQ) == 0 { if len(s.rQ) == 0 && len(s.wQ) == 0 {
...@@ -458,7 +459,7 @@ LOOP: ...@@ -458,7 +459,7 @@ LOOP:
} }
case inPkg = <-s.rQ: case inPkg = <-s.rQ:
// 这个条件分支通过(session)rQ排空确保(session)handlePackage gr不会阻塞在(session)rQ上 // read the s.rQ and assure (session)handlePackage gr will not block by (session)rQ.
if flag { if flag {
log.Debug("%#v <-s.rQ", inPkg) log.Debug("%#v <-s.rQ", inPkg)
s.listener.OnMessage(s, inPkg) s.listener.OnMessage(s, inPkg)
...@@ -558,7 +559,9 @@ func (s *session) handleTCPPackage() error { ...@@ -558,7 +559,9 @@ func (s *session) handleTCPPackage() error {
for { for {
if s.IsClosed() { if s.IsClosed() {
err = nil err = nil
break // 退出前不再读取任何packet,buf中剩余的stream bytes也不可能凑够一个package, 所以直接退出 // do not handle the left stream in pktBuf and exit asap.
// it is impossible packing a package by the left stream.
break
} }
bufLen = 0 bufLen = 0
......
...@@ -10,9 +10,9 @@ ...@@ -10,9 +10,9 @@
package getty package getty
const ( const (
Version = "0.8.2" Version = "0.8.3"
DATE = "2018/03/17" DATE = "2018/03/18"
GETTY_MAJOR = 0 GETTY_MAJOR = 0
GETTY_MINOR = 8 GETTY_MINOR = 8
GETTY_BUILD = 2 GETTY_BUILD = 3
) )
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment