Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
G
getty
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
wei.xuan
getty
Commits
6664cf2e
Commit
6664cf2e
authored
Jul 22, 2019
by
wongoo
Browse files
Options
Browse Files
Download
Plain Diff
Merge remote-tracking branch 'origin/master' into feature-demo
parents
de0b1b51
1d700118
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
16 additions
and
196 deletions
+16
-196
getty.go
getty.go
+2
-1
go.mod
go.mod
+1
-1
go.sum
go.sum
+2
-0
options.go
options.go
+0
-55
session.go
session.go
+11
-7
task_pool.go
task_pool.go
+0
-132
No files found.
getty.go
View file @
6664cf2e
...
...
@@ -16,6 +16,7 @@ import (
)
import
(
gxsync
"github.com/dubbogo/gost/sync"
perrors
"github.com/pkg/errors"
)
...
...
@@ -156,7 +157,7 @@ type Session interface {
SetRQLen
(
int
)
SetWQLen
(
int
)
SetWaitTime
(
time
.
Duration
)
SetTaskPool
(
*
TaskPool
)
SetTaskPool
(
*
gxsync
.
TaskPool
)
GetAttribute
(
interface
{})
interface
{}
SetAttribute
(
interface
{},
interface
{})
...
...
go.mod
View file @
6664cf2e
module github.com/dubbogo/getty
require (
github.com/dubbogo/gost v1.
0.1-0.20190706005735-65c3ecbba418
github.com/dubbogo/gost v1.
1.1
github.com/golang/snappy v0.0.1
github.com/gorilla/websocket v1.4.0
github.com/pkg/errors v0.8.1
...
...
go.sum
View file @
6664cf2e
...
...
@@ -2,6 +2,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/gost v1.0.1-0.20190706005735-65c3ecbba418 h1:7OsAjhWpX0m6o1b/fcO47IF7FgpVv/qMSgHNk2orvIM=
github.com/dubbogo/gost v1.0.1-0.20190706005735-65c3ecbba418/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI=
github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
...
...
options.go
View file @
6664cf2e
...
...
@@ -10,10 +10,6 @@
package
getty
import
(
"fmt"
)
/////////////////////////////////////////
// Server Options
/////////////////////////////////////////
...
...
@@ -113,54 +109,3 @@ func WithRootCertificateFile(cert string) ClientOption {
o
.
cert
=
cert
}
}
/////////////////////////////////////////
// Task Pool Options
/////////////////////////////////////////
type
TaskPoolOptions
struct
{
tQLen
int
// task queue length
tQNumber
int
// task queue number
tQPoolSize
int
// task pool size
}
func
(
o
*
TaskPoolOptions
)
validate
()
{
if
o
.
tQPoolSize
<
1
{
panic
(
fmt
.
Sprintf
(
"[getty][task_pool] illegal pool size %d"
,
o
.
tQPoolSize
))
}
if
o
.
tQLen
<
1
{
o
.
tQLen
=
defaultTaskQLen
}
if
o
.
tQNumber
<
1
{
o
.
tQNumber
=
defaultTaskQNumber
}
if
o
.
tQNumber
>
o
.
tQPoolSize
{
o
.
tQNumber
=
o
.
tQPoolSize
}
}
type
TaskPoolOption
func
(
*
TaskPoolOptions
)
// @size is the task queue pool size
func
WithTaskPoolTaskPoolSize
(
size
int
)
TaskPoolOption
{
return
func
(
o
*
TaskPoolOptions
)
{
o
.
tQPoolSize
=
size
}
}
// @length is the task queue length
func
WithTaskPoolTaskQueueLength
(
length
int
)
TaskPoolOption
{
return
func
(
o
*
TaskPoolOptions
)
{
o
.
tQLen
=
length
}
}
// @number is the task queue number
func
WithTaskPoolTaskQueueNumber
(
number
int
)
TaskPoolOption
{
return
func
(
o
*
TaskPoolOptions
)
{
o
.
tQNumber
=
number
}
}
session.go
View file @
6664cf2e
...
...
@@ -20,7 +20,8 @@ import (
)
import
(
gstime
"github.com/dubbogo/gost/time"
gxsync
"github.com/dubbogo/gost/sync"
gxtime
"github.com/dubbogo/gost/time"
"github.com/gorilla/websocket"
perrors
"github.com/pkg/errors"
)
...
...
@@ -44,10 +45,10 @@ const (
/////////////////////////////////////////
var
(
wheel
=
g
stime
.
NewWheel
(
gs
time
.
TimeMillisecondDuration
(
100
),
1200
)
// wheel longest span is 2 minute
wheel
=
g
xtime
.
NewWheel
(
gx
time
.
TimeMillisecondDuration
(
100
),
1200
)
// wheel longest span is 2 minute
)
func
GetTimeWheel
()
*
g
s
time
.
Wheel
{
func
GetTimeWheel
()
*
g
x
time
.
Wheel
{
return
wheel
}
...
...
@@ -72,7 +73,7 @@ type session struct {
// handle logic
maxMsgLen
int32
// task queue
tPool
*
TaskPool
tPool
*
gxsync
.
TaskPool
// heartbeat
period
time
.
Duration
...
...
@@ -310,7 +311,7 @@ func (s *session) SetWaitTime(waitTime time.Duration) {
}
// set task pool
func
(
s
*
session
)
SetTaskPool
(
p
*
TaskPool
)
{
func
(
s
*
session
)
SetTaskPool
(
p
*
gxsync
.
TaskPool
)
{
s
.
lock
.
Lock
()
defer
s
.
lock
.
Unlock
()
...
...
@@ -490,7 +491,7 @@ func (s *session) handleLoop() {
wsFlag
bool
wsConn
*
gettyWSConn
// start time.Time
counter
g
s
time
.
CountWatch
counter
g
x
time
.
CountWatch
inPkg
interface
{}
outPkg
interface
{}
)
...
...
@@ -573,7 +574,10 @@ LOOP:
func
(
s
*
session
)
addTask
(
pkg
interface
{})
{
if
s
.
tPool
!=
nil
{
s
.
tPool
.
AddTask
(
task
{
session
:
s
,
pkg
:
pkg
})
s
.
tPool
.
AddTask
(
func
()
{
s
.
listener
.
OnMessage
(
s
,
pkg
)
s
.
incReadPkgNum
()
})
}
else
{
s
.
rQ
<-
pkg
}
...
...
task_pool.go
deleted
100644 → 0
View file @
de0b1b51
package
getty
import
(
"sync"
"sync/atomic"
)
const
(
defaultTaskQNumber
=
10
defaultTaskQLen
=
128
)
// task t
type
task
struct
{
session
*
session
pkg
interface
{}
}
// task pool: manage task ts
type
TaskPool
struct
{
TaskPoolOptions
idx
uint32
// round robin index
qArray
[]
chan
task
wg
sync
.
WaitGroup
once
sync
.
Once
done
chan
struct
{}
}
// build a task pool
func
NewTaskPool
(
opts
...
TaskPoolOption
)
*
TaskPool
{
var
tOpts
TaskPoolOptions
for
_
,
opt
:=
range
opts
{
opt
(
&
tOpts
)
}
tOpts
.
validate
()
p
:=
&
TaskPool
{
TaskPoolOptions
:
tOpts
,
qArray
:
make
([]
chan
task
,
tOpts
.
tQNumber
),
done
:
make
(
chan
struct
{}),
}
for
i
:=
0
;
i
<
p
.
tQNumber
;
i
++
{
p
.
qArray
[
i
]
=
make
(
chan
task
,
p
.
tQLen
)
}
p
.
start
()
return
p
}
// start task pool
func
(
p
*
TaskPool
)
start
()
{
for
i
:=
0
;
i
<
p
.
tQPoolSize
;
i
++
{
p
.
wg
.
Add
(
1
)
workerID
:=
i
q
:=
p
.
qArray
[
workerID
%
p
.
tQNumber
]
go
p
.
run
(
int
(
workerID
),
q
)
}
}
// worker
func
(
p
*
TaskPool
)
run
(
id
int
,
q
chan
task
)
{
defer
p
.
wg
.
Done
()
var
(
ok
bool
t
task
)
for
{
select
{
case
<-
p
.
done
:
if
0
<
len
(
q
)
{
log
.
Warn
(
"[getty][task_pool] task worker %d exit now while its task buffer length %d is greater than 0"
,
id
,
len
(
q
))
}
else
{
log
.
Info
(
"[getty][task_pool] task worker %d exit now"
,
id
)
}
return
case
t
,
ok
=
<-
q
:
if
ok
{
t
.
session
.
listener
.
OnMessage
(
t
.
session
,
t
.
pkg
)
}
}
}
}
// add task
func
(
p
*
TaskPool
)
AddTask
(
t
task
)
{
id
:=
atomic
.
AddUint32
(
&
p
.
idx
,
1
)
%
uint32
(
p
.
tQNumber
)
select
{
case
<-
p
.
done
:
return
case
p
.
qArray
[
id
]
<-
t
:
}
}
// stop all tasks
func
(
p
*
TaskPool
)
stop
()
{
select
{
case
<-
p
.
done
:
return
default
:
p
.
once
.
Do
(
func
()
{
close
(
p
.
done
)
})
}
}
// check whether the session has been closed.
func
(
p
*
TaskPool
)
IsClosed
()
bool
{
select
{
case
<-
p
.
done
:
return
true
default
:
return
false
}
}
func
(
p
*
TaskPool
)
Close
()
{
p
.
stop
()
p
.
wg
.
Wait
()
for
i
:=
range
p
.
qArray
{
close
(
p
.
qArray
[
i
])
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment