Commit 5266865f authored by Tsaiilin(蔡依林)'s avatar Tsaiilin(蔡依林)

Merge branch 'transfer' into 'master'

Transfer

See merge request !13
parents f5abc227 ec9ec7e2
......@@ -46,7 +46,7 @@ func init() {
func initial() {
if global.Config.PprofPort > 0 {
safe.SafeGo(func() {
safe.Go(func() {
addr := fmt.Sprintf("127.0.0.1:%d", global.Config.PprofPort)
log.Run().Infof("enable pprof: %s", addr)
log.Run().Error(http.ListenAndServe(addr, nil))
......
......@@ -12,14 +12,13 @@ import (
type Client struct {
config *model.Configure
host string
port int
localAddr net.Addr
natTunnel getty.Client
session getty.Session
connStore sync.Map
sessionStore sync.Map
dnsCache *freecache.Cache
host string
port int
localAddr net.Addr
natTunnel getty.Client
session getty.Session
transferStore sync.Map
dnsCache *freecache.Cache
}
func NewClientWithConf(cfg *model.Configure, host string, port int) *Client {
......@@ -35,13 +34,12 @@ func NewCli(cfg *model.Configure, host string, port int) *Client {
}
}
client := &Client{
config: cfg,
host: host,
port: port,
localAddr: localAddr,
connStore: sync.Map{},
sessionStore: sync.Map{},
dnsCache: freecache.NewCache(1024),
config: cfg,
host: host,
port: port,
localAddr: localAddr,
transferStore: sync.Map{},
dnsCache: freecache.NewCache(1024),
}
return client
......@@ -50,4 +48,3 @@ func NewCli(cfg *model.Configure, host string, port int) *Client {
func (client *Client) StartUp() {
client.connect()
}
......@@ -23,7 +23,7 @@ type ClusterClient struct {
func (c *ClusterClient) Start() {
c.check()
safe.SafeGo(func() {
safe.Go(func() {
var timer = time.NewTimer(5 * time.Minute)
for {
c.connectNatServers()
......@@ -32,7 +32,7 @@ func (c *ClusterClient) Start() {
}
})
if global.Config.Redial.Valid() {
safe.SafeGo(func() {
safe.Go(func() {
// 加上随机 防止vps在同时间重启
duration := c.randomDuration()
log.Run().Infof("Redial interval %+v", duration)
......@@ -142,7 +142,7 @@ func (c *ClusterClient) check() {
url = global.Config.NetCheckUrl
}
safe.SafeGo(func() {
safe.Go(func() {
var timer = time.NewTimer(interval)
for {
timer.Reset(interval)
......
......@@ -4,18 +4,15 @@ import (
"bytes"
"encoding/binary"
"fmt"
"github.com/adamweixuan/getty"
"net"
"strings"
"sync"
"time"
"github.com/adamweixuan/getty"
"virjar.com/majora-go/common"
"virjar.com/majora-go/global"
"virjar.com/majora-go/log"
"virjar.com/majora-go/protocol"
"virjar.com/majora-go/safe"
"virjar.com/majora-go/trace"
)
......@@ -49,21 +46,20 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty
if !ok {
log.Error().Errorf("Get user from connect packet failed (sn:%d)", packet.SerialNumber)
}
traceSession := trace.NewSession(sessionId, client.host, user, enableTrace == "true")
client.AddSession(packet, traceSession)
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Start handle connect to %s (sn:%d)",
recorder := trace.NewRecorder(sessionId, client.host, user, enableTrace == "true")
recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Start handle connect to %s (sn:%d)",
packet.Extra, packet.SerialNumber))
if session.IsClosed() {
log.Run().Warnf("[handleConnect] %d -> nat server is closed", packet.SerialNumber)
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent, fmt.Sprintf("NatServer is closed (sn:%d)", packet.SerialNumber), nil)
recorder.RecordErrorEvent(trace.ConnectEvent, fmt.Sprintf("NatServer is closed (sn:%d)", packet.SerialNumber), nil)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
hostPort := strings.Split(packet.Extra, ":")
if len(packet.Extra) == 0 || len(hostPort) != 2 {
log.Error().Errorf("[handleConnect] invalid extra %s", packet.Extra)
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent,
recorder.RecordErrorEvent(trace.ConnectEvent,
fmt.Sprintf("Connect extra invalid %s (%d)", packet.Extra, packet.SerialNumber), nil)
client.closeVirtualConnection(session, packet.SerialNumber)
return
......@@ -77,27 +73,27 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty
target := hostPort[0]
ip, err := client.dnsCache.Get([]byte(hostPort[0]))
if err != nil {
traceSession.Recorder.RecordEvent(trace.DnsResolveEvent, fmt.Sprintf("Dns cache miss %s ", hostPort[0]))
recorder.RecordEvent(trace.DnsResolveEvent, fmt.Sprintf("Dns cache miss %s ", hostPort[0]))
hosts, dnsErr := net.LookupHost(hostPort[0])
if dnsErr != nil {
traceSession.Recorder.RecordErrorEvent(trace.DnsResolveEvent, fmt.Sprintf("Resolve %s ip error", hostPort[0]), dnsErr)
recorder.RecordErrorEvent(trace.DnsResolveEvent, fmt.Sprintf("Resolve %s ip error", hostPort[0]), dnsErr)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
err := client.dnsCache.Set([]byte(hostPort[0]), []byte(hosts[0]), int(global.Config.DnsCacheDuration.Seconds()))
if err != nil {
traceSession.Recorder.RecordErrorEvent(trace.DnsResolveEvent, fmt.Sprintf("Dns cache set error %s", hostPort[0]), err)
recorder.RecordErrorEvent(trace.DnsResolveEvent, fmt.Sprintf("Dns cache set error %s", hostPort[0]), err)
}
target = hosts[0]
} else {
target = string(ip)
recorder.RecordEvent(trace.DnsResolveEvent, fmt.Sprintf("Dns cache hit %s -> %s", hostPort[0], target))
}
traceSession.Recorder.RecordEvent(trace.DnsResolveEvent, fmt.Sprintf("Dns cache hit %s -> %s", hostPort[0], target))
conn, err := dialer.Dial(common.TCP, fmt.Sprintf("%s:%s", target, hostPort[1]))
if err != nil {
log.Error().Errorf("[handleConnect] %d->connect to %s->%s", packet.SerialNumber, packet.Extra, err.Error())
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent,
recorder.RecordErrorEvent(trace.ConnectEvent,
fmt.Sprintf("Connect to %s failed (sn:%d)", packet.Extra, packet.SerialNumber), err)
client.closeVirtualConnection(session, packet.SerialNumber)
return
......@@ -105,17 +101,21 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty
tcpConn := conn.(*net.TCPConn)
_ = tcpConn.SetNoDelay(true)
_ = tcpConn.SetKeepAlive(true)
client.AddConnection(packet, tcpConn, packet.Extra)
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Connect to %s success, local: %s -> remote:%s (sn:%d)",
t := NewTransfer(packet.SerialNumber, client, tcpConn, recorder)
t.SetTransferToUpstreamFunc(client.transferToUpstream)
t.SetTransferToDownstreamFunc(client.transferToDownstream)
client.AddTransfer(packet.SerialNumber, t, packet.Extra)
recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Connect to %s success, local: %s -> remote:%s (aaasn:%d)",
packet.Extra, tcpConn.LocalAddr(), tcpConn.RemoteAddr(), packet.SerialNumber))
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Start replay natServer connect ready (sn:%d)", packet.SerialNumber))
recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Start replay natServer connect ready (sn:%d)", packet.SerialNumber))
majoraPacket := protocol.TypeConnectReady.CreatePacket()
majoraPacket.SerialNumber = packet.SerialNumber
majoraPacket.Extra = client.config.ClientID
if session.IsClosed() {
log.Run().Warnf("[handleConnect] %d -> nat server is closed", packet.SerialNumber)
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent, fmt.Sprintf("NatServer is closed (sn:%d)", packet.SerialNumber),
recorder.RecordErrorEvent(trace.ConnectEvent, fmt.Sprintf("NatServer is closed (sn:%d)", packet.SerialNumber),
nil)
client.closeVirtualConnection(session, packet.SerialNumber)
return
......@@ -123,21 +123,19 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty
if _, _, err := session.WritePkg(majoraPacket, 0); err != nil {
log.Error().Errorf("[handleConnect] %d->write pkg to nat server with error %s", packet.SerialNumber,
err.Error())
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent, fmt.Sprintf("Write pkg to natServer failed (sn:%d)",
recorder.RecordErrorEvent(trace.ConnectEvent, fmt.Sprintf("Write pkg to natServer failed (sn:%d)",
packet.SerialNumber), err)
client.closeVirtualConnection(session, packet.SerialNumber)
return
} else {
safe.SafeGo(func() {
client.handleUpStream(tcpConn, packet, session)
})
t.Start()
log.Run().Debugf("[handleConnect] %d->connect success to %s ", packet.SerialNumber, packet.Extra)
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Replay natServer connect ready success (sn:%d)", packet.SerialNumber))
recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Replay natServer connect ready success (sn:%d)", packet.SerialNumber))
}
}
func decodeMap(data []byte) map[string]string {
result := make(map[string]string, 1)
result := make(map[string]string, 2)
var headerSize int8
err := binary.Read(bytes.NewBuffer(data[:1]), binary.BigEndian, &headerSize)
data = data[1:]
......@@ -168,105 +166,25 @@ func decodeMap(data []byte) map[string]string {
}
func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session getty.Session) {
traceRecorder := client.GetRecorderFromSession(packet.SerialNumber)
traceRecorder.RecordEvent(trace.TransferEvent,
fmt.Sprintf("Receive transfer packet from natServer,start to be forward to target, len:%d (%d)", len(packet.Data), packet.SerialNumber))
load, ok := client.connStore.Load(packet.SerialNumber)
t, ok := client.GetTransfer(packet.SerialNumber)
if !ok {
log.Error().Errorf("[handleTransfer] %d-> can not find connection", packet.SerialNumber)
traceMessage := fmt.Sprintf("Find upsteam connection failed (%d)", packet.SerialNumber)
traceRecorder.RecordErrorEvent(trace.TransferEvent, traceMessage, nil)
client.closeVirtualConnection(session, packet.SerialNumber)
log.Error().Errorf("can't find transfer")
return
}
conn := load.(*net.TCPConn)
cnt, err := conn.Write(packet.Data)
if err != nil {
log.Error().Errorf("[handleTransfer] %d->write to upstream fail for %s", packet.SerialNumber, err)
traceMessage := fmt.Sprintf("Write to upstream failed (%d)", packet.SerialNumber)
traceRecorder.RecordErrorEvent(trace.TransferEvent, traceMessage, err)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
if cnt != len(packet.Data) {
log.Error().Errorf("[handleTransfer] %d-> write not all data for expect->%d/%d",
packet.SerialNumber, len(packet.Data), cnt)
traceMessage := fmt.Sprintf("Write not all data for expect -> %d/%d (sn:%d)", len(packet.Data), cnt, packet.SerialNumber)
traceRecorder.RecordErrorEvent(trace.TransferEvent, traceMessage, nil)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
log.Run().Debugf("[handleTransfer] %d-> success dataLen: %d", packet.SerialNumber, len(packet.Data))
traceMessage := fmt.Sprintf("transfer data success (%d)", packet.SerialNumber)
traceRecorder.RecordEvent(trace.TransferEvent, traceMessage)
}
func (client *Client) handleUpStream(conn *net.TCPConn, packet *protocol.MajoraPacket, session getty.Session) {
traceRecorder := client.GetRecorderFromSession(packet.SerialNumber)
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("Ready read from upstream (sn:%d)", packet.SerialNumber))
log.Run().Debugf("[handleUpStream] %d-> handleUpStream start...", packet.SerialNumber)
for {
buf := make([]byte, common.BufSize)
cnt, err := conn.Read(buf)
if err != nil {
opErr, ok := err.(*net.OpError)
if ok && opErr.Err.Error() == "i/o timeout" {
recorderMessage := fmt.Sprintf("Upstream deadDeadline start close (sn:%d)", packet.SerialNumber)
traceRecorder.RecordEvent(trace.UpStreamEvent, recorderMessage)
} else {
log.Run().Debugf("[handleUpStream] %d->read with error:%+v,l:%s->r:%s",
packet.SerialNumber, err, conn.LocalAddr(), conn.RemoteAddr())
recorderMessage := fmt.Sprintf("Read with l:%s->r:%s (sn:%d) ",
conn.LocalAddr(), conn.RemoteAddr(), packet.SerialNumber)
traceRecorder.RecordErrorEvent(trace.UpStreamEvent, recorderMessage, err)
}
client.OnClose(session, conn, packet.SerialNumber)
break
}
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("read count: %d (sn:%d)",
cnt, packet.SerialNumber))
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("Start write to natServer (sn:%d)", packet.SerialNumber))
pack := protocol.TypeTransfer.CreatePacket()
pack.Data = buf[0:cnt]
pack.SerialNumber = packet.SerialNumber
if _, _, err := session.WritePkg(pack, 0); err != nil {
log.Error().Errorf("[handleUpStream] %d-> write to server fail %+v", packet.SerialNumber, err.Error())
traceRecorder.RecordErrorEvent(trace.UpStreamEvent,
fmt.Sprintf("Write to natServer failed (sn:%d)", packet.SerialNumber), err)
client.OnClose(session, conn, packet.SerialNumber)
break
} else {
log.Run().Debugf("[handleUpStream] %d->success dataLen:%d", packet.SerialNumber, len(packet.Data))
traceRecorder.RecordEvent(trace.UpStreamEvent,
fmt.Sprintf("Write to natServer success (sn:%d)", packet.SerialNumber))
}
}
t.recorder.RecordEvent(trace.TransferEvent,
fmt.Sprintf("Receive transfer packet from natServer,start to be forward to target, len:%d (%d)", len(packet.Data), packet.SerialNumber))
t.TransferToUpstream(packet)
}
func (client *Client) handleDisconnectMessage(session getty.Session, packet *protocol.MajoraPacket) {
traceRecorder := client.GetRecorderFromSession(packet.SerialNumber)
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("Start close upstream extra:%s (sn:%d)",
packet.Extra, packet.SerialNumber))
log.Run().Debugf("[handleDisconnectMessage] %d->session closed %v extra:%s", packet.SerialNumber, session.IsClosed())
if conn, ok := client.connStore.Load(packet.SerialNumber); ok {
upstreamConn := conn.(*net.TCPConn)
readDeadLine := time.Now().Add(3 * time.Millisecond)
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("Set upstream read deadline:%s (sn:%d)",
readDeadLine.Format("2006-01-02 15:04:05.000000"), packet.SerialNumber))
err := upstreamConn.SetReadDeadline(readDeadLine)
if err != nil {
traceRecorder.RecordErrorEvent(trace.DisconnectEvent,
fmt.Sprintf("Set upstream read deadline failed (sn:%d)", packet.SerialNumber), err)
client.OnClose(session, upstreamConn, packet.SerialNumber)
}
} else {
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("The upstream connection is closed, do nothing (sn:%d)", packet.SerialNumber))
t, ok := client.GetTransfer(packet.SerialNumber)
if !ok {
log.Error().Errorf("handleDisconnectMessage can't find transfer")
return
}
t.Close()
}
func (client *Client) handleControlMessage(_ *protocol.MajoraPacket) {
......@@ -278,31 +196,33 @@ func (client *Client) handleDestroyMessage() {
client.natTunnel.Close()
}
func (client *Client) AddSession(packet *protocol.MajoraPacket, session *trace.Session) {
if _, ok := client.sessionStore.Load(packet.SerialNumber); ok {
log.Error().Errorf("[AddSession] %d->error, has one", packet.SerialNumber)
func (client *Client) GetRecorder(sn int64) trace.Recorder {
t, ok := client.transferStore.Load(sn)
if !ok {
log.Run().Warnf("[GetRecorder] get session failed, maybe already closed (%d)", sn)
return trace.NewRecorder("", "", "", true)
}
client.sessionStore.Store(packet.SerialNumber, session)
log.Run().Debugf("[AddSession] %d-> success", packet.SerialNumber)
transfer := t.(*Transfer)
return transfer.recorder
}
func (client *Client) GetRecorderFromSession(sn int64) trace.Recorder {
session, ok := client.sessionStore.Load(sn)
func (client *Client) GetTransfer(sn int64) (*Transfer, bool) {
t, ok := client.transferStore.Load(sn)
if !ok {
log.Run().Warnf("[GetRecorderFromSession] get session failed, maybe already closed (%d)", sn)
session = trace.NewSession("", "", "", false)
log.Error().Errorf("[GetTransfer] error, not exist (sn:%d)", sn)
return nil, false
}
traceSession := session.(*trace.Session)
return traceSession.Recorder
transfer := t.(*Transfer)
return transfer, true
}
func (client *Client) AddConnection(packet *protocol.MajoraPacket, conn *net.TCPConn, addr string) {
if _, ok := client.connStore.Load(packet.SerialNumber); ok {
log.Error().Errorf("[AddConnection] %d->error, has one", packet.SerialNumber)
func (client *Client) AddTransfer(sn int64, transfer *Transfer, addr string) {
if _, ok := client.transferStore.Load(sn); ok {
log.Error().Errorf("[AddTransfer] %d->error, has one", sn)
}
client.connStore.Store(packet.SerialNumber, conn)
log.Run().Debugf("[AddConnection] %d->%s success", packet.SerialNumber, addr)
client.transferStore.Store(sn, transfer)
log.Run().Debugf("[AddTransfer] %d->%s success", sn, addr)
}
// OnClose 1. 本地缓存删除 2. 关闭连接 3. 通知natserver
......@@ -318,7 +238,7 @@ func (client *Client) OnClose(natSession getty.Session, upStreamSession net.Conn
//closeVirtualConnection disconnect to server
func (client *Client) closeVirtualConnection(session getty.Session, serialNumber int64) {
traceRecorder := client.GetRecorderFromSession(serialNumber)
traceRecorder := client.GetRecorder(serialNumber)
log.Run().Debugf("[closeVirtualConnection] %d->session closed %v", serialNumber, session.IsClosed())
if session.IsClosed() {
......@@ -334,12 +254,11 @@ func (client *Client) closeVirtualConnection(session getty.Session, serialNumber
log.Run().Warnf("[closeVirtualConnection] ->%d error %s session closed %v allCnt %d sendCnt %d",
serialNumber, err.Error(), session.IsClosed(), allCnt, sendCnt)
traceRecorder.RecordErrorEvent(trace.DisconnectEvent,
fmt.Sprintf("Send disconnect to natServer failed closed:%v allCnt %d sendCnt %d (sn:%d)",
fmt.Sprintf("send disconnect to natServer failed closed:%v allCnt %d sendCnt %d (sn:%d)",
session.IsClosed(), allCnt, sendCnt, serialNumber), err)
session.Close()
}
client.connStore.Delete(serialNumber)
client.sessionStore.Delete(serialNumber)
client.transferStore.Delete(serialNumber)
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("Send disconnect to natServer success (sn:%d)", serialNumber))
}
......@@ -349,13 +268,70 @@ func (client *Client) CloseAll() {
log.Error().Errorf("OnClose %+v", err)
}
}()
client.connStore.Range(func(key, value interface{}) bool {
client.transferStore.Range(func(key, value interface{}) bool {
serialNumber := key.(int64)
conn, _ := value.(*net.TCPConn)
t, _ := value.(*Transfer)
log.Run().Debugf("[CloseAll] close serialNumber -> %d", serialNumber)
client.OnClose(client.session, conn, serialNumber)
client.OnClose(client.session, t.upstreamConn, serialNumber)
return true
})
client.connStore = sync.Map{}
client.sessionStore = sync.Map{}
client.transferStore = sync.Map{}
}
func (client *Client) transferToUpstream(t *Transfer, p *protocol.MajoraPacket) {
cnt, err := t.upstreamConn.Write(p.Data)
if err != nil {
log.Error().Errorf("[handleTransfer] %d->write to upstream fail for %s", p.SerialNumber, err)
traceMessage := fmt.Sprintf("Write to upstream failed (%d)", p.SerialNumber)
t.recorder.RecordErrorEvent(trace.TransferEvent, traceMessage, err)
client.closeVirtualConnection(client.session, p.SerialNumber)
return
}
if cnt != len(p.Data) {
log.Error().Errorf("[handleTransfer] %d-> write not all data for expect->%d/%d",
p.SerialNumber, len(p.Data), cnt)
traceMessage := fmt.Sprintf("Write not all data for expect -> %d/%d (sn:%d)", len(p.Data), cnt, p.SerialNumber)
t.recorder.RecordErrorEvent(trace.TransferEvent, traceMessage, nil)
client.closeVirtualConnection(client.session, p.SerialNumber)
return
}
log.Run().Debugf("[handleTransfer] %d-> success dataLen: %d", p.SerialNumber, len(p.Data))
traceMessage := fmt.Sprintf("transfer data success (%d)", p.SerialNumber)
t.recorder.RecordEvent(trace.TransferEvent, traceMessage)
}
func (client *Client) transferToDownstream(t *Transfer, data []byte, err error) {
if err != nil {
opErr, ok := err.(*net.OpError)
if ok && opErr.Err.Error() == "i/o timeout" {
recorderMessage := fmt.Sprintf("Upstream deadDeadline start close (sn:%d)", t.serialNumber)
t.recorder.RecordEvent(trace.UpStreamEvent, recorderMessage)
} else {
log.Run().Debugf("[handleUpStream] %d->read with error:%+v,l:%s->r:%s",
t.serialNumber, err, t.upstreamConn.LocalAddr(), t.upstreamConn.RemoteAddr())
recorderMessage := fmt.Sprintf("Read with l:%s->r:%s (sn:%d) ",
t.upstreamConn.LocalAddr(), t.upstreamConn.RemoteAddr(), t.serialNumber)
t.recorder.RecordErrorEvent(trace.UpStreamEvent, recorderMessage, err)
}
t.client.OnClose(t.client.session, t.upstreamConn, t.serialNumber)
} else {
t.recorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("read count: %d (sn:%d)",
len(data), t.serialNumber))
t.recorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("Start write to natServer (sn:%d)", t.serialNumber))
pack := protocol.TypeTransfer.CreatePacket()
pack.Data = data
pack.SerialNumber = t.serialNumber
if _, _, err := t.client.session.WritePkg(pack, 0); err != nil {
log.Error().Errorf("[handleUpStream] %d-> write to server fail %+v", t.client, err.Error())
t.recorder.RecordErrorEvent(trace.UpStreamEvent,
fmt.Sprintf("Write to natServer failed (sn:%d)", t.serialNumber), err)
t.client.OnClose(t.client.session, t.upstreamConn, t.serialNumber)
} else {
log.Run().Debugf("[handleUpStream] %d->success dataLen:%d ", t.serialNumber, len(pack.Data))
t.recorder.RecordEvent(trace.UpStreamEvent,
fmt.Sprintf("Write to natServer success(sn:%d)", pack.SerialNumber))
}
}
}
......@@ -75,7 +75,7 @@ func (client *Client) Redial(tag string) {
if !client.config.Redial.Valid() {
return
}
log.Run().Infof("[Redial %s] Send offline message", tag)
log.Run().Infof("[Redial %s] seed offline message", tag)
if _, _, err := client.session.WritePkg(OfflinePacket, 0); err != nil {
log.Run().Errorf("[Redial %s] write offline to server error %s", tag, err.Error())
}
......
package client
import (
"errors"
"fmt"
"net"
"sync"
"time"
"virjar.com/majora-go/common"
"virjar.com/majora-go/log"
"virjar.com/majora-go/protocol"
"virjar.com/majora-go/safe"
"virjar.com/majora-go/trace"
)
type Transfer struct {
serialNumber int64
client *Client
upstreamConn *net.TCPConn
recorder trace.Recorder
transferChan chan *protocol.MajoraPacket
transferToUpstreamFunc func(t *Transfer, p *protocol.MajoraPacket)
transferToDownstreamFunc func(t *Transfer, data []byte, err error)
once sync.Once
cancel chan struct{}
}
func NewTransfer(serialNumber int64, client *Client, conn *net.TCPConn, recorder trace.Recorder) *Transfer {
return &Transfer{
serialNumber: serialNumber,
client: client,
upstreamConn: conn,
recorder: recorder,
transferChan: make(chan *protocol.MajoraPacket, 10),
once: sync.Once{},
cancel: make(chan struct{}, 0),
}
}
func (t *Transfer) SetTransferToUpstreamFunc(f func(t *Transfer, p *protocol.MajoraPacket)) {
t.transferToUpstreamFunc = f
}
func (t *Transfer) SetTransferToDownstreamFunc(f func(t *Transfer, data []byte, err error)) {
t.transferToDownstreamFunc = f
}
func (t *Transfer) TransferToUpstream(p *protocol.MajoraPacket) {
t.transferChan <- p
}
func (t *Transfer) Start() {
if t.transferToUpstreamFunc == nil {
panic(errors.New("transferToUpstreamFunc is nil"))
}
if t.transferToDownstreamFunc == nil {
panic(errors.New("transferToDownstreamFunc is nil"))
}
safe.Go(func() {
for {
select {
case p := <-t.transferChan:
t.transferToUpstreamFunc(t, p)
case <-t.cancel:
return
}
}
})
safe.Go(func() {
traceRecorder := t.recorder
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("Ready read from upstream (sn:%d)", t.serialNumber))
log.Run().Debugf("[handleUpStream] %d-> handleUpStream start...", t.serialNumber)
for {
buf := make([]byte, common.BufSize)
cnt, err := t.upstreamConn.Read(buf)
t.transferToDownstreamFunc(t, buf[0:cnt], err)
if err != nil {
break
}
}
})
}
func (t *Transfer) Close() {
t.once.Do(func() {
readDeadLine := time.Now().Add(3 * time.Millisecond)
t.recorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("Set upstream read deadline:%s (sn:%d)",
readDeadLine.Format("2006-01-02 15:04:05.000000"), t.serialNumber))
err := t.upstreamConn.SetReadDeadline(readDeadLine)
if err != nil {
t.recorder.RecordErrorEvent(trace.DisconnectEvent,
fmt.Sprintf("Set upstream read deadline failed (sn:%d)", t.serialNumber), err)
_ = t.upstreamConn.Close()
}
close(t.cancel)
})
}
tunnel_addr: majora-vps-zj.virjar.com:5879
tunnel_addr: 127.0.0.1:5879
dns_server: 114.114.114.114:53
daemon: true
#daemon: true
log_level: debug
log_path: ./majora-log/
reconn_intervalz: 5s
......@@ -8,11 +8,11 @@ net_check_interval: 5s
dns_cache_duration: 10m
net_check_url: https://www.baidu.com[extra]
redial:
command: /bin/bash
exec_path: /root/ppp_redial.sh
redial_duration: 5m
wait_time: 10s
#redial:
# command: /bin/bash
# exec_path: /root/ppp_redial.sh
# redial_duration: 5m
# wait_time: 10s
extra:
account: superman
......@@ -2,7 +2,7 @@ package safe
import "virjar.com/majora-go/log"
func SafeGo(f func()) {
func Go(f func()) {
go func() {
defer func() {
if err := recover(); err != nil {
......
......@@ -22,7 +22,7 @@ var (
)
func init() {
safe.SafeGo(func() {
safe.Go(func() {
for {
e := <-sessionEventChan
if e.Err != nil {
......@@ -133,15 +133,9 @@ func acquireRecorder(sessionId string, host, user string, enable bool) Recorder
}
type Session struct {
Recorder Recorder
}
func NewSession(sessionId string, host string, user string, enable bool) *Session {
func NewRecorder(sessionId string, host string, user string, enable bool) Recorder {
if len(sessionId) == 0 {
sessionId = sessionIdNop
}
return &Session{
Recorder: acquireRecorder(sessionId, host, user, enable),
}
return acquireRecorder(sessionId, host, user, enable)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment