Commit 42bbe721 authored by Tsaiilin's avatar Tsaiilin

trace format

parent 941337a6
......@@ -13,7 +13,6 @@ import (
"virjar.com/majora-go/common"
"virjar.com/majora-go/log"
"virjar.com/majora-go/protocol"
"virjar.com/majora-go/safe"
"virjar.com/majora-go/trace"
)
......@@ -33,19 +32,19 @@ func (client *Client) handleHeartbeat(session getty.Session) {
func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty.Session) {
m := decodeMap(packet.Data)
if len(m) <= 0 {
log.Error().Errorf("SN: %d -> get connect map failed", packet.SerialNumber)
log.Error().Errorf("SN:%d -> get connect map failed", packet.SerialNumber)
}
sessionId, ok := m[trace.MajoraSessionName]
if !ok {
log.Error().Errorf("SN: %d -> get sessionId failed", packet.SerialNumber)
log.Error().Errorf("SN:%d -> get sessionId failed", packet.SerialNumber)
}
traceSession := trace.NewSession(sessionId)
session.SetAttribute(trace.MajoraSessionName, traceSession)
traceSession.Recorder.RecordEvent(trace.ConnectEvent, "start handle connect")
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("SN:%d -> start handle connect", packet.SerialNumber))
if session.IsClosed() {
log.Run().Warnf("[handleConnect] %d -> nat server is closed", packet.SerialNumber)
traceSession.Recorder.RecordEvent("[handleConnect]", fmt.Sprintf("SerialNumber: %d -> nat server is closed", packet.SerialNumber))
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent, fmt.Sprintf("SerialNumber: %d -> natServer is closed", packet.SerialNumber), nil)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
......@@ -64,6 +63,8 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty
conn, err := dialer.Dial(common.TCP, packet.Extra)
if err != nil {
log.Error().Errorf("[handleConnect] %d->connect to %s->%s", packet.SerialNumber, packet.Extra, err.Error())
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent,
fmt.Sprintf("SN:%d -> connect to %s failed", packet.SerialNumber, packet.Extra), err)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
......@@ -77,17 +78,20 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty
majoraPacket.Extra = client.config.ClientID
if session.IsClosed() {
log.Run().Warnf("[handleConnect] %d -> nat server is closed", packet.SerialNumber)
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent, fmt.Sprintf("SerialNumber: %d -> natServer is closed", packet.SerialNumber), nil)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
if _, _, err := session.WritePkg(majoraPacket, 0); err != nil {
log.Error().Errorf("[handleConnect] %d->write pkg to nat server with error %s", packet.SerialNumber,
err.Error())
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent, fmt.Sprintf("SerialNumber: %d -> write pkg natServer failed",
packet.SerialNumber), err)
client.closeVirtualConnection(session, packet.SerialNumber)
return
} else {
log.Run().Debugf("[handleConnect] %d->connect success to %s ", packet.SerialNumber, packet.Extra)
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("connect success to %s", packet.Extra))
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("connect to upstream %s success", packet.Extra))
}
client.handleUpStream(tcpConn, packet, session)
}
......@@ -125,12 +129,13 @@ func decodeMap(data []byte) map[string]string {
func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session getty.Session) {
traceRecorder := trace.GetTraceRecorderFromSession(session)
traceRecorder.RecordEvent(trace.TransferEvent, "receive transfer packet from natServer,need to be forward to target")
traceRecorder.RecordEvent(trace.TransferEvent,
fmt.Sprintf("SN:%d - > receive transfer packet from natServer,start to be forward to target", packet.SerialNumber))
load, ok := client.connStore.Load(packet.SerialNumber)
if !ok {
log.Error().Errorf("[handleTransfer] %d-> can not find connection", packet.SerialNumber)
traceMessage := fmt.Sprintf("SerialNumber:%d -> can not find connection", packet.SerialNumber)
traceMessage := fmt.Sprintf("SN:%d -> can not find upsteam connection", packet.SerialNumber)
traceRecorder.RecordErrorEvent(trace.TransferEvent, traceMessage, nil)
client.closeVirtualConnection(session, packet.SerialNumber)
return
......@@ -140,7 +145,7 @@ func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session gett
if err != nil {
log.Error().Errorf("[handleTransfer] %d->write to upstream fail for %s", packet.SerialNumber, err)
traceMessage := fmt.Sprintf("SerialNumber:%d -> write to upstream fail", packet.SerialNumber)
traceMessage := fmt.Sprintf("SN:%d -> write to upstream failed", packet.SerialNumber)
traceRecorder.RecordErrorEvent(trace.TransferEvent, traceMessage, nil)
client.closeVirtualConnection(session, packet.SerialNumber)
return
......@@ -149,20 +154,19 @@ func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session gett
if cnt != len(packet.Data) {
log.Error().Errorf("[handleTransfer] %d-> write not all data for expect->%d/%d",
packet.SerialNumber, len(packet.Data), cnt)
traceMessage := fmt.Sprintf("SerialNumber: %d -> write not all data for expect -> %d/%d", packet.SerialNumber, len(packet.Data), cnt)
traceMessage := fmt.Sprintf("SN: %d -> write not all data for expect -> %d/%d", packet.SerialNumber, len(packet.Data), cnt)
traceRecorder.RecordErrorEvent(trace.TransferEvent, traceMessage, nil)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
log.Error().Debugf("[handleTransfer] %d-> success %+v", packet.SerialNumber, string(packet.Data))
traceMessage := fmt.Sprintf("SerialNumber: %d -> complete transfer", packet.SerialNumber)
log.Run().Debugf("[handleTransfer] %d-> success dataLen: %d", packet.SerialNumber, len(packet.Data))
traceMessage := fmt.Sprintf("SN: %d -> complete transfer", packet.SerialNumber)
traceRecorder.RecordEvent(trace.TransferEvent, traceMessage)
}
func (client *Client) handleUpStream(conn *net.TCPConn, packet *protocol.MajoraPacket, session getty.Session) {
safe.SageGo(func() {
traceRecorder := trace.GetTraceRecorderFromSession(session)
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("SerialNumber:%d -> handleUpStream start", packet.SerialNumber))
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("SN:%d -> handleUpStream start", packet.SerialNumber))
log.Run().Debugf("[handleUpStream] %d-> handleUpStream start...", packet.SerialNumber)
for {
buf := make([]byte, common.BufSize) // 4k
......@@ -170,13 +174,13 @@ func (client *Client) handleUpStream(conn *net.TCPConn, packet *protocol.MajoraP
if err != nil {
log.Run().Debugf("[handleUpStream] %d->read with error:%+v,l:%s->r:%s",
packet.SerialNumber, err, conn.LocalAddr(), conn.RemoteAddr())
recorderMessage := fmt.Sprintf("SerialNumber: %d -> read with l:%s->r:%s",
recorderMessage := fmt.Sprintf("SN: %d -> read with l:%s->r:%s",
packet.SerialNumber, conn.LocalAddr(), conn.RemoteAddr())
traceRecorder.RecordErrorEvent(trace.UpStreamEvent, recorderMessage, err)
client.OnClose(session, conn, packet.SerialNumber)
break
}
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("SerialNumber: %d read count: %d",
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("SN: %d read count: %d",
packet.SerialNumber, cnt))
pack := protocol.TypeTransfer.CreatePacket()
pack.Data = buf[:cnt]
......@@ -185,22 +189,21 @@ func (client *Client) handleUpStream(conn *net.TCPConn, packet *protocol.MajoraP
if _, _, err := session.WritePkg(pack, 0); err != nil {
log.Error().Errorf("[handleUpStream] %d-> write to server fail %+v", packet.SerialNumber, err.Error())
traceRecorder.RecordErrorEvent(trace.UpStreamEvent,
fmt.Sprintf("SerialNumber: %d -> write to natServer failed", packet.SerialNumber), err)
fmt.Sprintf("SN: %d -> write to natServer failed", packet.SerialNumber), err)
client.OnClose(session, conn, packet.SerialNumber)
break
} else {
log.Run().Debugf("[handleUpStream] %d->success %+v", packet.SerialNumber, string(packet.Data))
log.Run().Debugf("[handleUpStream] %d->success dataLen:%d", packet.SerialNumber, len(packet.Data))
traceRecorder.RecordEvent(trace.UpStreamEvent,
fmt.Sprintf("SerialNumber: %d write to natServer success", packet.SerialNumber))
fmt.Sprintf("SN: %d write to natServer success", packet.SerialNumber))
}
}
})
}
func (client *Client) handleDisconnectMessage(session getty.Session, packet *protocol.MajoraPacket) {
traceRecorder := trace.GetTraceRecorderFromSession(session)
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("SN: %d start close session", packet.SerialNumber))
log.Run().Debugf("[handleDisconnectMessage] %d->session closesd %v", packet.SerialNumber, session.IsClosed())
log.Run().Debugf("[handleDisconnectMessage] %d->session closed %v", packet.SerialNumber, session.IsClosed())
if conn, ok := client.connStore.Load(packet.SerialNumber); ok {
client.OnClose(session, conn.(net.Conn), packet.SerialNumber)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment