summaryrefslogtreecommitdiffstats
path: root/vendor/github.com/xordataexchange/crypt/backend/etcd/etcd.go
blob: 18f35510dd25151e6253f7fde29b4892efd71450 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package etcd

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/xordataexchange/crypt/backend"

	goetcd "github.com/coreos/etcd/client"
)

type Client struct {
	client    goetcd.Client
	keysAPI   goetcd.KeysAPI
	waitIndex uint64
}

func New(machines []string) (*Client, error) {
	newClient, err := goetcd.New(goetcd.Config{
		Endpoints: machines,
	})
	if err != nil {
		return nil, fmt.Errorf("creating new etcd client for crypt.backend.Client: %v", err)
	}
	keysAPI := goetcd.NewKeysAPI(newClient)
	return &Client{client: newClient, keysAPI: keysAPI, waitIndex: 0}, nil
}

func (c *Client) Get(key string) ([]byte, error) {
	return c.GetWithContext(context.TODO(), key)
}

func (c *Client) GetWithContext(ctx context.Context, key string) ([]byte, error) {
	resp, err := c.keysAPI.Get(ctx, key, nil)
	if err != nil {
		return nil, err
	}
	return []byte(resp.Node.Value), nil
}

func addKVPairs(node *goetcd.Node, list backend.KVPairs) backend.KVPairs {
	if node.Dir {
		for _, n := range node.Nodes {
			list = addKVPairs(n, list)
		}
		return list
	}
	return append(list, &backend.KVPair{Key: node.Key, Value: []byte(node.Value)})
}

func (c *Client) List(key string) (backend.KVPairs, error) {
	return c.ListWithContext(context.TODO(), key)
}

func (c *Client) ListWithContext(ctx context.Context, key string) (backend.KVPairs, error) {
	resp, err := c.keysAPI.Get(ctx, key, nil)
	if err != nil {
		return nil, err
	}
	if !resp.Node.Dir {
		return nil, errors.New("key is not a directory")
	}
	list := addKVPairs(resp.Node, nil)
	return list, nil
}

func (c *Client) Set(key string, value []byte) error {
	return c.SetWithContext(context.TODO(), key, value)
}

func (c *Client) SetWithContext(ctx context.Context, key string, value []byte) error {
	_, err := c.keysAPI.Set(ctx, key, string(value), nil)
	return err
}

func (c *Client) Watch(key string, stop chan bool) <-chan *backend.Response {
	return c.WatchWithContext(context.Background(), key, stop)
}

func (c *Client) WatchWithContext(ctx context.Context, key string, stop chan bool) <-chan *backend.Response {
	respChan := make(chan *backend.Response, 0)
	go func() {
		watcher := c.keysAPI.Watcher(key, nil)
		ctx, cancel := context.WithCancel(ctx)
		go func() {
			<-stop
			cancel()
		}()
		for {
			var resp *goetcd.Response
			var err error
			// if c.waitIndex == 0 {
			// 	resp, err = c.client.Get(key, false, false)
			// 	if err != nil {
			// 		respChan <- &backend.Response{nil, err}
			// 		time.Sleep(time.Second * 5)
			// 		continue
			// 	}
			// 	c.waitIndex = resp.EtcdIndex
			// 	respChan <- &backend.Response{[]byte(resp.Node.Value), nil}
			// }
			// resp, err = c.client.Watch(key, c.waitIndex+1, false, nil, stop)
			resp, err = watcher.Next(ctx)
			if err != nil {
				respChan <- &backend.Response{nil, err}
				time.Sleep(time.Second * 5)
				continue
			}
			c.waitIndex = resp.Node.ModifiedIndex
			respChan <- &backend.Response{[]byte(resp.Node.Value), nil}
		}
	}()
	return respChan
}