Commit 7d1057a6 authored by Tsaiilin's avatar Tsaiilin

增加 DNS 缓存,修改重播逻辑

parent ce6782d9
......@@ -6,6 +6,7 @@ import (
"time"
"github.com/adamweixuan/getty"
"github.com/coocood/freecache"
"virjar.com/majora-go/infra"
"virjar.com/majora-go/log"
......@@ -18,8 +19,11 @@ type Client struct {
localAddr net.Addr
natTunnel getty.Client
session getty.Session
redial *infra.PPPRedial
connStore sync.Map
sessionStore sync.Map
dnsCache *freecache.Cache
}
func NewClientWithConf(cfg *model.Configure) *Client {
......@@ -39,6 +43,8 @@ func NewCli(cfg *model.Configure) *Client {
localAddr: localAddr,
connStore: sync.Map{},
sessionStore: sync.Map{},
dnsCache: freecache.NewCache(1024),
redial: infra.NewPPPRedial(),
}
return client
......@@ -70,12 +76,21 @@ func (client *Client) check() {
for {
timer.Reset(interval)
<-timer.C
success := infra.Ping(url)
success := false
for i := 0; i < 3; i++ {
success = infra.Ping(url)
if success {
break
}
}
if success {
continue
}
log.Run().Warnf("net check fail, redial...")
infra.RedialByCheck(cfg)
log.Run().Warnf("Redial net check fail, redial...")
client.redial.RedialByCheck(cfg)
if client.sessiong != nil {
client.session.Close()
}
}
})
}
......@@ -12,6 +12,7 @@ import (
"github.com/adamweixuan/getty"
"virjar.com/majora-go/common"
"virjar.com/majora-go/global"
"virjar.com/majora-go/log"
"virjar.com/majora-go/protocol"
"virjar.com/majora-go/trace"
......@@ -64,7 +65,27 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty
LocalAddr: client.localAddr,
}
conn, err := dialer.Dial(common.TCP, packet.Extra)
target := hostPort[0]
ip, err := client.dnsCache.Get([]byte(hostPort[0]))
if err != nil {
traceSession.Recorder.RecordEvent(trace.DnsResolveEvent, fmt.Sprintf("Dns cache miss %s ", hostPort[0]))
host, dnsErr := net.LookupHost(hostPort[0])
if dnsErr != nil {
traceSession.Recorder.RecordErrorEvent(trace.DnsResolveEvent, fmt.Sprintf("Resolve %s ip error", hostPort[0]), dnsErr)
client.closeVirtualConnection(session, packet.SerialNumber)
return
}
err := client.dnsCache.Set([]byte(hostPort[0]), []byte(host[0]), int(global.Config.DnsCacheDuration.Seconds()))
if err != nil {
traceSession.Recorder.RecordErrorEvent(trace.DnsResolveEvent, fmt.Sprintf("Dns cache set error %s", hostPort[0]), err)
}
target = host[0]
} else {
target = string(ip)
}
traceSession.Recorder.RecordEvent(trace.DnsResolveEvent, fmt.Sprintf("Dns cache hit %s -> %s", hostPort[0], target))
conn, err := dialer.Dial(common.TCP, fmt.Sprintf("%s:%s", target, hostPort[1]))
if err != nil {
log.Error().Errorf("[handleConnect] %d->connect to %s->%s", packet.SerialNumber, packet.Extra, err.Error())
traceSession.Recorder.RecordErrorEvent(trace.ConnectEvent,
......@@ -76,8 +97,8 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty
_ = tcpConn.SetNoDelay(true)
_ = tcpConn.SetKeepAlive(true)
client.AddConnection(packet, tcpConn, packet.Extra)
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Connect to %s success (sn:%d)",
packet.Extra, packet.SerialNumber))
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Connect to %s success, local: %s -> remote:%s (sn:%d)",
packet.Extra, tcpConn.LocalAddr(), tcpConn.RemoteAddr(), packet.SerialNumber))
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Start replay natServer connect ready (sn:%d)", packet.SerialNumber))
majoraPacket := protocol.TypeConnectReady.CreatePacket()
......@@ -139,7 +160,7 @@ func (client *Client) handleTransfer(packet *protocol.MajoraPacket, session gett
traceRecorder := client.GetRecorderFromSession(packet.SerialNumber)
traceRecorder.RecordEvent(trace.TransferEvent,
fmt.Sprintf("Receive transfer packet from natServer,start to be forward to target (%d)", packet.SerialNumber))
fmt.Sprintf("Receive transfer packet from natServer,start to be forward to target, len:%d (%d)", len(packet.Data), packet.SerialNumber))
load, ok := client.connStore.Load(packet.SerialNumber)
if !ok {
......@@ -304,6 +325,7 @@ func (client *Client) closeVirtualConnection(session getty.Session, serialNumber
traceRecorder.RecordErrorEvent(trace.DisconnectEvent,
fmt.Sprintf("Send disconnect to natServer failed closed:%v allCnt %d sendCnt %d (sn:%d)",
session.IsClosed(), allCnt, sendCnt, serialNumber), err)
session.Close()
}
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("Send disconnect to natServer success (sn:%d)", serialNumber))
client.connStore.Delete(serialNumber)
......@@ -324,4 +346,5 @@ func (client *Client) CloseAll(session getty.Session) {
return true
})
client.connStore = sync.Map{}
client.sessionStore = sync.Map{}
}
package client
import (
"runtime"
"time"
"github.com/adamweixuan/getty"
......@@ -16,6 +15,7 @@ type MajoraEventListener struct {
}
func (m *MajoraEventListener) OnOpen(session getty.Session) error {
m.client.session = session
packet := protocol.TypeRegister.CreatePacket()
packet.Extra = m.client.config.ClientID
extraMap := make(map[string]string, 1)
......@@ -26,6 +26,7 @@ func (m *MajoraEventListener) OnOpen(session getty.Session) error {
return err
}
log.Run().Infof("[OnOpen] register to %s success", m.client.config.TunnelAddr)
return nil
}
......@@ -40,7 +41,7 @@ func (m *MajoraEventListener) OnError(session getty.Session, err error) {
}
func (m *MajoraEventListener) OnCron(session getty.Session) {
log.Run().Warnf("thread:%d session closed %v", runtime.NumGoroutine(), session.IsClosed())
log.Run().Infof("[OnCorn] Redial, session closed:%v", session.IsClosed())
m.client.Redial(session)
}
......
......@@ -11,7 +11,6 @@ import (
gxsync "github.com/adamweixuan/gostnops/sync"
"virjar.com/majora-go/common"
"virjar.com/majora-go/infra"
"virjar.com/majora-go/log"
)
......@@ -76,7 +75,7 @@ func InitialSession(session getty.Session, client *Client) (err error) {
// 加上随机 防止vps在同时间重启
randDuration := rand.Int63n(time.Minute.Milliseconds() * 5)
interval := randDuration + client.config.Redial.RedialDuration.Milliseconds()
log.Run().Infof("redial interval %+v", time.Duration(interval)*time.Millisecond)
log.Run().Infof("Redial interval %+v", time.Duration(interval)*time.Millisecond)
session.SetCronPeriod(int(interval))
}
session.SetPkgHandler(PkgCodec)
......@@ -87,16 +86,17 @@ func InitialSession(session getty.Session, client *Client) (err error) {
}
func (client *Client) Redial(session getty.Session) {
log.Run().Infof("redial start ...%v", client.config.Redial.Valid())
log.Run().Infof("[Redial] start can redial ? %v", client.config.Redial.Valid())
if !client.config.Redial.Valid() {
return
}
log.Run().Warn("redial send offline message ...")
log.Run().Info("[Redial] send offline message ...")
if _, _, err := session.WritePkg(OfflinePacket, 0); err != nil {
log.Run().Warnf("write offline to server error %s", err.Error())
log.Run().Errorf("[Redial] write offline to server error %s", err.Error())
}
log.Run().Warn("redial close local session")
client.CloseAll(session)
time.Sleep(client.config.Redial.WaitTime)
infra.Redial(client.config, session)
log.Run().Info("[Redial] start close local session")
client.CloseAll(session)
client.redial.Redial(client.config)
session.Close()
}
env: debug
tunnel_addr: 127.0.0.1:5879
tunnel_addr: majora-vps-zj.virjar.com:5879
dns_server: 114.114.114.114:53
pprof_port: 16666
log_level: debug
log_path: ./output/log/
reconn_intervalz: 5s
net_check_interval: 5s
dns_cache_duration: 10m
net_check_url: https://www.baidu.com[extra]
account: superman
#redial:
# command: /bin/bash
# exec_path: /root/ppp_redial.sh
# redial_duration: 5m
# wait_time: 10s
......@@ -4,4 +4,10 @@ dns_server: 114.114.114.114:53
log_level: info
reconn_interval: 5s
net_check_interval: 5s
net_check_url: https://www.baidu.com
\ No newline at end of file
net_check_url: https://www.baidu.com
dns_cache_duration: 10m
redial:
command: /bin/bash
exec_path: /root/ppp_redial.sh
redial_duration: 5m
wait_time: 10s
......@@ -6,14 +6,16 @@ require (
github.com/adamweixuan/getty v0.0.1
github.com/adamweixuan/gostnops v0.0.1
github.com/google/uuid v1.3.0
gopkg.in/ini.v1 v1.66.2
)
require (
github.com/BurntSushi/toml v0.4.1 // indirect
github.com/coocood/freecache v1.2.0
github.com/fsnotify/fsnotify v1.5.1
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/spf13/viper v1.10.1
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.19.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
......
......@@ -50,6 +50,7 @@ github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw
github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/adamweixuan/getty v0.0.1 h1:ze1++8zi9yF7QJnp9p1pbCn/7n5Ex5xrJKlFzpOu5q8=
github.com/adamweixuan/getty v0.0.1/go.mod h1:SnxNiA40Am3aSxaDnAYBBbd7iz9BtKUign8x1jNh2TU=
......@@ -73,6 +74,7 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
......@@ -92,6 +94,8 @@ github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/coocood/freecache v1.2.0 h1:p8RhjN6Y4DRBIMzdRlm1y+M7h7YJxye3lGW8/VvzCz0=
github.com/coocood/freecache v1.2.0/go.mod h1:OKrEjkGVoxZhyWAJoeFi5BMLUJm2Tit0kpGkIr7NGYY=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
......@@ -284,6 +288,8 @@ github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4
github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
......@@ -314,6 +320,7 @@ github.com/sagikazarmark/crypt v0.4.0/go.mod h1:ALv2SRj7GxYV4HO9elxH9nS6M9gW+xDN
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.3.3/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4=
github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY=
......
......@@ -12,7 +12,7 @@ import (
// 网络检测
var (
httpcli *http.Client
httpCli *http.Client
)
var (
......@@ -32,7 +32,7 @@ const (
func init() {
rand.Seed(time.Now().UnixNano())
httpcli = &http.Client{
httpCli = &http.Client{
Transport: &http.Transport{
TLSHandshakeTimeout: defTimeout,
TLSClientConfig: &tls.Config{
......@@ -44,7 +44,7 @@ func init() {
}
func Ping(url string) bool {
resp, err := httpcli.Head(url)
resp, err := httpCli.Head(url)
if err != nil {
log.Run().Warnf("ping %s with error %+v", url, err)
return false
......
......@@ -5,7 +5,8 @@ import (
"runtime"
"time"
"github.com/adamweixuan/getty"
"go.uber.org/atomic"
"virjar.com/majora-go/log"
"virjar.com/majora-go/model"
)
......@@ -15,44 +16,54 @@ const (
cmdUnix = "-c"
)
func Redial(cfg *model.Configure, session getty.Session) {
log.Run().Infof("[redial] start, session is close :%d", session.IsClosed())
beforeIp := GetPPP()
retry := 0
defer func(start time.Time) {
newIp := GetPPP()
log.Run().Infof("[redial] retry %d, cost %v, ip change %s -> %s, session is close:%v",
retry, time.Since(start), beforeIp, newIp, session.IsClosed())
}(time.Now())
for {
retry++
status := command(cfg)
pingBaidu := RandomPing()
log.Run().Infof("[redial] net check: %d->%v", retry, pingBaidu)
if pingBaidu && status {
break
type PPPRedial struct {
inRedialing *atomic.Bool
}
func NewPPPRedial() *PPPRedial {
return &PPPRedial{
inRedialing: atomic.NewBool(false),
}
}
func (p *PPPRedial) Redial(cfg *model.Configure) {
if p.inRedialing.CAS(false, true) {
log.Run().Infof("[Redial] start")
beforeIp := GetPPP()
retry := 0
defer func(start time.Time) {
newIp := GetPPP()
log.Run().Infof("[Redial] retry %d, cost %v, ip change %s -> %s ",
retry, time.Since(start), beforeIp, newIp)
}(time.Now())
for {
retry++
status := command(cfg)
pingBaidu := RandomPing()
log.Run().Infof("[Redial] net check: %d->%v", retry, pingBaidu)
if pingBaidu && status {
break
}
}
p.inRedialing.CAS(true, false)
} else {
log.Run().Infof("[Redial] inRedialing ignore this")
}
}
func RedialByCheck(cfg *model.Configure) bool {
beforeIp := GetPPP()
defer func(start time.Time) {
newIp := GetPPP()
log.Run().Infof("[RedialByCheck] cost %v, ip change %s -> %s", time.Since(start), beforeIp, newIp)
}(time.Now())
return command(cfg)
func (p *PPPRedial) RedialByCheck(cfg *model.Configure) {
p.Redial(cfg)
}
func command(cfg *model.Configure) bool {
execPath := cfg.Redial.ExecPath
if len(execPath) == 0 {
log.Run().Warn("[redial] exec file is empty")
log.Run().Warn("[Redial] exec file is empty")
return true
}
command := cfg.Redial.Command
if len(command) == 0 {
log.Run().Warn("[redial] command is empty")
log.Run().Warn("[Redial] command is empty")
return true
}
......@@ -64,9 +75,9 @@ func command(cfg *model.Configure) bool {
cmd := exec.Command(command, args, execPath)
output, err := cmd.Output()
if err != nil {
log.Run().Errorf("[redial] Execute Shell:%s failed with error:%s", command, err.Error())
log.Run().Errorf("[Redial] Execute Shell:%s failed with error:%s", command, err.Error())
return false
}
log.Run().Infof("[redial] success %+v resp:%s", cmd, string(output))
log.Run().Infof("[Redial] success %+v resp:%s", cmd, string(output))
return true
}
......@@ -4,7 +4,6 @@ import (
"time"
"github.com/google/uuid"
"gopkg.in/ini.v1"
"virjar.com/majora-go/common"
)
......@@ -31,6 +30,7 @@ type Configure struct {
ClientID string `mapstructure:"client_id"`
NetCheckInterval time.Duration `mapstructure:"net_check_interval"`
NetCheckUrl string `mapstructure:"net_check_url"`
DnsCacheDuration time.Duration `mapstructure:"dns_cache_duration"`
Extra Extra `mapstructure:"extra"`
Redial Redial `mapstructure:"redial"`
}
......@@ -57,14 +57,6 @@ func NewDefMajoraConf() *Configure {
}
}
func InitConf(path string) *Configure {
conf := NewDefMajoraConf()
if err := ini.MapTo(conf, path); err != nil {
panic(err)
}
return conf
}
func (r Redial) Valid() bool {
if len(r.Command) == 0 {
return false
......
......@@ -18,6 +18,7 @@ var (
MajoraSessionName = "MajoraSessionId"
UpStreamEvent = "ReadUpStream"
DisconnectEvent = "Disconnect"
DnsResolveEvent = "DnsResolve"
sessionIdNop = "session_id_not_set"
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment