Commit e4aac5d3 authored by Tsaiilin's avatar Tsaiilin

集群版开发完成

parent ad3d0c1c
......@@ -5,6 +5,7 @@ import (
"encoding/json"
"flag"
"fmt"
"github.com/pkg/errors"
"math/rand"
"net"
"net/http"
......@@ -13,8 +14,11 @@ import (
"os/signal"
"path/filepath"
"runtime"
"strconv"
"strings"
"syscall"
"time"
"virjar.com/majora-go/infra"
"virjar.com/majora-go/client"
"virjar.com/majora-go/daemon"
......@@ -26,7 +30,6 @@ import (
var (
configure string
)
var (
......@@ -43,7 +46,7 @@ func init() {
func initial() {
if global.Config.PprofPort > 0 {
safe.SageGo(func() {
safe.SafeGo(func() {
addr := fmt.Sprintf("127.0.0.1:%d", global.Config.PprofPort)
log.Run().Infof("enable pprof: %s", addr)
log.Run().Error(http.ListenAndServe(addr, nil))
......@@ -71,7 +74,22 @@ func cli() {
log.Run().Infof("hostInfo os:%s, arch:%s", runtime.GOOS, runtime.GOARCH)
cfgInfo, _ := json.Marshal(global.Config)
log.Run().Infof("config info:%s", string(cfgInfo))
client.NewClientWithConf(global.Config).StartUp()
domainAndPort := strings.Split(global.Config.TunnelAddr, ":")
if len(domainAndPort) != 2 {
panic(errors.Errorf("TunnelAddr Error: %s", global.Config.TunnelAddr))
}
domain := domainAndPort[0]
port, err := strconv.Atoi(domainAndPort[1])
if err != nil {
panic(errors.Errorf("Parse tunnel port error: %s", domainAndPort[1]))
}
clusterClient := client.ClusterClient{
Domain: domain,
Port: port,
Redial: infra.NewPPPRedial(),
}
clusterClient.Start()
}
//main start
......
package client
import (
"net"
"sync"
"time"
"github.com/adamweixuan/getty"
"github.com/coocood/freecache"
"net"
"sync"
"virjar.com/majora-go/infra"
"virjar.com/majora-go/log"
"virjar.com/majora-go/model"
"virjar.com/majora-go/safe"
)
type Client struct {
config *model.Configure
host string
port int
localAddr net.Addr
natTunnel getty.Client
session getty.Session
redial *infra.PPPRedial
connStore sync.Map
sessionStore sync.Map
dnsCache *freecache.Cache
}
func NewClientWithConf(cfg *model.Configure) *Client {
return NewCli(cfg)
func NewClientWithConf(cfg *model.Configure, host string, port int) *Client {
return NewCli(cfg, host, port)
}
func NewCli(cfg *model.Configure) *Client {
func NewCli(cfg *model.Configure, host string, port int) *Client {
var localAddr net.Addr
if len(cfg.LocalAddr) > 0 {
localAddr = &net.TCPAddr{
......@@ -40,57 +36,18 @@ func NewCli(cfg *model.Configure) *Client {
}
client := &Client{
config: cfg,
host: host,
port: port,
localAddr: localAddr,
connStore: sync.Map{},
sessionStore: sync.Map{},
dnsCache: freecache.NewCache(1024),
redial: infra.NewPPPRedial(),
}
return client
}
func (client *Client) StartUp() {
client.check()
client.connect()
}
func (client *Client) check() {
cfg := client.config
if !cfg.Redial.Valid() {
return
}
interval := cfg.NetCheckInterval
if interval <= 0 {
interval = time.Second * 5
}
url := infra.RandUrl()
if len(cfg.NetCheckUrl) > 0 {
url = cfg.NetCheckUrl
}
safe.SageGo(func() {
var timer = time.NewTimer(interval)
for {
timer.Reset(interval)
<-timer.C
success := false
for i := 0; i < 3; i++ {
success = infra.Ping(url)
if success {
break
}
}
if success {
continue
}
log.Run().Warnf("Redial net check fail, redial...")
if client.redial.RedialByCheck(cfg) {
client.natTunnel.Close()
client.connect()
}
}
})
}
package client
import (
"math/rand"
"net"
"sync"
"time"
"gopkg.in/fatih/set.v0"
"virjar.com/majora-go/global"
"virjar.com/majora-go/infra"
"virjar.com/majora-go/log"
"virjar.com/majora-go/safe"
)
type ClusterClient struct {
Domain string
Port int
Redial *infra.PPPRedial
clients sync.Map
}
func (c *ClusterClient) Start() {
c.check()
safe.SafeGo(func() {
var timer = time.NewTimer(5 * time.Minute)
for {
c.connectNatServers()
<-timer.C
timer.Reset(5 * time.Minute)
}
})
if global.Config.Redial.Valid() {
safe.SafeGo(func() {
// 加上随机 防止vps在同时间重启
duration := c.randomDuration()
log.Run().Infof("Redial interval %+v", duration)
var timer = time.NewTimer(duration)
for {
<-timer.C
c.StartRedial("cron", true)
duration = c.randomDuration()
log.Run().Infof("Redial interval %+v", duration)
timer.Reset(duration)
}
})
}
}
func (c *ClusterClient) connectNatServers() {
hosts, err := net.LookupHost(c.Domain)
if err != nil {
log.Error().Errorf("[connectNatServers] lookup domain host error, %+v", err)
return
}
log.Run().Infof("[connectNatServers] LookupHost from %s result: %+v", c.Domain, hosts)
dnsSet := set.New(set.ThreadSafe)
for _, v := range hosts {
dnsSet.Add(v)
}
existSet := set.New(set.ThreadSafe)
c.clients.Range(func(key, value interface{}) bool {
existSet.Add(key)
return true
})
needConnectSet := set.Difference(dnsSet, existSet)
log.Run().Infof("[connectNatServers] NeedConnectSet: %v", needConnectSet.List())
needConnectSet.Each(func(i interface{}) bool {
t := i.(string)
if _, ok := c.clients.Load(t); ok {
log.Error().Error("[connectNatServers] client already exist")
} else {
client := NewClientWithConf(global.Config, t, c.Port)
client.StartUp()
c.clients.Store(t, client)
}
return true
})
needRemoveSet := set.Difference(existSet, dnsSet)
log.Run().Infof("[connectNatServers] needRemoveSet: %v", needRemoveSet.List())
needRemoveSet.Each(func(i interface{}) bool {
t := i.(string)
load, loaded := c.clients.LoadAndDelete(t)
if !loaded {
log.Error().Error("[connectNatServers] client already remove")
}
needCloseClient := load.(*Client)
needCloseClient.CloseAll()
needCloseClient.natTunnel.Close()
return true
})
}
func (c *ClusterClient) randomDuration() time.Duration {
rand.Seed(time.Now().UnixNano())
randDuration := rand.Int63n(time.Minute.Milliseconds() * 5)
interval := randDuration + global.Config.Redial.RedialDuration.Milliseconds()
return time.Duration(interval) * time.Millisecond
}
func (c *ClusterClient) StartRedial(tag string, replay bool) {
defer func(startTime time.Time) {
log.Run().Infof("StartRedial cost %v", time.Since(startTime))
}(time.Now())
if replay {
c.clients.Range(func(host, c interface{}) bool {
client, _ := c.(*Client)
client.Redial(tag)
return true
})
time.Sleep(global.Config.Redial.WaitTime)
}
c.Redial.Redial(tag)
c.clients.Range(func(host, c interface{}) bool {
client, _ := c.(*Client)
client.CloseAll()
client.natTunnel.Close()
client.connect()
return true
})
}
func (c *ClusterClient) check() {
if !global.Config.Redial.Valid() {
return
}
interval := global.Config.NetCheckInterval
if interval <= 0 {
interval = time.Second * 5
}
url := infra.RandUrl()
if len(global.Config.NetCheckUrl) > 0 {
url = global.Config.NetCheckUrl
}
safe.SafeGo(func() {
var timer = time.NewTimer(interval)
for {
timer.Reset(interval)
<-timer.C
success := false
for i := 0; i < 3; i++ {
success = infra.Ping(url)
if success {
break
}
}
if success {
continue
}
log.Run().Warnf("Redial net check fail, redial...")
c.StartRedial("check", false)
}
})
}
......@@ -49,7 +49,7 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty
if !ok {
log.Error().Errorf("Get user from connect packet failed (sn:%d)", packet.SerialNumber)
}
traceSession := trace.NewSession(sessionId, user, enableTrace == "true")
traceSession := trace.NewSession(sessionId, client.host, user, enableTrace == "true")
client.AddSession(packet, traceSession)
traceSession.Recorder.RecordEvent(trace.ConnectEvent, fmt.Sprintf("Start handle connect to %s (sn:%d)",
packet.Extra, packet.SerialNumber))
......@@ -128,7 +128,7 @@ func (client *Client) handleConnect(packet *protocol.MajoraPacket, session getty
client.closeVirtualConnection(session, packet.SerialNumber)
return
} else {
safe.SageGo(func() {
safe.SafeGo(func() {
client.handleUpStream(tcpConn, packet, session)
})
log.Run().Debugf("[handleConnect] %d->connect success to %s ", packet.SerialNumber, packet.Extra)
......@@ -290,7 +290,7 @@ func (client *Client) GetRecorderFromSession(sn int64) trace.Recorder {
session, ok := client.sessionStore.Load(sn)
if !ok {
log.Run().Warnf("[GetRecorderFromSession] get session failed, maybe already closed (%d)", sn)
session = trace.NewSession("", "", false)
session = trace.NewSession("", "", "", false)
}
traceSession := session.(*trace.Session)
return traceSession.Recorder
......@@ -343,7 +343,7 @@ func (client *Client) closeVirtualConnection(session getty.Session, serialNumber
traceRecorder.RecordEvent(trace.DisconnectEvent, fmt.Sprintf("Send disconnect to natServer success (sn:%d)", serialNumber))
}
func (client *Client) CloseAll(session getty.Session) {
func (client *Client) CloseAll() {
defer func() {
if err := recover(); err != nil {
log.Error().Errorf("OnClose %+v", err)
......@@ -353,7 +353,7 @@ func (client *Client) CloseAll(session getty.Session) {
serialNumber := key.(int64)
conn, _ := value.(*net.TCPConn)
log.Run().Debugf("[CloseAll] close serialNumber -> %d", serialNumber)
client.OnClose(session, conn, serialNumber)
client.OnClose(client.session, conn, serialNumber)
return true
})
client.connStore = sync.Map{}
......
......@@ -32,17 +32,17 @@ func (m *MajoraEventListener) OnOpen(session getty.Session) error {
func (m *MajoraEventListener) OnClose(session getty.Session) {
log.Error().Errorf("OnClose-> session closed %v", session.IsClosed())
m.client.CloseAll(session)
m.client.CloseAll()
}
func (m *MajoraEventListener) OnError(session getty.Session, err error) {
log.Error().Errorf("OnError %s", err.Error())
m.client.CloseAll(session)
m.client.CloseAll()
}
func (m *MajoraEventListener) OnCron(session getty.Session) {
log.Run().Infof("[OnCorn] Redial, session closed:%v", session.IsClosed())
m.client.Redial(session, "corn")
//log.Run().Infof("[OnCorn] Redial, session closed:%v", session.IsClosed())
//m.client.Redial(session, "corn")
}
func (m *MajoraEventListener) OnMessage(session getty.Session, input interface{}) {
......
......@@ -2,13 +2,10 @@ package client
import (
"fmt"
"math/rand"
"net"
"runtime"
"time"
"github.com/adamweixuan/getty"
gxsync "github.com/adamweixuan/gostnops/sync"
"net"
"runtime"
"virjar.com/majora-go/common"
"virjar.com/majora-go/log"
......@@ -19,15 +16,11 @@ var (
)
func (client *Client) connect() {
hostPort := client.config.TunnelAddr
if len(hostPort) == 0 {
panic("invalid nat host/port info")
}
reConnect := client.config.ReconnInterval
gettyCli := getty.NewTCPClient(
getty.WithServerAddress(hostPort),
getty.WithServerAddress(fmt.Sprintf("%s:%d", client.host, client.port)),
getty.WithConnectionNumber(1),
getty.WithClientTaskPool(taskPool),
getty.WithReconnectInterval(int(reConnect.Milliseconds())),
......@@ -70,14 +63,6 @@ func InitialSession(session getty.Session, client *Client) (err error) {
session.SetWriteTimeout(common.WriteTimeout)
session.SetWaitTime(common.WaitTimeout)
if client.config.Redial.Valid() {
rand.Seed(time.Now().UnixNano())
// 加上随机 防止vps在同时间重启
randDuration := rand.Int63n(time.Minute.Milliseconds() * 5)
interval := randDuration + client.config.Redial.RedialDuration.Milliseconds()
log.Run().Infof("Redial interval %+v", time.Duration(interval)*time.Millisecond)
session.SetCronPeriod(int(interval))
}
session.SetPkgHandler(PkgCodec)
session.SetEventListener(&MajoraEventListener{
client: client,
......@@ -85,19 +70,15 @@ func InitialSession(session getty.Session, client *Client) (err error) {
return nil
}
func (client *Client) Redial(session getty.Session, tag string) {
func (client *Client) Redial(tag string) {
log.Run().Infof("[Redial %s] start, can redial? %v", tag, client.config.Redial.Valid())
if !client.config.Redial.Valid() {
return
}
log.Run().Infof("[Redial %s] Send offline message", tag)
if _, _, err := session.WritePkg(OfflinePacket, 0); err != nil {
if _, _, err := client.session.WritePkg(OfflinePacket, 0); err != nil {
log.Run().Errorf("[Redial %s] write offline to server error %s", tag, err.Error())
}
time.Sleep(client.config.Redial.WaitTime)
log.Run().Info("[Redial %s] start close local session", tag)
client.CloseAll(session)
client.redial.Redial(client.config, tag)
client.natTunnel.Close()
client.connect()
log.Run().Info("[Redial %s %s] start close local session", client.host, tag)
}
......@@ -13,10 +13,11 @@ require (
github.com/coocood/freecache v1.2.0
github.com/fsnotify/fsnotify v1.5.1
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pkg/errors v0.9.1
github.com/spf13/viper v1.10.1
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/atomic v1.9.0
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.19.1
gopkg.in/fatih/set.v0 v0.2.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
)
......@@ -288,8 +288,6 @@ github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4
github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
......@@ -769,6 +767,8 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fatih/set.v0 v0.2.1 h1:Xvyyp7LXu34P0ROhCyfXkmQCAoOUKb1E2JS9I7SE5CY=
gopkg.in/fatih/set.v0 v0.2.1/go.mod h1:5eLWEndGL4zGGemXWrKuts+wTJR0y+w+auqUJZbmyBg=
gopkg.in/ini.v1 v1.66.2 h1:XfR1dOYubytKy4Shzc2LHrrGhU0lDCfDGG1yLPmpgsI=
gopkg.in/ini.v1 v1.66.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
......
......@@ -4,11 +4,11 @@ import (
"os/exec"
"runtime"
"time"
"virjar.com/majora-go/global"
"go.uber.org/atomic"
"virjar.com/majora-go/log"
"virjar.com/majora-go/model"
)
const (
......@@ -26,7 +26,7 @@ func NewPPPRedial() *PPPRedial {
}
}
func (p *PPPRedial) Redial(cfg *model.Configure, tag string) bool{
func (p *PPPRedial) Redial(tag string) bool {
if p.inRedialing.CAS(false, true) {
log.Run().Infof("[PPPRedial %s] start", tag)
beforeIp := GetPPP()
......@@ -38,7 +38,7 @@ func (p *PPPRedial) Redial(cfg *model.Configure, tag string) bool{
}(time.Now())
for {
retry++
status := command(cfg)
status := command()
pingBaidu := RandomPing()
log.Run().Infof("[PPPRedial %s] net check: %d->%v", tag, retry, pingBaidu)
if pingBaidu && status {
......@@ -53,17 +53,17 @@ func (p *PPPRedial) Redial(cfg *model.Configure, tag string) bool{
}
}
func (p *PPPRedial) RedialByCheck(cfg *model.Configure) bool {
return p.Redial(cfg, "check")
func (p *PPPRedial) RedialByCheck() bool {
return p.Redial("check")
}
func command(cfg *model.Configure) bool {
execPath := cfg.Redial.ExecPath
func command() bool {
execPath := global.Config.Redial.ExecPath
if len(execPath) == 0 {
log.Run().Warn("[Redial] exec file is empty")
return true
}
command := cfg.Redial.Command
command := global.Config.Redial.Command
if len(command) == 0 {
log.Run().Warn("[Redial] command is empty")
return true
......
......@@ -2,7 +2,7 @@ package safe
import "virjar.com/majora-go/log"
func SageGo(f func()) {
func SafeGo(f func()) {
go func() {
defer func() {
if err := recover(); err != nil {
......
......@@ -22,15 +22,15 @@ var (
)
func init() {
safe.SageGo(func() {
safe.SafeGo(func() {
for {
e := <-sessionEventChan
if e.Err != nil {
log.Trace().Errorf("[%s] [%s] [%s] [%s] %s error:%+v",
e.sessionId, e.user, e.Timestamp.Format("2006-01-02 15:04:05.000000"), e.EventName, e.Message, e.Err)
log.Trace().Errorf("[%s] [%s] [%s] [%s] [%s] %s error:%+v",
e.natHost, e.sessionId, e.user, e.Timestamp.Format("2006-01-02 15:04:05.000000"), e.EventName, e.Message, e.Err)
} else {
log.Trace().Infof("[%s] [%s] [%s] [%s] %s",
e.sessionId, e.user, e.Timestamp.Format("2006-01-02 15:04:05.000000"), e.EventName, e.Message)
log.Trace().Infof("[%s] [%s] [%s] [%s] [%s] %s",
e.natHost, e.sessionId, e.user, e.Timestamp.Format("2006-01-02 15:04:05.000000"), e.EventName, e.Message)
}
}
})
......@@ -54,6 +54,7 @@ type Event struct {
type sessionEvent struct {
user string
sessionId string
natHost string
*Event
}
......@@ -82,6 +83,7 @@ func (n *nopRecorder) Enable() bool {
type recorderImpl struct {
user string
sessionId string
host string
}
func (r *recorderImpl) RecordEvent(eventName string, message string) {
......@@ -98,6 +100,7 @@ func (r *recorderImpl) RecordErrorEvent(eventName string, message string, err er
sessionEvent := &sessionEvent{
user: r.user,
sessionId: r.sessionId,
natHost: r.host,
Event: event,
}
......@@ -117,11 +120,12 @@ func (r *recorderImpl) Enable() bool {
var defaultNopRecorder = nopRecorder{}
func acquireRecorder(sessionId string, user string, enable bool) Recorder {
func acquireRecorder(sessionId string, host, user string, enable bool) Recorder {
if enable {
return &recorderImpl{
user: user,
sessionId: sessionId,
host: host,
}
} else {
return &defaultNopRecorder
......@@ -133,11 +137,11 @@ type Session struct {
Recorder Recorder
}
func NewSession(sessionId string, user string, enable bool) *Session {
func NewSession(sessionId string, host string, user string, enable bool) *Session {
if len(sessionId) == 0 {
sessionId = sessionIdNop
}
return &Session{
Recorder: acquireRecorder(sessionId, user, enable),
Recorder: acquireRecorder(sessionId, host, user, enable),
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment