Commit b3e7c618 authored by AlexStocks's avatar AlexStocks

add zip/snappy compress

parent 5d50a01f
...@@ -11,12 +11,17 @@ ...@@ -11,12 +11,17 @@
## develop history ## ## develop history ##
--- ---
- 2016/11/16
> 1 add zip/snappy compress for tcp/ws connection
>
> 2 version: 0.6.01
- 2016/11/02 - 2016/11/02
> 1 add session.go:Session{ID(), LocalAddr(), RemoteAddr()} > 1 add session.go:Session{ID(), LocalAddr(), RemoteAddr()}
> >
> 2 add conn.go:iConn{id(), localAddr(), remoteAddr()} > 2 add conn.go:iConn{id(), localAddr(), remoteAddr()}
> >
> 2 version: 0.4.08 > 3 version: 0.4.08
- 2016/11/01 - 2016/11/01
> 1 session.go:Session{maxPkgLen(int)} -> Session{maxPkgLen(int32)} > 1 session.go:Session{maxPkgLen(int)} -> Session{maxPkgLen(int32)}
......
...@@ -11,6 +11,9 @@ package getty ...@@ -11,6 +11,9 @@ package getty
import ( import (
// "errors" // "errors"
"compress/flate"
"fmt"
"io"
"net" "net"
"sync/atomic" "sync/atomic"
"time" "time"
...@@ -18,6 +21,8 @@ import ( ...@@ -18,6 +21,8 @@ import (
import ( import (
log "github.com/AlexStocks/log4go" log "github.com/AlexStocks/log4go"
"github.com/golang/snappy"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
...@@ -28,11 +33,24 @@ var ( ...@@ -28,11 +33,24 @@ var (
) )
///////////////////////////////////////// /////////////////////////////////////////
// compress
/////////////////////////////////////////
type CompressType byte
const (
CompressNone CompressType = 0x00
CompressZip = 0x01
CompressSnappy = 0x02
)
/////////////////////////////////////////
// connection interfacke // connection interfacke
///////////////////////////////////////// /////////////////////////////////////////
type iConn interface { type iConn interface {
id() uint32 id() uint32
setCompressType(t CompressType)
localAddr() string localAddr() string
remoteAddr() string remoteAddr() string
incReadPkgCount() incReadPkgCount()
...@@ -59,12 +77,14 @@ var ( ...@@ -59,12 +77,14 @@ var (
type gettyConn struct { type gettyConn struct {
ID uint32 ID uint32
padding uint32 // last active, in milliseconds compress CompressType
padding1 uint8
padding2 uint16
readCount uint32 // read() count readCount uint32 // read() count
writeCount uint32 // write() count writeCount uint32 // write() count
readPkgCount uint32 // send pkg count readPkgCount uint32 // send pkg count
writePkgCount uint32 // recv pkg count writePkgCount uint32 // recv pkg count
active int64 // active active int64 // last active, in milliseconds
rDeadline time.Duration // network current limiting rDeadline time.Duration // network current limiting
wDeadline time.Duration wDeadline time.Duration
local string // local address local string // local address
...@@ -138,7 +158,9 @@ func (this *gettyConn) setWriteDeadline(wDeadline time.Duration) { ...@@ -138,7 +158,9 @@ func (this *gettyConn) setWriteDeadline(wDeadline time.Duration) {
type gettyTCPConn struct { type gettyTCPConn struct {
gettyConn gettyConn
conn net.Conn reader io.Reader
writer io.Writer
conn net.Conn
} }
// create gettyTCPConn // create gettyTCPConn
...@@ -156,15 +178,58 @@ func newGettyTCPConn(conn net.Conn) *gettyTCPConn { ...@@ -156,15 +178,58 @@ func newGettyTCPConn(conn net.Conn) *gettyTCPConn {
} }
return &gettyTCPConn{ return &gettyTCPConn{
conn: conn, conn: conn,
reader: io.Reader(conn),
writer: io.Writer(conn),
gettyConn: gettyConn{ gettyConn: gettyConn{
ID: atomic.AddUint32(&connID, 1), ID: atomic.AddUint32(&connID, 1),
local: localAddr, local: localAddr,
peer: peerAddr, peer: peerAddr,
compress: CompressNone,
}, },
} }
} }
// for zip compress
type writeFlusher struct {
flusher *flate.Writer
}
func (this *writeFlusher) Write(p []byte) (int, error) {
var (
n int
err error
)
n, err = this.flusher.Write(p)
if err != nil {
return n, err
}
if err := this.flusher.Flush(); err != nil {
return 0, err
}
return n, nil
}
// set compress type(tcp: zip/snappy, websocket:zip)
func (this *gettyTCPConn) setCompressType(t CompressType) {
switch {
case t == CompressZip:
this.reader = flate.NewReader(this.conn)
w, err := flate.NewWriter(this.conn, flate.DefaultCompression)
if err != nil {
panic(fmt.Sprintf("flate.NewReader(flate.DefaultCompress) = err(%s)", err))
}
this.writer = &writeFlusher{flusher: w}
case t == CompressSnappy:
this.reader = snappy.NewReader(this.conn)
this.writer = snappy.NewWriter(this.conn)
}
}
// tcp connection read // tcp connection read
func (this *gettyTCPConn) read(p []byte) (int, error) { func (this *gettyTCPConn) read(p []byte) (int, error) {
// if this.conn == nil { // if this.conn == nil {
...@@ -172,7 +237,8 @@ func (this *gettyTCPConn) read(p []byte) (int, error) { ...@@ -172,7 +237,8 @@ func (this *gettyTCPConn) read(p []byte) (int, error) {
// } // }
// atomic.AddUint32(&this.readCount, 1) // atomic.AddUint32(&this.readCount, 1)
l, e := this.conn.Read(p) // l, e := this.conn.Read(p)
l, e := this.reader.Read(p)
atomic.AddUint32(&this.readCount, uint32(l)) atomic.AddUint32(&this.readCount, uint32(l))
return l, e return l, e
} }
...@@ -185,7 +251,9 @@ func (this *gettyTCPConn) write(p []byte) error { ...@@ -185,7 +251,9 @@ func (this *gettyTCPConn) write(p []byte) error {
// atomic.AddUint32(&this.writeCount, 1) // atomic.AddUint32(&this.writeCount, 1)
atomic.AddUint32(&this.writeCount, (uint32)(len(p))) atomic.AddUint32(&this.writeCount, (uint32)(len(p)))
_, err := this.conn.Write(p) // _, err := this.conn.Write(p)
_, err := this.writer.Write(p)
return err return err
} }
...@@ -239,6 +307,16 @@ func newGettyWSConn(conn *websocket.Conn) *gettyWSConn { ...@@ -239,6 +307,16 @@ func newGettyWSConn(conn *websocket.Conn) *gettyWSConn {
return gettyWSConn return gettyWSConn
} }
// set compress type(tcp: zip/snappy, websocket:zip)
func (this *gettyWSConn) setCompressType(t CompressType) {
switch {
case t == CompressZip:
this.conn.EnableWriteCompression(true)
case t == CompressSnappy:
this.conn.EnableWriteCompression(true)
}
}
func (this *gettyWSConn) handlePing(message string) error { func (this *gettyWSConn) handlePing(message string) error {
err := this.conn.WriteMessage(websocket.PongMessage, []byte(message)) err := this.conn.WriteMessage(websocket.PongMessage, []byte(message))
if err == websocket.ErrCloseSent { if err == websocket.ErrCloseSent {
......
...@@ -168,6 +168,10 @@ func (this *Session) ID() uint32 { ...@@ -168,6 +168,10 @@ func (this *Session) ID() uint32 {
return this.iConn.id() return this.iConn.id()
} }
func (this *Session) SetCompressType(t CompressType) {
this.iConn.setCompressType(t)
}
// get local address // get local address
func (this *Session) LocalAddr() string { func (this *Session) LocalAddr() string {
return this.iConn.localAddr() return this.iConn.localAddr()
......
...@@ -10,9 +10,9 @@ ...@@ -10,9 +10,9 @@
package getty package getty
const ( const (
Version = "0.4.08" Version = "0.6.01"
DATE = "2016/11/02" DATE = "2016/11/16"
GETTY_MAJOR = 0 GETTY_MAJOR = 0
GETTY_MINOR = 4 GETTY_MINOR = 6
GETTY_BUILD = 8 GETTY_BUILD = 1
) )
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment