Commit 3a45121d authored by Tsaiilin(蔡依林)'s avatar Tsaiilin(蔡依林)

Merge branch 'dev' into 'master'

Dev

See merge request !10
parents 48003a52 8c316f22
......@@ -10,27 +10,23 @@ import (
"net/http"
_ "net/http/pprof"
"os"
"os/exec"
"os/signal"
"path/filepath"
"runtime"
"syscall"
"time"
"virjar.com/majora-go/client"
"virjar.com/majora-go/common"
"virjar.com/majora-go/daemon"
"virjar.com/majora-go/global"
"virjar.com/majora-go/initialize"
"virjar.com/majora-go/log"
"virjar.com/majora-go/model"
"virjar.com/majora-go/safe"
)
var (
configure string
logLevel int
pprofPort int
natServer string
account string
dnsServer string
localAddr string
daemon bool
)
var (
......@@ -38,79 +34,44 @@ var (
Date string
)
var (
cmd *exec.Cmd
)
func init() {
rand.Seed(time.Now().UnixNano())
flag.StringVar(&configure, "conf", "", "./majora -c path/to/your/majora.ini")
flag.IntVar(&logLevel, "log", 1, "log logLevel")
flag.IntVar(&pprofPort, "pprof", 0, "enable pprof")
flag.StringVar(&natServer, "natServer", common.DefNatAddr, "natServer")
flag.StringVar(&account, "account", "unknown", "account")
flag.StringVar(&dnsServer, "dnsServer", common.DNSServer, "custom dns server")
flag.StringVar(&localAddr, "localIp", "", "bind local ip")
flag.BoolVar(&daemon, "daemon", false, "daemon")
flag.Parse()
}
func initial(cfg *model.Configure) {
log.Init(cfg.LogLevel - 1)
if cfg.PprofPort > 0 {
go func() {
addr := fmt.Sprintf("127.0.0.1:%d", cfg.PprofPort)
func initial() {
if global.Config.PprofPort > 0 {
safe.SageGo(func() {
addr := fmt.Sprintf("127.0.0.1:%d", global.Config.PprofPort)
log.Run().Infof("enable pprof: %s", addr)
log.Run().Error(http.ListenAndServe(addr, nil))
}()
})
}
if len(cfg.DNSServer) > 0 {
if len(global.Config.DNSServer) > 0 {
net.DefaultResolver = &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
return net.Dial("udp", dnsServer)
return net.Dial("udp", global.Config.DNSServer)
},
}
}
}
func parseFromCmd(cfg *model.Configure) {
cfg.TunnelAddr = natServer
cfg.LogLevel = logLevel
cfg.DNSServer = dnsServer
cfg.LocalAddr = localAddr
// 先兼容吧
cfg.Extra.Account = account
cfg.PprofPort = pprofPort
}
func cli(cfg *model.Configure) {
func cli() {
defer func() {
if err := recover(); err != nil {
log.Run().Errorf("cli panic %+v", err)
log.Error().Errorf("cli panic %+v", err)
}
}()
log.Run().Infof("cpu count %d proc %d", runtime.NumCPU(), runtime.NumCPU()*2)
log.Run().Infof("current Version %s, build at %s", Version, Date)
log.Run().Infof("hostinfo os:%s, arch:%s", runtime.GOOS, runtime.GOARCH)
cfgInfo, _ := json.Marshal(cfg)
log.Run().Infof("hostInfo os:%s, arch:%s", runtime.GOOS, runtime.GOARCH)
cfgInfo, _ := json.Marshal(global.Config)
log.Run().Infof("config info:%s", string(cfgInfo))
client.NewClientWithConf(cfg)
}
func initConf() *model.Configure {
cfg := model.NewDefMajoraConf()
if len(configure) > 0 {
cfg = model.InitConf(configure)
} else {
parseFromCmd(cfg)
}
//runtime.GOMAXPROCS(runtime.NumCPU() * 2)
//debug.SetGCPercent(200)
initial(cfg)
return cfg
client.NewClientWithConf(global.Config).StartUp()
}
//main start
......@@ -119,26 +80,21 @@ func main() {
fmt.Println(Version)
os.Exit(0)
}
cfg := initConf()
if daemon {
var args []string
for _, arg := range os.Args[1:] {
if arg != "-daemon" {
args = append(args, arg)
}
}
cmd = exec.Command(os.Args[0], args...)
cmd.Env = os.Environ()
if err := cmd.Start(); err != nil {
panic(err)
}
log.Run().Infof("%s [pid-%d] running...\n", os.Args[0], cmd.Process.Pid)
os.Exit(0)
initialize.MustInitConfig(configure, global.Config)
if global.Config.Daemon {
logFile := filepath.Join(global.Config.LogPath, "daemon.log")
d := daemon.NewDaemon(logFile)
d.MaxCount = 20 //最大重启次数
d.Run()
}
initialize.InitLogger()
initial()
cli()
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
cli(cfg)
select {
case <-signalChan:
time.Sleep(time.Second * 3)
......
......@@ -6,21 +6,28 @@ import (
"time"
"github.com/adamweixuan/getty"
"github.com/coocood/freecache"
"virjar.com/majora-go/infra"
"virjar.com/majora-go/log"
"virjar.com/majora-go/model"
"virjar.com/majora-go/safe"
)
type Client struct {
config *model.Configure
localAddr net.Addr
natTunnel getty.Client
connStore sync.Map
localAddr net.Addr
natTunnel getty.Client
session getty.Session
redial *infra.PPPRedial
connStore sync.Map
sessionStore sync.Map
dnsCache *freecache.Cache
}
func NewClientWithConf(cfg *model.Configure) {
NewCli(cfg).StartUp()
func NewClientWithConf(cfg *model.Configure) *Client {
return NewCli(cfg)
}
func NewCli(cfg *model.Configure) *Client {
......@@ -32,9 +39,12 @@ func NewCli(cfg *model.Configure) *Client {
}
}
client := &Client{
config: cfg,
localAddr: localAddr,
connStore: sync.Map{},
config: cfg,
localAddr: localAddr,
connStore: sync.Map{},
sessionStore: sync.Map{},
dnsCache: freecache.NewCache(1024),
redial: infra.NewPPPRedial(),
}
return client
......@@ -61,18 +71,26 @@ func (client *Client) check() {
url = cfg.NetCheckUrl
}
go func() {
safe.SageGo(func() {
var timer = time.NewTimer(interval)
for {
timer.Reset(interval)
<-timer.C
success := infra.Ping(url)
success := false
for i := 0; i < 3; i++ {
success = infra.Ping(url)
if success {
break
}
}
if success {
continue
}
log.Run().Warnf("net check fail, redial...")
infra.RedialByCheck(cfg)
log.Run().Warnf("Redial net check fail, redial...")
if client.redial.RedialByCheck(cfg) {
client.natTunnel.Close()
client.connect()
}
}
}()
})
}
......@@ -5,6 +5,7 @@ import (
"encoding/binary"
"github.com/adamweixuan/getty"
"virjar.com/majora-go/common"
"virjar.com/majora-go/log"
"virjar.com/majora-go/protocol"
......@@ -18,17 +19,17 @@ type PacketCodec struct {
}
func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int, error) {
log.Run().Debugf("[PacketCodec] %s->%d", string(data), len(data))
log.Run().Debugf("[PacketCodec] length:%d", len(data))
if len(data) < common.MagicSize+common.FrameSize {
return nil, 0, nil
}
log.Run().Debugf("[PacketCodec] read magic %+v", binary.BigEndian.Uint64(data[0:8]))
readmagic := data[0:common.MagicSize]
readMagic := data[0:common.MagicSize]
if !common.ReadMagic(readmagic) {
if !common.ReadMagic(readMagic) {
log.Run().Errorf("[PacketCodec] invalid magic %d|%s",
binary.BigEndian.Uint64(readmagic), string(readmagic))
binary.BigEndian.Uint64(readMagic), string(readMagic))
return nil, 0, common.ErrInvalidMagic
}
......@@ -36,7 +37,7 @@ func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int
frameLen, err := common.ReadInt32(reader)
if err != nil {
log.Run().Errorf("[PacketCodec] frameLen error %+v", err)
log.Error().Errorf("[PacketCodec] frameLen error %+v", err)
return nil, 0, err
}
......@@ -51,7 +52,7 @@ func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int
// type
msgType, err := common.ReadByte(reader)
if err != nil {
log.Run().Errorf("[PacketCodec] read type error %+v", err)
log.Error().Errorf("[PacketCodec] read type error %+v", err)
return nil, 0, err
}
......@@ -63,7 +64,7 @@ func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int
// num
pack.SerialNumber, err = common.ReadInt64(reader)
if err != nil {
log.Run().Errorf("[PacketCodec] read num error %+v", err)
log.Error().Errorf("[PacketCodec] read num error %+v", err)
return nil, len(data), nil
}
......@@ -72,13 +73,13 @@ func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int
// extra size
extraSize, err := common.ReadByte(reader)
if err != nil {
log.Run().Errorf("[PacketCodec] read extra size error %+v", err)
log.Error().Errorf("[PacketCodec] read extra size error %+v", err)
return nil, len(data), nil
}
extra, err := common.ReadN(int(extraSize), reader)
if err != nil {
log.Run().Errorf("[PacketCodec] read extra error %+v", err)
log.Error().Errorf("[PacketCodec] read extra error %+v", err)
return nil, len(data), nil
}
pack.Extra = string(extra)
......@@ -87,14 +88,14 @@ func (p *PacketCodec) Read(session getty.Session, data []byte) (interface{}, int
// dataFrame
dataSize := int(frameLen) - common.TypeSize - common.SerialNumberSize - common.ExtraSize - int(extraSize)
if dataSize < 0 {
log.Run().Errorf("[PacketCodec] read frameLen error %+v", err)
log.Error().Errorf("[PacketCodec] read frameLen error %+v", err)
return nil, len(data), common.ErrInvalidSize
}
if dataSize > 0 {
data, err := common.ReadN(dataSize, reader)
if err != nil {
log.Run().Errorf("[PacketCodec] read data error %+v", err)
log.Error().Errorf("[PacketCodec] read data error %+v", err)
return nil, len(data), nil
}
pack.Data = data
......
This diff is collapsed.
package client
import (
"runtime"
"time"
"github.com/adamweixuan/getty"
"virjar.com/majora-go/common"
"virjar.com/majora-go/log"
"virjar.com/majora-go/protocol"
......@@ -15,37 +15,45 @@ type MajoraEventListener struct {
}
func (m *MajoraEventListener) OnOpen(session getty.Session) error {
m.client.session = session
packet := protocol.TypeRegister.CreatePacket()
packet.Extra = m.client.config.ClientID
extraMap := make(map[string]string, 1)
extraMap[common.ExtrakeyUser] = m.client.config.Extra.Account
packet.Data = protocol.EncodeExtra(extraMap)
if _, _, err := session.WritePkg(packet, time.Second*10); err != nil {
log.Event().Errorf("register to server error %+v", err)
log.Error().Errorf("register to server error %+v", err)
return err
}
log.Event().Infof("[OnOpen] registe to %s success", m.client.config.TunnelAddr)
log.Run().Infof("[OnOpen] register to %s success", m.client.config.TunnelAddr)
return nil
}
func (m *MajoraEventListener) OnClose(session getty.Session) {
log.Event().Errorf("OnClose-> session closed %v", session.IsClosed())
log.Error().Errorf("OnClose-> session closed %v", session.IsClosed())
m.client.CloseAll(session)
}
func (m *MajoraEventListener) OnError(session getty.Session, err error) {
log.Event().Errorf("OnError %s", err.Error())
log.Error().Errorf("OnError %s", err.Error())
m.client.CloseAll(session)
}
func (m *MajoraEventListener) OnCron(session getty.Session) {
log.Event().Warnf("thread:%d session closed %v", runtime.NumGoroutine(), session.IsClosed())
m.client.Redial(session)
log.Run().Infof("[OnCorn] Redial, session closed:%v", session.IsClosed())
m.client.Redial(session, "corn")
}
func (m *MajoraEventListener) OnMessage(session getty.Session, input interface{}) {
defer func() {
if err := recover(); err != nil {
log.Error().Errorf("OnMessage panic %+v", err)
}
}()
majoraPacket := input.(*protocol.MajoraPacket)
log.Event().Debugf("receive packet from server %d->%s", majoraPacket.SerialNumber, majoraPacket.Ttype.ToString())
log.Run().Debugf("receive packet from server %d->%s", majoraPacket.SerialNumber, majoraPacket.Ttype.ToString())
switch majoraPacket.Ttype {
case protocol.TypeHeartbeat:
......
......@@ -4,17 +4,18 @@ import (
"fmt"
"math/rand"
"net"
"runtime"
"time"
"github.com/adamweixuan/getty"
gxsync "github.com/adamweixuan/gostnops/sync"
"virjar.com/majora-go/common"
"virjar.com/majora-go/infra"
"virjar.com/majora-go/log"
)
var (
taskPool = gxsync.NewTaskPoolSimple(10000)
taskPool = gxsync.NewTaskPoolSimple(runtime.GOMAXPROCS(-1) * 100)
)
func (client *Client) connect() {
......@@ -74,7 +75,7 @@ func InitialSession(session getty.Session, client *Client) (err error) {
// 加上随机 防止vps在同时间重启
randDuration := rand.Int63n(time.Minute.Milliseconds() * 5)
interval := randDuration + client.config.Redial.RedialDuration.Milliseconds()
log.Run().Infof("redial interval %+v", time.Duration(interval)*time.Millisecond)
log.Run().Infof("Redial interval %+v", time.Duration(interval)*time.Millisecond)
session.SetCronPeriod(int(interval))
}
session.SetPkgHandler(PkgCodec)
......@@ -84,17 +85,19 @@ func InitialSession(session getty.Session, client *Client) (err error) {
return nil
}
func (client *Client) Redial(session getty.Session) {
log.Run().Infof("redial start ...%v", client.config.Redial.Valid())
func (client *Client) Redial(session getty.Session, tag string) {
log.Run().Infof("[Redial %s] start, can redial? %v", tag, client.config.Redial.Valid())
if !client.config.Redial.Valid() {
return
}
log.Run().Warn("redial send offline message ...")
log.Run().Infof("[Redial %s] Send offline message", tag)
if _, _, err := session.WritePkg(OfflinePacket, 0); err != nil {
log.Run().Warnf("write offline to server error %s", err.Error())
log.Run().Errorf("[Redial %s] write offline to server error %s", tag, err.Error())
}
log.Run().Warn("redial close local session")
client.CloseAll(session)
time.Sleep(client.config.Redial.WaitTime)
infra.Redial(client.config, session)
log.Run().Info("[Redial %s] start close local session", tag)
client.CloseAll(session)
client.redial.Redial(client.config, tag)
client.natTunnel.Close()
client.connect()
}
tunnel_addr = 127.0.0.1:5879
;tunnel_addr = aoba.vip:5879
dns_server = 114.114.114.114:53
;bind to local ip
;local_ip = 192.168.0.100
;for performance pprof 0 is close
pprof_port = 16666
log_level = 1
reconn_interval = 5s
net_check_interval = 5s
net_check_url = https://www.baidu.com
[extra]
account = superman
[redial]
; on windows is cmd.exe
; on *nix is /bin/bash
;command = /bin/bash
; windows bat 脚本的绝对路径
; *nix shell脚本的绝对路径 D:\redial\redial.bat
;exec_path = ls
;redial_duration = 30s
;wait_time = 10s
tunnel_addr: majora-vps-zj.virjar.com:5879
dns_server: 114.114.114.114:53
daemon: true
log_level: debug
log_path: ./majora-log/
reconn_intervalz: 5s
net_check_interval: 5s
dns_cache_duration: 10m
net_check_url: https://www.baidu.com[extra]
account: superman
redial:
command: /bin/bash
exec_path: /root/ppp_redial.sh
redial_duration: 5m
wait_time: 10s
tunnel_addr = majora-vps-zj.virjar.com:5879
dns_server = 114.114.114.114:53
;bind to local ip
;local_ip = 192.168.0.100
; default is info
log_level = 1
reconn_interval = 5s
net_check_interval = 5s
net_check_url = https://www.baidu.com
[extra]
account = superman
[redial]
;command = /bin/bash
;exec_path = /root/ppp_redial.sh
;redial_duration = 10m
;wait_time = 10s
env: debug
tunnel_addr: majora-vps-zj.virjar.com:5879
dns_server: 114.114.114.114:53
log_level: info
log_path: ./majora-log/
daemon: true
reconn_interval: 5s
net_check_interval: 5s
net_check_url: https://www.baidu.com
dns_cache_duration: 10m
redial:
command: /bin/bash
exec_path: /root/ppp_redial.sh
redial_duration: 5m
wait_time: 15s
......@@ -12,7 +12,6 @@ echo "old pid is ${old_pid}"
echo "clean old ..."
`ps -ef | grep majora | grep -v grep | awk '{print $2}'| xargs kill -9`
mkdir -p "majora-log"
mkdir -p output/log
exec ./majora -daemon -conf majora.ini
\ No newline at end of file
exec ./majora -conf majora.yaml
\ No newline at end of file
......@@ -5,15 +5,15 @@ wget https://oss.virjar.com/majora/bin/latest/majora-cli_latest_linux_amd64.tar.
rm -fr majora
rm -fr exec.sh
rm -fr start.sh
rm -fr majora.ini
rm -fr majora.yaml
rm -fr majora.service
rm -fr majora-dev.ini
rm -fr majora-dev.yaml
rm -fr majora.log
rm -fr output
rm -fr majora-log
rm -fr std.log
tar -zxvf majora-cli.tar.gz
mv -f majora-cli*/* .
exec bash ./start.sh
exec bash ./start.sh
\ No newline at end of file
// +build !windows,!plan9
package daemon
import "syscall"
func NewSysProcAttr() *syscall.SysProcAttr {
return &syscall.SysProcAttr{
Setsid: true,
}
}
// +build windows
package daemon
import "syscall"
func NewSysProcAttr() *syscall.SysProcAttr {
return &syscall.SysProcAttr{
HideWindow: true,
}
}
package daemon
import (
"fmt"
"log"
"os"
"os/exec"
"strconv"
"time"
)
const EnvName = "XW_DAEMON_IDX"
var runIdx = 0
type Daemon struct {
LogFile string
MaxCount int
MaxError int
MinExitTime int64
}
func Background(logFile string, isExit bool) (*exec.Cmd, error) {
runIdx++
envIdx, err := strconv.Atoi(os.Getenv(EnvName))
if err != nil {
envIdx = 0
}
if runIdx <= envIdx {
return nil, nil
}
env := os.Environ()
env = append(env, fmt.Sprintf("%s=%d", EnvName, runIdx))
cmd, err := startProc(os.Args, env, logFile)
if err != nil {
log.Println(os.Getpid(), " Start child process error:", err)
return nil, err
} else {
log.Println(os.Getpid(), " Start child process success:", cmd.Process.Pid)
}
if isExit {
os.Exit(0)
}
return cmd, nil
}
func NewDaemon(logFile string) *Daemon {
return &Daemon{
LogFile: logFile,
MaxCount: 0,
MaxError: 3,
MinExitTime: 10,
}
}
func (d *Daemon) Run() {
_, _ = Background(d.LogFile, true)
var t int64
count := 1
errNum := 0
for {
dInfo := fmt.Sprintf("daemon process(pid:%d; count:%d/%d; errNum:%d/%d):",
os.Getpid(), count, d.MaxCount, errNum, d.MaxError)
if errNum > d.MaxError {
log.Println(dInfo, "Start child process error too many,exit")
os.Exit(1)
}
if d.MaxCount > 0 && count > d.MaxCount {
log.Println(dInfo, "Too many restarts")
os.Exit(0)
}
count++
t = time.Now().Unix()
cmd, err := Background(d.LogFile, false)
if err != nil {
log.Println(dInfo, "Start child process err:", err)
errNum++
continue
}
if cmd == nil {
log.Printf("child process pid=%d: start", os.Getpid())
break
}
err = cmd.Wait()
dat := time.Now().Unix() - t
if dat < d.MinExitTime {
errNum++
} else {
errNum = 0
}
log.Printf("%s child process(%d)exit, Ran for %d seconds: %v\n", dInfo, cmd.ProcessState.Pid(), dat, err)
}
}
func startProc(args, env []string, logFile string) (*exec.Cmd, error) {
cmd := &exec.Cmd{
Path: args[0],
Args: args,
Env: env,
SysProcAttr: NewSysProcAttr(),
}
if logFile != "" {
stdout, err := os.OpenFile(logFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
log.Println(os.Getpid(), ": Open log file error", err)
return nil, err
}
cmd.Stderr = stdout
cmd.Stdout = stdout
}
err := cmd.Start()
if err != nil {
return nil, err
}
return cmd, nil
}
package env
import (
"bytes"
"errors"
"fmt"
)
var errUnmarshalEnv = errors.New("can't unmarshal a nil *Level")
type Env int8
const (
Debug Env = iota - 1
Product
)
func (e *Env) Set(s string) error {
return e.UnmarshalText([]byte(s))
}
func (e *Env) UnmarshalText(text []byte) error {
if e == nil {
return errUnmarshalEnv
}
if !e.unmarshalText(text) && !e.unmarshalText(bytes.ToLower(text)) {
return fmt.Errorf("unrecognized env: %q", text)
}
return nil
}
func (e *Env) unmarshalText(text []byte) bool {
switch string(text) {
case "debug", "DEBUG":
*e = Debug
case "product", "PRODUCT", "": // make the zero value useful
*e = Product
default:
return false
}
return true
}
package global
import (
"virjar.com/majora-go/env"
"virjar.com/majora-go/model"
)
var (
Config = model.NewDefMajoraConf()
CurrentEnv = env.Product
)
......@@ -6,13 +6,17 @@ require (
github.com/adamweixuan/getty v0.0.1
github.com/adamweixuan/gostnops v0.0.1
github.com/google/uuid v1.3.0
gopkg.in/ini.v1 v1.63.2
)
require (
github.com/gorilla/websocket v1.4.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/BurntSushi/toml v0.4.1 // indirect
github.com/coocood/freecache v1.2.0
github.com/fsnotify/fsnotify v1.5.1
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/spf13/viper v1.10.1
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.19.1 // indirect
go.uber.org/zap v1.19.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
)
This diff is collapsed.
......@@ -12,7 +12,7 @@ import (
// 网络检测
var (
httpcli *http.Client
httpCli *http.Client
)
var (
......@@ -31,8 +31,7 @@ const (
)
func init() {
rand.Seed(time.Now().UnixNano())
httpcli = &http.Client{
httpCli = &http.Client{
Transport: &http.Transport{
TLSHandshakeTimeout: defTimeout,
TLSClientConfig: &tls.Config{
......@@ -44,7 +43,7 @@ func init() {
}
func Ping(url string) bool {
resp, err := httpcli.Head(url)
resp, err := httpCli.Head(url)
if err != nil {
log.Run().Warnf("ping %s with error %+v", url, err)
return false
......
......@@ -5,7 +5,8 @@ import (
"runtime"
"time"
"github.com/adamweixuan/getty"
"go.uber.org/atomic"
"virjar.com/majora-go/log"
"virjar.com/majora-go/model"
)
......@@ -15,44 +16,56 @@ const (
cmdUnix = "-c"
)
func Redial(cfg *model.Configure, session getty.Session) {
log.Run().Infof("[redial] start, session is close :%d", session.IsClosed())
beforeIp := GetPPP()
retry := 0
defer func(start time.Time) {
newIp := GetPPP()
log.Run().Infof("[redial] retry %d, cost %v, ip change %s -> %s, session is close:%v",
retry, time.Since(start), beforeIp, newIp, session.IsClosed())
}(time.Now())
for {
retry++
status := command(cfg)
pingBaidu := RandomPing()
log.Run().Infof("[redial] net check: %d->%v", retry, pingBaidu)
if pingBaidu && status {
break
type PPPRedial struct {
inRedialing *atomic.Bool
}
func NewPPPRedial() *PPPRedial {
return &PPPRedial{
inRedialing: atomic.NewBool(false),
}
}
func (p *PPPRedial) Redial(cfg *model.Configure, tag string) bool{
if p.inRedialing.CAS(false, true) {
log.Run().Infof("[PPPRedial %s] start", tag)
beforeIp := GetPPP()
retry := 0
defer func(start time.Time) {
newIp := GetPPP()
log.Run().Infof("[PPPRedial %s] retry %d, cost %v, ip change %s -> %s ",
tag, retry, time.Since(start), beforeIp, newIp)
}(time.Now())
for {
retry++
status := command(cfg)
pingBaidu := RandomPing()
log.Run().Infof("[PPPRedial %s] net check: %d->%v", tag, retry, pingBaidu)
if pingBaidu && status {
break
}
}
p.inRedialing.CAS(true, false)
return true
} else {
log.Run().Infof("[PPPRedial %s] inRedialing ignore this", tag)
return false
}
}
func RedialByCheck(cfg *model.Configure) bool {
beforeIp := GetPPP()
defer func(start time.Time) {
newIp := GetPPP()
log.Run().Infof("[RedialByCheck] cost %v, ip change %s -> %s", time.Since(start), beforeIp, newIp)
}(time.Now())
return command(cfg)
func (p *PPPRedial) RedialByCheck(cfg *model.Configure) bool {
return p.Redial(cfg, "check")
}
func command(cfg *model.Configure) bool {
execPath := cfg.Redial.ExecPath
if len(execPath) == 0 {
log.Run().Warn("[redial] exec file is empty")
log.Run().Warn("[Redial] exec file is empty")
return true
}
command := cfg.Redial.Command
if len(command) == 0 {
log.Run().Warn("[redial] command is empty")
log.Run().Warn("[Redial] command is empty")
return true
}
......@@ -64,9 +77,9 @@ func command(cfg *model.Configure) bool {
cmd := exec.Command(command, args, execPath)
output, err := cmd.Output()
if err != nil {
log.Run().Errorf("[redial] Execute Shell:%s failed with error:%s", command, err.Error())
log.Run().Errorf("[Redial] Execute Shell:%s failed with error:%s", command, err.Error())
return false
}
log.Run().Infof("[redial] success %+v resp:%s", cmd, string(output))
log.Run().Infof("[Redial] success %+v resp:%s", cmd, string(output))
return true
}
package initialize
import (
"github.com/fsnotify/fsnotify"
"github.com/spf13/viper"
)
// MustInitConfigAndWatch 第一次初始化 config 时必须成功,否则 panic;
func MustInitConfigAndWatch(configFileName string, config interface{}, watch func(config interface{}, err error)) {
v, err := initConfig(configFileName, config)
if err != nil {
panic(err)
}
watchConfigFile(v, func(in fsnotify.Event) {
err = readAndUnmarshalConfig(v, config)
watch(config, err)
})
}
// MustInitConfig 初始化 config 时必须成功,否则 panic;
func MustInitConfig(configFileName string, config interface{}) {
_, err := initConfig(configFileName, config)
if err != nil {
panic(err)
}
}
func initConfig(configFileName string, config interface{}) (*viper.Viper, error) {
if configFileName == "" {
configFileName = "./conf/majora-dev.yaml"
}
v := viper.New()
v.SetConfigFile(configFileName)
err := readAndUnmarshalConfig(v, config)
if err != nil {
return nil, err
}
return v, nil
}
func watchConfigFile(v *viper.Viper, run func(in fsnotify.Event)) {
v.WatchConfig()
v.OnConfigChange(run)
}
func readAndUnmarshalConfig(v *viper.Viper, config interface{}) error {
err := v.ReadInConfig()
if err != nil {
return err
}
err = v.Unmarshal(config)
if err != nil {
return err
}
return nil
}
package initialize
import (
"fmt"
"testing"
"virjar.com/majora-go/global"
)
func TestMustInitConfig(t *testing.T) {
MustInitConfigAndWatch("/Users/tsaiilin/src/go/majora-go/conf/majora-dev.yaml", global.Config, func(config interface{}) {
fmt.Printf("config: %+v", global.Config)
})
select {
}
}
package initialize
import (
"virjar.com/majora-go/global"
"virjar.com/majora-go/log"
)
func InitLogger() {
// 暂时在这里初始化环境
_ = global.CurrentEnv.Set(global.Config.Env)
log.Init(global.Config.LogLevel, global.Config.LogPath)
}
\ No newline at end of file
package initialize
import (
"testing"
"virjar.com/majora-go/log"
)
func TestInitLogger(t *testing.T) {
log.Init("debug", "")
log.Run().Info("adfdfsfsf")
}
......@@ -3,67 +3,107 @@ package log
import (
"os"
"path/filepath"
"time"
"github.com/adamweixuan/getty"
"github.com/natefinch/lumberjack"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"virjar.com/majora-go/env"
"virjar.com/majora-go/global"
)
var (
runLogger getty.Logger
eventLogger getty.Logger
heartLogger getty.Logger
latencyLogger getty.Logger
runLogger getty.Logger
traceLogger getty.Logger
errorLogger getty.Logger
)
const (
logDir = "./output/log/"
run = "run.log"
event = "event.log"
heart = "heart.log"
latency = "latency.log"
logDir = "./output/log/"
run = "run.log"
trace = "trace.log"
errorLog = "error.log"
logTmFmtWithMS = "2006-01-02 15:04:05.000"
)
func getCurPath() string {
exePath, err := os.Executable()
if err != nil {
panic(err)
// debug 模式下会将日志输出到控制台和文件,其他模式只输出到文件
func getLogWriter(path string) zapcore.WriteSyncer {
lumberJackLogger := &lumberjack.Logger{
Filename: path,
MaxSize: 100,
MaxBackups: 5,
MaxAge: 30,
Compress: false,
}
if global.CurrentEnv == env.Debug {
return zapcore.NewMultiWriteSyncer(zapcore.AddSync(os.Stdout), zapcore.AddSync(lumberJackLogger))
}
return filepath.Dir(exePath)
return zapcore.AddSync(lumberJackLogger)
}
func Init(level int) {
curPath := getCurPath()
base := filepath.Join(curPath, logDir)
if _, err := os.Stat(base); err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(base, os.ModePerm); err != nil {
panic(err)
}
} else {
panic(err)
}
func getEncoder() zapcore.Encoder {
customTimeEncoder := func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
enc.AppendString("[" + t.Format(logTmFmtWithMS) + "]")
}
customLevelEncoder := func(level zapcore.Level, enc zapcore.PrimitiveArrayEncoder) {
enc.AppendString("[" + level.CapitalString() + "]")
}
if len(curPath) == 0 {
panic("invalid current path")
customCallerEncoder := func(caller zapcore.EntryCaller, enc zapcore.PrimitiveArrayEncoder) {
enc.AppendString("[" + caller.TrimmedPath() + "]")
}
runLogger = getty.NewLogger(filepath.Join(base, run), getty.LoggerLevel(level))
eventLogger = getty.NewLogger(filepath.Join(base, event), getty.LoggerLevel(level))
heartLogger = getty.NewLogger(filepath.Join(base, heart), getty.LoggerLevel(level))
latencyLogger = getty.NewLogger(filepath.Join(base, latency), getty.LoggerLevel(level))
encoderConfig := zap.NewDevelopmentEncoderConfig()
encoderConfig.EncodeTime = customTimeEncoder
encoderConfig.EncodeLevel = customLevelEncoder
encoderConfig.EncodeCaller = customCallerEncoder
return zapcore.NewConsoleEncoder(encoderConfig)
}
func Run() getty.Logger {
return runLogger
func Init(level string, logPath string) {
if len(logPath) == 0 {
logPath = logDir
}
runLogger = initSugaredLogger(filepath.Join(logPath, run), true, level)
errorLogger = initSugaredLogger(filepath.Join(logPath, errorLog), true, level)
traceLogger = initSugaredLogger(filepath.Join(logPath, trace), false, level)
// 框架的日志也输入到 run.log 中
getty.SetLogger(runLogger)
}
func initLogger(path string, caller bool, level string) *zap.Logger {
zapLevel := zapcore.InfoLevel
// 忽略错误,如果传入的字符串有误默认 info 级别
_ = zapLevel.Set(level)
encoder := getEncoder()
writeSyncer := getLogWriter(path)
core := zapcore.NewCore(encoder, writeSyncer, zapLevel)
if caller {
return zap.New(core, zap.AddCaller())
} else {
return zap.New(core)
}
}
func Event() getty.Logger {
return eventLogger
func initSugaredLogger(path string, caller bool, level string) *zap.SugaredLogger {
logger := initLogger(path, caller, level)
return logger.Sugar()
}
func Run() getty.Logger {
return runLogger
}
func Heart() getty.Logger {
return heartLogger
func Trace() getty.Logger {
return traceLogger
}
func Latency() getty.Logger {
return latencyLogger
func Error() getty.Logger {
return errorLogger
}
......@@ -4,33 +4,36 @@ import (
"time"
"github.com/google/uuid"
"gopkg.in/ini.v1"
"virjar.com/majora-go/common"
)
type Redial struct {
Command string `ini:"command" json:"command"`
ExecPath string `ini:"exec_path" json:"exec_path"`
RedialDuration time.Duration `ini:"redial_duration" json:"redial_duration"`
WaitTime time.Duration `ini:"wait_time" json:"wait_time"`
Command string `mapstructure:"command"`
ExecPath string `mapstructure:"exec_path"`
RedialDuration time.Duration `mapstructure:"redial_duration"`
WaitTime time.Duration `mapstructure:"wait_time"`
}
type Extra struct {
Account string `ini:"account" json:"account"`
Account string `mapstructure:"account"`
}
type Configure struct {
LogLevel int `ini:"log_level" json:"log_level"`
PprofPort int `ini:"pprof_port" json:"pprof_port"`
TunnelAddr string `ini:"tunnel_addr" json:"tunnel_addr"`
DNSServer string `ini:"dns_server" json:"dns_server"`
LocalAddr string `ini:"local_ip" json:"local_ip"`
ReconnInterval time.Duration `ini:"reconn_interval" json:"reconn_interval"`
ClientID string `ini:"client_id" json:"client_id"`
NetCheckInterval time.Duration `ini:"net_check_interval" json:"net_check_interval"`
NetCheckUrl string `ini:"net_check_url" json:"net_check_url"`
Extra Extra `ini:"extra" json:"extra"`
Redial Redial `ini:"redial" json:"redial"`
Env string `mapstructure:"env"`
LogLevel string `mapstructure:"log_level"`
LogPath string `mapstructure:"log_path"`
Daemon bool `mapstructure:"daemon"`
PprofPort int `mapstructure:"pprof_port"`
TunnelAddr string `mapstructure:"tunnel_addr"`
DNSServer string `mapstructure:"dns_server"`
LocalAddr string `mapstructure:"local_ip"`
ReconnInterval time.Duration `mapstructure:"reconn_interval"`
ClientID string `mapstructure:"client_id"`
NetCheckInterval time.Duration `mapstructure:"net_check_interval"`
NetCheckUrl string `mapstructure:"net_check_url"`
DnsCacheDuration time.Duration `mapstructure:"dns_cache_duration"`
Extra Extra `mapstructure:"extra"`
Redial Redial `mapstructure:"redial"`
}
const (
......@@ -39,7 +42,9 @@ const (
func NewDefMajoraConf() *Configure {
return &Configure{
LogLevel: 1,
Env: "product",
LogLevel: "info",
Daemon: false,
PprofPort: 0,
TunnelAddr: common.DefNatAddr,
DNSServer: common.DNSServer, //nolint:typecheck
......@@ -54,14 +59,6 @@ func NewDefMajoraConf() *Configure {
}
}
func InitConf(path string) *Configure {
conf := NewDefMajoraConf()
if err := ini.MapTo(conf, path); err != nil {
panic(err)
}
return conf
}
func (r Redial) Valid() bool {
if len(r.Command) == 0 {
return false
......
package model
import (
"testing"
"github.com/google/uuid"
)
func TestInitConf(t *testing.T) {
conf := InitConf("/Users/weixuan/code/gcode/majora-go/conf/majora.ini")
t.Logf("conf %+v", conf)
t.Logf("%s", uuid.NewString())
}
package safe
import "virjar.com/majora-go/log"
func SageGo(f func()) {
go func() {
defer func() {
if err := recover(); err != nil {
log.Error().Errorf("goroutine panic %+v", err)
}
}()
f()
}()
}
package trace
import (
"runtime"
"sync/atomic"
"time"
"virjar.com/majora-go/env"
"virjar.com/majora-go/global"
"virjar.com/majora-go/log"
"virjar.com/majora-go/safe"
)
var (
sessionEventChan = make(chan *sessionEvent, runtime.GOMAXPROCS(-1)*100)
ConnectEvent = "ConnectEvent"
TransferEvent = "TransferEvent"
MajoraSessionName = "MajoraSessionId"
UpStreamEvent = "ReadUpStream"
DisconnectEvent = "Disconnect"
DnsResolveEvent = "DnsResolve"
sessionIdNop = "session_id_not_set"
)
func init() {
safe.SageGo(func() {
for {
e := <-sessionEventChan
if e.Err != nil {
log.Trace().Errorf("[%s] [%s] [%s] %s error:%+v",
e.sessionId, e.Timestamp.Format("2006-01-02 15:04:05.000000"), e.EventName, e.Message, e.Err)
} else {
log.Trace().Infof("[%s] [%s] [%s] %s",
e.sessionId, e.Timestamp.Format("2006-01-02 15:04:05.000000"), e.EventName, e.Message)
}
}
})
}
// Event 事件
type Event struct {
// 发生时间
Timestamp time.Time
// 事件名称
EventName string
// 事件消息
Message string
// 错误,如果存在
Err error
}
type sessionEvent struct {
sessionId string
*Event
}
type Recorder interface {
RecordEvent(eventName string, message string)
RecordErrorEvent(eventName string, message string, err error)
Enable() bool
}
type nopRecorder struct{}
func (n *nopRecorder) RecordEvent(eventName string, message string) {
}
func (n *nopRecorder) RecordErrorEvent(eventName string, message string, err error) {
}
func (n *nopRecorder) Enable() bool {
return false
}
type recorderImpl struct {
sessionId string
}
func (r *recorderImpl) RecordEvent(eventName string, message string) {
r.RecordErrorEvent(eventName, message, nil)
}
func (r *recorderImpl) RecordErrorEvent(eventName string, message string, err error) {
event := &Event{
Timestamp: time.Now(),
EventName: eventName,
Message: message,
Err: err,
}
sessionEvent := &sessionEvent{
sessionId: r.sessionId,
Event: event,
}
// 当 trace 日志 channel 超过 90% 时放弃 trace 记录,防止阻塞主业务
sessionChanCap := cap(sessionEventChan)
sessionChanLen := len(sessionEventChan)
if sessionChanLen > sessionChanCap*9/10 {
log.Run().Errorf("sessionEventChan data to many -> cap:%d len:%d", sessionChanCap, sessionChanLen)
return
}
sessionEventChan <- sessionEvent
}
func (r *recorderImpl) Enable() bool {
return true
}
var defaultNopRecorder = nopRecorder{}
var slots = make([]int64, 30)
func acquireRecorder(sessionId string) Recorder {
if global.CurrentEnv == env.Debug {
return &recorderImpl{sessionId: sessionId}
}
now := time.Now()
slotIndex := now.Minute() / 2
timeMinute := now.Unix() / 60
slot := &slots[slotIndex]
slotTime := atomic.LoadInt64(slot)
if slotTime == timeMinute {
return &defaultNopRecorder
}
if atomic.CompareAndSwapInt64(slot, slotTime, timeMinute) {
return &recorderImpl{sessionId: sessionId}
}
return &defaultNopRecorder
}
type Session struct {
Recorder Recorder
}
func NewSession(sessionId string) *Session {
if len(sessionId) == 0 {
sessionId = sessionIdNop
}
return &Session{
Recorder: acquireRecorder(sessionId),
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment