Commit 0fcf0748 authored by Tsaiilin's avatar Tsaiilin

考虑特殊情况下 sessionid 获取不到的情况

parent 49627f30
...@@ -89,7 +89,7 @@ func parseFromCmd(cfg *model.Configure) { ...@@ -89,7 +89,7 @@ func parseFromCmd(cfg *model.Configure) {
func cli(cfg *model.Configure) { func cli(cfg *model.Configure) {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
log.Run().Errorf("cli panic %+v", err) log.Error().Errorf("cli panic %+v", err)
} }
}() }()
log.Run().Infof("cpu count %d proc %d", runtime.NumCPU(), runtime.NumCPU()*2) log.Run().Infof("cpu count %d proc %d", runtime.NumCPU(), runtime.NumCPU()*2)
......
...@@ -36,7 +36,7 @@ func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int ...@@ -36,7 +36,7 @@ func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int
frameLen, err := common.ReadInt32(reader) frameLen, err := common.ReadInt32(reader)
if err != nil { if err != nil {
log.Run().Errorf("[PacketCodec] frameLen error %+v", err) log.Error().Errorf("[PacketCodec] frameLen error %+v", err)
return nil, 0, err return nil, 0, err
} }
...@@ -51,7 +51,7 @@ func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int ...@@ -51,7 +51,7 @@ func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int
// type // type
msgType, err := common.ReadByte(reader) msgType, err := common.ReadByte(reader)
if err != nil { if err != nil {
log.Run().Errorf("[PacketCodec] read type error %+v", err) log.Error().Errorf("[PacketCodec] read type error %+v", err)
return nil, 0, err return nil, 0, err
} }
...@@ -63,7 +63,7 @@ func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int ...@@ -63,7 +63,7 @@ func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int
// num // num
pack.SerialNumber, err = common.ReadInt64(reader) pack.SerialNumber, err = common.ReadInt64(reader)
if err != nil { if err != nil {
log.Run().Errorf("[PacketCodec] read num error %+v", err) log.Error().Errorf("[PacketCodec] read num error %+v", err)
return nil, len(data), nil return nil, len(data), nil
} }
...@@ -72,13 +72,13 @@ func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int ...@@ -72,13 +72,13 @@ func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int
// extra size // extra size
extraSize, err := common.ReadByte(reader) extraSize, err := common.ReadByte(reader)
if err != nil { if err != nil {
log.Run().Errorf("[PacketCodec] read extra size error %+v", err) log.Error().Errorf("[PacketCodec] read extra size error %+v", err)
return nil, len(data), nil return nil, len(data), nil
} }
extra, err := common.ReadN(int(extraSize), reader) extra, err := common.ReadN(int(extraSize), reader)
if err != nil { if err != nil {
log.Run().Errorf("[PacketCodec] read extra error %+v", err) log.Error().Errorf("[PacketCodec] read extra error %+v", err)
return nil, len(data), nil return nil, len(data), nil
} }
pack.Extra = string(extra) pack.Extra = string(extra)
...@@ -87,14 +87,14 @@ func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int ...@@ -87,14 +87,14 @@ func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int
// dataFrame // dataFrame
dataSize := int(frameLen) - common.TypeSize - common.SerialNumberSize - common.ExtraSize - int(extraSize) dataSize := int(frameLen) - common.TypeSize - common.SerialNumberSize - common.ExtraSize - int(extraSize)
if dataSize < 0 { if dataSize < 0 {
log.Run().Errorf("[PacketCodec] read frameLen error %+v", err) log.Error().Errorf("[PacketCodec] read frameLen error %+v", err)
return nil, len(data), common.ErrInvalidSize return nil, len(data), common.ErrInvalidSize
} }
if dataSize > 0 { if dataSize > 0 {
data, err := common.ReadN(dataSize, reader) data, err := common.ReadN(dataSize, reader)
if err != nil { if err != nil {
log.Run().Errorf("[PacketCodec] read data error %+v", err) log.Error().Errorf("[PacketCodec] read data error %+v", err)
return nil, len(data), nil return nil, len(data), nil
} }
pack.Data = data pack.Data = data
......
...@@ -31,13 +31,20 @@ func (client *Client) handleHeartbeat(session getty.Session) { ...@@ -31,13 +31,20 @@ func (client *Client) handleHeartbeat(session getty.Session) {
func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty.Session) { func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty.Session) {
go func(packet *protocol.MajoraPacket, session getty.Session) { go func(packet *protocol.MajoraPacket, session getty.Session) {
defer func() {
if err := recover(); err != nil {
log.Error().Errorf("handleConnect panic %+v", err)
}
}()
m := decodeMap(packet.Data) m := decodeMap(packet.Data)
if len(m) <= 0 { if len(m) <= 0 {
log.Error().Errorf("SN: %d -> get connect map failed", packet.SerialNumber) log.Error().Errorf("SN: %d -> get connect map failed", packet.SerialNumber)
client.closeVirtualConnection(session, packet.SerialNumber)
return
} }
traceSession := trace.NewSession(m[trace.MajoraSessionName]) sessionId, ok := m[trace.MajoraSessionName]
if !ok {
log.Error().Errorf("SN: %d -> get sessionId failed", packet.SerialNumber)
}
traceSession := trace.NewSession(sessionId)
session.SetAttribute(trace.MajoraSessionName, traceSession) session.SetAttribute(trace.MajoraSessionName, traceSession)
traceSession.Recorder.RecordEvent(trace.ConnectEvent, "start handle connect") traceSession.Recorder.RecordEvent(trace.ConnectEvent, "start handle connect")
...@@ -93,6 +100,9 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty ...@@ -93,6 +100,9 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty
func decodeMap(data []byte) map[string]string { func decodeMap(data []byte) map[string]string {
result := make(map[string]string, 1) result := make(map[string]string, 1)
if len(data) <= 0 {
return result
}
var headerSize int8 var headerSize int8
err := binary.Read(bytes.NewBuffer(data[:1]), binary.BigEndian, &headerSize) err := binary.Read(bytes.NewBuffer(data[:1]), binary.BigEndian, &headerSize)
data = data[1:] data = data[1:]
...@@ -124,10 +134,7 @@ func decodeMap(data []byte) map[string]string { ...@@ -124,10 +134,7 @@ func decodeMap(data []byte) map[string]string {
func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session getty.Session) { func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session getty.Session) {
go func(packet *protocol.MajoraPacket, session getty.Session) { go func(packet *protocol.MajoraPacket, session getty.Session) {
traceRecorder, err := trace.GetTraceRecorderFromSession(session) traceRecorder := trace.GetTraceRecorderFromSession(session)
if err != nil {
log.Run().Errorf("%s SerialNumber: %s", err.Error(), packet.SerialNumber)
}
traceRecorder.RecordEvent(trace.TransferEvent, "receive transfer packet from natServer,need to be forward to target") traceRecorder.RecordEvent(trace.TransferEvent, "receive transfer packet from natServer,need to be forward to target")
load, ok := client.connStore.Load(packet.SerialNumber) load, ok := client.connStore.Load(packet.SerialNumber)
...@@ -165,10 +172,7 @@ func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session gett ...@@ -165,10 +172,7 @@ func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session gett
func (client *Client) handleUpStream(conn *net.TCPConn, packet *protocol.MajoraPacket, session getty.Session) { func (client *Client) handleUpStream(conn *net.TCPConn, packet *protocol.MajoraPacket, session getty.Session) {
go func() { go func() {
traceRecorder, err := trace.GetTraceRecorderFromSession(session) traceRecorder := trace.GetTraceRecorderFromSession(session)
if err != nil {
log.Run().Errorf("%s SerialNumber: %s", err.Error(), packet.SerialNumber)
}
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("SerialNumber:%d -> handleUpStream start", packet.SerialNumber)) traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("SerialNumber:%d -> handleUpStream start", packet.SerialNumber))
log.Run().Debugf("[handleUpStream] %d-> handleUpStream start...", packet.SerialNumber) log.Run().Debugf("[handleUpStream] %d-> handleUpStream start...", packet.SerialNumber)
for { for {
...@@ -207,10 +211,7 @@ func (client *Client) handleUpStream(conn *net.TCPConn, packet *protocol.MajoraP ...@@ -207,10 +211,7 @@ func (client *Client) handleUpStream(conn *net.TCPConn, packet *protocol.MajoraP
func (client *Client) handleDisconnectMessage(session getty.Session, packet *protocol.MajoraPacket) { func (client *Client) handleDisconnectMessage(session getty.Session, packet *protocol.MajoraPacket) {
go func() { go func() {
traceRecorder, err := trace.GetTraceRecorderFromSession(session) traceRecorder := trace.GetTraceRecorderFromSession(session)
if err != nil {
log.Run().Errorf("%s SerialNumber: %s", err.Error(), packet.SerialNumber)
}
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("SN: %d start close session", packet.SerialNumber)) traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("SN: %d start close session", packet.SerialNumber))
log.Run().Debugf("[handleDisconnectMessage] %d->session closesd %v", packet.SerialNumber, session.IsClosed()) log.Run().Debugf("[handleDisconnectMessage] %d->session closesd %v", packet.SerialNumber, session.IsClosed())
if conn, ok := client.connStore.Load(packet.SerialNumber); ok { if conn, ok := client.connStore.Load(packet.SerialNumber); ok {
...@@ -247,10 +248,7 @@ func (client *Client) OnClose(natSession getty.Session, upStreamSession net.Conn ...@@ -247,10 +248,7 @@ func (client *Client) OnClose(natSession getty.Session, upStreamSession net.Conn
log.Error().Errorf("OnClose %+v", err) log.Error().Errorf("OnClose %+v", err)
} }
}() }()
traceRecorder, err := trace.GetTraceRecorderFromSession(natSession) traceRecorder := trace.GetTraceRecorderFromSession(natSession)
if err != nil {
log.Run().Errorf("%s SerialNumber: %s", err.Error(), serialNumber)
}
client.connStore.Delete(serialNumber) client.connStore.Delete(serialNumber)
_ = upStreamSession.Close() _ = upStreamSession.Close()
client.closeVirtualConnection(natSession, serialNumber) client.closeVirtualConnection(natSession, serialNumber)
...@@ -259,10 +257,7 @@ func (client *Client) OnClose(natSession getty.Session, upStreamSession net.Conn ...@@ -259,10 +257,7 @@ func (client *Client) OnClose(natSession getty.Session, upStreamSession net.Conn
//closeVirtualConnection disconnect to server //closeVirtualConnection disconnect to server
func (client *Client) closeVirtualConnection(session getty.Session, serialNumber int64) { func (client *Client) closeVirtualConnection(session getty.Session, serialNumber int64) {
traceRecorder, err := trace.GetTraceRecorderFromSession(session) traceRecorder := trace.GetTraceRecorderFromSession(session)
if err != nil {
log.Run().Errorf("%s SerialNumber: %s", err.Error(), serialNumber)
}
log.Run().Debugf("[closeVirtualConnection] %d->session closed %v", serialNumber, session.IsClosed()) log.Run().Debugf("[closeVirtualConnection] %d->session closed %v", serialNumber, session.IsClosed())
if session.IsClosed() { if session.IsClosed() {
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/adamweixuan/getty" "github.com/adamweixuan/getty"
"virjar.com/majora-go/common" "virjar.com/majora-go/common"
"virjar.com/majora-go/log" "virjar.com/majora-go/log"
"virjar.com/majora-go/protocol" "virjar.com/majora-go/protocol"
...@@ -24,7 +25,7 @@ func (m *MajoraEventListener) OnOpen(session getty.Session) error { ...@@ -24,7 +25,7 @@ func (m *MajoraEventListener) OnOpen(session getty.Session) error {
log.Error().Errorf("register to server error %+v", err) log.Error().Errorf("register to server error %+v", err)
return err return err
} }
log.Run().Infof("[OnOpen] registe to %s success", m.client.config.TunnelAddr) log.Run().Infof("[OnOpen] register to %s success", m.client.config.TunnelAddr)
return nil return nil
} }
......
package trace package trace
import ( import (
"errors"
"github.com/adamweixuan/getty" "github.com/adamweixuan/getty"
"go.uber.org/zap" "go.uber.org/zap"
"time" "time"
...@@ -15,6 +14,7 @@ var ( ...@@ -15,6 +14,7 @@ var (
MajoraSessionName = "MajoraSession" MajoraSessionName = "MajoraSession"
UpStreamEvent = "UpStream" UpStreamEvent = "UpStream"
DisconnectEvent = "Disconnect" DisconnectEvent = "Disconnect"
sessionIdNop = "session_id_not_set"
) )
func init() { func init() {
...@@ -128,16 +128,19 @@ type Session struct { ...@@ -128,16 +128,19 @@ type Session struct {
} }
func NewSession(sessionId string) *Session { func NewSession(sessionId string) *Session {
if len(sessionId) == 0 {
sessionId = sessionIdNop
}
return &Session{ return &Session{
Recorder: AcquireRecorder(sessionId), Recorder: AcquireRecorder(sessionId),
} }
} }
func GetTraceRecorderFromSession(session getty.Session) (Recorder, error) { func GetTraceRecorderFromSession(session getty.Session) Recorder {
majoraSession := session.GetAttribute(MajoraSessionName) majoraSession := session.GetAttribute(MajoraSessionName)
traceSession, ok := majoraSession.(*Session) traceSession, ok := majoraSession.(*Session)
if !ok { if !ok {
return nil, errors.New("Get trace session failed") return NewSession(sessionIdNop).Recorder
} }
return traceSession.Recorder, nil return traceSession.Recorder
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment