Unverified Commit fd9b88f3 authored by 赵云兴's avatar 赵云兴 Committed by GitHub

Naming Client and Config Client (#59)

* add:config client

* up:config client and naming client

add:nacos client

add:nacos client

add:ci

add: assert err

up:nacos address

* up:lock name

* add:note
parent 944d0be7
...@@ -19,90 +19,114 @@ package nacos ...@@ -19,90 +19,114 @@ package nacos
import ( import (
"sync" "sync"
"sync/atomic"
) )
import ( import (
"github.com/nacos-group/nacos-sdk-go/clients" "github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/config_client" "github.com/nacos-group/nacos-sdk-go/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/common/constant" "github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo" "github.com/nacos-group/nacos-sdk-go/vo"
) )
var ( var (
clientPool nacosClientPool
clientPoolOnce sync.Once
)
var (
configClientPool nacosConfigClientPool configClientPool nacosConfigClientPool
configClientPoolOnce sync.Once configClientPoolOnce sync.Once
) )
type nacosClientPool struct {
sync.Mutex
namingClient map[string]naming_client.INamingClient
}
type nacosConfigClientPool struct { type nacosConfigClientPool struct {
sync.Mutex sync.Mutex
configClient map[string]config_client.IConfigClient configClient map[string]*NacosConfigClient
} }
func initNacosClientPool() { type NacosConfigClient struct {
clientPool.namingClient = make(map[string]naming_client.INamingClient) name string
clientLock sync.Mutex // for Client
client config_client.IConfigClient
config vo.NacosClientParam //conn config
valid uint32
activeCount uint32
share bool
} }
func initNacosConfigClientPool() { func initNacosConfigClientPool() {
configClientPool.configClient = make(map[string]config_client.IConfigClient) configClientPool.configClient = make(map[string]*NacosConfigClient)
} }
// NewNacosNamingClient create nacos client func (n *NacosConfigClient) newConfigClient() error {
func NewNacosNamingClient(name string, share bool, sc []constant.ServerConfig, client, err := clients.NewConfigClient(n.config)
cc constant.ClientConfig) (naming_client.INamingClient, error) { if err != nil {
if !share { return err
return newNamingClient(sc, cc)
}
clientPoolOnce.Do(initNacosClientPool)
clientPool.Lock()
defer clientPool.Unlock()
if client, ok := clientPool.namingClient[name]; ok {
return client, nil
}
client, err := newNamingClient(sc, cc)
if err == nil {
clientPool.namingClient[name] = client
} }
return client, err n.activeCount++
atomic.StoreUint32(&n.valid, 1)
n.client = client
return nil
} }
// NewNacosConfigClient create config client // NewNacosConfigClient create config client
func NewNacosConfigClient(name string, share bool, sc []constant.ServerConfig, func NewNacosConfigClient(name string, share bool, sc []constant.ServerConfig,
cc constant.ClientConfig) (config_client.IConfigClient, error) { cc constant.ClientConfig) (*NacosConfigClient, error) {
configClient := &NacosConfigClient{
name: name,
activeCount: 0,
share: share,
config: vo.NacosClientParam{ClientConfig: &cc, ServerConfigs: sc},
}
if !share { if !share {
return newConfigClient(sc, cc) return configClient, configClient.newConfigClient()
} }
configClientPoolOnce.Do(initNacosConfigClientPool) configClientPoolOnce.Do(initNacosConfigClientPool)
configClientPool.Lock() configClientPool.Lock()
defer configClientPool.Unlock() defer configClientPool.Unlock()
if client, ok := configClientPool.configClient[name]; ok { if client, ok := configClientPool.configClient[name]; ok {
client.activeCount++
return client, nil return client, nil
} }
client, err := newConfigClient(sc, cc) err := configClient.newConfigClient()
if err == nil { if err == nil {
configClientPool.configClient[name] = client configClientPool.configClient[name] = configClient
} }
return client, err return configClient, err
}
// Client Get NacosConfigClient
func (n *NacosConfigClient) Client() config_client.IConfigClient {
return n.client
} }
func newNamingClient(sc []constant.ServerConfig, cc constant.ClientConfig) (naming_client.INamingClient, error) { // SetClient Set NacosConfigClient
cfg := vo.NacosClientParam{ClientConfig: &cc, ServerConfigs: sc} func (n *NacosConfigClient) SetClient(client config_client.IConfigClient) {
return clients.NewNamingClient(cfg) n.clientLock.Lock()
n.client = client
n.clientLock.Unlock()
} }
func newConfigClient(sc []constant.ServerConfig, cc constant.ClientConfig) (config_client.IConfigClient, error) { // NacosClientValid Get nacos client valid status
cfg := vo.NacosClientParam{ClientConfig: &cc, ServerConfigs: sc} func (n *NacosConfigClient) NacosClientValid() bool {
return clients.NewConfigClient(cfg)
return atomic.LoadUint32(&n.valid) == 1
}
// Close close client
func (n *NacosConfigClient) Close() {
configClientPool.Lock()
defer configClientPool.Unlock()
if n.client == nil {
return
}
n.activeCount--
if n.share {
if n.activeCount == 0 {
n.client = nil
atomic.StoreUint32(&n.valid, 0)
delete(configClientPool.configClient, n.name)
}
} else {
n.client = nil
atomic.StoreUint32(&n.valid, 0)
delete(configClientPool.configClient, n.name)
}
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"fmt"
"reflect"
"testing"
"time"
)
import (
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
"github.com/stretchr/testify/assert"
)
func TestStructAlign(t *testing.T) {
typ := reflect.TypeOf(NacosConfigClient{})
fmt.Printf("Struct is %d bytes long\n", typ.Size())
n := typ.NumField()
for i := 0; i < n; i++ {
field := typ.Field(i)
fmt.Printf("%s: at offset %v, size=%d, align=%d\n",
field.Name, field.Offset, field.Type.Size(),
field.Type.Align())
}
}
//TestNewNacosConfigClient config client
func TestNewNacosConfigClient(t *testing.T) {
scs := []constant.ServerConfig{*constant.NewServerConfig("console.nacos.io", 80)}
cc := constant.ClientConfig{TimeoutMs: 5 * 1000, NotLoadCacheAtStart: true}
client1, err := NewNacosConfigClient("nacos", true, scs, cc)
assert.Nil(t, err)
client2, err := NewNacosConfigClient("nacos", true, scs, cc)
assert.Nil(t, err)
client3, err := NewNacosConfigClient("nacos", false, scs, cc)
assert.Nil(t, err)
client4, err := NewNacosConfigClient("test", true, scs, cc)
assert.Nil(t, err)
assert.Equal(t, client1, client2)
assert.Equal(t, client1.activeCount, uint32(2))
assert.Equal(t, client1.NacosClientValid(), true)
assert.Equal(t, client3.activeCount, uint32(1))
assert.Equal(t, client4.activeCount, uint32(1))
client1.Close()
assert.Equal(t, client1.activeCount, uint32(1))
client1.Close()
assert.Equal(t, client1.NacosClientValid(), false)
assert.Nil(t, client1.Client())
client1.Close()
assert.Equal(t, client1.NacosClientValid(), false)
assert.Nil(t, client1.Client())
}
func TestPublishConfig(t *testing.T) {
scs := []constant.ServerConfig{*constant.NewServerConfig("console.nacos.io", 80)}
cc := constant.ClientConfig{
AppName: "nacos",
NamespaceId: "14e01fa8-a4aa-44cc-ad5b-c768f3c62bd5", //namespace id
TimeoutMs: 5000,
NotLoadCacheAtStart: true,
LogDir: "/tmp/nacos/log",
CacheDir: "/tmp/nacos/cache",
RotateTime: "1h",
MaxAge: 3,
LogLevel: "debug",
}
client, err := NewNacosConfigClient("nacos", true, scs, cc)
assert.Nil(t, err)
//publish config
//config key=dataId+group+namespaceId
push, err := client.Client().PublishConfig(vo.ConfigParam{
DataId: "nacos-config",
Group: "dubbo",
Content: "dubbo-go nb",
})
assert.Nil(t, err)
assert.Equal(t, push, true)
//get config
cfg, err := client.Client().GetConfig(vo.ConfigParam{
DataId: "nacos-config",
Group: "dubbo",
})
assert.Nil(t, err)
fmt.Println("GetConfig,config :", cfg)
//Listen config change,key=dataId+group+namespaceId.
err = client.Client().ListenConfig(vo.ConfigParam{
DataId: "nacos-config",
Group: "dubbo",
OnChange: func(namespace, group, dataId, data string) {
assert.Equal(t, data, "test-listen")
fmt.Println("config changed group:" + group + ", dataId:" + dataId + ", content:" + data)
},
})
assert.Nil(t, err)
_, err = client.Client().PublishConfig(vo.ConfigParam{
DataId: "nacos-config",
Group: "dubbo",
Content: "test-listen",
})
assert.Nil(t, err)
searchPage, _ := client.Client().SearchConfig(vo.SearchConfigParam{
Search: "accurate",
DataId: "",
Group: "dubbo",
PageNo: 1,
PageSize: 10,
})
fmt.Printf("Search config:%+v \n", searchPage)
time.Sleep(2 * time.Second)
client.Close()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"sync"
"sync/atomic"
)
import (
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/vo"
)
var (
namingClientPool nacosClientPool
clientPoolOnce sync.Once
)
type nacosClientPool struct {
sync.Mutex
namingClient map[string]*NacosNamingClient
}
type NacosNamingClient struct {
name string
clientLock sync.Mutex // for Client
client naming_client.INamingClient
config vo.NacosClientParam //conn config
valid uint32
activeCount uint32
share bool
}
func initNacosClientPool() {
namingClientPool.namingClient = make(map[string]*NacosNamingClient)
}
// NewNacosNamingClient create nacos client
func NewNacosNamingClient(name string, share bool, sc []constant.ServerConfig,
cc constant.ClientConfig) (*NacosNamingClient, error) {
namingClient := &NacosNamingClient{
name: name,
activeCount: 0,
share: share,
config: vo.NacosClientParam{ClientConfig: &cc, ServerConfigs: sc},
}
if !share {
return namingClient, namingClient.newNamingClient()
}
clientPoolOnce.Do(initNacosClientPool)
namingClientPool.Lock()
defer namingClientPool.Unlock()
if client, ok := namingClientPool.namingClient[name]; ok {
client.activeCount++
return client, nil
}
err := namingClient.newNamingClient()
if err == nil {
namingClientPool.namingClient[name] = namingClient
}
return namingClient, err
}
// newNamingClient create NamingClient
func (n *NacosNamingClient) newNamingClient() error {
client, err := clients.NewNamingClient(n.config)
if err != nil {
return err
}
n.activeCount++
atomic.StoreUint32(&n.valid, 1)
n.client = client
return nil
}
// Client Get NacosNamingClient
func (n *NacosNamingClient) Client() naming_client.INamingClient {
return n.client
}
// SetClient Set NacosNamingClient
func (n *NacosNamingClient) SetClient(client naming_client.INamingClient) {
n.clientLock.Lock()
n.client = client
n.clientLock.Unlock()
}
// NacosClientValid Get nacos client valid status
func (n *NacosNamingClient) NacosClientValid() bool {
return atomic.LoadUint32(&n.valid) == 1
}
// Close close client
func (n *NacosNamingClient) Close() {
namingClientPool.Lock()
defer namingClientPool.Unlock()
if n.client == nil {
return
}
n.activeCount--
if n.share {
if n.activeCount == 0 {
n.client = nil
atomic.StoreUint32(&n.valid, 0)
delete(namingClientPool.namingClient, n.name)
}
} else {
n.client = nil
atomic.StoreUint32(&n.valid, 0)
delete(namingClientPool.namingClient, n.name)
}
}
...@@ -37,32 +37,17 @@ func TestNewNacosClient(t *testing.T) { ...@@ -37,32 +37,17 @@ func TestNewNacosClient(t *testing.T) {
NotLoadCacheAtStart: true, NotLoadCacheAtStart: true,
} }
t.Run("naming_client", func(t *testing.T) { client1, err := NewNacosNamingClient("nacos", true, scs, cc)
client1, err := NewNacosNamingClient("nacos", true, scs, cc) assert.Nil(t, err)
assert.Nil(t, err) client2, err := NewNacosNamingClient("nacos", true, scs, cc)
client2, err := NewNacosNamingClient("nacos", true, scs, cc) assert.Nil(t, err)
assert.Nil(t, err) client3, err := NewNacosNamingClient("nacos", false, scs, cc)
client3, err := NewNacosNamingClient("nacos", false, scs, cc) assert.Nil(t, err)
assert.Nil(t, err) client4, err := NewNacosNamingClient("test", true, scs, cc)
client4, err := NewNacosNamingClient("test", true, scs, cc) assert.Nil(t, err)
assert.Nil(t, err)
assert.Equal(t, client1, client2) assert.Equal(t, client1, client2)
assert.NotEqual(t, client1, client3) assert.Equal(t, client1.activeCount, uint32(2))
assert.NotEqual(t, client1, client4) assert.NotEqual(t, client1, client3)
}) assert.NotEqual(t, client1, client4)
t.Run("config_client", func(t *testing.T) {
client1, err := NewNacosConfigClient("nacos", true, scs, cc)
assert.Nil(t, err)
client2, err := NewNacosConfigClient("nacos", true, scs, cc)
assert.Nil(t, err)
client3, err := NewNacosConfigClient("nacos", false, scs, cc)
assert.Nil(t, err)
client4, err := NewNacosConfigClient("test", true, scs, cc)
assert.Nil(t, err)
assert.Equal(t, client1, client2)
assert.Equal(t, client1, client3)
assert.Equal(t, client1, client4)
})
} }
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment