Commit 70d1d8bc authored by alexstocks's avatar alexstocks

use new getty

parent 9e462daa
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
<input class="inputMessage" placeholder=":img url"/> <input class="inputMessage" placeholder=":img url"/>
</li> </li>
</ul> </ul>
<script src="https://code.jquery.com/jquery-1.10.2.min.js"></script> <script src="jquery-1.10.2.min.js"></script>
<script src="encoding.js"></script> <script src="encoding.js"></script>
<script src="struct.min.js"></script> <script src="struct.min.js"></script>
<script src="main.js"></script> <script src="main.js"></script>
......
This diff is collapsed.
...@@ -32,14 +32,6 @@ $(function() { ...@@ -32,14 +32,6 @@ $(function() {
extra1:'uint16', extra1:'uint16',
extra2:'int32' // reserved, maybe used as package md5 checksum extra2:'int32' // reserved, maybe used as package md5 checksum
}, 0, true); }, 0, true);
var echoPkgBody;
// echoPkgHeader.endianness = true; // true is Little-endian
// var echoPkg = new Struct({
// H:echoPkgHeader
// B:string,
// }, 0, true);
// Initialize varibles // Initialize varibles
var $window = $(window); var $window = $(window);
...@@ -52,7 +44,6 @@ $(function() { ...@@ -52,7 +44,6 @@ $(function() {
// // Setting binaryType to accept received binary as either 'blob' or 'arraybuffer'. In default it is 'blob'. // // Setting binaryType to accept received binary as either 'blob' or 'arraybuffer'. In default it is 'blob'.
// socket.binaryType = 'arraybuffer'; // socket.binaryType = 'arraybuffer';
// socket.binaryType = '' // socket.binaryType = ''
// var decoder = new TextDecoder('utf-8')
function socketSend (o, silence) { function socketSend (o, silence) {
if (socket.readyState != socket.OPEN) { if (socket.readyState != socket.OPEN) {
...@@ -72,7 +63,6 @@ $(function() { ...@@ -72,7 +63,6 @@ $(function() {
tmp[header.byteLength] = msg.length tmp[header.byteLength] = msg.length
var ma = new TextEncoder("utf-8").encode(msg); var ma = new TextEncoder("utf-8").encode(msg);
tmp[0] = msg.length;
tmp.set(ma, header.byteLength + 1); tmp.set(ma, header.byteLength + 1);
return tmp; return tmp;
...@@ -81,36 +71,21 @@ $(function() { ...@@ -81,36 +71,21 @@ $(function() {
function AB2Str(buf) { function AB2Str(buf) {
var decoder = new TextDecoder('utf-8'); var decoder = new TextDecoder('utf-8');
// var json = JSON.parse(decoder.decode(new DataView(buf)))
var msg = decoder.decode(new DataView(buf)); var msg = decoder.decode(new DataView(buf));
// console.log('response text msg: ' + msg);
return msg; return msg;
} }
function unmarshalEchoPkg(data) { function unmarshalEchoPkg(data) {
var dataAB = new Uint8Array(data); var dataAB = new Uint8Array(data);
var echoPkgHeader = new Struct ({
Magic:'uint32',
LogID:'uint32', // log id
Sequence: 'uint32', // request/response sequence
ServiceID:'uint32', // service id
Command:'uint32', // operation command code
Code :'int32', // error code
Len:'uint16', // body length
extra1:'uint16',
extra2:'int32' // reserved, maybe used as package md5 checksum
}, 0, true);
var hLen = echoPkgHeader.byteLength; var hLen = echoPkgHeader.byteLength;
var rspHeaderData = dataAB.subarray(0, hLen); var rspHeaderData = dataAB.subarray(0, hLen);
var rspHeader = echoPkgHeader.read(rspHeaderData.buffer); var rspHeader = echoPkgHeader.read(rspHeaderData.buffer);
// console.log("echo rsp package header:" + rspHeader.Magic); // console.log("echo rsp package header:" + rspHeader.Magic);
console.log('echo response{seq:' + rspHeader.Sequence + ', msg len:' + dataAB[hLen] + '}'); // console.log(rspHeader.Len)
// console.log('echo response{seq:' + rspHeader.Sequence + ', msg len:' + dataAB[hLen] + '}');
addChatMessage({Message: addChatMessage({Message:
'echo response{seq:' + rspHeader.Sequence + 'echo response{seq:' + rspHeader.Sequence +
', msg len:' + dataAB[hLen] + ', msg len:' + (dataAB[hLen]) +
', msg:' + AB2Str(new Uint8Array(dataAB.subarray(hLen + 1)).buffer) + ', msg:' + AB2Str(new Uint8Array(dataAB.subarray(hLen + 1)).buffer) +
'}' '}'
} }
...@@ -120,10 +95,8 @@ $(function() { ...@@ -120,10 +95,8 @@ $(function() {
// Sends a chat message // Sends a chat message
function sendMessage () { function sendMessage () {
var message = $inputMessage.val(); var message = $inputMessage.val();
// console.log("input mesage:" + message)
// Prevent markup from being injected into the message // Prevent markup from being injected into the message
message = cleanInput(message); message = cleanInput(message);
// console.log("after cleanInput mesage:" + message)
// Prevent markup from being injected into the message // Prevent markup from being injected into the message
// if there is a non-empty message and a socket connection // if there is a non-empty message and a socket connection
if (message) { if (message) {
...@@ -138,11 +111,10 @@ $(function() { ...@@ -138,11 +111,10 @@ $(function() {
Command:echoCommand, Command:echoCommand,
Sequence:seq++, Sequence:seq++,
Code:0, Code:0,
Len:message.length, Len:message.length + 1,
extra1:0, extra1:0,
extra2:0 extra2:0
}); });
// console.log(marshalEchoPkg(pkgHeaderArrayBuffer, message));
socket.send(marshalEchoPkg(pkgHeaderArrayBuffer, message)); socket.send(marshalEchoPkg(pkgHeaderArrayBuffer, message));
} }
} }
...@@ -244,11 +216,7 @@ $(function() { ...@@ -244,11 +216,7 @@ $(function() {
var fileReader = new FileReader(); // 用filereader来转blob为arraybuffer var fileReader = new FileReader(); // 用filereader来转blob为arraybuffer
fileReader.onload = function() { fileReader.onload = function() {
var arrayBuffer = this.result; // 得到arraybuffer var arrayBuffer = this.result; // 得到arraybuffer
// var decoder = new TextDecoder('utf-8') // 上面回复中给的encoding.js、encoding-indexes.js
// console.log('decoder data:' + decoder.decode(new DataView(arrayBuffer)))
var dv = new DataView(arrayBuffer); var dv = new DataView(arrayBuffer);
// console.log('decoder data:' + dv.buffer)
var dva = new Uint8Array(dv.buffer);
unmarshalEchoPkg(dv.buffer) unmarshalEchoPkg(dv.buffer)
}; };
fileReader.readAsArrayBuffer(e.data); // 此处读取blob fileReader.readAsArrayBuffer(e.data); // 此处读取blob
......
...@@ -58,9 +58,11 @@ type ( ...@@ -58,9 +58,11 @@ type (
ProfilePort int `default:"10086"` ProfilePort int `default:"10086"`
// session // session
SessionTimeout string `default:"60s"` HeartbeatPeriod string `default:"30s"`
sessionTimeout time.Duration heartbeatPeriod time.Duration
SessionNumber int `default:"1000"` SessionTimeout string `default:"60s"`
sessionTimeout time.Duration
SessionNumber int `default:"1000"`
// app // app
FailFastTimeout string `default:"5s"` FailFastTimeout string `default:"5s"`
...@@ -89,11 +91,20 @@ func initConf() { ...@@ -89,11 +91,20 @@ func initConf() {
} }
conf = new(Config) conf = new(Config)
config.MustLoadWithPath(confFile, conf) config.MustLoadWithPath(confFile, conf)
conf.heartbeatPeriod, err = time.ParseDuration(conf.HeartbeatPeriod)
if err != nil {
panic(fmt.Sprintf("time.ParseDuration(HeartbeatPeriod{%#v}) = error{%v}", conf.heartbeatPeriod, err))
return
}
conf.sessionTimeout, err = time.ParseDuration(conf.SessionTimeout) conf.sessionTimeout, err = time.ParseDuration(conf.SessionTimeout)
if err != nil { if err != nil {
panic(fmt.Sprintf("time.ParseDuration(SessionTimeout{%#v}) = error{%v}", conf.SessionTimeout, err)) panic(fmt.Sprintf("time.ParseDuration(SessionTimeout{%#v}) = error{%v}", conf.SessionTimeout, err))
return return
} }
if conf.sessionTimeout <= conf.heartbeatPeriod {
panic(fmt.Sprintf("SessionTimeout{%#v} <= HeartbeatPeriod{%#v}", conf.SessionTimeout, conf.HeartbeatPeriod))
return
}
conf.failFastTimeout, err = time.ParseDuration(conf.FailFastTimeout) conf.failFastTimeout, err = time.ParseDuration(conf.FailFastTimeout)
if err != nil { if err != nil {
panic(fmt.Sprintf("time.ParseDuration(FailFastTimeout{%#v}) = error{%v}", conf.FailFastTimeout, err)) panic(fmt.Sprintf("time.ParseDuration(FailFastTimeout{%#v}) = error{%v}", conf.FailFastTimeout, err))
......
...@@ -133,7 +133,7 @@ func (this *EchoPackage) Unmarshal(buf *bytes.Buffer) (int, error) { ...@@ -133,7 +133,7 @@ func (this *EchoPackage) Unmarshal(buf *bytes.Buffer) (int, error) {
return 0, ErrNotEnoughSteam return 0, ErrNotEnoughSteam
} }
// 防止恶意客户端把这个字段设置过大导致服务端死等或者服务端在准备对应的缓冲区时内存崩溃 // 防止恶意客户端把这个字段设置过大导致服务端死等或者服务端在准备对应的缓冲区时内存崩溃
if maxEchoStringLen < this.H.Len { if maxEchoStringLen < this.H.Len-1 {
return 0, ErrTooLargePackage return 0, ErrTooLargePackage
} }
...@@ -143,5 +143,5 @@ func (this *EchoPackage) Unmarshal(buf *bytes.Buffer) (int, error) { ...@@ -143,5 +143,5 @@ func (this *EchoPackage) Unmarshal(buf *bytes.Buffer) (int, error) {
} }
this.B = (string)(buf.Next((int)(len))) this.B = (string)(buf.Next((int)(len)))
return (int)(this.H.Len) + 1 + echoPkgHeaderLen, nil return (int)(this.H.Len) + echoPkgHeaderLen, nil
} }
...@@ -62,7 +62,6 @@ func (this *MessageHandler) Handle(session *getty.Session, pkg *EchoPackage) err ...@@ -62,7 +62,6 @@ func (this *MessageHandler) Handle(session *getty.Session, pkg *EchoPackage) err
type clientEchoSession struct { type clientEchoSession struct {
session *getty.Session session *getty.Session
active time.Time
reqNum int32 reqNum int32
} }
...@@ -97,7 +96,7 @@ func (this *EchoMessageHandler) OnOpen(session *getty.Session) error { ...@@ -97,7 +96,7 @@ func (this *EchoMessageHandler) OnOpen(session *getty.Session) error {
log.Info("got session:%s", session.Stat()) log.Info("got session:%s", session.Stat())
this.rwlock.Lock() this.rwlock.Lock()
this.sessionMap[session] = &clientEchoSession{session: session, active: time.Now()} this.sessionMap[session] = &clientEchoSession{session: session}
this.rwlock.Unlock() this.rwlock.Unlock()
return nil return nil
} }
...@@ -132,7 +131,6 @@ func (this *EchoMessageHandler) OnMessage(session *getty.Session, pkg interface{ ...@@ -132,7 +131,6 @@ func (this *EchoMessageHandler) OnMessage(session *getty.Session, pkg interface{
if err != nil { if err != nil {
this.rwlock.Lock() this.rwlock.Lock()
if _, ok := this.sessionMap[session]; ok { if _, ok := this.sessionMap[session]; ok {
this.sessionMap[session].active = time.Now()
this.sessionMap[session].reqNum++ this.sessionMap[session].reqNum++
} }
this.rwlock.Unlock() this.rwlock.Unlock()
...@@ -140,13 +138,17 @@ func (this *EchoMessageHandler) OnMessage(session *getty.Session, pkg interface{ ...@@ -140,13 +138,17 @@ func (this *EchoMessageHandler) OnMessage(session *getty.Session, pkg interface{
} }
func (this *EchoMessageHandler) OnCron(session *getty.Session) { func (this *EchoMessageHandler) OnCron(session *getty.Session) {
var flag bool var (
flag bool
active time.Time
)
this.rwlock.RLock() this.rwlock.RLock()
if _, ok := this.sessionMap[session]; ok { if _, ok := this.sessionMap[session]; ok {
if conf.sessionTimeout.Nanoseconds() < time.Since(this.sessionMap[session].active).Nanoseconds() { active = session.GetActive()
if conf.sessionTimeout.Nanoseconds() < time.Since(active).Nanoseconds() {
flag = true flag = true
log.Warn("session{%s} timeout{%s}, reqNum{%d}", log.Warn("session{%s} timeout{%s}, reqNum{%d}",
session.Stat(), time.Since(this.sessionMap[session].active).String(), this.sessionMap[session].reqNum) session.Stat(), time.Since(active).String(), this.sessionMap[session].reqNum)
} }
} }
this.rwlock.RUnlock() this.rwlock.RUnlock()
......
...@@ -96,7 +96,7 @@ func newSession(session *getty.Session) error { ...@@ -96,7 +96,7 @@ func newSession(session *getty.Session) error {
session.SetWQLen(conf.GettySessionParam.PkgWQSize) session.SetWQLen(conf.GettySessionParam.PkgWQSize)
session.SetReadDeadline(conf.GettySessionParam.tcpReadTimeout) session.SetReadDeadline(conf.GettySessionParam.tcpReadTimeout)
session.SetWriteDeadline(conf.GettySessionParam.tcpWriteTimeout) session.SetWriteDeadline(conf.GettySessionParam.tcpWriteTimeout)
session.SetCronPeriod((int)(conf.sessionTimeout.Nanoseconds() / 1e6)) session.SetCronPeriod((int)(conf.heartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(conf.GettySessionParam.waitTimeout) session.SetWaitTime(conf.GettySessionParam.waitTimeout)
log.Debug("app accepts new session:%s\n", session.Stat()) log.Debug("app accepts new session:%s\n", session.Stat())
......
...@@ -10,8 +10,11 @@ Paths = ["/echo", "/echo"] ...@@ -10,8 +10,11 @@ Paths = ["/echo", "/echo"]
ProfilePort = 10086 ProfilePort = 10086
# session # session
# client与server之间连接的心跳周期
HeartbeatPeriod = "10s"
# client与server之间连接的超时时间 # client与server之间连接的超时时间
SessionTimeout = "20s" SessionTimeout = "20s"
# client与server之间连接的最大连接数
SessionNumber = 700 SessionNumber = 700
# app # app
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment