Commit ee78d98f authored by wei.xuan's avatar wei.xuan

Merge branch 'v0.0.4' into 'master'

V0.0.4

See merge request !3
parents 01f4e7be ace39c8e
......@@ -3,7 +3,7 @@ export GOPROXY="https://goproxy.cn,https://goproxy.io,direct"
LDFLAGS := -s -w
DATE=$(shell date +"%Y-%m-%d")
BUILDINFO := -X main.Version=v0.0.3 -X main.Date=$(DATE)
BUILDINFO := -X main.Version=v0.0.4 -X main.Date=$(DATE)
all:
env CGO_ENABLED=0 go build -trimpath -ldflags '-w -s $(BUILDINFO)' -o bin/majora
......
......@@ -9,7 +9,7 @@ import (
"math/rand"
"net"
"net/http"
_ "net/http/pprof" //nolint:gosec
_ "net/http/pprof"
"os"
"runtime"
"time"
......@@ -53,10 +53,11 @@ func init() {
func initial(cfg *model.Configure) {
logger.SetLogLevel(cfg.LogLevel)
addr := fmt.Sprintf("127.0.0.1:%d", cfg.PprofPort)
logger.Info().Msgf("enable pprof: %s", addr)
if cfg.PprofPort > 0 {
go func() {
log.Printf("enable pprof: %s", common.PprofAddr)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", cfg.PprofPort), nil))
log.Fatal(http.ListenAndServe(addr, nil))
}()
}
......
......@@ -5,7 +5,6 @@ import (
"errors"
"io"
"net"
"os"
"sync"
"sync/atomic"
......@@ -22,7 +21,6 @@ type Client struct {
natTunnel atomic.Value
Codec protocol.ICodec
connStore sync.Map
cleanup chan struct{}
}
func NewClientWithConf(cfg *model.Configure) {
......@@ -44,7 +42,6 @@ func NewCli(cfg *model.Configure) *Client {
natTunnel: atomic.Value{},
Codec: protocol.Codec,
connStore: sync.Map{},
cleanup: make(chan struct{}),
}
return client
......@@ -53,19 +50,7 @@ func NewCli(cfg *model.Configure) *Client {
func (client *Client) StartUp() {
client.connect()
client.register()
if client.config.Redial.Invalid() {
client.Redial()
}
go func() {
client.handleNatEvent()
}()
// 退出旧的进程
for range client.cleanup {
os.Exit(0)
}
}
func (client *Client) register() {
......@@ -79,26 +64,36 @@ func (client *Client) register() {
} else {
logger.Info().Msgf("client %s register to nat server %s success", client.config.ClientID, client.config.TunnelAddr)
}
if client.config.Redial.Invalid() {
client.Redial()
}
}
func (client *Client) handleNatEvent() {
for {
reader := bufio.NewReader(client.natTunnel.Load().(net.Conn))
// todo 支持 timeout检测
conn, ok := client.natTunnel.Load().(net.Conn)
if !ok {
logger.Error().Msgf("[core] tunnel server is invalid, reconnect...")
client.reConnect()
continue
}
reader := bufio.NewReader(conn)
majoraPacket, err := client.Codec.Decode(reader)
if errors.Is(err, io.EOF) {
// 清理本地session
client.connStore = sync.Map{}
client.reConnect()
continue
}
if majoraPacket == nil || err != nil {
logger.Error().Msgf("decode error %+v", err)
if err != nil {
logger.Error().Msgf("decode_error %s->%+v", conn.RemoteAddr(), err)
}
client.reConnect()
continue
}
logger.Debug().Msgf("receive HeartbeatPacket type %s", majoraPacket.Ttype.ToString())
logger.Debug().Msgf("receive packet type %s", majoraPacket.Ttype.ToString())
switch majoraPacket.Ttype {
case protocol.TypeHeartbeat:
client.handleHeartbeatMessage()
......
......@@ -3,6 +3,7 @@ package client
import (
"fmt"
"net"
"runtime"
"sync"
"time"
......@@ -41,6 +42,7 @@ func (client *Client) connect() {
}
func (client *Client) reConnect() {
client.cleanSession()
// 已经check 过
hostPort := client.config.TunnelAddr
......@@ -67,7 +69,7 @@ func (client *Client) reConnect() {
}
}
logger.Info().Msgf("reconnect to nathost %s success ...", hostPort)
logger.Info().Msgf("%s reconnect to nathost %s success ...", client.config.ClientID, hostPort)
client.natTunnel.Store(conn)
client.connStore = sync.Map{}
client.register()
......@@ -83,12 +85,32 @@ func (client *Client) Redial() {
timer.Reset(client.config.Redial.RedialDuration)
}
<-timer.C
majoraPacket := protocol.TypeDisconnect.CreatePacket()
majoraPacket := protocol.TypeOffline.CreatePacket()
if err := client.WriteAndFlush(majoraPacket); err != nil {
logger.Warn().Msgf("flush to nat server error %s", err.Error())
}
logger.Info().Msgf("====================redial start ==============")
time.Sleep(client.config.Redial.WaitTime)
infra.Redial(client.config, client.cleanup)
infra.Redial(client.config)
client.reConnect()
}
}()
}
func (client *Client) cleanSession() {
logger.Info().Msgf("cleanSession [before] thread cnt %d", runtime.NumGoroutine())
client.connStore.Range(func(key, value interface{}) bool {
serialId, _ := key.(int64)
conn, ok := value.(net.Conn)
if ok && conn != nil {
if err := conn.Close(); err != nil {
logger.Error().Msgf("cleanSession close %s with error %s", conn.RemoteAddr(), err)
} else {
logger.Error().Msgf("cleanSession close %d->s with success", serialId, conn.RemoteAddr())
}
}
client.connStore.Delete(serialId)
return true
})
logger.Info().Msgf("cleanSession [after] thread cnt %d", runtime.NumGoroutine())
}
......@@ -2,9 +2,7 @@ package client
import (
"bufio"
"errors"
"fmt"
"io"
"net"
"strings"
......@@ -21,7 +19,7 @@ var (
// todo 心跳超时检测
func (client *Client) handleHeartbeatMessage() {
go func() {
//logger.Debug().Msg("receive heartbeat message from nat server")
logger.Debug().Msg("receive heartbeat message from nat server")
if err := client.WriteAndFlush(HeartbeatPacket); err != nil {
logger.Error().Msgf("flush heart beat message error %s", err.Error())
}
......@@ -41,7 +39,6 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket) {
client.closeVirtualConnection(packet, "invalid extra "+packet.Extra)
return
}
logger.Info().Msgf("handleConnect to %s", hostPort)
var (
conn net.Conn
......@@ -72,7 +69,7 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket) {
majoraPacket.Extra = client.config.ClientID
if err := client.WriteAndFlush(majoraPacket); err != nil {
logger.Error().Msgf("handleConnect message error %s", err.Error())
logger.Error().Msgf("handleConnect message error %d->%s", packet.SerialNumber, err.Error())
// close && clean
_ = conn.Close()
client.removeConnection(packet, "client:"+err.Error())
......@@ -105,23 +102,24 @@ func (client *Client) handleTransfer(packet *protocol.MajoraPacket) {
go func(packet *protocol.MajoraPacket) {
conn, ok := client.GetConnection(packet, "handleTransfer")
if !ok {
client.closeVirtualConnection(packet, "")
client.closeVirtualConnection(packet, "handleTransfer")
return
}
writer := bufio.NewWriter(conn)
flush(client, writer, packet)
logger.Debug().Msgf("handleTransfer success %d->%+v", packet.SerialNumber, string(packet.Data))
}(packet)
}
func flush(client *Client, writer *bufio.Writer, packet *protocol.MajoraPacket) {
if cnt, err := writer.Write(packet.Data); err != nil {
logger.Warn().Msgf("write with error cnt=%d|err=%+v", cnt, err)
client.removeConnection(packet, "write_error")
client.removeConnection(packet, "write_error:"+err.Error())
}
if err := writer.Flush(); err != nil {
logger.Warn().Msgf("flush with error err=%+v", err)
client.removeConnection(packet, "write_error")
client.removeConnection(packet, "write_error:"+err.Error())
}
}
......@@ -130,13 +128,8 @@ func (client *Client) handleConnection(conn net.Conn, packet *protocol.MajoraPac
reader := bufio.NewReader(conn)
for {
if _, err := reader.Peek(1); err != nil {
if !errors.Is(err, net.ErrClosed) && !errors.Is(err, io.EOF) {
logger.Error().Msgf("%d -> handleConnection peek with error:%+v", packet.SerialNumber, err)
}
bufsize := reader.Buffered()
client.removeConnection(packet, fmt.Sprintf("%d->peek_with_error:%s,bufferSize:%d",
packet.SerialNumber, err, bufsize))
client.removeConnection(packet, fmt.Sprintf("peek:%s", err))
break
}
bufsize := reader.Buffered()
......@@ -145,15 +138,16 @@ func (client *Client) handleConnection(conn net.Conn, packet *protocol.MajoraPac
buf := make([]byte, bufsize)
_, err := reader.Read(buf)
if err != nil {
logger.Error().Msgf("handleConnection read with error:%+v", err)
logger.Error().Msgf("handleConnection read with error:%d->%+v", packet.SerialNumber, err)
break
}
pack := protocol.TypeTransfer.CreatePacket()
pack.Data = buf
pack.SerialNumber = packet.SerialNumber
if err = client.WriteAndFlush(pack); err != nil {
logger.Error().Msgf("write to nat server error %+v", err)
logger.Error().Msgf("write to nat server error %d->%+v", packet.SerialNumber, err)
}
logger.Debug().Msgf("handleConnection success %d->%+v", packet.SerialNumber, string(packet.Data))
}
}
......@@ -192,12 +186,12 @@ func (client *Client) removeConnection(packet *protocol.MajoraPacket, reason str
}
}()
// delete from local cache
conn, ok := client.GetConnection(packet, "removeConnection")
conn, ok := client.GetConnection(packet, "remove:"+reason)
if ok {
client.connStore.Delete(packet.SerialNumber)
// 直接关闭是否就可以 主动断开 是否有剩余数据已经意义不大了
_ = conn.Close()
logger.Debug().Msgf("removeConnection target connection %d, reason %s", packet.SerialNumber, reason)
logger.Info().Msgf("removeConnection target connection %d, reason %s", packet.SerialNumber, reason)
majoraPacket := protocol.TypeDisconnect.CreatePacket()
majoraPacket.SerialNumber = packet.SerialNumber
majoraPacket.Data = []byte(client.config.ClientID)
......@@ -214,8 +208,7 @@ func (client *Client) GetConnection(packet *protocol.MajoraPacket, step string)
// 没有的话 可能是服务端未感知到端上的连接已断开了
// 是否需要主动创建一个
if !ok || load == nil {
logger.Warn().Msgf("can not find connection for %s->%d", step, packet.SerialNumber)
client.closeVirtualConnection(packet, "GetConnection with empty")
client.closeVirtualConnection(packet, fmt.Sprintf("get %d->%s", packet.SerialNumber, step))
return nil, false
}
conn, ok = load.(net.Conn)
......@@ -223,10 +216,14 @@ func (client *Client) GetConnection(packet *protocol.MajoraPacket, step string)
}
func (client *Client) closeVirtualConnection(packet *protocol.MajoraPacket, msg string) {
logger.Warn().Msgf("disconnect to server %s", msg)
logger.Warn().Msgf("disconnect to server %d->%s", packet.SerialNumber, msg)
majoraPacket := protocol.TypeDisconnect.CreatePacket()
majoraPacket.SerialNumber = packet.SerialNumber
majoraPacket.Extra = client.config.ClientID
ll := len(msg)
if ll >= 127 {
ll = 127
}
majoraPacket.Extra = msg[:ll]
if err := client.WriteAndFlush(packet); err != nil {
logger.Error().Msgf("closeVirtualConnection with error %+v", err)
......
......@@ -6,6 +6,8 @@ import (
"errors"
"io"
"time"
"virjar.com/majora-go/logger"
)
const (
......@@ -53,7 +55,6 @@ const (
)
var (
ErrNilPacket = errors.New("packet is nil")
ErrInvalidSize = errors.New("invalid size")
ErrInvalidMagic = errors.New("invalid magic")
)
......@@ -113,5 +114,9 @@ func ReadN(size int, conn io.Reader) ([]byte, error) {
}
func ReadMagic(buf []byte) bool {
return int64(binary.BigEndian.Uint64(buf)) == MAGIC
magic := int64(binary.BigEndian.Uint64(buf))
if magic != MAGIC {
logger.Error().Msgf("magic not equal %d|%d|%+v", magic, MAGIC, buf)
}
return magic == MAGIC
}
tunnel_addr = aoba.vip:5879
tunnel_addr = 127.0.0.1:5879
dns_server = 114.114.114.114:53
;bind to local ip
;local_ip = 192.168.0.100
;for performance pprof 0 is close
;pprof_port = 0
pprof_port = 16666
disable_update = false
; default is info
log_level = 0
......
;tunnel_addr = majora.virjar.com:5879
tunnel_addr = majora-dev.virjar.com:5879
dns_server = 114.114.114.114:53
;bind to local ip
......@@ -17,5 +16,5 @@ account = superman
[redial]
command = /bin/bash
exec_path = /root/ppp_auto_with_auth.sh
redial_duration = 5m
redial_duration = 6m
wait_time = 10s
......@@ -11,7 +11,7 @@ cd `dirname $0`
script_dir=`pwd`
function getPid(){
echo `ps -ef | grep "majora" | grep -v "grep" | grep -v "startup.sh" | awk '{print $2}'`
echo `ps -ef | grep "majora" | grep "majoro.ini" | grep -v "grep" | grep -v "startup.sh" | awk '{print $2}'`
}
remote_pid=`getPid`
......
......@@ -7,12 +7,32 @@ import (
"virjar.com/majora-go/model"
)
func Redial(cfg *model.Configure, cleanup chan struct{}) {
redial(cfg)
RestartBySignal(cleanup)
}
//func Redial(cfg *model.Configure, cleanup chan struct{}) {
//redial(cfg)
//RestartBySignal(cleanup)
//}
//func redial(cfg *model.Configure) {
// execPath := cfg.Redial.ExecPath
// if len(execPath) == 0 {
// logger.Error().Msgf("redial exec file is empty")
// return
// }
// command := cfg.Redial.Command
// if len(command) == 0 {
// logger.Error().Msgf("redial command is empty")
// return
// }
// cmd := exec.Command(command, "-c", execPath)
// output, err := cmd.Output()
// if err != nil {
// logger.Error().Msgf("Execute Shell:%s failed with error:%s", command, err.Error())
// return
// }
// logger.Info().Msgf("[redial] redial success %+v resp:%s", cmd, string(output))
//}
func redial(cfg *model.Configure) {
func Redial(cfg *model.Configure) {
execPath := cfg.Redial.ExecPath
if len(execPath) == 0 {
logger.Error().Msgf("redial exec file is empty")
......
......@@ -13,7 +13,7 @@ var (
func init() {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
output := zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339, NoColor: true}
output := zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339Nano, NoColor: true}
logger = zerolog.New(output).With().Timestamp().Logger()
}
......
This diff is collapsed.
......@@ -12,7 +12,6 @@ type Decoder interface {
}
type MajoraPacketDecoder struct {
consumeHeader bool
}
func (mpd *MajoraPacketDecoder) Decode(reader *bufio.Reader) (pack *MajoraPacket, err error) {
......
......@@ -20,6 +20,7 @@ const (
TypeControl MajoraPacketType = 0x06
TypeConnectReady MajoraPacketType = 0x07
TypeDestroy MajoraPacketType = 0x08
TypeOffline MajoraPacketType = 0x09
)
func (mpt MajoraPacketType) CreatePacket() *MajoraPacket {
......@@ -44,6 +45,8 @@ func (mpt MajoraPacketType) ToString() string {
return "TypeConnectReady"
case TypeDestroy:
return "TypeDestroy"
case TypeOffline:
return "TypeOffline"
}
return "Unknown"
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment