Commit 234e107d authored by alexstocks's avatar alexstocks

add websocket client&connection&server

parent e56a1388
......@@ -11,12 +11,14 @@ package getty
import (
"net"
"strings"
"sync"
"time"
)
import (
log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket"
)
const (
......@@ -25,6 +27,10 @@ const (
maxTimes = 10
)
/////////////////////////////////////////
// getty tcp client
/////////////////////////////////////////
type Client struct {
// net
sync.Mutex
......@@ -60,7 +66,7 @@ func NewClient(connNum int, connInterval time.Duration, serverAddr string) *Clie
}
}
func (this *Client) dial() net.Conn {
func (this *Client) dialTCP() *Session {
var (
err error
conn net.Conn
......@@ -75,15 +81,48 @@ func (this *Client) dial() net.Conn {
err = errSelfConnect
}
if err == nil {
return conn
return NewTCPSession(conn)
}
log.Info("net.Connect(addr:%s, timeout:%v) = error{%v}", this.addr, err)
log.Info("net.DialTimeout(addr:%s, timeout:%v) = error{%v}", this.addr, err)
time.Sleep(this.interval)
continue
}
}
func (this *Client) dialWS() *Session {
var (
err error
conn websocket.Conn
dialer websocket.Dialer
)
for {
if this.IsClosed() {
return nil
}
conn, _, err = dialer.Dial(this.addr, nil)
if err == nil && conn.LocalAddr().String() == conn.RemoteAddr().String() {
err = errSelfConnect
}
if err == nil {
return NewWSSession(conn)
}
log.Info("websocket.dialer.Dial(addr:%s) = error{%v}", this.addr, err)
time.Sleep(this.interval)
continue
}
}
func (this *Client) dial() *Session {
if strings.HasPrefix(this.addr, "ws") {
this.dialWS()
}
return this.dialTCP()
}
func (this *Client) sessionNum() int {
var num int
......@@ -102,17 +141,15 @@ func (this *Client) sessionNum() int {
func (this *Client) connect() {
var (
err error
conn net.Conn
session *Session
)
for {
conn = this.dial()
if conn == nil {
session = this.dial()
if session == nil {
// client has been closed
break
}
session = NewSession(conn)
err = this.newSession(session)
if err == nil {
session.RunEventLoop()
......@@ -121,7 +158,9 @@ func (this *Client) connect() {
this.Unlock()
break
}
conn.Close()
// don't distinguish between tcp connection and websocket connection. Because
// gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close()
session.Conn().Close()
}
}
......
/******************************************************
# DESC : tcp/websocket connection
# MAINTAINER : Alex Stocks
# LICENCE : Apache License 2.0
# EMAIL : alexstocks@foxmail.com
# MOD : 2016-08-17 11:21
# FILE : conn.go
******************************************************/
package getty
import (
// "errors"
"net"
"sync/atomic"
)
import (
"github.com/gorilla/websocket"
)
var (
// ErrInvalidConnection = errors.New("connection has been closed.")
)
type conn interface {
incReadPkgCount()
incWritePkgCount()
write(p []byte) error
close(int)
}
/////////////////////////////////////////
// getty connection
/////////////////////////////////////////
var (
connID uint32
)
type gettyConn struct {
ID uint32
readCount uint32 // read() count
writeCount uint32 // write() count
readPkgCount uint32 // send pkg count
writePkgCount uint32 // recv pkg count
local string // local address
peer string // peer address
}
func (this *gettyConn) incReadPkgCount() {
atomic.AddUint32(&this.readPkgCount, 1)
}
func (this *gettyConn) incWritePkgCount() {
atomic.AddUint32(&this.writePkgCount, 1)
}
func (this *gettyConn) write([]byte) error {
return nil
}
func (this *gettyConn) close(int) {}
/////////////////////////////////////////
// getty tcp connection
/////////////////////////////////////////
type gettyTCPConn struct {
gettyConn
conn net.Conn
}
// create gettyTCPConn
func newGettyTCPConn(conn net.Conn) *gettyConn {
if conn == nil {
panic("newGettyTCPConn(conn):@conn is nil")
}
var localAddr, peerAddr string
// check conn.LocalAddr or conn.RemoetAddr is nil to defeat panic on 2016/09/27
if conn.LocalAddr() != nil {
localAddr = conn.LocalAddr().String()
}
if conn.RemoteAddr() != nil {
peerAddr = conn.RemoteAddr().String()
}
return gettyTCPConn{
conn: conn,
gettyConn: gettyConn{
ID: atomic.AddUint32(&connID, 1),
local: localAddr,
peer: peerAddr,
},
}
}
// tcp connection read
func (this *gettyTCPConn) read(p []byte) (int, error) {
// if this.conn == nil {
// return 0, ErrInvalidConnection
// }
// atomic.AddUint32(&this.readCount, 1)
atomic.AddUint32(&this.readCount, (uint32)(len(p)))
return this.conn.Read(p)
}
// tcp connection write
func (this *gettyTCPConn) write(p []byte) error {
// if this.conn == nil {
// return 0, ErrInvalidConnection
// }
// atomic.AddUint32(&this.writeCount, 1)
atomic.AddUint32(&this.writeCount, (uint32)(len(p)))
_, err := this.conn.Write(p)
return err
}
// close tcp connection
func (this *gettyTCPConn) close(waitSec int) {
// if tcpConn, ok := this.conn.(*net.TCPConn); ok {
// tcpConn.SetLinger(0)
// }
if this.conn != nil {
this.conn.(*net.TCPConn).SetLinger(waitSec)
this.conn.Close()
this.conn = nil
}
}
/////////////////////////////////////////
// getty websocket connection
/////////////////////////////////////////
type gettyWSConn struct {
gettyConn
conn websocket.Conn
}
// create websocket connection
func newGettyWSConn(conn websocket.Conn) *gettyWSConn {
if conn == nil {
panic("newGettyWSConn(conn):@conn is nil")
}
var localAddr, peerAddr string
// check conn.LocalAddr or conn.RemoetAddr is nil to defeat panic on 2016/09/27
if conn.LocalAddr() != nil {
localAddr = conn.LocalAddr().String()
}
if conn.RemoteAddr() != nil {
peerAddr = conn.RemoteAddr().String()
}
return gettyWSConn{
conn: conn,
gettyConn: gettyConn{
ID: atomic.AddUint32(&connID, 1),
local: localAddr,
peer: peerAddr,
},
}
}
// websocket connection read
func (this *gettyWSConn) read() ([]byte, error) {
// l, b, e := this.conn.ReadMessage()
_, b, e := this.conn.ReadMessage()
if e == nil {
// atomic.AddUint32(&this.readCount, (uint32)(l))
atomic.AddUint32(&this.readPkgCount, 1)
}
return b, e
}
// websocket connection write
func (this *gettyWSConn) write(p []byte) error {
// atomic.AddUint32(&this.writeCount, 1)
atomic.AddUint32(&this.writeCount, (uint32)(len(p)))
return this.conn.WriteMessage(len(p), p)
}
// close websocket connection
func (this *gettyWSConn) close(waitSec int) {
this.conn.UnderlyingConn().(*net.TCPConn).SetLinger(waitSec)
this.conn.Close()
}
......@@ -11,6 +11,11 @@
## develop history ##
---
- 2016/10/08
> 1 add websocket connection & client & server
>
> 3 version: 0.4.0
- 2016/10/01
> 1 remark SetReadDeadline & SetWriteDeadline in session.go (ref: https://github.com/golang/go/issues/15133)
>
......
......@@ -12,6 +12,7 @@ package getty
import (
"errors"
"net"
"net/http"
"sync"
"time"
)
......@@ -19,6 +20,7 @@ import (
import (
"github.com/AlexStocks/goext/net"
log "github.com/AlexStocks/log4go"
"github.com/gorilla/websocket"
)
var (
......@@ -102,7 +104,7 @@ func (this *Server) RunEventloop(newSession NewSessionCallback) {
if delay != 0 {
time.Sleep(delay)
}
client, err = this.Accept(newSession)
client, err = this.accept(newSession)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
if delay == 0 {
......@@ -124,20 +126,83 @@ func (this *Server) RunEventloop(newSession NewSessionCallback) {
}()
}
type WSHandler struct {
server *Server
newSession NewSessionCallback
upgrader websocket.Upgrader
}
func NewWSHandler(server *Server) *WSHandler {
return &WSHandler{
server: server,
upgrader: websocket.Upgrader{
// HandshakeTimeout: server.HTTPTimeout,
CheckOrigin: func(_ *http.Request) bool { return true },
},
}
}
func (this *WSHandler) ServeHTTPServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "Method not allowed", 405)
return
}
if this.server.IsClosed() {
http.Error(w, "HTTP server is closed(code:500-11).", 500)
log.Warn("Server{%s} stop acceptting client connect request.", this.server.addr)
return
}
conn, err := this.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Warn("upgrader.Upgrader(http.Request{%#v}) = error{%#v}", r, err)
return
}
if conn.RemoteAddr().String() == conn.LocalAddr().String() {
return nil, errSelfConnect
log.Warn("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String())
return
}
// conn.SetReadLimit(int64(handler.maxMsgLen))
session := NewWSSession(conn)
err = this.newSession(session)
if err != nil {
conn.Close()
log.Warn("Server{%s}.newSession(session{%#v}) = err {%#v}", this.server.addr, session, err)
return nil
}
session.RunEventLoop()
}
func (this *Server) RunWSEventLoop(newSession NewSessionCallback) {
this.wg.Add(1)
go func() {
defer this.wg.Done()
http.Server{
Addr: this.addr,
Handler: NewWSHandler(this),
// ReadTimeout: server.HTTPTimeout,
// WriteTimeout: server.HTTPTimeout,
}.Serve(this.listener)
}()
}
func (this *Server) Listener() net.Listener {
return this.listener
}
func (this *Server) Accept(newSession NewSessionCallback) (*Session, error) {
func (this *Server) accept(newSession NewSessionCallback) (*Session, error) {
conn, err := this.listener.Accept()
if err != nil {
return nil, err
}
if conn.RemoteAddr().String() == conn.LocalAddr().String() {
log.Warn("conn.localAddr{%s} == conn.RemoteAddr", conn.LocalAddr().String(), conn.RemoteAddr().String())
return nil, errSelfConnect
}
session := NewSession(conn)
session := NewTCPSession(conn)
err = newSession(session)
if err != nil {
conn.Close()
......
This diff is collapsed.
......@@ -10,8 +10,6 @@
package getty
const (
Version = "0.3.14"
GETTY_MAJOR = 0
GETTY_MINOR = 3
GETTY_BUILD = 14
Version = "0.4.0"
DATE = "2016/10/08"
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment