Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in / Register
Toggle navigation
G
gostnops
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
wei.xuan
gostnops
Commits
62b6716d
Commit
62b6716d
authored
Feb 06, 2021
by
ztelur
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add remoting/etcd3 encapsulation
parent
e7ba5790
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
912 additions
and
0 deletions
+912
-0
go.mod
go.mod
+2
-0
go.sum
go.sum
+42
-0
client.go
remoting/etcd3/client.go
+456
-0
client_test.go
remoting/etcd3/client_test.go
+412
-0
No files found.
go.mod
View file @
62b6716d
...
@@ -13,6 +13,8 @@ require (
...
@@ -13,6 +13,8 @@ require (
github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible
github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.6.1
go.uber.org/atomic v1.7.0
go.uber.org/atomic v1.7.0
github.com/coreos/etcd v3.3.25+incompatible
google.golang.org/grpc v1.26.0
)
)
go 1.13
go 1.13
go.sum
View file @
62b6716d
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/etcd v3.3.25+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dubbogo/jsonparser v1.0.1 h1:sAIr8gk+gkahkIm6CnUxh9wTCkbgwLEQ8dTXTnAXyzo=
github.com/dubbogo/jsonparser v1.0.1 h1:sAIr8gk+gkahkIm6CnUxh9wTCkbgwLEQ8dTXTnAXyzo=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/dubbogo/jsonparser v1.0.1/go.mod h1:tYAtpctvSP/tWw4MeelsowSPgXQRVHHWbqL6ynps8jU=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
github.com/k0kubun/pp v3.0.1+incompatible h1:3tqvf7QgUnZ5tXO6pNAZlrvHgl6DvifjDrd9g2S9Z40=
github.com/k0kubun/pp v3.0.1+incompatible h1:3tqvf7QgUnZ5tXO6pNAZlrvHgl6DvifjDrd9g2S9Z40=
...
@@ -20,6 +32,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
...
@@ -20,6 +32,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible h1:Wll9sV8SqrD0cSI17l1L1Q2ZcqhhoDb1CUN+6TarZ3I=
github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible h1:Wll9sV8SqrD0cSI17l1L1Q2ZcqhhoDb1CUN+6TarZ3I=
github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil v3.20.11-0.20201116082039-2fb5da2f2449+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
...
@@ -29,11 +42,40 @@ github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd
...
@@ -29,11 +42,40 @@ github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42 h1:vEOn+mP2zCOVzKckCZy6YsCtDblrpj/w7B9nxGNELpg=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42 h1:vEOn+mP2zCOVzKckCZy6YsCtDblrpj/w7B9nxGNELpg=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
remoting/etcd3/client.go
0 → 100644
View file @
62b6716d
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
etcdv3
import
(
"context"
"sync"
"time"
"log"
)
import
(
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
perrors
"github.com/pkg/errors"
"google.golang.org/grpc"
)
const
(
// ConnDelay connection delay
ConnDelay
=
3
// MaxFailTimes max failure times
MaxFailTimes
=
15
// RegistryETCDV3Client client name
RegistryETCDV3Client
=
"etcd registry"
// MetadataETCDV3Client client name
MetadataETCDV3Client
=
"etcd metadata"
)
var
(
// ErrNilETCDV3Client raw client nil
ErrNilETCDV3Client
=
perrors
.
New
(
"etcd raw client is nil"
)
// full describe the ERR
// ErrKVPairNotFound not found key
ErrKVPairNotFound
=
perrors
.
New
(
"k/v pair not found"
)
)
// Options client configuration
type
Options
struct
{
name
string
endpoints
[]
string
client
*
Client
timeout
time
.
Duration
heartbeat
int
// heartbeat second
}
// Option will define a function of handling Options
type
Option
func
(
*
Options
)
// WithEndpoints sets etcd client endpoints
func
WithEndpoints
(
endpoints
...
string
)
Option
{
return
func
(
opt
*
Options
)
{
opt
.
endpoints
=
endpoints
}
}
// WithName sets etcd client name
func
WithName
(
name
string
)
Option
{
return
func
(
opt
*
Options
)
{
opt
.
name
=
name
}
}
// WithTimeout sets etcd client timeout
func
WithTimeout
(
timeout
time
.
Duration
)
Option
{
return
func
(
opt
*
Options
)
{
opt
.
timeout
=
timeout
}
}
// WithHeartbeat sets etcd client heartbeat
func
WithHeartbeat
(
heartbeat
int
)
Option
{
return
func
(
opt
*
Options
)
{
opt
.
heartbeat
=
heartbeat
}
}
// NewConfigClient create new Client
func
NewConfigClient
(
opts
...
Option
)
*
Client
{
options
:=
&
Options
{
heartbeat
:
1
,
// default heartbeat
}
for
_
,
opt
:=
range
opts
{
opt
(
options
)
}
newClient
,
err
:=
NewClient
(
options
.
name
,
options
.
endpoints
,
options
.
timeout
,
options
.
heartbeat
)
if
err
!=
nil
{
log
.
Printf
(
"new etcd client (name{%s}, etcd addresses{%v}, timeout{%d}) = error{%v}"
,
options
.
name
,
options
.
endpoints
,
options
.
timeout
,
err
)
}
return
newClient
}
// Client represents etcd client Configuration
type
Client
struct
{
lock
sync
.
RWMutex
// these properties are only set once when they are started.
name
string
endpoints
[]
string
timeout
time
.
Duration
heartbeat
int
ctx
context
.
Context
// if etcd server connection lose, the ctx.Done will be sent msg
cancel
context
.
CancelFunc
// cancel the ctx, all watcher will stopped
rawClient
*
clientv3
.
Client
exit
chan
struct
{}
Wait
sync
.
WaitGroup
}
// NewClient create a client instance with name, endpoints etc.
func
NewClient
(
name
string
,
endpoints
[]
string
,
timeout
time
.
Duration
,
heartbeat
int
)
(
*
Client
,
error
)
{
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
rawClient
,
err
:=
clientv3
.
New
(
clientv3
.
Config
{
Context
:
ctx
,
Endpoints
:
endpoints
,
DialTimeout
:
timeout
,
DialOptions
:
[]
grpc
.
DialOption
{
grpc
.
WithBlock
()},
})
if
err
!=
nil
{
return
nil
,
perrors
.
WithMessage
(
err
,
"new raw client block connect to server"
)
}
c
:=
&
Client
{
name
:
name
,
timeout
:
timeout
,
endpoints
:
endpoints
,
heartbeat
:
heartbeat
,
ctx
:
ctx
,
cancel
:
cancel
,
rawClient
:
rawClient
,
exit
:
make
(
chan
struct
{}),
}
if
err
:=
c
.
maintenanceStatus
();
err
!=
nil
{
return
nil
,
perrors
.
WithMessage
(
err
,
"client maintenance status"
)
}
return
c
,
nil
}
// NOTICE: need to get the lock before calling this method
func
(
c
*
Client
)
clean
()
{
// close raw client
c
.
rawClient
.
Close
()
// cancel ctx for raw client
c
.
cancel
()
// clean raw client
c
.
rawClient
=
nil
}
func
(
c
*
Client
)
stop
()
bool
{
select
{
case
<-
c
.
exit
:
return
true
default
:
close
(
c
.
exit
)
}
return
false
}
// Close close client
func
(
c
*
Client
)
Close
()
{
if
c
==
nil
{
return
}
// stop the client
c
.
stop
()
// wait client maintenance status stop
c
.
Wait
.
Wait
()
c
.
lock
.
Lock
()
defer
c
.
lock
.
Unlock
()
if
c
.
rawClient
!=
nil
{
c
.
clean
()
}
log
.
Printf
(
"etcd client{name:%s, endpoints:%s} exit now."
,
c
.
name
,
c
.
endpoints
)
}
func
(
c
*
Client
)
maintenanceStatus
()
error
{
s
,
err
:=
concurrency
.
NewSession
(
c
.
rawClient
,
concurrency
.
WithTTL
(
c
.
heartbeat
))
if
err
!=
nil
{
return
perrors
.
WithMessage
(
err
,
"new session with server"
)
}
// must add wg before go maintenance status goroutine
c
.
Wait
.
Add
(
1
)
go
c
.
maintenanceStatusLoop
(
s
)
return
nil
}
func
(
c
*
Client
)
maintenanceStatusLoop
(
s
*
concurrency
.
Session
)
{
defer
func
()
{
c
.
Wait
.
Done
()
log
.
Printf
(
"etcd client {endpoints:%v, name:%s} maintenance goroutine game over."
,
c
.
endpoints
,
c
.
name
)
}()
for
{
select
{
case
<-
c
.
Done
()
:
// Client be stopped, will clean the client hold resources
return
case
<-
s
.
Done
()
:
log
.
Print
(
"etcd server stopped"
)
c
.
lock
.
Lock
()
// when etcd server stopped, cancel ctx, stop all watchers
c
.
clean
()
// when connection lose, stop client, trigger reconnect to etcd
c
.
stop
()
c
.
lock
.
Unlock
()
return
}
}
}
// if k not exist will put k/v in etcd, otherwise return nil
func
(
c
*
Client
)
put
(
k
string
,
v
string
,
opts
...
clientv3
.
OpOption
)
error
{
c
.
lock
.
RLock
()
defer
c
.
lock
.
RUnlock
()
if
c
.
rawClient
==
nil
{
return
ErrNilETCDV3Client
}
_
,
err
:=
c
.
rawClient
.
Txn
(
c
.
ctx
)
.
If
(
clientv3
.
Compare
(
clientv3
.
Version
(
k
),
"<"
,
1
))
.
Then
(
clientv3
.
OpPut
(
k
,
v
,
opts
...
))
.
Commit
()
return
err
}
// if k not exist will put k/v in etcd
// if k is already exist in etcd, replace it
func
(
c
*
Client
)
update
(
k
string
,
v
string
,
opts
...
clientv3
.
OpOption
)
error
{
c
.
lock
.
RLock
()
defer
c
.
lock
.
RUnlock
()
if
c
.
rawClient
==
nil
{
return
ErrNilETCDV3Client
}
_
,
err
:=
c
.
rawClient
.
Txn
(
c
.
ctx
)
.
If
(
clientv3
.
Compare
(
clientv3
.
Version
(
k
),
"!="
,
-
1
))
.
Then
(
clientv3
.
OpPut
(
k
,
v
,
opts
...
))
.
Commit
()
return
err
}
func
(
c
*
Client
)
delete
(
k
string
)
error
{
c
.
lock
.
RLock
()
defer
c
.
lock
.
RUnlock
()
if
c
.
rawClient
==
nil
{
return
ErrNilETCDV3Client
}
_
,
err
:=
c
.
rawClient
.
Delete
(
c
.
ctx
,
k
)
return
err
}
func
(
c
*
Client
)
get
(
k
string
)
(
string
,
error
)
{
c
.
lock
.
RLock
()
defer
c
.
lock
.
RUnlock
()
if
c
.
rawClient
==
nil
{
return
""
,
ErrNilETCDV3Client
}
resp
,
err
:=
c
.
rawClient
.
Get
(
c
.
ctx
,
k
)
if
err
!=
nil
{
return
""
,
err
}
if
len
(
resp
.
Kvs
)
==
0
{
return
""
,
ErrKVPairNotFound
}
return
string
(
resp
.
Kvs
[
0
]
.
Value
),
nil
}
// CleanKV delete all key and value
func
(
c
*
Client
)
CleanKV
()
error
{
c
.
lock
.
RLock
()
defer
c
.
lock
.
RUnlock
()
if
c
.
rawClient
==
nil
{
return
ErrNilETCDV3Client
}
_
,
err
:=
c
.
rawClient
.
Delete
(
c
.
ctx
,
""
,
clientv3
.
WithPrefix
())
return
err
}
func
(
c
*
Client
)
getChildren
(
k
string
)
([]
string
,
[]
string
,
error
)
{
c
.
lock
.
RLock
()
defer
c
.
lock
.
RUnlock
()
if
c
.
rawClient
==
nil
{
return
nil
,
nil
,
ErrNilETCDV3Client
}
resp
,
err
:=
c
.
rawClient
.
Get
(
c
.
ctx
,
k
,
clientv3
.
WithPrefix
())
if
err
!=
nil
{
return
nil
,
nil
,
err
}
if
len
(
resp
.
Kvs
)
==
0
{
return
nil
,
nil
,
ErrKVPairNotFound
}
kList
:=
make
([]
string
,
0
,
len
(
resp
.
Kvs
))
vList
:=
make
([]
string
,
0
,
len
(
resp
.
Kvs
))
for
_
,
kv
:=
range
resp
.
Kvs
{
kList
=
append
(
kList
,
string
(
kv
.
Key
))
vList
=
append
(
vList
,
string
(
kv
.
Value
))
}
return
kList
,
vList
,
nil
}
func
(
c
*
Client
)
watchWithPrefix
(
prefix
string
)
(
clientv3
.
WatchChan
,
error
)
{
c
.
lock
.
RLock
()
defer
c
.
lock
.
RUnlock
()
if
c
.
rawClient
==
nil
{
return
nil
,
ErrNilETCDV3Client
}
return
c
.
rawClient
.
Watch
(
c
.
ctx
,
prefix
,
clientv3
.
WithPrefix
()),
nil
}
func
(
c
*
Client
)
watch
(
k
string
)
(
clientv3
.
WatchChan
,
error
)
{
c
.
lock
.
RLock
()
defer
c
.
lock
.
RUnlock
()
if
c
.
rawClient
==
nil
{
return
nil
,
ErrNilETCDV3Client
}
return
c
.
rawClient
.
Watch
(
c
.
ctx
,
k
),
nil
}
func
(
c
*
Client
)
keepAliveKV
(
k
string
,
v
string
)
error
{
c
.
lock
.
RLock
()
defer
c
.
lock
.
RUnlock
()
if
c
.
rawClient
==
nil
{
return
ErrNilETCDV3Client
}
// make lease time longer, since 1 second is too short
lease
,
err
:=
c
.
rawClient
.
Grant
(
c
.
ctx
,
int64
(
30
*
time
.
Second
.
Seconds
()))
if
err
!=
nil
{
return
perrors
.
WithMessage
(
err
,
"grant lease"
)
}
keepAlive
,
err
:=
c
.
rawClient
.
KeepAlive
(
c
.
ctx
,
lease
.
ID
)
if
err
!=
nil
||
keepAlive
==
nil
{
c
.
rawClient
.
Revoke
(
c
.
ctx
,
lease
.
ID
)
if
err
!=
nil
{
return
perrors
.
WithMessage
(
err
,
"keep alive lease"
)
}
return
perrors
.
New
(
"keep alive lease"
)
}
_
,
err
=
c
.
rawClient
.
Put
(
c
.
ctx
,
k
,
v
,
clientv3
.
WithLease
(
lease
.
ID
))
return
perrors
.
WithMessage
(
err
,
"put k/v with lease"
)
}
// Done return exit chan
func
(
c
*
Client
)
Done
()
<-
chan
struct
{}
{
return
c
.
exit
}
// Valid check client
func
(
c
*
Client
)
Valid
()
bool
{
select
{
case
<-
c
.
exit
:
return
false
default
:
}
c
.
lock
.
RLock
()
defer
c
.
lock
.
RUnlock
()
return
c
.
rawClient
!=
nil
}
// Create key value ...
func
(
c
*
Client
)
Create
(
k
string
,
v
string
)
error
{
err
:=
c
.
put
(
k
,
v
)
return
perrors
.
WithMessagef
(
err
,
"put k/v (key: %s value %s)"
,
k
,
v
)
}
// Update key value ...
func
(
c
*
Client
)
Update
(
k
,
v
string
)
error
{
err
:=
c
.
update
(
k
,
v
)
return
perrors
.
WithMessagef
(
err
,
"Update k/v (key: %s value %s)"
,
k
,
v
)
}
// Delete key
func
(
c
*
Client
)
Delete
(
k
string
)
error
{
err
:=
c
.
delete
(
k
)
return
perrors
.
WithMessagef
(
err
,
"delete k/v (key %s)"
,
k
)
}
// RegisterTemp registers a temporary node
func
(
c
*
Client
)
RegisterTemp
(
k
,
v
string
)
error
{
err
:=
c
.
keepAliveKV
(
k
,
v
)
return
perrors
.
WithMessagef
(
err
,
"keepalive kv (key %s)"
,
k
)
}
// GetChildrenKVList gets children kv list by @k
func
(
c
*
Client
)
GetChildrenKVList
(
k
string
)
([]
string
,
[]
string
,
error
)
{
kList
,
vList
,
err
:=
c
.
getChildren
(
k
)
return
kList
,
vList
,
perrors
.
WithMessagef
(
err
,
"get key children (key %s)"
,
k
)
}
// Get gets value by @k
func
(
c
*
Client
)
Get
(
k
string
)
(
string
,
error
)
{
v
,
err
:=
c
.
get
(
k
)
return
v
,
perrors
.
WithMessagef
(
err
,
"get key value (key %s)"
,
k
)
}
// Watch watches on spec key
func
(
c
*
Client
)
Watch
(
k
string
)
(
clientv3
.
WatchChan
,
error
)
{
wc
,
err
:=
c
.
watch
(
k
)
return
wc
,
perrors
.
WithMessagef
(
err
,
"watch prefix (key %s)"
,
k
)
}
// WatchWithPrefix watches on spec prefix
func
(
c
*
Client
)
WatchWithPrefix
(
prefix
string
)
(
clientv3
.
WatchChan
,
error
)
{
wc
,
err
:=
c
.
watchWithPrefix
(
prefix
)
return
wc
,
perrors
.
WithMessagef
(
err
,
"watch prefix (key %s)"
,
prefix
)
}
remoting/etcd3/client_test.go
0 → 100644
View file @
62b6716d
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
etcdv3
import
(
"net/url"
"os"
"path"
"reflect"
"strings"
"sync"
"testing"
"time"
)
import
(
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/mvcc/mvccpb"
perrors
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc/connectivity"
)
const
defaultEtcdV3WorkDir
=
"/tmp/default-dubbo-go-remote.etcd"
// tests dataset
var
tests
=
[]
struct
{
input
struct
{
k
string
v
string
}
}{
{
input
:
struct
{
k
string
v
string
}{
k
:
"name"
,
v
:
"scott.wang"
}},
{
input
:
struct
{
k
string
v
string
}{
k
:
"namePrefix"
,
v
:
"prefix.scott.wang"
}},
{
input
:
struct
{
k
string
v
string
}{
k
:
"namePrefix1"
,
v
:
"prefix1.scott.wang"
}},
{
input
:
struct
{
k
string
v
string
}{
k
:
"age"
,
v
:
"27"
}},
}
// test dataset prefix
const
prefix
=
"name"
type
ClientTestSuite
struct
{
suite
.
Suite
etcdConfig
struct
{
name
string
endpoints
[]
string
timeout
time
.
Duration
heartbeat
int
}
etcd
*
embed
.
Etcd
client
*
Client
}
// start etcd server
func
(
suite
*
ClientTestSuite
)
SetupSuite
()
{
t
:=
suite
.
T
()
DefaultListenPeerURLs
:=
"http://localhost:2382"
DefaultListenClientURLs
:=
"http://localhost:2381"
lpurl
,
_
:=
url
.
Parse
(
DefaultListenPeerURLs
)
lcurl
,
_
:=
url
.
Parse
(
DefaultListenClientURLs
)
cfg
:=
embed
.
NewConfig
()
cfg
.
LPUrls
=
[]
url
.
URL
{
*
lpurl
}
cfg
.
LCUrls
=
[]
url
.
URL
{
*
lcurl
}
cfg
.
Dir
=
defaultEtcdV3WorkDir
e
,
err
:=
embed
.
StartEtcd
(
cfg
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
select
{
case
<-
e
.
Server
.
ReadyNotify
()
:
t
.
Log
(
"Server is ready!"
)
case
<-
time
.
After
(
60
*
time
.
Second
)
:
e
.
Server
.
Stop
()
// trigger a shutdown
t
.
Logf
(
"Server took too long to start!"
)
}
suite
.
etcd
=
e
return
}
// stop etcd server
func
(
suite
*
ClientTestSuite
)
TearDownSuite
()
{
suite
.
etcd
.
Close
()
if
err
:=
os
.
RemoveAll
(
defaultEtcdV3WorkDir
);
err
!=
nil
{
suite
.
FailNow
(
err
.
Error
())
}
}
func
(
suite
*
ClientTestSuite
)
setUpClient
()
*
Client
{
c
,
err
:=
NewClient
(
suite
.
etcdConfig
.
name
,
suite
.
etcdConfig
.
endpoints
,
suite
.
etcdConfig
.
timeout
,
suite
.
etcdConfig
.
heartbeat
)
if
err
!=
nil
{
suite
.
T
()
.
Fatal
(
err
)
}
return
c
}
// set up a client for suite
func
(
suite
*
ClientTestSuite
)
SetupTest
()
{
c
:=
suite
.
setUpClient
()
c
.
CleanKV
()
suite
.
client
=
c
return
}
func
(
suite
*
ClientTestSuite
)
TestClientClose
()
{
c
:=
suite
.
client
t
:=
suite
.
T
()
defer
c
.
Close
()
if
c
.
rawClient
.
ActiveConnection
()
.
GetState
()
!=
connectivity
.
Ready
{
t
.
Fatal
(
suite
.
client
.
rawClient
.
ActiveConnection
()
.
GetState
())
}
}
func
(
suite
*
ClientTestSuite
)
TestClientValid
()
{
c
:=
suite
.
client
t
:=
suite
.
T
()
if
!
c
.
Valid
()
{
t
.
Fatal
(
"client is not valid"
)
}
c
.
Close
()
if
suite
.
client
.
Valid
()
!=
false
{
t
.
Fatal
(
"client is valid"
)
}
}
func
(
suite
*
ClientTestSuite
)
TestClientDone
()
{
c
:=
suite
.
client
go
func
()
{
time
.
Sleep
(
2
*
time
.
Second
)
c
.
Close
()
}()
c
.
Wait
.
Wait
()
if
c
.
Valid
()
{
suite
.
T
()
.
Fatal
(
"client should be invalid then"
)
}
}
func
(
suite
*
ClientTestSuite
)
TestClientCreateKV
()
{
tests
:=
tests
c
:=
suite
.
client
t
:=
suite
.
T
()
defer
suite
.
client
.
Close
()
for
_
,
tc
:=
range
tests
{
k
:=
tc
.
input
.
k
v
:=
tc
.
input
.
v
expect
:=
tc
.
input
.
v
if
err
:=
c
.
Create
(
k
,
v
);
err
!=
nil
{
t
.
Fatal
(
err
)
}
value
,
err
:=
c
.
Get
(
k
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
value
!=
expect
{
t
.
Fatalf
(
"expect %v but get %v"
,
expect
,
value
)
}
}
}
func
(
suite
*
ClientTestSuite
)
TestClientDeleteKV
()
{
tests
:=
tests
c
:=
suite
.
client
t
:=
suite
.
T
()
defer
c
.
Close
()
for
_
,
tc
:=
range
tests
{
k
:=
tc
.
input
.
k
v
:=
tc
.
input
.
v
expect
:=
ErrKVPairNotFound
if
err
:=
c
.
Create
(
k
,
v
);
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
err
:=
c
.
Delete
(
k
);
err
!=
nil
{
t
.
Fatal
(
err
)
}
_
,
err
:=
c
.
Get
(
k
)
if
perrors
.
Cause
(
err
)
==
expect
{
continue
}
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
}
}
func
(
suite
*
ClientTestSuite
)
TestClientGetChildrenKVList
()
{
tests
:=
tests
c
:=
suite
.
client
t
:=
suite
.
T
()
var
expectKList
[]
string
var
expectVList
[]
string
for
_
,
tc
:=
range
tests
{
k
:=
tc
.
input
.
k
v
:=
tc
.
input
.
v
if
strings
.
Contains
(
k
,
prefix
)
{
expectKList
=
append
(
expectKList
,
k
)
expectVList
=
append
(
expectVList
,
v
)
}
if
err
:=
c
.
Create
(
k
,
v
);
err
!=
nil
{
t
.
Fatal
(
err
)
}
}
kList
,
vList
,
err
:=
c
.
GetChildrenKVList
(
prefix
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
reflect
.
DeepEqual
(
expectKList
,
kList
)
&&
reflect
.
DeepEqual
(
expectVList
,
vList
)
{
return
}
t
.
Fatalf
(
"expect keylist %v but got %v expect valueList %v but got %v "
,
expectKList
,
kList
,
expectVList
,
vList
)
}
func
(
suite
*
ClientTestSuite
)
TestClientWatch
()
{
tests
:=
tests
c
:=
suite
.
client
t
:=
suite
.
T
()
wg
:=
sync
.
WaitGroup
{}
wg
.
Add
(
1
)
go
func
()
{
defer
wg
.
Done
()
wc
,
err
:=
c
.
watch
(
prefix
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
events
:=
make
([]
mvccpb
.
Event
,
0
)
var
eCreate
,
eDelete
mvccpb
.
Event
for
e
:=
range
wc
{
for
_
,
event
:=
range
e
.
Events
{
events
=
append
(
events
,
(
mvccpb
.
Event
)(
*
event
))
if
event
.
Type
==
mvccpb
.
PUT
{
eCreate
=
(
mvccpb
.
Event
)(
*
event
)
}
if
event
.
Type
==
mvccpb
.
DELETE
{
eDelete
=
(
mvccpb
.
Event
)(
*
event
)
}
t
.
Logf
(
"type IsCreate %v k %s v %s"
,
event
.
IsCreate
(),
event
.
Kv
.
Key
,
event
.
Kv
.
Value
)
}
}
assert
.
Equal
(
t
,
2
,
len
(
events
))
assert
.
Contains
(
t
,
events
,
eCreate
)
assert
.
Contains
(
t
,
events
,
eDelete
)
}()
for
_
,
tc
:=
range
tests
{
k
:=
tc
.
input
.
k
v
:=
tc
.
input
.
v
if
err
:=
c
.
Create
(
k
,
v
);
err
!=
nil
{
t
.
Fatal
(
err
)
}
if
err
:=
c
.
delete
(
k
);
err
!=
nil
{
t
.
Fatal
(
err
)
}
}
c
.
Close
()
wg
.
Wait
()
}
func
(
suite
*
ClientTestSuite
)
TestClientRegisterTemp
()
{
c
:=
suite
.
client
observeC
:=
suite
.
setUpClient
()
t
:=
suite
.
T
()
wg
:=
sync
.
WaitGroup
{}
wg
.
Add
(
1
)
go
func
()
{
defer
wg
.
Done
()
completePath
:=
path
.
Join
(
"scott"
,
"wang"
)
wc
,
err
:=
observeC
.
watch
(
completePath
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
events
:=
make
([]
mvccpb
.
Event
,
0
)
var
eCreate
,
eDelete
mvccpb
.
Event
for
e
:=
range
wc
{
for
_
,
event
:=
range
e
.
Events
{
events
=
append
(
events
,
(
mvccpb
.
Event
)(
*
event
))
if
event
.
Type
==
mvccpb
.
DELETE
{
eDelete
=
(
mvccpb
.
Event
)(
*
event
)
t
.
Logf
(
"complete key (%s) is delete"
,
completePath
)
observeC
.
Close
()
break
}
eCreate
=
(
mvccpb
.
Event
)(
*
event
)
t
.
Logf
(
"type IsCreate %v k %s v %s"
,
event
.
IsCreate
(),
event
.
Kv
.
Key
,
event
.
Kv
.
Value
)
}
}
assert
.
Equal
(
t
,
2
,
len
(
events
))
assert
.
Contains
(
t
,
events
,
eCreate
)
assert
.
Contains
(
t
,
events
,
eDelete
)
}()
err
:=
c
.
RegisterTemp
(
"scott/wang"
,
"test"
)
if
err
!=
nil
{
t
.
Fatal
(
err
)
}
time
.
Sleep
(
2
*
time
.
Second
)
c
.
Close
()
wg
.
Wait
()
}
func
TestClientSuite
(
t
*
testing
.
T
)
{
suite
.
Run
(
t
,
&
ClientTestSuite
{
etcdConfig
:
struct
{
name
string
endpoints
[]
string
timeout
time
.
Duration
heartbeat
int
}{
name
:
"test"
,
endpoints
:
[]
string
{
"localhost:2381"
},
timeout
:
time
.
Second
,
heartbeat
:
1
,
},
})
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment