Commit d134309b authored by Tsaiilin's avatar Tsaiilin

trace 日志调整

parent 352e80f3
......@@ -7,6 +7,7 @@ import (
"net"
"strings"
"sync"
"time"
"github.com/adamweixuan/getty"
......@@ -32,25 +33,28 @@ func (client *Client) handleHeartbeat(session getty.Session) {
func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty.Session) {
m := decodeMap(packet.Data)
if len(m) <= 0 {
log.Error().Errorf("SN:%d -> get connect map failed", packet.SerialNumber)
log.Error().Errorf("Get map data from connect packet failed (sn:%d)", packet.SerialNumber)
}
sessionId, ok := m[trace.MajoraSessionName]
if !ok {
log.Error().Errorf("SN:%d -> get sessionId failed", packet.SerialNumber)
log.Error().Errorf("Get sessionId from connect packet failed (sn:%d)", packet.SerialNumber)
}
traceSession := trace.NewSession(sessionId)
client.AddSession(packet, traceSession)
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("SN:%d -> start handle connect", packet.SerialNumber))
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Start handle connect to %s (sn:%d)",
packet.Extra, packet.SerialNumber))
if session.IsClosed() {
log.Run().Warnf("[handleConnect] %d -> nat server is closed", packet.SerialNumber)
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent, fmt.Sprintf("SerialNumber: %d -> natServer is closed", packet.SerialNumber), nil)
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent, fmt.Sprintf("NatServer is closed (sn:%d)", packet.SerialNumber), nil)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
hostPort := strings.Split(packet.Extra, ":")
if len(packet.Extra) == 0 || len(hostPort) != 2 {
log.Error().Errorf("[handleConnect] invalid extra %s", packet.Extra)
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent,
fmt.Sprintf("Connect extra invalid %s (%d)", packet.Extra, packet.SerialNumber), nil)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
......@@ -64,36 +68,40 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty
if err != nil {
log.Error().Errorf("[handleConnect] %d->connect to %s->%s", packet.SerialNumber, packet.Extra, err.Error())
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent,
fmt.Sprintf("SN:%d -> connect to %s failed", packet.SerialNumber, packet.Extra), err)
fmt.Sprintf("Connect to %s failed (sn:%d)", packet.Extra, packet.SerialNumber), err)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
tcpConn := conn.(*net.TCPConn)
_ = tcpConn.SetNoDelay(true)
_ = tcpConn.SetKeepAlive(true)
client.AddConnection(packet, tcpConn, packet.Extra)
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Connect to %s success (sn:%d)",
packet.Extra, packet.SerialNumber))
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Start replay natServer connect ready (sn:%d)", packet.SerialNumber))
majoraPacket := protocol.TypeConnectReady.CreatePacket()
majoraPacket.SerialNumber = packet.SerialNumber
majoraPacket.Extra = client.config.ClientID
if session.IsClosed() {
log.Run().Warnf("[handleConnect] %d -> nat server is closed", packet.SerialNumber)
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent, fmt.Sprintf("SerialNumber: %d -> natServer is closed", packet.SerialNumber), nil)
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent, fmt.Sprintf("NatServer is closed (sn:%d)", packet.SerialNumber),
nil)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
if _, _, err := session.WritePkg(majoraPacket, 0); err != nil {
log.Error().Errorf("[handleConnect] %d->write pkg to nat server with error %s", packet.SerialNumber,
err.Error())
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent, fmt.Sprintf("SerialNumber: %d -> write pkg natServer failed",
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent, fmt.Sprintf("Write pkg to natServer failed (sn:%d)",
packet.SerialNumber), err)
client.closeVirtualConnection(session, packet.SerialNumber)
return
} else {
log.Run().Debugf("[handleConnect] %d->connect success to %s ", packet.SerialNumber, packet.Extra)
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("connect to upstream %s success", packet.Extra))
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Replay natServer connect ready success (sn:%d)", packet.SerialNumber))
client.handleUpStream(tcpConn, packet, session)
}
client.handleUpStream(tcpConn, packet, session)
}
func decodeMap(data []byte) map[string]string {
......@@ -131,23 +139,22 @@ func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session gett
traceRecorder := client.GetRecorderFromSession(packet.SerialNumber)
traceRecorder.RecordEvent(trace.TransferEvent,
fmt.Sprintf("SN:%d - > receive transfer packet from natServer,start to be forward to target", packet.SerialNumber))
fmt.Sprintf("Receive transfer packet from natServer,start to be forward to target (%d)", packet.SerialNumber))
load, ok := client.connStore.Load(packet.SerialNumber)
if !ok {
log.Error().Errorf("[handleTransfer] %d-> can not find connection", packet.SerialNumber)
traceMessage := fmt.Sprintf("SN:%d -> can not find upsteam connection", packet.SerialNumber)
traceMessage := fmt.Sprintf("Find upsteam connection failed (%d)", packet.SerialNumber)
traceRecorder.RecordErrorEvent(trace.TransferEvent, traceMessage, nil)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
conn := load.(*net.TCPConn)
cnt, err := conn.Write(packet.Data)
if err != nil {
log.Error().Errorf("[handleTransfer] %d->write to upstream fail for %s", packet.SerialNumber, err)
traceMessage := fmt.Sprintf("SN:%d -> write to upstream failed", packet.SerialNumber)
traceRecorder.RecordErrorEvent(trace.TransferEvent, traceMessage, nil)
traceMessage := fmt.Sprintf("Write to upstream failed (%d)", packet.SerialNumber)
traceRecorder.RecordErrorEvent(trace.TransferEvent, traceMessage, err)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
......@@ -155,58 +162,77 @@ func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session gett
if cnt != len(packet.Data) {
log.Error().Errorf("[handleTransfer] %d-> write not all data for expect->%d/%d",
packet.SerialNumber, len(packet.Data), cnt)
traceMessage := fmt.Sprintf("SN: %d -> write not all data for expect -> %d/%d", packet.SerialNumber, len(packet.Data), cnt)
traceMessage := fmt.Sprintf("Write not all data for expect -> %d/%d (sn:%d)", len(packet.Data), cnt, packet.SerialNumber)
traceRecorder.RecordErrorEvent(trace.TransferEvent, traceMessage, nil)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
log.Run().Debugf("[handleTransfer] %d-> success dataLen: %d", packet.SerialNumber, len(packet.Data))
traceMessage := fmt.Sprintf("SN: %d -> complete transfer", packet.SerialNumber)
traceMessage := fmt.Sprintf("transfer data success (%d)", packet.SerialNumber)
traceRecorder.RecordEvent(trace.TransferEvent, traceMessage)
}
func (client *Client) handleUpStream(conn *net.TCPConn, packet *protocol.MajoraPacket, session getty.Session) {
traceRecorder := client.GetRecorderFromSession(packet.SerialNumber)
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("SN:%d -> handleUpStream start", packet.SerialNumber))
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("Ready read from upstream (sn:%d)", packet.SerialNumber))
log.Run().Debugf("[handleUpStream] %d-> handleUpStream start...", packet.SerialNumber)
for {
buf := make([]byte, common.BufSize) // 4k
cnt, err := conn.Read(buf)
if err != nil {
log.Run().Debugf("[handleUpStream] %d->read with error:%+v,l:%s->r:%s",
packet.SerialNumber, err, conn.LocalAddr(), conn.RemoteAddr())
recorderMessage := fmt.Sprintf("SN: %d -> read with l:%s->r:%s",
packet.SerialNumber, conn.LocalAddr(), conn.RemoteAddr())
traceRecorder.RecordErrorEvent(trace.UpStreamEvent, recorderMessage, err)
opErr, ok := err.(*net.OpError)
if ok && opErr.Err.Error() == "i/o timeout" {
recorderMessage := fmt.Sprintf("Upstream deadDeadline start close (sn:%d)", packet.SerialNumber)
traceRecorder.RecordEvent(trace.UpStreamEvent, recorderMessage)
} else {
log.Run().Debugf("[handleUpStream] %d->read with error:%+v,l:%s->r:%s",
packet.SerialNumber, err, conn.LocalAddr(), conn.RemoteAddr())
recorderMessage := fmt.Sprintf("Read with l:%s->r:%s (sn:%d) ",
conn.LocalAddr(), conn.RemoteAddr(), packet.SerialNumber)
traceRecorder.RecordErrorEvent(trace.UpStreamEvent, recorderMessage, err)
}
client.OnClose(session, conn, packet.SerialNumber)
break
}
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("SN: %d read count: %d",
packet.SerialNumber, cnt))
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("read count: %d (sn:%d)",
cnt, packet.SerialNumber))
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("Start transfer data to natServer (sn:%d)", packet.SerialNumber))
pack := protocol.TypeTransfer.CreatePacket()
pack.Data = buf[:cnt]
pack.SerialNumber = packet.SerialNumber
if _, _, err := session.WritePkg(pack, 0); err != nil {
log.Error().Errorf("[handleUpStream] %d-> write to server fail %+v", packet.SerialNumber, err.Error())
traceRecorder.RecordErrorEvent(trace.UpStreamEvent,
fmt.Sprintf("SN: %d -> write to natServer failed", packet.SerialNumber), err)
fmt.Sprintf("Write to natServer failed (sn:%d)", packet.SerialNumber), err)
client.OnClose(session, conn, packet.SerialNumber)
break
} else {
log.Run().Debugf("[handleUpStream] %d->success dataLen:%d", packet.SerialNumber, len(packet.Data))
traceRecorder.RecordEvent(trace.UpStreamEvent,
fmt.Sprintf("SN: %d write to natServer success", packet.SerialNumber))
fmt.Sprintf("Write to natServer success (sn:%d)", packet.SerialNumber))
}
}
}
func (client *Client) handleDisconnectMessage(session getty.Session, packet *protocol.MajoraPacket) {
traceRecorder := client.GetRecorderFromSession(packet.SerialNumber)
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("SN: %d start close session extra:%s", packet.SerialNumber, packet.Extra))
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("Start close upstream extra:%s (sn:%d)",
packet.Extra, packet.SerialNumber))
log.Run().Debugf("[handleDisconnectMessage] %d->session closed %v extra:%s", packet.SerialNumber, session.IsClosed())
if conn, ok := client.connStore.Load(packet.SerialNumber); ok {
client.OnClose(session, conn.(net.Conn), packet.SerialNumber)
upstreamConn := conn.(*net.TCPConn)
readDeadLine := time.Now().Add(3 * time.Second)
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("Set upstream read deadline:%s (sn:%d)",
readDeadLine.Format("2006-01-02 15:04:05.000000"), packet.SerialNumber))
err := upstreamConn.SetReadDeadline(readDeadLine)
if err != nil {
traceRecorder.RecordErrorEvent(trace.DisconnectEvent,
fmt.Sprintf("Set upstream read deadline failed (sn:%d)", packet.SerialNumber), err)
client.OnClose(session, upstreamConn, packet.SerialNumber)
}
} else {
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("The upstream connection is closed, do nothing (sn:%d)", packet.SerialNumber))
}
}
......@@ -253,12 +279,8 @@ func (client *Client) OnClose(natSession getty.Session, upStreamSession net.Conn
log.Error().Errorf("OnClose %+v", err)
}
}()
traceRecorder := client.GetRecorderFromSession(serialNumber)
client.connStore.Delete(serialNumber)
client.sessionStore.Delete(serialNumber)
_ = upStreamSession.Close()
client.closeVirtualConnection(natSession, serialNumber)
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("SN: %d close success", serialNumber))
}
//closeVirtualConnection disconnect to server
......@@ -271,7 +293,7 @@ func (client *Client) closeVirtualConnection(session getty.Session, serialNumber
return
}
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("SN: %d -> start send disconnect to natServer", serialNumber))
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("Start send disconnect to natServer (sn:%d)", serialNumber))
majoraPacket := protocol.TypeDisconnect.CreatePacket()
majoraPacket.SerialNumber = serialNumber
majoraPacket.Extra = client.config.ClientID
......@@ -279,10 +301,12 @@ func (client *Client) closeVirtualConnection(session getty.Session, serialNumber
log.Run().Warnf("[closeVirtualConnection] ->%d error %s session closed %v allCnt %d sendCnt %d",
serialNumber, err.Error(), session.IsClosed(), allCnt, sendCnt)
traceRecorder.RecordErrorEvent(trace.DisconnectEvent,
fmt.Sprintf("SN: %d -> send disconnect failed closed:%v allCnt %d sendCnt %d",
serialNumber, session.IsClosed(), allCnt, sendCnt), err)
fmt.Sprintf("Send disconnect to natServer failed closed:%v allCnt %d sendCnt %d (sn:%d)",
session.IsClosed(), allCnt, sendCnt, serialNumber), err)
}
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("SN: %d -> send disconnect to natServer success", serialNumber))
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("Send disconnect to natServer success (sn:%d)", serialNumber))
client.connStore.Delete(serialNumber)
client.sessionStore.Delete(serialNumber)
}
func (client *Client) CloseAll(session getty.Session) {
......
......@@ -15,7 +15,7 @@ var (
ConnectEvent = "ConnectEvent"
TransferEvent = "TransferEvent"
MajoraSessionName = "MajoraSessionId"
UpStreamEvent = "UpStream"
UpStreamEvent = "ReadUpStream"
DisconnectEvent = "Disconnect"
sessionIdNop = "session_id_not_set"
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment