Commit 28de3e20 authored by Tsaiilin's avatar Tsaiilin

修复 trace 日志不全问题

parent 42bbe721
...@@ -16,9 +16,10 @@ import ( ...@@ -16,9 +16,10 @@ import (
type Client struct { type Client struct {
config *model.Configure config *model.Configure
localAddr net.Addr localAddr net.Addr
natTunnel getty.Client natTunnel getty.Client
connStore sync.Map connStore sync.Map
sessionStore sync.Map
} }
func NewClientWithConf(cfg *model.Configure) *Client { func NewClientWithConf(cfg *model.Configure) *Client {
......
...@@ -39,7 +39,7 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty ...@@ -39,7 +39,7 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty
log.Error().Errorf("SN:%d -> get sessionId failed", packet.SerialNumber) log.Error().Errorf("SN:%d -> get sessionId failed", packet.SerialNumber)
} }
traceSession := trace.NewSession(sessionId) traceSession := trace.NewSession(sessionId)
session.SetAttribute(trace.MajoraSessionName, traceSession) client.AddSession(packet, traceSession)
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("SN:%d -> start handle connect", packet.SerialNumber)) traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("SN:%d -> start handle connect", packet.SerialNumber))
if session.IsClosed() { if session.IsClosed() {
...@@ -128,7 +128,8 @@ func decodeMap(data []byte) map[string]string { ...@@ -128,7 +128,8 @@ func decodeMap(data []byte) map[string]string {
} }
func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session getty.Session) { func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session getty.Session) {
traceRecorder := trace.GetTraceRecorderFromSession(session)
traceRecorder := client.GetRecorderFromSession(packet.SerialNumber)
traceRecorder.RecordEvent(trace.TransferEvent, traceRecorder.RecordEvent(trace.TransferEvent,
fmt.Sprintf("SN:%d - > receive transfer packet from natServer,start to be forward to target", packet.SerialNumber)) fmt.Sprintf("SN:%d - > receive transfer packet from natServer,start to be forward to target", packet.SerialNumber))
...@@ -165,45 +166,45 @@ func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session gett ...@@ -165,45 +166,45 @@ func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session gett
} }
func (client *Client) handleUpStream(conn *net.TCPConn, packet *protocol.MajoraPacket, session getty.Session) { func (client *Client) handleUpStream(conn *net.TCPConn, packet *protocol.MajoraPacket, session getty.Session) {
traceRecorder := trace.GetTraceRecorderFromSession(session) traceRecorder := client.GetRecorderFromSession(packet.SerialNumber)
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("SN:%d -> handleUpStream start", packet.SerialNumber)) traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("SN:%d -> handleUpStream start", packet.SerialNumber))
log.Run().Debugf("[handleUpStream] %d-> handleUpStream start...", packet.SerialNumber) log.Run().Debugf("[handleUpStream] %d-> handleUpStream start...", packet.SerialNumber)
for { for {
buf := make([]byte, common.BufSize) // 4k buf := make([]byte, common.BufSize) // 4k
cnt, err := conn.Read(buf) cnt, err := conn.Read(buf)
if err != nil { if err != nil {
log.Run().Debugf("[handleUpStream] %d->read with error:%+v,l:%s->r:%s", log.Run().Debugf("[handleUpStream] %d->read with error:%+v,l:%s->r:%s",
packet.SerialNumber, err, conn.LocalAddr(), conn.RemoteAddr()) packet.SerialNumber, err, conn.LocalAddr(), conn.RemoteAddr())
recorderMessage := fmt.Sprintf("SN: %d -> read with l:%s->r:%s", recorderMessage := fmt.Sprintf("SN: %d -> read with l:%s->r:%s",
packet.SerialNumber, conn.LocalAddr(), conn.RemoteAddr()) packet.SerialNumber, conn.LocalAddr(), conn.RemoteAddr())
traceRecorder.RecordErrorEvent(trace.UpStreamEvent, recorderMessage, err) traceRecorder.RecordErrorEvent(trace.UpStreamEvent, recorderMessage, err)
client.OnClose(session, conn, packet.SerialNumber) client.OnClose(session, conn, packet.SerialNumber)
break break
} }
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("SN: %d read count: %d", traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("SN: %d read count: %d",
packet.SerialNumber, cnt)) packet.SerialNumber, cnt))
pack := protocol.TypeTransfer.CreatePacket() pack := protocol.TypeTransfer.CreatePacket()
pack.Data = buf[:cnt] pack.Data = buf[:cnt]
pack.SerialNumber = packet.SerialNumber pack.SerialNumber = packet.SerialNumber
if _, _, err := session.WritePkg(pack, 0); err != nil { if _, _, err := session.WritePkg(pack, 0); err != nil {
log.Error().Errorf("[handleUpStream] %d-> write to server fail %+v", packet.SerialNumber, err.Error()) log.Error().Errorf("[handleUpStream] %d-> write to server fail %+v", packet.SerialNumber, err.Error())
traceRecorder.RecordErrorEvent(trace.UpStreamEvent, traceRecorder.RecordErrorEvent(trace.UpStreamEvent,
fmt.Sprintf("SN: %d -> write to natServer failed", packet.SerialNumber), err) fmt.Sprintf("SN: %d -> write to natServer failed", packet.SerialNumber), err)
client.OnClose(session, conn, packet.SerialNumber) client.OnClose(session, conn, packet.SerialNumber)
break break
} else { } else {
log.Run().Debugf("[handleUpStream] %d->success dataLen:%d", packet.SerialNumber, len(packet.Data)) log.Run().Debugf("[handleUpStream] %d->success dataLen:%d", packet.SerialNumber, len(packet.Data))
traceRecorder.RecordEvent(trace.UpStreamEvent, traceRecorder.RecordEvent(trace.UpStreamEvent,
fmt.Sprintf("SN: %d write to natServer success", packet.SerialNumber)) fmt.Sprintf("SN: %d write to natServer success", packet.SerialNumber))
}
} }
}
} }
func (client *Client) handleDisconnectMessage(session getty.Session, packet *protocol.MajoraPacket) { func (client *Client) handleDisconnectMessage(session getty.Session, packet *protocol.MajoraPacket) {
traceRecorder := trace.GetTraceRecorderFromSession(session) traceRecorder := client.GetRecorderFromSession(packet.SerialNumber)
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("SN: %d start close session", packet.SerialNumber)) traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("SN: %d start close session extra:%s", packet.SerialNumber, packet.Extra))
log.Run().Debugf("[handleDisconnectMessage] %d->session closed %v", packet.SerialNumber, session.IsClosed()) log.Run().Debugf("[handleDisconnectMessage] %d->session closed %v extra:%s", packet.SerialNumber, session.IsClosed())
if conn, ok := client.connStore.Load(packet.SerialNumber); ok { if conn, ok := client.connStore.Load(packet.SerialNumber); ok {
client.OnClose(session, conn.(net.Conn), packet.SerialNumber) client.OnClose(session, conn.(net.Conn), packet.SerialNumber)
} }
...@@ -218,6 +219,25 @@ func (client *Client) handleDestroyMessage() { ...@@ -218,6 +219,25 @@ func (client *Client) handleDestroyMessage() {
client.natTunnel.Close() client.natTunnel.Close()
} }
func (client *Client) AddSession(packet *protocol.MajoraPacket, session *trace.Session) {
if _, ok := client.sessionStore.Load(packet.SerialNumber); ok {
log.Error().Errorf("[AddSession] %d->error, has one", packet.SerialNumber)
}
client.sessionStore.Store(packet.SerialNumber, session)
log.Run().Debugf("[AddSession] %d-> success", packet.SerialNumber)
}
func (client *Client) GetRecorderFromSession(sn int64) trace.Recorder {
session, ok := client.sessionStore.Load(sn)
if !ok {
log.Error().Errorf("[GetSession] get session failed, has one", sn)
session = trace.NewSession("")
}
traceSession := session.(*trace.Session)
return traceSession.Recorder
}
func (client *Client) AddConnection(packet *protocol.MajoraPacket, conn *net.TCPConn, addr string) { func (client *Client) AddConnection(packet *protocol.MajoraPacket, conn *net.TCPConn, addr string) {
if _, ok := client.connStore.Load(packet.SerialNumber); ok { if _, ok := client.connStore.Load(packet.SerialNumber); ok {
log.Error().Errorf("[AddConnection] %d->error, has one", packet.SerialNumber) log.Error().Errorf("[AddConnection] %d->error, has one", packet.SerialNumber)
...@@ -233,8 +253,9 @@ func (client *Client) OnClose(natSession getty.Session, upStreamSession net.Conn ...@@ -233,8 +253,9 @@ func (client *Client) OnClose(natSession getty.Session, upStreamSession net.Conn
log.Error().Errorf("OnClose %+v", err) log.Error().Errorf("OnClose %+v", err)
} }
}() }()
traceRecorder := trace.GetTraceRecorderFromSession(natSession) traceRecorder := client.GetRecorderFromSession(serialNumber)
client.connStore.Delete(serialNumber) client.connStore.Delete(serialNumber)
client.sessionStore.Delete(serialNumber)
_ = upStreamSession.Close() _ = upStreamSession.Close()
client.closeVirtualConnection(natSession, serialNumber) client.closeVirtualConnection(natSession, serialNumber)
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("SN: %d close success", serialNumber)) traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("SN: %d close success", serialNumber))
...@@ -242,7 +263,7 @@ func (client *Client) OnClose(natSession getty.Session, upStreamSession net.Conn ...@@ -242,7 +263,7 @@ func (client *Client) OnClose(natSession getty.Session, upStreamSession net.Conn
//closeVirtualConnection disconnect to server //closeVirtualConnection disconnect to server
func (client *Client) closeVirtualConnection(session getty.Session, serialNumber int64) { func (client *Client) closeVirtualConnection(session getty.Session, serialNumber int64) {
traceRecorder := trace.GetTraceRecorderFromSession(session) traceRecorder := client.GetRecorderFromSession(serialNumber)
log.Run().Debugf("[closeVirtualConnection] %d->session closed %v", serialNumber, session.IsClosed()) log.Run().Debugf("[closeVirtualConnection] %d->session closed %v", serialNumber, session.IsClosed())
if session.IsClosed() { if session.IsClosed() {
......
...@@ -28,7 +28,7 @@ const ( ...@@ -28,7 +28,7 @@ const (
func getLogWriter(path string) zapcore.WriteSyncer { func getLogWriter(path string) zapcore.WriteSyncer {
lumberJackLogger := &lumberjack.Logger{ lumberJackLogger := &lumberjack.Logger{
Filename: path, Filename: path,
MaxSize: 10, MaxSize: 100,
MaxBackups: 5, MaxBackups: 5,
MaxAge: 30, MaxAge: 30,
Compress: false, Compress: false,
...@@ -40,7 +40,7 @@ func getLogWriter(path string) zapcore.WriteSyncer { ...@@ -40,7 +40,7 @@ func getLogWriter(path string) zapcore.WriteSyncer {
} }
func getEncoder() zapcore.Encoder { func getEncoder() zapcore.Encoder {
encoderConfig := zap.NewProductionEncoderConfig() encoderConfig := zap.NewDevelopmentEncoderConfig()
encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
return zapcore.NewConsoleEncoder(encoderConfig) return zapcore.NewConsoleEncoder(encoderConfig)
......
File added
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/adamweixuan/getty"
"go.uber.org/zap" "go.uber.org/zap"
"virjar.com/majora-go/env" "virjar.com/majora-go/env"
...@@ -13,7 +12,7 @@ import ( ...@@ -13,7 +12,7 @@ import (
) )
var ( var (
sessionEventChan = make(chan *sessionEvent, 100) sessionEventChan = make(chan *sessionEvent, 1000)
ConnectEvent = "ConnectEvent" ConnectEvent = "ConnectEvent"
TransferEvent = "TransferEvent" TransferEvent = "TransferEvent"
MajoraSessionName = "MajoraSessionId" MajoraSessionName = "MajoraSessionId"
...@@ -150,13 +149,4 @@ func NewSession(sessionId string) *Session { ...@@ -150,13 +149,4 @@ func NewSession(sessionId string) *Session {
return &Session{ return &Session{
Recorder: acquireRecorder(sessionId), Recorder: acquireRecorder(sessionId),
} }
} }
\ No newline at end of file
func GetTraceRecorderFromSession(session getty.Session) Recorder {
majoraSession := session.GetAttribute(MajoraSessionName)
traceSession, ok := majoraSession.(*Session)
if !ok {
return NewSession(sessionIdNop).Recorder
}
return traceSession.Recorder
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment