Commit 9f1daf17 authored by Randy's avatar Randy

do with revision

parent 755fff02
...@@ -37,6 +37,10 @@ var ( ...@@ -37,6 +37,10 @@ var (
// ErrKVPairNotFound not found key // ErrKVPairNotFound not found key
ErrKVPairNotFound = perrors.New("k/v pair not found") ErrKVPairNotFound = perrors.New("k/v pair not found")
ErrKVListSizeIllegal = perrors.New("k/v List is empty or kList size not equal to vList size") ErrKVListSizeIllegal = perrors.New("k/v List is empty or kList size not equal to vList size")
// ErrCompareFail txn compare fail
ErrCompareFail = perrors.New("txn compare fail")
// ErrRevision revision when error
ErrRevision int64 = -1
) )
// NewConfigClient create new Client // NewConfigClient create new Client
...@@ -212,23 +216,30 @@ func (c *Client) GetEndPoints() []string { ...@@ -212,23 +216,30 @@ func (c *Client) GetEndPoints() []string {
return c.endpoints return c.endpoints
} }
// if k not exist will put k/v in etcd, otherwise return error // if k not exist will put k/v in etcd, otherwise return ErrCompareFail
func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error { func (c *Client) create(k string, v string, opts ...clientv3.OpOption) error {
rawClient := c.GetRawClient() rawClient := c.GetRawClient()
if rawClient == nil { if rawClient == nil {
return ErrNilETCDV3Client return ErrNilETCDV3Client
} }
_, err := rawClient.Txn(c.ctx). resp, err := c.rawClient.Txn(c.ctx).
If(clientv3.Compare(clientv3.Version(k), "<", 1)). If(clientv3.Compare(clientv3.CreateRevision(k), "=", 0)).
Then(clientv3.OpPut(k, v, opts...)). Then(clientv3.OpPut(k, v, opts...)).
Commit() Commit()
if err != nil {
return err return err
}
if !resp.Succeeded {
return ErrCompareFail
}
return nil
} }
// if k in bulk insertion not exist all, then put all k/v in etcd, otherwise return error // if k in bulk insertion not exist all, then put all k/v in etcd, otherwise return error
func (c *Client) batchPut(kList []string, vList []string, opts ...clientv3.OpOption) error { func (c *Client) batchCreate(kList []string, vList []string, opts ...clientv3.OpOption) error {
rawClient := c.GetRawClient() rawClient := c.GetRawClient()
if rawClient == nil { if rawClient == nil {
...@@ -246,7 +257,7 @@ func (c *Client) batchPut(kList []string, vList []string, opts ...clientv3.OpOpt ...@@ -246,7 +257,7 @@ func (c *Client) batchPut(kList []string, vList []string, opts ...clientv3.OpOpt
for i, k := range kList { for i, k := range kList {
v := vList[i] v := vList[i]
cs = append(cs, clientv3.Compare(clientv3.Version(k), "<", 1)) cs = append(cs, clientv3.Compare(clientv3.CreateRevision(k), "=", 0))
ops = append(ops, clientv3.OpPut(k, v, opts...)) ops = append(ops, clientv3.OpPut(k, v, opts...))
} }
...@@ -257,20 +268,47 @@ func (c *Client) batchPut(kList []string, vList []string, opts ...clientv3.OpOpt ...@@ -257,20 +268,47 @@ func (c *Client) batchPut(kList []string, vList []string, opts ...clientv3.OpOpt
return err return err
} }
// if k not exist will put k/v in etcd // put k v into etcd
// if k is already exist in etcd, replace it func (c *Client) put(k string, v string, opts ...clientv3.OpOption) error {
func (c *Client) update(k string, v string, opts ...clientv3.OpOption) error {
rawClient := c.GetRawClient() rawClient := c.GetRawClient()
if rawClient == nil { if rawClient == nil {
return ErrNilETCDV3Client return ErrNilETCDV3Client
} }
_, err := rawClient.Txn(c.ctx). if c.rawClient == nil {
If(clientv3.Compare(clientv3.Version(k), "!=", -1)). return ErrNilETCDV3Client
}
_, err := c.rawClient.Put(c.ctx, k, v, opts...)
if err != nil {
return err
}
return nil
}
// if k not exist will put k/v in etcd
// if k is already exist in etcd and revision <= revision, replace it, otherwise return ErrCompareFail
func (c *Client) updateWithRev(k string, v string, rev int64, opts ...clientv3.OpOption) error {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return ErrNilETCDV3Client
}
resp, err := c.rawClient.Txn(c.ctx).
If(clientv3.Compare(clientv3.ModRevision(k), "=", rev)).
Then(clientv3.OpPut(k, v, opts...)). Then(clientv3.OpPut(k, v, opts...)).
Commit() Commit()
if err != nil {
return err return err
}
if !resp.Succeeded {
return ErrCompareFail
}
return nil
} }
func (c *Client) delete(k string) error { func (c *Client) delete(k string) error {
...@@ -303,6 +341,27 @@ func (c *Client) get(k string) (string, error) { ...@@ -303,6 +341,27 @@ func (c *Client) get(k string) (string, error) {
return string(resp.Kvs[0].Value), nil return string(resp.Kvs[0].Value), nil
} }
// getValAndRev get value and revision
func (c *Client) getValAndRev(k string) (string, int64, error) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.rawClient == nil {
return "", ErrRevision, ErrNilETCDV3Client
}
resp, err := c.rawClient.Get(c.ctx, k)
if err != nil {
return "", ErrRevision, err
}
if len(resp.Kvs) == 0 {
return "", ErrRevision, ErrKVPairNotFound
}
return string(resp.Kvs[0].Value), resp.Header.Revision, nil
}
// CleanKV delete all key and value // CleanKV delete all key and value
func (c *Client) CleanKV() error { func (c *Client) CleanKV() error {
rawClient := c.GetRawClient() rawClient := c.GetRawClient()
...@@ -341,24 +400,16 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) { ...@@ -341,24 +400,16 @@ func (c *Client) GetChildren(k string) ([]string, []string, error) {
return kList, vList, nil return kList, vList, nil
} }
func (c *Client) watchWithPrefix(prefix string) (clientv3.WatchChan, error) { // watchWithOption watch
rawClient := c.GetRawClient() func (c *Client) watchWithOption(k string, opts ...clientv3.OpOption) (clientv3.WatchChan, error) {
c.lock.RLock()
if rawClient == nil { defer c.lock.RUnlock()
return nil, ErrNilETCDV3Client
}
return rawClient.Watch(c.ctx, prefix, clientv3.WithPrefix()), nil
}
func (c *Client) watch(k string) (clientv3.WatchChan, error) {
rawClient := c.GetRawClient()
if rawClient == nil { if c.rawClient == nil {
return nil, ErrNilETCDV3Client return nil, ErrNilETCDV3Client
} }
return rawClient.Watch(c.ctx, k), nil return c.rawClient.Watch(c.ctx, k, opts...), nil
} }
func (c *Client) keepAliveKV(k string, v string) error { func (c *Client) keepAliveKV(k string, v string) error {
...@@ -407,19 +458,25 @@ func (c *Client) Valid() bool { ...@@ -407,19 +458,25 @@ func (c *Client) Valid() bool {
// Create key value ... // Create key value ...
func (c *Client) Create(k string, v string) error { func (c *Client) Create(k string, v string) error {
err := c.put(k, v) err := c.create(k, v)
return perrors.WithMessagef(err, "put k/v (key: %s value %s)", k, v) return perrors.WithMessagef(err, "put k/v (key: %s value %s)", k, v)
} }
// BatchCreate bulk insertion // BatchCreate bulk insertion
func (c *Client) BatchCreate(kList []string, vList []string) error { func (c *Client) BatchCreate(kList []string, vList []string) error {
err := c.batchPut(kList, vList) err := c.batchCreate(kList, vList)
return perrors.WithMessagef(err, "batch put k/v error ") return perrors.WithMessagef(err, "batch put k/v error ")
} }
// Update key value ... // Update key value ...
func (c *Client) Update(k, v string) error { func (c *Client) Update(k, v string) error {
err := c.update(k, v) err := c.put(k, v)
return perrors.WithMessagef(err, "Update k/v (key: %s value %s)", k, v)
}
// Update key value ...
func (c *Client) UpdateWithRev(k, v string, rev int64, opts ...clientv3.OpOption) error {
err := c.updateWithRev(k, v, rev, opts...)
return perrors.WithMessagef(err, "Update k/v (key: %s value %s)", k, v) return perrors.WithMessagef(err, "Update k/v (key: %s value %s)", k, v)
} }
...@@ -449,12 +506,18 @@ func (c *Client) Get(k string) (string, error) { ...@@ -449,12 +506,18 @@ func (c *Client) Get(k string) (string, error) {
// Watch watches on spec key // Watch watches on spec key
func (c *Client) Watch(k string) (clientv3.WatchChan, error) { func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
wc, err := c.watch(k) wc, err := c.watchWithOption(k)
return wc, perrors.WithMessagef(err, "watch prefix (key %s)", k) return wc, perrors.WithMessagef(err, "watch prefix (key %s)", k)
} }
// WatchWithPrefix watches on spec prefix // WatchWithPrefix watches on spec prefix
func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) { func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) {
wc, err := c.watchWithPrefix(prefix) wc, err := c.watchWithOption(prefix, clientv3.WithPrefix())
return wc, perrors.WithMessagef(err, "watch prefix (key %s)", prefix) return wc, perrors.WithMessagef(err, "watch prefix (key %s)", prefix)
} }
// Watch watches on spc key with OpOption
func (c *Client) WatchWithOption(k string, opts ...clientv3.OpOption) (clientv3.WatchChan, error) {
wc, err := c.watchWithOption(k, opts...)
return wc, perrors.WithMessagef(err, "watch prefix (key %s)", k)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment