Commit 4953a83c authored by Tsaiilin's avatar Tsaiilin

goroutine 池切换为 ants

parent 77a127be
...@@ -5,7 +5,6 @@ import ( ...@@ -5,7 +5,6 @@ import (
"encoding/json" "encoding/json"
"flag" "flag"
"fmt" "fmt"
"github.com/pkg/errors"
"math/rand" "math/rand"
"net" "net"
"net/http" "net/http"
...@@ -18,15 +17,17 @@ import ( ...@@ -18,15 +17,17 @@ import (
"strings" "strings"
"syscall" "syscall"
"time" "time"
"virjar.com/majora-go/infra"
"virjar.com/majora-go/trace" "github.com/pkg/errors"
"virjar.com/majora-go/client" "virjar.com/majora-go/client"
"virjar.com/majora-go/daemon" "virjar.com/majora-go/daemon"
"virjar.com/majora-go/global" "virjar.com/majora-go/global"
"virjar.com/majora-go/infra"
"virjar.com/majora-go/initialize" "virjar.com/majora-go/initialize"
"virjar.com/majora-go/log" "virjar.com/majora-go/log"
"virjar.com/majora-go/safe" "virjar.com/majora-go/safe"
"virjar.com/majora-go/trace"
) )
var ( var (
...@@ -108,7 +109,6 @@ func main() { ...@@ -108,7 +109,6 @@ func main() {
d.MaxCount = 20 //最大重启次数 d.MaxCount = 20 //最大重启次数
d.Run() d.Run()
} }
initialize.InitLogger() initialize.InitLogger()
initial() initial()
cli() cli()
......
...@@ -18,7 +18,7 @@ type Client struct { ...@@ -18,7 +18,7 @@ type Client struct {
natTunnel getty.Client natTunnel getty.Client
session getty.Session session getty.Session
transferStore sync.Map transferStore sync.Map
dnsCache *freecache.Cache dnsCache *freecache.Cache
} }
func NewClientWithConf(cfg *model.Configure, host string, port int) *Client { func NewClientWithConf(cfg *model.Configure, host string, port int) *Client {
......
...@@ -22,7 +22,7 @@ type ClusterClient struct { ...@@ -22,7 +22,7 @@ type ClusterClient struct {
func (c *ClusterClient) Start() { func (c *ClusterClient) Start() {
c.check() c.check()
taskPool.AddTaskAlways(func() { _ = taskPool.Submit(func() {
var timer = time.NewTimer(5 * time.Minute) var timer = time.NewTimer(5 * time.Minute)
for { for {
c.connectNatServers() c.connectNatServers()
...@@ -31,7 +31,7 @@ func (c *ClusterClient) Start() { ...@@ -31,7 +31,7 @@ func (c *ClusterClient) Start() {
} }
}) })
if global.Config.Redial.Valid() { if global.Config.Redial.Valid() {
taskPool.AddTaskAlways(func() { _ = taskPool.Submit(func() {
// 加上随机 防止vps在同时间重启 // 加上随机 防止vps在同时间重启
duration := c.randomDuration() duration := c.randomDuration()
log.Run().Infof("Redial interval %+v", duration) log.Run().Infof("Redial interval %+v", duration)
...@@ -141,7 +141,7 @@ func (c *ClusterClient) check() { ...@@ -141,7 +141,7 @@ func (c *ClusterClient) check() {
url = global.Config.NetCheckUrl url = global.Config.NetCheckUrl
} }
taskPool.AddTaskAlways(func() { _ = taskPool.Submit(func() {
var timer = time.NewTimer(interval) var timer = time.NewTimer(interval)
for { for {
timer.Reset(interval) timer.Reset(interval)
......
...@@ -31,14 +31,14 @@ func (m *MajoraEventListener) OnOpen(session getty.Session) error { ...@@ -31,14 +31,14 @@ func (m *MajoraEventListener) OnOpen(session getty.Session) error {
} }
func (m *MajoraEventListener) OnClose(session getty.Session) { func (m *MajoraEventListener) OnClose(session getty.Session) {
taskPool.AddTaskAlways(func() { _ = taskPool.Submit(func() {
log.Error().Errorf("OnClose-> session closed %v", session.IsClosed()) log.Error().Errorf("OnClose-> session closed %v", session.IsClosed())
m.client.CloseAll() m.client.CloseAll()
}) })
} }
func (m *MajoraEventListener) OnError(session getty.Session, err error) { func (m *MajoraEventListener) OnError(session getty.Session, err error) {
taskPool.AddTaskAlways(func() { _ = taskPool.Submit(func() {
log.Error().Errorf("OnError %s", err.Error()) log.Error().Errorf("OnError %s", err.Error())
m.client.CloseAll() m.client.CloseAll()
}) })
...@@ -50,7 +50,7 @@ func (m *MajoraEventListener) OnCron(session getty.Session) { ...@@ -50,7 +50,7 @@ func (m *MajoraEventListener) OnCron(session getty.Session) {
} }
func (m *MajoraEventListener) OnMessage(session getty.Session, input interface{}) { func (m *MajoraEventListener) OnMessage(session getty.Session, input interface{}) {
taskPool.AddTask(func() { _ = taskPool.Submit(func() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
log.Error().Errorf("OnMessage panic %+v", err) log.Error().Errorf("OnMessage panic %+v", err)
......
...@@ -3,7 +3,7 @@ package client ...@@ -3,7 +3,7 @@ package client
import ( import (
"fmt" "fmt"
"github.com/adamweixuan/getty" "github.com/adamweixuan/getty"
gxsync "github.com/adamweixuan/gostnops/sync" "github.com/panjf2000/ants/v2"
"net" "net"
"runtime" "runtime"
...@@ -12,7 +12,7 @@ import ( ...@@ -12,7 +12,7 @@ import (
) )
var ( var (
taskPool = gxsync.NewTaskPoolSimple(runtime.GOMAXPROCS(-1) * 100) taskPool, _ = ants.NewPool(runtime.GOMAXPROCS(-1) * 100, ants.WithNonblocking(true))
) )
func (client *Client) connect() { func (client *Client) connect() {
......
...@@ -57,7 +57,7 @@ func (t *Transfer) Start() { ...@@ -57,7 +57,7 @@ func (t *Transfer) Start() {
panic(errors.New("transferToDownstreamFunc is nil")) panic(errors.New("transferToDownstreamFunc is nil"))
} }
taskPool.AddTaskAlways(func() { _ = taskPool.Submit(func() {
for { for {
select { select {
case p := <-t.transferChan: case p := <-t.transferChan:
...@@ -68,7 +68,7 @@ func (t *Transfer) Start() { ...@@ -68,7 +68,7 @@ func (t *Transfer) Start() {
} }
}) })
taskPool.AddTaskAlways(func() { _ = taskPool.Submit(func() {
traceRecorder := t.recorder traceRecorder := t.recorder
traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("Ready read from upstream (sn:%d)", t.serialNumber)) traceRecorder.RecordEvent(trace.UpStreamEvent, fmt.Sprintf("Ready read from upstream (sn:%d)", t.serialNumber))
log.Run().Debugf("[handleUpStream] %d-> handleUpStream start...", t.serialNumber) log.Run().Debugf("[handleUpStream] %d-> handleUpStream start...", t.serialNumber)
......
...@@ -4,7 +4,6 @@ go 1.17 ...@@ -4,7 +4,6 @@ go 1.17
require ( require (
github.com/adamweixuan/getty v0.0.1 github.com/adamweixuan/getty v0.0.1
github.com/adamweixuan/gostnops v0.0.1
github.com/google/uuid v1.3.0 github.com/google/uuid v1.3.0
) )
...@@ -13,6 +12,7 @@ require ( ...@@ -13,6 +12,7 @@ require (
github.com/coocood/freecache v1.2.0 github.com/coocood/freecache v1.2.0
github.com/fsnotify/fsnotify v1.5.1 github.com/fsnotify/fsnotify v1.5.1
github.com/natefinch/lumberjack v2.0.0+incompatible github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/panjf2000/ants/v2 v2.4.7
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/spf13/viper v1.10.1 github.com/spf13/viper v1.10.1
go.uber.org/atomic v1.9.0 go.uber.org/atomic v1.9.0
......
...@@ -286,6 +286,8 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY ...@@ -286,6 +286,8 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM= github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM=
github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk= github.com/natefinch/lumberjack v2.0.0+incompatible/go.mod h1:Wi9p2TTF5DG5oU+6YfsmYQpsTIOm0B1VNzQg9Mw6nPk=
github.com/panjf2000/ants/v2 v2.4.7 h1:MZnw2JRyTJxFwtaMtUJcwE618wKD04POWk2gwwP4E2M=
github.com/panjf2000/ants/v2 v2.4.7/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
...@@ -778,6 +780,7 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= ...@@ -778,6 +780,7 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment