Commit c4e44e4c authored by Tsaiilin's avatar Tsaiilin

重播问题修复

parent 7a5a79cb
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"net" "net"
"sync" "sync"
"time" "time"
"virjar.com/majora-go/common"
"gopkg.in/fatih/set.v0" "gopkg.in/fatih/set.v0"
...@@ -107,22 +108,33 @@ func (c *ClusterClient) StartRedial(tag string, replay bool) { ...@@ -107,22 +108,33 @@ func (c *ClusterClient) StartRedial(tag string, replay bool) {
defer func(startTime time.Time) { defer func(startTime time.Time) {
log.Run().Infof("StartRedial cost %v", time.Since(startTime)) log.Run().Infof("StartRedial cost %v", time.Since(startTime))
}(time.Now()) }(time.Now())
if replay { if c.Redial.CanRedial() {
if replay {
c.clients.Range(func(host, c interface{}) bool {
client, _ := c.(*Client)
client.Redial(tag)
return true
})
time.Sleep(global.Config.Redial.WaitTime)
}
log.Run().Info("[Redial %s ] start close local session", tag)
c.clients.Range(func(host, c interface{}) bool { c.clients.Range(func(host, c interface{}) bool {
client, _ := c.(*Client) client, _ := c.(*Client)
client.Redial(tag) client.CloseAll()
client.natTunnel.Close()
return true return true
}) })
time.Sleep(global.Config.Redial.WaitTime) c.Redial.Redial(tag)
//client.natTunnel.Close 的实现不是立即生效,需要等待读写超时
time.Sleep(common.ReadTimeout)
c.clients.Range(func(host, c interface{}) bool {
client, _ := c.(*Client)
client.connect()
return true
})
} else {
log.Run().Info("inRedialing ignore")
} }
c.Redial.Redial(tag)
c.clients.Range(func(host, c interface{}) bool {
client, _ := c.(*Client)
client.CloseAll()
client.natTunnel.Close()
client.connect()
return true
})
} }
......
...@@ -177,10 +177,11 @@ func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session gett ...@@ -177,10 +177,11 @@ func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session gett
} }
func (client *Client) handleDisconnectMessage(session getty.Session, packet *protocol.MajoraPacket) { func (client *Client) handleDisconnectMessage(session getty.Session, packet *protocol.MajoraPacket) {
log.Run().Debugf("[handleDisconnectMessage] %d->session closed %v extra:%s", packet.SerialNumber, session.IsClosed()) log.Run().Debugf("[handleDisconnectMessage] %d->session closed %v extra:%s", packet.SerialNumber, session.IsClosed(), packet.Extra)
// disconnect 会多回复一个关闭的包,暂时忽略,后续更改关闭交互流程
t, ok := client.GetTransfer(packet.SerialNumber) t, ok := client.GetTransfer(packet.SerialNumber)
if !ok { if !ok {
log.Error().Errorf("handleDisconnectMessage can't find transfer") log.Run().Debug("handleDisconnectMessage can't find transfer")
return return
} }
t.Close() t.Close()
...@@ -210,7 +211,6 @@ func (client *Client) GetRecorder(sn int64) trace.Recorder { ...@@ -210,7 +211,6 @@ func (client *Client) GetRecorder(sn int64) trace.Recorder {
func (client *Client) GetTransfer(sn int64) (*Transfer, bool) { func (client *Client) GetTransfer(sn int64) (*Transfer, bool) {
t, ok := client.transferStore.Load(sn) t, ok := client.transferStore.Load(sn)
if !ok { if !ok {
log.Error().Errorf("[GetTransfer] error, not exist (sn:%d)", sn)
return nil, false return nil, false
} }
transfer := t.(*Transfer) transfer := t.(*Transfer)
...@@ -220,6 +220,7 @@ func (client *Client) GetTransfer(sn int64) (*Transfer, bool) { ...@@ -220,6 +220,7 @@ func (client *Client) GetTransfer(sn int64) (*Transfer, bool) {
func (client *Client) AddTransfer(sn int64, transfer *Transfer, addr string) { func (client *Client) AddTransfer(sn int64, transfer *Transfer, addr string) {
if _, ok := client.transferStore.Load(sn); ok { if _, ok := client.transferStore.Load(sn); ok {
log.Error().Errorf("[AddTransfer] %d->error, has one", sn) log.Error().Errorf("[AddTransfer] %d->error, has one", sn)
return
} }
client.transferStore.Store(sn, transfer) client.transferStore.Store(sn, transfer)
log.Run().Debugf("[AddTransfer] %d->%s success", sn, addr) log.Run().Debugf("[AddTransfer] %d->%s success", sn, addr)
......
...@@ -78,6 +78,4 @@ func (client *Client) Redial(tag string) { ...@@ -78,6 +78,4 @@ func (client *Client) Redial(tag string) {
if _, _, err := client.session.WritePkg(OfflinePacket, 0); err != nil { if _, _, err := client.session.WritePkg(OfflinePacket, 0); err != nil {
log.Run().Errorf("[Redial %s] write offline to server error %s", tag, err.Error()) log.Run().Errorf("[Redial %s] write offline to server error %s", tag, err.Error())
} }
log.Run().Info("[Redial %s %s] start close local session", client.host, tag)
} }
...@@ -45,9 +45,9 @@ const ( ...@@ -45,9 +45,9 @@ const (
) )
const ( const (
ReadTimeout = time.Minute ReadTimeout = 30 * time.Second
WriteTimeout = time.Minute WriteTimeout = 30 * time.Second
WaitTimeout = time.Minute WaitTimeout = 30 * time.Second
KeepAliveTimeout = time.Second * 10 KeepAliveTimeout = time.Second * 10
SessionTimeout = time.Minute * 5 SessionTimeout = time.Minute * 5
......
tunnel_addr: 127.0.0.1:5879 tunnel_addr: 127.0.0.1:5879
dns_server: 114.114.114.114:53 dns_server: 114.114.114.114:53
#daemon: true #daemon: true
log_level: debug log_level: info
log_path: ./majora-log/ log_path: ./majora-log/
reconn_intervalz: 5s reconn_intervalz: 5s
net_check_interval: 5s net_check_interval: 5s
dns_cache_duration: 10m dns_cache_duration: 10m
net_check_url: https://www.baidu.com[extra] net_check_url: https://www.baidu.com
#redial: #redial:
# command: /bin/bash # command: /bin/bash
......
tunnel_addr: majora-vps-zj.virjar.com:5879 tunnel_addr: majora-vps.virjar.com:5879
dns_server: 114.114.114.114:53 dns_server: 114.114.114.114:53
log_level: info log_level: info
log_path: ./majora-log/ log_path: ./majora-log/
......
...@@ -26,6 +26,10 @@ func NewPPPRedial() *PPPRedial { ...@@ -26,6 +26,10 @@ func NewPPPRedial() *PPPRedial {
} }
} }
func (p *PPPRedial) CanRedial() bool {
return !p.inRedialing.Load()
}
func (p *PPPRedial) Redial(tag string) bool { func (p *PPPRedial) Redial(tag string) bool {
if p.inRedialing.CAS(false, true) { if p.inRedialing.CAS(false, true) {
log.Run().Infof("[PPPRedial %s] start", tag) log.Run().Infof("[PPPRedial %s] start", tag)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment