Commit 3f0589fc authored by wei.xuan's avatar wei.xuan

feat:gops依赖

parent 4bebd896
......@@ -18,13 +18,10 @@
package gxchan
import (
"github.com/adamweixuan/gostnops/container/queue"
"go.uber.org/atomic"
)
import (
"github.com/dubbogo/gost/container/queue"
)
// UnboundedChan is a chan that could grow if the number of elements exceeds the capacity.
type UnboundedChan struct {
in chan interface{}
......
This diff is collapsed.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gxetcd
import (
"net/url"
"os"
"path"
"reflect"
"strings"
"testing"
"time"
)
import (
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/server/v3/embed"
"google.golang.org/grpc/connectivity"
)
const defaultEtcdV3WorkDir = "/tmp/default-dubbo-go-remote.etcd"
// tests dataset
var tests = []struct {
input struct {
k string
v string
}
}{
{input: struct {
k string
v string
}{k: "name/name", v: "scott.wang"}},
{input: struct {
k string
v string
}{k: "name/namePrefix", v: "prefix.scott.wang"}},
{input: struct {
k string
v string
}{k: "name/namePrefix1", v: "prefix1.scott.wang"}},
{input: struct {
k string
v string
}{k: "age", v: "27"}},
}
// test dataset prefix
const prefixKey = "name/"
const keyPrefix = "name/name"
type ClientTestSuite struct {
suite.Suite
etcdConfig struct {
name string
endpoints []string
timeout time.Duration
heartbeat int
}
etcd *embed.Etcd
client *Client
}
// start etcd server
func (suite *ClientTestSuite) SetupSuite() {
t := suite.T()
DefaultListenPeerURLs := "http://localhost:2382"
DefaultListenClientURLs := "http://localhost:2381"
lpurl, _ := url.Parse(DefaultListenPeerURLs)
lcurl, _ := url.Parse(DefaultListenClientURLs)
cfg := embed.NewConfig()
cfg.LPUrls = []url.URL{*lpurl}
cfg.LCUrls = []url.URL{*lcurl}
cfg.Dir = defaultEtcdV3WorkDir
e, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
}
select {
case <-e.Server.ReadyNotify():
t.Log("Server is ready!")
case <-time.After(60 * time.Second):
e.Server.Stop() // trigger a shutdown
t.Logf("Server took too long to start!")
}
suite.etcd = e
return
}
// stop etcd server
func (suite *ClientTestSuite) TearDownSuite() {
suite.etcd.Close()
if err := os.RemoveAll(defaultEtcdV3WorkDir); err != nil {
suite.FailNow(err.Error())
}
}
func (suite *ClientTestSuite) setUpClient() *Client {
c, err := NewConfigClientWithErr(WithName(suite.etcdConfig.name),
WithEndpoints(suite.etcdConfig.endpoints...),
WithTimeout(suite.etcdConfig.timeout),
WithHeartbeat(suite.etcdConfig.heartbeat))
if err != nil {
suite.T().Fatal(err)
}
return c
}
// set up a client for suite
func (suite *ClientTestSuite) SetupTest() {
c := suite.setUpClient()
c.CleanKV()
suite.client = c
return
}
func (suite *ClientTestSuite) TestClientClose() {
c := suite.client
t := suite.T()
defer c.Close()
if c.rawClient.ActiveConnection().GetState() != connectivity.Ready {
t.Fatal(suite.client.rawClient.ActiveConnection().GetState())
}
}
func (suite *ClientTestSuite) TestClientValid() {
c := suite.client
t := suite.T()
if !c.Valid() {
t.Fatal("client is not valid")
}
c.Close()
if suite.client.Valid() != false {
t.Fatal("client is valid")
}
}
func (suite *ClientTestSuite) TestClientDone() {
c := suite.client
go func() {
time.Sleep(2 * time.Second)
c.Close()
}()
c.Wait.Wait()
if c.Valid() {
suite.T().Fatal("client should be invalid then")
}
}
func (suite *ClientTestSuite) TestClientCreateKV() {
tests := tests
c := suite.client
t := suite.T()
defer suite.client.Close()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
expect := tc.input.v
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
value, err := c.Get(k)
if err != nil {
t.Fatal(err)
}
if value != expect {
t.Fatalf("expect %v but get %v", expect, value)
}
}
}
func (suite *ClientTestSuite) TestBatchClientCreateKV() {
tests := tests
c := suite.client
t := suite.T()
defer suite.client.Close()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
expect := tc.input.v
kList := make([]string, 0, 1)
vList := make([]string, 0, 1)
kList = append(kList, k)
vList = append(vList, v)
if err := c.BatchCreate(kList, vList); err != nil {
t.Fatal(err)
}
value, err := c.Get(k)
if err != nil {
t.Fatal(err)
}
if value != expect {
t.Fatalf("expect %v but get %v", expect, value)
}
}
}
func (suite *ClientTestSuite) TestBatchClientGetValAndRevKV() {
tests := tests
c := suite.client
t := suite.T()
defer suite.client.Close()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
expect := tc.input.v
kList := make([]string, 0, 1)
vList := make([]string, 0, 1)
kList = append(kList, k)
vList = append(vList, v)
if err := c.BatchCreate(kList, vList); err != nil {
t.Fatal(err)
}
value, revision, err := c.getValAndRev(k)
if err != nil {
t.Fatal(err)
}
err = c.UpdateWithRev(k, k, revision)
if err != nil {
t.Fatal(err)
}
err = c.Update(k, k)
if err != nil {
t.Fatal(err)
}
if value != expect {
t.Fatalf("expect %v but get %v", expect, value)
}
}
}
func (suite *ClientTestSuite) TestClientDeleteKV() {
tests := tests
c := suite.client
t := suite.T()
defer c.Close()
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
expect := ErrKVPairNotFound
if err := c.Put(k, v); err != nil {
t.Fatal(err)
}
if err := c.Delete(k); err != nil {
t.Fatal(err)
}
_, err := c.Get(k)
if perrors.Cause(err) == expect {
continue
}
if err != nil {
t.Fatal(err)
}
}
}
func (suite *ClientTestSuite) TestClientGetChildrenKVList() {
tests := tests
c := suite.client
t := suite.T()
var expectKList []string
var expectVList []string
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
if strings.Contains(k, prefixKey) {
expectKList = append(expectKList, k)
expectVList = append(expectVList, v)
}
if err := c.Create(k, v); err != nil {
t.Fatal(err)
}
}
kList, vList, err := c.GetChildrenKVList(prefixKey)
if err != nil {
t.Fatal(err)
}
if reflect.DeepEqual(expectKList, kList) && reflect.DeepEqual(expectVList, vList) {
return
}
t.Fatalf("expect keylist %v but got %v expect valueList %v but got %v ", expectKList, kList, expectVList, vList)
}
func (suite *ClientTestSuite) TestClientWatch() {
tests := tests
c := suite.client
t := suite.T()
go func() {
time.Sleep(time.Second)
for _, tc := range tests {
k := tc.input.k
v := tc.input.v
if err := c.Create(k, v); err != nil {
assert.Error(t, err)
}
if err := c.delete(k); err != nil {
assert.Error(t, err)
}
}
c.Close()
}()
wc, err := c.WatchWithOption(keyPrefix)
if err != nil {
assert.Error(t, err)
}
events := make([]mvccpb.Event, 0)
var eCreate, eDelete mvccpb.Event
for e := range wc {
for _, event := range e.Events {
events = append(events, (mvccpb.Event)(*event))
if event.Type == mvccpb.PUT {
eCreate = (mvccpb.Event)(*event)
}
if event.Type == mvccpb.DELETE {
eDelete = (mvccpb.Event)(*event)
}
t.Logf("type IsCreate %v k %s v %s", event.IsCreate(), event.Kv.Key, event.Kv.Value)
}
}
assert.Equal(t, 2, len(events))
assert.Contains(t, events, eCreate)
assert.Contains(t, events, eDelete)
}
func (suite *ClientTestSuite) TestClientRegisterTemp() {
c := suite.client
observeC := suite.setUpClient()
t := suite.T()
go func() {
time.Sleep(2 * time.Second)
err := c.RegisterTemp("scott/wang", "test")
if err != nil {
assert.Error(t, err)
}
c.Close()
}()
completePath := path.Join("scott", "wang")
wc, err := observeC.watchWithOption(completePath)
if err != nil {
assert.Error(t, err)
}
events := make([]mvccpb.Event, 0)
var eCreate, eDelete mvccpb.Event
for e := range wc {
for _, event := range e.Events {
events = append(events, (mvccpb.Event)(*event))
if event.Type == mvccpb.DELETE {
eDelete = (mvccpb.Event)(*event)
t.Logf("complete key (%s) is delete", completePath)
observeC.Close()
break
}
eCreate = (mvccpb.Event)(*event)
t.Logf("type IsCreate %v k %s v %s", event.IsCreate(), event.Kv.Key, event.Kv.Value)
}
}
assert.Equal(t, 2, len(events))
assert.Contains(t, events, eCreate)
assert.Contains(t, events, eDelete)
}
func TestClientSuite(t *testing.T) {
suite.Run(t, &ClientTestSuite{
etcdConfig: struct {
name string
endpoints []string
timeout time.Duration
heartbeat int
}{
name: "test",
endpoints: []string{"localhost:2381"},
timeout: time.Second,
heartbeat: 1,
},
})
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gxetcd
import (
"time"
)
const (
// ConnDelay connection delay
ConnDelay = 3
// MaxFailTimes max failure times
MaxFailTimes = 15
// RegistryETCDV3Client client Name
RegistryETCDV3Client = "etcd registry"
// MetadataETCDV3Client client Name
MetadataETCDV3Client = "etcd metadata"
)
// Options client configuration
type Options struct {
// Name etcd server name
Name string
// Endpoints etcd endpoints
Endpoints []string
// Client etcd client
Client *Client
// Timeout timeout
Timeout time.Duration
// Heartbeat second
Heartbeat int
}
// Option will define a function of handling Options
type Option func(*Options)
// WithEndpoints sets etcd client endpoints
func WithEndpoints(endpoints ...string) Option {
return func(opt *Options) {
opt.Endpoints = endpoints
}
}
// WithName sets etcd client name
func WithName(name string) Option {
return func(opt *Options) {
opt.Name = name
}
}
// WithTimeout sets etcd client timeout
func WithTimeout(timeout time.Duration) Option {
return func(opt *Options) {
opt.Timeout = timeout
}
}
// WithHeartbeat sets etcd client heartbeat
func WithHeartbeat(heartbeat int) Option {
return func(opt *Options) {
opt.Heartbeat = heartbeat
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"sync"
"sync/atomic"
)
import (
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
)
var (
configClientPool nacosConfigClientPool
configClientPoolOnce sync.Once
)
type nacosConfigClientPool struct {
sync.Mutex
configClient map[string]*NacosConfigClient
}
type NacosConfigClient struct {
name string
clientLock sync.Mutex // for Client
client config_client.IConfigClient
config vo.NacosClientParam //conn config
valid uint32
activeCount uint32
share bool
}
func initNacosConfigClientPool() {
configClientPool.configClient = make(map[string]*NacosConfigClient)
}
func (n *NacosConfigClient) newConfigClient() error {
client, err := clients.NewConfigClient(n.config)
if err != nil {
return err
}
n.activeCount++
atomic.StoreUint32(&n.valid, 1)
n.client = client
return nil
}
// NewNacosConfigClient create config client
func NewNacosConfigClient(name string, share bool, sc []constant.ServerConfig,
cc constant.ClientConfig) (*NacosConfigClient, error) {
configClient := &NacosConfigClient{
name: name,
activeCount: 0,
share: share,
config: vo.NacosClientParam{ClientConfig: &cc, ServerConfigs: sc},
}
if !share {
return configClient, configClient.newConfigClient()
}
configClientPoolOnce.Do(initNacosConfigClientPool)
configClientPool.Lock()
defer configClientPool.Unlock()
if client, ok := configClientPool.configClient[name]; ok {
client.activeCount++
return client, nil
}
err := configClient.newConfigClient()
if err == nil {
configClientPool.configClient[name] = configClient
}
return configClient, err
}
// Client Get NacosConfigClient
func (n *NacosConfigClient) Client() config_client.IConfigClient {
return n.client
}
// SetClient Set NacosConfigClient
func (n *NacosConfigClient) SetClient(client config_client.IConfigClient) {
n.clientLock.Lock()
n.client = client
n.clientLock.Unlock()
}
// NacosClientValid Get nacos client valid status
func (n *NacosConfigClient) NacosClientValid() bool {
return atomic.LoadUint32(&n.valid) == 1
}
// Close close client
func (n *NacosConfigClient) Close() {
configClientPool.Lock()
defer configClientPool.Unlock()
if n.client == nil {
return
}
n.activeCount--
if n.share {
if n.activeCount == 0 {
n.client = nil
atomic.StoreUint32(&n.valid, 0)
delete(configClientPool.configClient, n.name)
}
} else {
n.client = nil
atomic.StoreUint32(&n.valid, 0)
delete(configClientPool.configClient, n.name)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"fmt"
"reflect"
"testing"
"time"
)
import (
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
"github.com/stretchr/testify/assert"
)
func TestStructAlign(t *testing.T) {
typ := reflect.TypeOf(NacosConfigClient{})
fmt.Printf("Struct is %d bytes long\n", typ.Size())
n := typ.NumField()
for i := 0; i < n; i++ {
field := typ.Field(i)
fmt.Printf("%s: at offset %v, size=%d, align=%d\n",
field.Name, field.Offset, field.Type.Size(),
field.Type.Align())
}
}
//TestNewNacosConfigClient config client
func TestNewNacosConfigClient(t *testing.T) {
scs := []constant.ServerConfig{*constant.NewServerConfig("console.nacos.io", 80)}
cc := constant.ClientConfig{TimeoutMs: 5 * 1000, NotLoadCacheAtStart: true}
client1, err := NewNacosConfigClient("nacos", true, scs, cc)
assert.Nil(t, err)
client2, err := NewNacosConfigClient("nacos", true, scs, cc)
assert.Nil(t, err)
client3, err := NewNacosConfigClient("nacos", false, scs, cc)
assert.Nil(t, err)
client4, err := NewNacosConfigClient("test", true, scs, cc)
assert.Nil(t, err)
assert.Equal(t, client1, client2)
assert.Equal(t, client1.activeCount, uint32(2))
assert.Equal(t, client1.NacosClientValid(), true)
assert.Equal(t, client3.activeCount, uint32(1))
assert.Equal(t, client4.activeCount, uint32(1))
client1.Close()
assert.Equal(t, client1.activeCount, uint32(1))
client1.Close()
assert.Equal(t, client1.NacosClientValid(), false)
assert.Nil(t, client1.Client())
client1.Close()
assert.Equal(t, client1.NacosClientValid(), false)
assert.Nil(t, client1.Client())
}
func TestPublishConfig(t *testing.T) {
scs := []constant.ServerConfig{*constant.NewServerConfig("console.nacos.io", 80)}
cc := constant.ClientConfig{
AppName: "nacos",
NamespaceId: "14e01fa8-a4aa-44cc-ad5b-c768f3c62bd5", //namespace id
TimeoutMs: 5000,
NotLoadCacheAtStart: true,
LogDir: "/tmp/nacos/log",
CacheDir: "/tmp/nacos/cache",
RotateTime: "1h",
MaxAge: 3,
LogLevel: "debug",
}
client, err := NewNacosConfigClient("nacos", true, scs, cc)
assert.Nil(t, err)
//publish config
//config key=dataId+group+namespaceId
push, err := client.Client().PublishConfig(vo.ConfigParam{
DataId: "nacos-config",
Group: "dubbo",
Content: "dubbo-go nb",
})
assert.Nil(t, err)
assert.Equal(t, push, true)
//get config
cfg, err := client.Client().GetConfig(vo.ConfigParam{
DataId: "nacos-config",
Group: "dubbo",
})
assert.Nil(t, err)
fmt.Println("GetConfig,config :", cfg)
//Listen config change,key=dataId+group+namespaceId.
err = client.Client().ListenConfig(vo.ConfigParam{
DataId: "nacos-config",
Group: "dubbo",
OnChange: func(namespace, group, dataId, data string) {
assert.Equal(t, data, "test-listen")
fmt.Println("config changed group:" + group + ", dataId:" + dataId + ", content:" + data)
},
})
assert.Nil(t, err)
_, err = client.Client().PublishConfig(vo.ConfigParam{
DataId: "nacos-config",
Group: "dubbo",
Content: "test-listen",
})
assert.Nil(t, err)
searchPage, _ := client.Client().SearchConfig(vo.SearchConfigParam{
Search: "accurate",
DataId: "",
Group: "dubbo",
PageNo: 1,
PageSize: 10,
})
fmt.Printf("Search config:%+v \n", searchPage)
time.Sleep(2 * time.Second)
client.Close()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"sync"
"sync/atomic"
)
import (
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
)
var (
namingClientPool nacosClientPool
clientPoolOnce sync.Once
)
type nacosClientPool struct {
sync.Mutex
namingClient map[string]*NacosNamingClient
}
type NacosNamingClient struct {
name string
clientLock sync.Mutex // for Client
client naming_client.INamingClient
config vo.NacosClientParam //conn config
valid uint32
activeCount uint32
share bool
}
func initNacosClientPool() {
namingClientPool.namingClient = make(map[string]*NacosNamingClient)
}
// NewNacosNamingClient create nacos client
func NewNacosNamingClient(name string, share bool, sc []constant.ServerConfig,
cc constant.ClientConfig) (*NacosNamingClient, error) {
namingClient := &NacosNamingClient{
name: name,
activeCount: 0,
share: share,
config: vo.NacosClientParam{ClientConfig: &cc, ServerConfigs: sc},
}
if !share {
return namingClient, namingClient.newNamingClient()
}
clientPoolOnce.Do(initNacosClientPool)
namingClientPool.Lock()
defer namingClientPool.Unlock()
if client, ok := namingClientPool.namingClient[name]; ok {
client.activeCount++
return client, nil
}
err := namingClient.newNamingClient()
if err == nil {
namingClientPool.namingClient[name] = namingClient
}
return namingClient, err
}
// newNamingClient create NamingClient
func (n *NacosNamingClient) newNamingClient() error {
client, err := clients.NewNamingClient(n.config)
if err != nil {
return err
}
n.activeCount++
atomic.StoreUint32(&n.valid, 1)
n.client = client
return nil
}
// Client Get NacosNamingClient
func (n *NacosNamingClient) Client() naming_client.INamingClient {
return n.client
}
// SetClient Set NacosNamingClient
func (n *NacosNamingClient) SetClient(client naming_client.INamingClient) {
n.clientLock.Lock()
n.client = client
n.clientLock.Unlock()
}
// NacosClientValid Get nacos client valid status
func (n *NacosNamingClient) NacosClientValid() bool {
return atomic.LoadUint32(&n.valid) == 1
}
// Close close client
func (n *NacosNamingClient) Close() {
namingClientPool.Lock()
defer namingClientPool.Unlock()
if n.client == nil {
return
}
n.activeCount--
if n.share {
if n.activeCount == 0 {
n.client = nil
atomic.StoreUint32(&n.valid, 0)
delete(namingClientPool.namingClient, n.name)
}
} else {
n.client = nil
atomic.StoreUint32(&n.valid, 0)
delete(namingClientPool.namingClient, n.name)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"testing"
)
import (
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/stretchr/testify/assert"
)
func TestNewNacosClient(t *testing.T) {
scs := []constant.ServerConfig{
*constant.NewServerConfig("console.nacos.io", 80),
}
cc := constant.ClientConfig{
TimeoutMs: 5 * 1000,
NotLoadCacheAtStart: true,
}
client1, err := NewNacosNamingClient("nacos", true, scs, cc)
assert.Nil(t, err)
client2, err := NewNacosNamingClient("nacos", true, scs, cc)
assert.Nil(t, err)
client3, err := NewNacosNamingClient("nacos", false, scs, cc)
assert.Nil(t, err)
client4, err := NewNacosNamingClient("test", true, scs, cc)
assert.Nil(t, err)
assert.Equal(t, client1, client2)
assert.Equal(t, client1.activeCount, uint32(2))
assert.NotEqual(t, client1, client3)
assert.NotEqual(t, client1, client4)
}
This diff is collapsed.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gxzookeeper
import (
"strconv"
"testing"
"time"
)
import (
"github.com/dubbogo/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
)
func verifyEventStateOrder(t *testing.T, c <-chan zk.Event, expectedStates []zk.State, source string) {
for _, state := range expectedStates {
for {
event, ok := <-c
if !ok {
t.Fatalf("unexpected channel close for %s", source)
}
if event.Type != zk.EventSession {
continue
}
if event.State != state {
t.Fatalf("mismatched state order from %s, expected %v, received %v", source, state, event.State)
}
break
}
}
}
func Test_getZookeeperClient(t *testing.T) {
var err error
var tc *zk.TestCluster
var address []string
tc, err = zk.StartTestCluster(1, nil, nil, zk.WithRetryTimes(40))
assert.NoError(t, err)
assert.NotNil(t, tc.Servers[0])
address = append(address, "127.0.0.1:"+strconv.Itoa(tc.Servers[0].Port))
client1, err := NewZookeeperClient("test1", address, true, WithZkTimeOut(3*time.Second))
assert.Nil(t, err)
client2, err := NewZookeeperClient("test1", address, true, WithZkTimeOut(3*time.Second))
assert.Nil(t, err)
client3, err := NewZookeeperClient("test2", address, true, WithZkTimeOut(3*time.Second))
assert.Nil(t, err)
client4, err := NewZookeeperClient("test2", address, false, WithZkTimeOut(3*time.Second))
assert.Nil(t, err)
if client1 != client2 {
t.Fatalf("NewZookeeperClient failed")
}
if client1 == client3 {
t.Fatalf("NewZookeeperClient failed")
}
if client3 == client4 {
t.Fatalf("NewZookeeperClient failed")
}
client1.Close()
client2.Close()
client3.Close()
client4.Close()
tc.Stop()
}
func Test_Close(t *testing.T) {
var err error
var tc *zk.TestCluster
var address []string
tc, err = zk.StartTestCluster(1, nil, nil, zk.WithRetryTimes(40))
assert.NoError(t, err)
assert.NotNil(t, tc.Servers[0])
address = append(address, "127.0.0.1:"+strconv.Itoa(tc.Servers[0].Port))
client1, err := NewZookeeperClient("test1", address, true, WithZkTimeOut(3*time.Second))
assert.Nil(t, err)
client2, err := NewZookeeperClient("test1", address, true, WithZkTimeOut(3*time.Second))
assert.Nil(t, err)
if client1 != client2 {
t.Fatalf("NewZookeeperClient failed")
}
client1.Close()
client3, err := NewZookeeperClient("test1", address, true, WithZkTimeOut(3*time.Second))
assert.Nil(t, err)
if client2 != client3 {
t.Fatalf("NewZookeeperClient failed")
}
client2.Close()
assert.Equal(t, client1.activeNumber, uint32(1))
client1.Close()
assert.Equal(t, client1.activeNumber, uint32(0))
client4, err := NewZookeeperClient("test1", address, true, WithZkTimeOut(3*time.Second))
assert.Nil(t, err)
assert.Equal(t, client4.activeNumber, uint32(1))
if client4 == client3 {
t.Fatalf("NewZookeeperClient failed")
}
client5, err := NewZookeeperClient("test1", address, false, WithZkTimeOut(3*time.Second))
assert.Nil(t, err)
client6, err := NewZookeeperClient("test1", address, false, WithZkTimeOut(3*time.Second))
assert.Nil(t, err)
if client5 == client6 {
t.Fatalf("NewZookeeperClient failed")
}
client5.Close()
assert.Equal(t, client5.activeNumber, uint32(0))
assert.Equal(t, client5.Conn, (*zk.Conn)(nil))
assert.NotEqual(t, client6.Conn, nil)
client6.Close()
assert.Equal(t, client6.activeNumber, uint32(0))
assert.Equal(t, client6.Conn, (*zk.Conn)(nil))
tc.Stop()
}
func Test_newMockZookeeperClient(t *testing.T) {
ts, _, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer func() {
err := ts.Stop()
assert.Nil(t, err)
}()
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}
func TestCreate(t *testing.T) {
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer func() {
_ = ts.Stop()
assert.Nil(t, err)
}()
err = z.Create("test1/test2/test3/test4")
assert.NoError(t, err)
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}
func TestCreateDelete(t *testing.T) {
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer func() {
_ = ts.Stop()
assert.Nil(t, err)
}()
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
err = z.Create("/test1/test2/test3/test4")
assert.NoError(t, err)
err = z.Delete("/test1/test2/test3/test4")
assert.NoError(t, err)
// verifyEventOrder(t, event, []zk.EventType{zk.EventNodeCreated}, "event channel")
}
func TestRegisterTemp(t *testing.T) {
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer func() {
_ = ts.Stop()
assert.Nil(t, err)
}()
err = z.Create("/test1/test2/test3")
assert.NoError(t, err)
tmpath, err := z.RegisterTemp("/test1/test2/test3", "test4")
assert.NoError(t, err)
assert.Equal(t, "/test1/test2/test3/test4", tmpath)
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}
func TestRegisterTempSeq(t *testing.T) {
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer func() {
_ = ts.Stop()
assert.Nil(t, err)
}()
err = z.Create("/test1/test2/test3")
assert.NoError(t, err)
tmpath, err := z.RegisterTempSeq("/test1/test2/test3", []byte("test"))
assert.NoError(t, err)
assert.Equal(t, "/test1/test2/test3/0000000000", tmpath)
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}
func Test_UnregisterEvent(t *testing.T) {
client := &ZookeeperClient{}
client.eventRegistry = make(map[string][]*chan struct{})
array := []*chan struct{}{}
array = append(array, new(chan struct{}))
client.eventRegistry["test"] = array
client.UnregisterEvent("test", new(chan struct{}))
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gxzookeeper
import (
"time"
)
import (
"github.com/dubbogo/go-zookeeper/zk"
)
// nolint
type options struct {
ZkName string
Client *ZookeeperClient
Ts *zk.TestCluster
}
// Option will define a function of handling Options
type Option func(*options)
// WithZkName sets zk Client name
func WithZkName(name string) Option {
return func(opt *options) {
opt.ZkName = name
}
}
type zkClientOption func(*ZookeeperClient)
// WithZkEventHandler sets zk Client event
func WithZkEventHandler(handler ZkEventHandler) zkClientOption {
return func(opt *ZookeeperClient) {
opt.zkEventHandler = handler
}
}
// WithZkTimeOut sets zk Client timeout
func WithZkTimeOut(t time.Duration) zkClientOption {
return func(opt *ZookeeperClient) {
opt.Timeout = t
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package json
import (
"io/ioutil"
"log"
"reflect"
"strconv"
"strings"
"time"
)
import (
"github.com/dubbogo/jsonparser"
perrors "github.com/pkg/errors"
)
// HessianRegisterPair define the pair to register to hessian
type HessianRegisterPair struct {
JavaClassName string
Obj interface{}
}
// jsonStructParser can use reflect to create arbitrary interface{} of go, from user defined json file.
type jsonStructParser struct {
structFields []reflect.StructField
hessianRegisterPair []HessianRegisterPair
valueMap map[string]string
subObjValueMap map[string]reflect.Value
}
// newJSONStructParser create a new json struct parser
func newJSONStructParser() *jsonStructParser {
return &jsonStructParser{
structFields: make([]reflect.StructField, 0, 16),
valueMap: make(map[string]string, 8),
hessianRegisterPair: make([]HessianRegisterPair, 0, 16),
subObjValueMap: make(map[string]reflect.Value, 8),
}
}
// File2Interface first read json byte from @filePath, and parse it to interface
func File2Interface(filePath string) ([]HessianRegisterPair, interface{}, error) {
defer func() {
defaultJSONStructParser = newJSONStructParser()
}()
return defaultJSONStructParser.jsonFilePath2Struct(filePath)
}
func init() {
defaultJSONStructParser = newJSONStructParser()
}
var defaultJSONStructParser *jsonStructParser
// RemoveTargetNameField remove target file in @v
func RemoveTargetNameField(v interface{}, targetName string) interface{} {
defer func() {
defaultJSONStructParser = newJSONStructParser()
}()
return defaultJSONStructParser.removeTargetNameField(v, targetName)
}
func (jsp *jsonStructParser) cb(key []byte, value []byte, dataType jsonparser.ValueType, offset int) error {
switch dataType {
case jsonparser.Object:
// parse sub interface, use a new parser to deal with it the same way
newParser := newJSONStructParser()
subObj := newParser.json2Struct(value)
javaClassName, err := getJavaClassName(subObj)
if err != nil {
return err
}
jsp.hessianRegisterPair = append(jsp.hessianRegisterPair, HessianRegisterPair{
JavaClassName: javaClassName,
Obj: subObj,
})
jsp.structFields = append(jsp.structFields, reflect.StructField{
Name: string(key),
Type: reflect.TypeOf(subObj),
})
jsp.subObjValueMap[string(key)] = reflect.ValueOf(subObj)
case jsonparser.Array: // TODO slice parse
case jsonparser.String: // normal struct parse
// "type@value"
arr := strings.Split(string(value), "@")
var userDefinedType reflect.Type
switch arr[0] {
case "int":
userDefinedType = reflect.TypeOf(0)
case "string":
userDefinedType = reflect.TypeOf("")
case "uint64":
userDefinedType = reflect.TypeOf(uint64(0))
case "time.Time":
userDefinedType = reflect.TypeOf(time.Time{})
case "float32":
userDefinedType = reflect.TypeOf(float32(0))
case "float64":
userDefinedType = reflect.TypeOf(float64(0))
case "bool":
userDefinedType = reflect.TypeOf(false)
default:
log.Printf("error: dataType %s in json is not supported\n", string(value))
return perrors.Errorf("dataType %s in json is not supported", string(value))
}
if len(arr) > 1 {
jsp.valueMap[string(key)] = arr[1]
}
jsp.structFields = append(jsp.structFields, reflect.StructField{
Name: string(key),
Type: userDefinedType,
})
default:
log.Printf("error: dataType %s in json is not supported\n", string(value))
return perrors.Errorf("dataType %s in json is not supported", string(value))
}
return nil
}
// json2Struct parse data from json file to user defined interface
func (jsp *jsonStructParser) json2Struct(jsonData []byte) interface{} {
// first: call ObjectEach to parse jsonData to reflect.StructField item
if err := jsonparser.ObjectEach(jsonData, jsp.cb); err != nil {
log.Println("jsonparser.ObjectEach error = ", err)
}
// second: parse structField to reflectType
typ := reflect.StructOf(jsp.structFields)
v := reflect.New(typ).Elem()
newty := reflect.TypeOf(v.Addr().Interface()).Elem()
// finally: traverse each json field, and set user defined value
for i := 0; i < typ.NumField(); i++ {
valStr, ok1 := jsp.valueMap[newty.Field(i).Name]
subObj, ok2 := jsp.subObjValueMap[newty.Field(i).Name]
if !ok1 && !ok2 {
continue
}
if newty.Field(i).Type.Kind() == reflect.Ptr {
v.Field(i).Set(subObj)
continue
}
switch newty.Field(i).Type {
case reflect.TypeOf(0), reflect.TypeOf(uint64(0)):
if parsedInt, err := strconv.Atoi(valStr); err == nil {
v.Field(i).SetInt(int64(parsedInt))
break
}
v.Field(i).SetInt(0)
case reflect.TypeOf(""):
v.Field(i).SetString(valStr)
case reflect.TypeOf(time.Time{}):
// todo time support v.Field(i).
case reflect.TypeOf(float64(0)), reflect.TypeOf(float32(0)):
if parsedFloat, err := strconv.ParseFloat(valStr, 64); err == nil {
v.Field(i).SetFloat(parsedFloat)
break
}
v.Field(i).SetFloat(0)
case reflect.TypeOf(false):
if valStr == "true" || valStr == "1" {
v.Field(i).SetBool(true)
}
default:
log.Printf("error: val %s in value is not supported\n", valStr)
return perrors.Errorf("val %s in value is not supported", valStr)
}
}
s := v.Addr().Interface()
return s
}
// jsonFilePath2Struct read file from @filePath and parse data to interface
func (jsp *jsonStructParser) jsonFilePath2Struct(filePath string) ([]HessianRegisterPair, interface{}, error) {
jsonData, err := ioutil.ReadFile(filePath)
if err != nil {
return []HessianRegisterPair{}, nil, err
}
return jsp.hessianRegisterPair, jsp.json2Struct(jsonData), nil
}
// removeTargetNameField remove origin interface @v's target field by @targetName
func (jsp *jsonStructParser) removeTargetNameField(v interface{}, targetName string) interface{} {
typ := reflect.TypeOf(v).Elem()
val := reflect.ValueOf(v).Elem()
nums := val.NumField()
structFields := make([]reflect.StructField, 0)
fieldMap := make(map[string]reflect.Value)
for i := 0; i < nums; i++ {
if typ.Field(i).Name != targetName {
structFields = append(structFields, reflect.StructField{
Name: typ.Field(i).Name,
Type: typ.Field(i).Type,
})
fieldMap[typ.Field(i).Name] = val.Field(i)
}
}
newtyp := reflect.StructOf(structFields)
newi := reflect.New(newtyp).Elem()
newty := reflect.TypeOf(newi.Addr().Interface()).Elem()
for i := 0; i < nums-1; i++ {
newi.Field(i).Set(fieldMap[newty.Field(i).Name])
}
return newi.Addr().Interface()
}
// getJavaClassName can read field JavaClassName of interface{}, used in cli-tool to do hessian registry
func getJavaClassName(pkg interface{}) (string, error) {
val := reflect.ValueOf(pkg).Elem()
typ := reflect.TypeOf(pkg).Elem()
nums := val.NumField()
for i := 0; i < nums; i++ {
if typ.Field(i).Name == "JavaClassName" {
return val.Field(i).String(), nil
}
}
return "", perrors.Errorf("JavaClassName field not found error")
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package json
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
func Test_newJsonStructParser(t *testing.T) {
path := "./user.json"
hessianPair, _, err := File2Interface(path)
assert.NotEmpty(t, hessianPair)
assert.Nil(t, err)
}
{
"ID": "string",
"Name": "string",
"Age": "int",
"JavaClassName": "string@com.ikurento.user.User",
"SubInfo": {
"SubID": "string",
"SubMale": "bool",
"SubAge": "int",
"JavaClassName":"string@com.ikurento.user.SubInfo"
}
}
module github.com/dubbogo/gost
module github.com/adamweixuan/gostnops
go 1.17
require (
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/davecgh/go-spew v1.1.1
github.com/dubbogo/go-zookeeper v1.0.3
github.com/dubbogo/jsonparser v1.0.1
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect
github.com/k0kubun/pp v3.0.1+incompatible
github.com/mattn/go-colorable v0.1.7 // indirect
github.com/mattn/go-isatty v0.0.12
github.com/nacos-group/nacos-sdk-go v1.0.8
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.9.0 // indirect
github.com/shirou/gopsutil v3.20.11+incompatible
github.com/stretchr/testify v1.7.0
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
go.etcd.io/etcd/api/v3 v3.5.0-alpha.0
go.etcd.io/etcd/client/v3 v3.5.0-alpha.0
go.etcd.io/etcd/server/v3 v3.5.0-alpha.0
go.uber.org/atomic v1.7.0
golang.org/x/sys v0.0.0-20201223074533-0d417f636930 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect
google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98 // indirect
google.golang.org/grpc v1.36.0
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
go.uber.org/atomic v1.9.0
)
go 1.13
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
This diff is collapsed.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// package gxlog is based on log4go.
// color.go provides colorful terminal log output functions.
package gxlog
import (
"fmt"
"os"
"path/filepath"
"runtime"
"time"
)
import (
"github.com/mattn/go-isatty"
)
var (
// Normal colors
// NORMAL = []byte{'\033', '0', 'm'}
NORMAL = []byte{'\033', '0'}
NBlack = []byte{'\033', '[', '3', '0', 'm'}
NRed = []byte{'\033', '[', '3', '1', 'm'}
NGreen = []byte{'\033', '[', '3', '2', 'm'}
NYellow = []byte{'\033', '[', '3', '3', 'm'}
NBlue = []byte{'\033', '[', '3', '4', 'm'}
NMagenta = []byte{'\033', '[', '3', '5', 'm'}
NCyan = []byte{'\033', '[', '3', '6', 'm'}
NWhite = []byte{'\033', '[', '3', '7', 'm'}
// Bright colors
BBlack = []byte{'\033', '[', '3', '0', ';', '1', 'm'}
BRed = []byte{'\033', '[', '3', '1', ';', '1', 'm'}
BGreen = []byte{'\033', '[', '3', '2', ';', '1', 'm'}
BYellow = []byte{'\033', '[', '3', '3', ';', '1', 'm'}
BBlue = []byte{'\033', '[', '3', '4', ';', '1', 'm'}
BMagenta = []byte{'\033', '[', '3', '5', ';', '1', 'm'}
BCyan = []byte{'\033', '[', '3', '6', ';', '1', 'm'}
BWhite = []byte{'\033', '[', '3', '7', ';', '1', 'm'}
UnderlineTwinkleHighLight = []byte{'\033', '[', '1', ';', '6', ';', '4', '0', 'm'}
reset = []byte{'\033', '[', '0', 'm'}
)
func funcFileLine() string {
tm := time.Unix(time.Now().Unix(), 0)
funcName, file, line, _ := runtime.Caller(3)
return "[" + tm.Format("2006-01-02/15:04:05 ") +
runtime.FuncForPC(funcName).Name() +
": " + filepath.Base(file) +
": " + fmt.Sprintf("%d", line) +
"] "
}
func CPrintf(color []byte, format string, args ...interface{}) {
logStr := fmt.Sprintf(format, args...)
if isatty.IsTerminal(os.Stdout.Fd()) {
fmt.Fprintf(os.Stdout, string(color)+funcFileLine()+"%s"+string(reset), logStr)
} else {
fmt.Fprintf(os.Stdout, "%s", logStr)
}
}
func CPrintfln(color []byte, format string, args ...interface{}) {
logStr := fmt.Sprintf(format, args...)
if isatty.IsTerminal(os.Stdout.Fd()) {
fmt.Fprintf(os.Stdout, string(color)+funcFileLine()+"%s"+string(reset)+"\n", logStr)
} else {
fmt.Fprintf(os.Stdout, "%s\n", logStr)
}
}
func CEPrintf(color []byte, format string, args ...interface{}) {
logStr := fmt.Sprintf(format, args...)
if isatty.IsTerminal(os.Stdout.Fd()) {
fmt.Fprintf(os.Stderr, string(color)+funcFileLine()+"%s"+string(reset), logStr)
} else {
fmt.Fprintf(os.Stderr, "%s", logStr)
}
}
func CEPrintfln(color []byte, format string, args ...interface{}) {
logStr := fmt.Sprintf(format, args...)
if isatty.IsTerminal(os.Stdout.Fd()) {
fmt.Fprintf(os.Stderr, string(color)+funcFileLine()+"%s"+string(reset)+"\n", logStr)
} else {
fmt.Fprintf(os.Stderr, "%s\n", logStr)
}
}
func CDebug(format string, args ...interface{}) {
CPrintfln(NORMAL, format, args...)
}
func CInfo(format string, args ...interface{}) {
CPrintfln(NGreen, format, args...)
}
func CWarn(format string, args ...interface{}) {
CEPrintfln(BMagenta, format, args...)
}
func CError(format string, args ...interface{}) {
CEPrintfln(NRed, format, args...)
}
func CFatal(format string, args ...interface{}) {
CEPrintfln(BRed, format, args...)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* log_test.go - test for log.go */
package gxlog
import (
"testing"
)
func TestColorLog(t *testing.T) {
CDebug("Debug")
CInfo("Info")
// CWarn("Warn")
CWarn("%s", "/test/group%3Dbjtelecom%26protocol%3Dpb%26role%3DSRT_Provider%26service%3Dshopping%26version%3D1.0.1")
CError("Error")
}
func TestCPrintfln(t *testing.T) {
CPrintfln(NRed, "%s", "/test/group%3Dbjtelecom%26protocol%3Dpb%26role%3DSRT_Provider%26service%3Dshopping%26version%3D1.0.1")
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gxlog
type Logger interface {
Info(args ...interface{})
Warn(args ...interface{})
Error(args ...interface{})
Debug(args ...interface{})
Infof(fmt string, args ...interface{})
Warnf(fmt string, args ...interface{})
Errorf(fmt string, args ...interface{})
Debugf(fmt string, args ...interface{})
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// package gxlog is based on log4go.
// pretty.go provides pretty format string
package gxlog
import (
"github.com/davecgh/go-spew/spew"
"github.com/k0kubun/pp"
)
func PrettyString(i interface{}) string {
return spew.Sdump(i)
}
func ColorSprint(i interface{}) string {
return pp.Sprint(i)
}
func ColorSprintln(i interface{}) string {
return pp.Sprintln(i)
}
func ColorSprintf(fmt string, args ...interface{}) string {
return pp.Sprintf(fmt, args...)
}
func ColorPrint(i interface{}) {
pp.Print(i)
}
func ColorPrintln(i interface{}) {
pp.Println(i)
}
func ColorPrintf(fmt string, args ...interface{}) {
pp.Printf(fmt, args...)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* log_test.go - test for log.go */
package gxlog
import (
"fmt"
"testing"
)
type info struct {
name string
age float32
m map[string]string
}
func TestPrettyString(t *testing.T) {
i := info{name: "hello", age: 23.5, m: map[string]string{"h": "w", "hello": "world"}}
fmt.Println(PrettyString(i))
}
func TestColorPrint(t *testing.T) {
i := info{name: "hello", age: 23.5, m: map[string]string{"h": "w", "hello": "world"}}
ColorPrintln(i)
}
func TestColorPrintf(t *testing.T) {
i := info{name: "hello", age: 23.5, m: map[string]string{"h": "w", "hello": "world"}}
ColorPrintf("exapmle format:%s\n", i)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gxruntime
import (
"io/ioutil"
"os"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"time"
)
import (
"github.com/shirou/gopsutil/mem"
"github.com/shirou/gopsutil/process"
)
import (
"github.com/dubbogo/gost/path/filepath"
)
// CurrentPID returns the process id of the caller.
var CurrentPID = os.Getpid()
const (
cgroupMemLimitPath = "/sys/fs/cgroup/memory/memory.limit_in_bytes"
)
// GetCPUNum gets current os's cpu number
func GetCPUNum() int {
return runtime.NumCPU()
}
// GetMemoryStat gets current os's memory size in bytes
func GetMemoryStat() (total, used, free uint64, usedPercent float64) {
stat, err := mem.VirtualMemory()
if err != nil {
return 0, 0, 0, 0
}
return stat.Total, stat.Used, stat.Free, stat.UsedPercent
}
// IsCgroup checks whether current os is a container or not
func IsCgroup() bool {
ok, _ := gxfilepath.Exists(cgroupMemLimitPath)
if ok {
return true
}
return false
}
// GetCgroupMemoryLimit returns a container's total memory in bytes
func GetCgroupMemoryLimit() (uint64, error) {
return readUint(cgroupMemLimitPath)
}
// GetThreadNum gets current process's thread number
func GetThreadNum() int {
return pprof.Lookup("threadcreate").Count()
}
// GetGoroutineNum gets current process's goroutine number
func GetGoroutineNum() int {
return runtime.NumGoroutine()
}
// GetProcessCPUStat gets current process's cpu stat
func GetProcessCPUStat() (float64, error) {
p, err := process.NewProcess(int32(CurrentPID))
if err != nil {
return 0, err
}
cpuPercent, err := p.Percent(time.Second)
if err != nil {
return 0, err
}
// The default percent is if you use one core, then 100%, two core, 200%
// but it's inconvenient to calculate the proper percent
// here we multiply by core number, so we can set a percent bar more intuitively
cpuPercent = cpuPercent / float64(runtime.GOMAXPROCS(-1))
return cpuPercent, nil
}
// GetProcessMemoryStat gets current process's memory usage percent
func GetProcessMemoryPercent() (float32, error) {
p, err := process.NewProcess(int32(CurrentPID))
if err != nil {
return 0, err
}
memPercent, err := p.MemoryPercent()
if err != nil {
return 0, err
}
return memPercent, nil
}
// GetProcessMemoryStat gets current process's memory usage in Byte
func GetProcessMemoryStat() (uint64, error) {
p, err := process.NewProcess(int32(CurrentPID))
if err != nil {
return 0, err
}
memInfo, err := p.MemoryInfo()
if err != nil {
return 0, err
}
return memInfo.RSS, nil
}
// copied from https://github.com/containerd/cgroups/blob/318312a373405e5e91134d8063d04d59768a1bff/utils.go#L251
func parseUint(s string, base, bitSize int) (uint64, error) {
v, err := strconv.ParseUint(s, base, bitSize)
if err != nil {
intValue, intErr := strconv.ParseInt(s, base, bitSize)
// 1. Handle negative values greater than MinInt64 (and)
// 2. Handle negative values lesser than MinInt64
if intErr == nil && intValue < 0 {
return 0, nil
} else if intErr != nil &&
intErr.(*strconv.NumError).Err == strconv.ErrRange &&
intValue < 0 {
return 0, nil
}
return 0, err
}
return v, nil
}
// copied from https://github.com/containerd/cgroups/blob/318312a373405e5e91134d8063d04d59768a1bff/utils.go#L243
func readUint(path string) (uint64, error) {
v, err := ioutil.ReadFile(path)
if err != nil {
return 0, err
}
return parseUint(strings.TrimSpace(string(v)), 10, 64)
}
// GetCgroupProcessMemoryPercent gets current process's memory usage percent in cgroup env
func GetCgroupProcessMemoryPercent() (float64, error) {
p, err := process.NewProcess(int32(os.Getpid()))
if err != nil {
return 0, err
}
mem, err := p.MemoryInfo()
if err != nil {
return 0, err
}
memLimit, err := GetCgroupMemoryLimit()
if err != nil {
return 0, err
}
// mem.RSS / cgroup limit in bytes
memPercent := float64(mem.RSS) * 100 / float64(memLimit)
return memPercent, nil
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gxruntime
import (
"testing"
"time"
)
func TestSysStat(t *testing.T) {
t.Logf("current os cpu number %d", GetCPUNum())
total, used, free, usedPercent := GetMemoryStat()
t.Logf("memory: limit %d bytes, used %d bytes, free %d bytes, usedPercent %f", total, used, free, usedPercent)
t.Logf("current prcess thread number %d", GetThreadNum())
go func() {
time.Sleep(10e9)
}()
grNum := GetGoroutineNum()
if grNum < 2 {
t.Errorf("current prcess goroutine number %d", grNum)
}
cpu, err := GetProcessCPUStat()
if err != nil {
t.Errorf("GetProcessCPUStat() = error %+v", err)
}
t.Logf("process cpu stat %v", cpu)
size := 100 * 1024 * 1024
arr := make([]byte, size)
for idx := range arr {
arr[idx] = byte(idx / 255)
}
memoryStat, err := GetProcessMemoryStat()
if err != nil {
t.Errorf("GetProcessMemoryStat() = error %+v", err)
}
// t.Logf("process memory usage stat %v", memoryStat)
if memoryStat <= uint64(size) {
t.Errorf("memory usage stat %d < %d", memoryStat, size)
}
memoryUsage, err := GetProcessMemoryPercent()
if err != nil {
t.Errorf("GetProcessMemoryPercent() = error %+v", err)
}
t.Logf("process memory usage percent %v", memoryUsage)
if IsCgroup() {
memoryLimit, err := GetCgroupMemoryLimit()
if err != nil {
t.Errorf("GetCgroupMemoryLimit() = error %+v", err)
}
t.Logf("CGroupMemoryLimit() = %d", memoryLimit)
memoryPercent, err := GetCgroupProcessMemoryPercent()
if err != nil {
t.Errorf("GetCgroupProcessMemoryPercent(ps:%d) = error %+v", CurrentPID, err)
}
t.Logf("GetCgroupProcessMemoryPercent(ps:%d) = %+v", CurrentPID, memoryPercent)
}
}
......@@ -19,6 +19,7 @@ package gxsync
import (
"fmt"
"log"
"runtime/debug"
"sync"
)
......@@ -27,15 +28,10 @@ import (
"go.uber.org/atomic"
)
import (
gxlog "github.com/dubbogo/gost/log"
)
type WorkerPoolConfig struct {
NumWorkers int
NumQueues int
QueueSize int
Logger gxlog.Logger
Enable bool
}
......@@ -64,8 +60,6 @@ type WorkerPoolConfig struct {
// │worker1│ │worker3│ │worker5│ └──│ taskId % NumQueues == 1 │
// └───────┘ └───────┘ └───────┘ └─────────────────────────┘
type baseWorkerPool struct {
logger gxlog.Logger
taskId uint32
taskQueues []chan task
......@@ -92,7 +86,6 @@ func newBaseWorkerPool(config WorkerPoolConfig) *baseWorkerPool {
}
p := &baseWorkerPool{
logger: config.Logger,
taskQueues: taskQueues,
numWorkers: new(atomic.Int32),
wg: new(sync.WaitGroup),
......@@ -109,10 +102,7 @@ func newBaseWorkerPool(config WorkerPoolConfig) *baseWorkerPool {
p.dispatch(config.NumWorkers, initWg)
initWg.Wait()
if p.logger != nil {
p.logger.Infof("all %d workers are started", p.NumWorkers())
}
log.Printf("all %d workers are started", p.NumWorkers())
return p
}
......@@ -139,9 +129,7 @@ func (p *baseWorkerPool) Close() {
close(q)
}
p.wg.Wait()
if p.logger != nil {
p.logger.Infof("there are %d workers remained, all workers are closed", p.NumWorkers())
}
log.Printf("there are %d workers remained, all workers are closed", p.NumWorkers())
}
func (p *baseWorkerPool) IsClosed() bool {
......@@ -180,9 +168,7 @@ func (p *baseWorkerPool) worker(workerId int, wg *sync.WaitGroup) {
// prevent from goroutine panic
defer func() {
if r := recover(); r != nil {
if p.logger != nil {
p.logger.Errorf("goroutine panic: %v\n%s", r, string(debug.Stack()))
}
log.Printf("goroutine panic: %v\n%s", r, string(debug.Stack()))
}
}()
// execute task
......
......@@ -20,9 +20,7 @@ package gxsync
import (
"math/rand"
"sync/atomic"
)
import (
perrors "github.com/pkg/errors"
)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gxsync
import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
func TestConnectionPool(t *testing.T) {
t.Run("Count", func(t *testing.T) {
p := NewConnectionPool(WorkerPoolConfig{
NumWorkers: 100,
NumQueues: runtime.NumCPU(),
QueueSize: 10,
Logger: nil,
Enable: true,
})
var count int64
wg := new(sync.WaitGroup)
for i := 1; i <= 100; i++ {
wg.Add(1)
value := i
err := p.Submit(func() {
defer wg.Done()
atomic.AddInt64(&count, int64(value))
})
assert.Nil(t, err)
}
wg.Wait()
assert.Equal(t, int64(5050), count)
p.Close()
})
t.Run("PoolBusyErr", func(t *testing.T) {
p := NewConnectionPool(WorkerPoolConfig{
NumWorkers: 1,
NumQueues: 1,
QueueSize: 0,
Logger: nil,
Enable: true,
})
wg := new(sync.WaitGroup)
wg.Add(1)
err := p.Submit(func() {
wg.Wait()
})
assert.Nil(t, err)
err = p.Submit(func() {})
assert.Equal(t, PoolBusyErr, err)
wg.Done()
time.Sleep(100 * time.Millisecond)
err = p.Submit(func() {})
assert.Nil(t, err)
p.Close()
})
t.Run("Close", func(t *testing.T) {
p := NewConnectionPool(WorkerPoolConfig{
NumWorkers: runtime.NumCPU(),
NumQueues: runtime.NumCPU(),
QueueSize: 100,
Enable: true,
Logger: nil,
})
assert.Equal(t, runtime.NumCPU(), int(p.NumWorkers()))
p.Close()
assert.True(t, p.IsClosed())
assert.Panics(t, func() {
_ = p.Submit(func() {})
})
})
t.Run("BorderCondition", func(t *testing.T) {
p := NewConnectionPool(WorkerPoolConfig{
NumWorkers: 0,
NumQueues: runtime.NumCPU(),
Enable: true,
QueueSize: 100,
Logger: nil,
})
assert.Equal(t, 1, int(p.NumWorkers()))
p.Close()
p = NewConnectionPool(WorkerPoolConfig{
NumWorkers: 1,
NumQueues: 0,
Enable: true,
QueueSize: 0,
Logger: nil,
})
err := p.Submit(func() {})
assert.Nil(t, err)
p.Close()
p = NewConnectionPool(WorkerPoolConfig{
NumWorkers: 1,
NumQueues: 1,
QueueSize: -1,
Logger: nil,
Enable: true,
})
err = p.Submit(func() {})
assert.Nil(t, err)
p.Close()
})
t.Run("NilTask", func(t *testing.T) {
p := NewConnectionPool(WorkerPoolConfig{
NumWorkers: 1,
NumQueues: 1,
Enable: true,
QueueSize: 0,
Logger: nil,
})
err := p.Submit(nil)
assert.NotNil(t, err)
p.Close()
})
t.Run("CountTask", func(t *testing.T) {
p := NewConnectionPool(WorkerPoolConfig{
NumWorkers: runtime.NumCPU(),
NumQueues: runtime.NumCPU(),
QueueSize: 10,
Logger: nil,
Enable: true,
})
task, v := newCountTask()
wg := new(sync.WaitGroup)
wg.Add(100)
for i := 0; i < 100; i++ {
if err := p.Submit(func() {
defer wg.Done()
task()
}); err != nil {
i--
}
}
wg.Wait()
assert.Equal(t, 100, int(*v))
p.Close()
})
t.Run("CountTaskSync", func(t *testing.T) {
p := NewConnectionPool(WorkerPoolConfig{
NumWorkers: runtime.NumCPU(),
NumQueues: runtime.NumCPU(),
QueueSize: 10,
Logger: nil,
Enable: true,
})
task, v := newCountTask()
for i := 0; i < 100; i++ {
err := p.SubmitSync(task)
assert.Nil(t, err)
}
assert.Equal(t, 100, int(*v))
p.Close()
})
}
func BenchmarkConnectionPool(b *testing.B) {
p := NewConnectionPool(WorkerPoolConfig{
NumWorkers: 100,
NumQueues: runtime.NumCPU(),
QueueSize: 100,
Enable: true,
Logger: nil,
})
b.Run("CountTask", func(b *testing.B) {
task, _ := newCountTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = p.Submit(task)
}
})
})
b.Run("CPUTask", func(b *testing.B) {
task, _ := newCPUTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = p.Submit(task)
}
})
})
b.Run("IOTask", func(b *testing.B) {
task, _ := newIOTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = p.Submit(task)
}
})
})
b.Run("RandomTask", func(b *testing.B) {
task, _ := newRandomTask()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = p.Submit(task)
}
})
})
}
......@@ -30,7 +30,7 @@ import (
)
import (
gxruntime "github.com/dubbogo/gost/runtime"
gxruntime "github.com/adamweixuan/gostnops/runtime"
)
type task func()
......
This diff is collapsed.
This diff is collapsed.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package gxtime encapsulates some golang.time functions
package gxtime
import (
"testing"
"time"
)
import (
gxlog "github.com/dubbogo/gost/log"
)
func TestTickFunc(t *testing.T) {
// num int
var cw CountWatch // xassert *assert.Assertions
InitDefaultTimerWheel()
f := func() {
gxlog.CInfo("timer costs:%dms", cw.Count()/1e6)
}
// num = 3
// xassert = assert.New(t)
cw.Start()
TickFunc(TimeSecondDuration(0.5), f)
TickFunc(TimeSecondDuration(1.3), f)
TickFunc(TimeSecondDuration(6.5), f)
time.Sleep(6e9)
// xassert.Equal(defaultTimerWheel.TimerNumber(), num, "") // just equal in this ut
}
func TestTicker_Reset(t *testing.T) {
//var (
// ticker *Ticker
// wg sync.WaitGroup
// cw CountWatch
// xassert *assert.Assertions
//)
//
//Init()
//
//f := func() {
// defer wg.Done()
// gxlog.CInfo("timer costs:%dms", cw.Count()/1e6)
// gxlog.CInfo("in timer func, timer number:%d", defaultTimerWheel.TimerNumber())
//}
//
//xassert = assert.New(t)
//wg.Add(1)
//cw.Start()
//ticker = TickFunc(TimeSecondDuration(1.5), f)
//ticker.Reset(TimeSecondDuration(3.5))
//time.Sleep(TimeSecondDuration(0.001))
//xassert.Equal(defaultTimerWheel.TimerNumber(), 1, "") // just equal on this ut
//wg.Wait()
}
func TestTicker_Stop(t *testing.T) {
var (
ticker *Ticker
cw CountWatch
// xassert assert.Assertions
)
InitDefaultTimerWheel()
f := func() {
gxlog.CInfo("timer costs:%dms", cw.Count()/1e6)
}
cw.Start()
ticker = TickFunc(TimeSecondDuration(4.5), f)
// 添加是异步进行的,所以sleep一段时间再去检测timer number
time.Sleep(TimeSecondDuration(0.001))
// timerNumber := defaultTimerWheel.TimerNumber()
// xassert.Equal(timerNumber, 1, "")
time.Sleep(TimeSecondDuration(5))
ticker.Stop()
// 删除是异步进行的,所以sleep一段时间再去检测timer number
// time.Sleep(TimeSecondDuration(0.001))
// timerNumber = defaultTimerWheel.TimerNumber()
// xassert.Equal(timerNumber, 0, "")
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment