ETCD

概述

Etcd是一个golang编写的分布式、高可用的一致性键值存储系统,用于配置共享和服务发现等。它使用Raft一致性算法来保持集群数据的一致性,且客户端通过长连接watch功能,能够及时收到数据变化通知,相较于Zookeeper框架更加轻量化。以下是关于etcd的安装与使用方法的详细介绍。

节点配置

如果是单节点集群其实就可以不用进行配置,默认etcd的集群节点通信端口为2380,客户端访问端口为2379
若需要修改,则可以配置:/etc/default/etcd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 节点名称,默认为 "default" 
ETCD_NAME="etcd1"

# 数据目录,默认为 "${name}.etcd"
ETCD_DATA_DIR="/var/lib/etcd/default.etcd"

# 用于客户端连接的 URL。
ETCD_LISTEN_CLIENT_URLS="http://192.168.65.132:2379,http://127.0.0.1:2379"

# 用于客户端访问的公开,也就是提供服务的 URL
ETCD_ADVERTISE_CLIENT_URLS="http://192.168.65.132:2379,http://127.0.0.1:2379"

# 用于集群节点间通信的 URL。
ETCD_LISTEN_PEER_URLS="http://192.168.65.132:2380"
ETCD_INITIAL_ADVERTISE_PEER_URLS="http://192.168.65.132:2380"

# 心跳间隔时间-毫秒
ETCD_HEARTBEAT_INTERVAL=100

# 选举超时时间-毫秒
ETCD_ELECTION_TIMEOUT=1000
# 以下为集群配置,若无集群则需要注销
# 初始集群状态和配置--集群中所有节点
# ETCD_INITIAL_CLUSTER="etcd1=http://192.168.65.132:2380,etcd2=http://192.168.65.132:2381,etcd3=http://192.168.65.132:2382"

# 初始集群令牌-集群的ID
# ETCD_INITIAL_CLUSTER_TOKEN="etcd-cluster"
# ETCD_INITIAL_CLUSTER_STATE="new"

# 以下为安全配置,如果要求SSL连接etcd的话,把下面的配置启用,并修改文件路径
# ETCD_CERT_FILE="/etc/ssl/client.pem" #ETCD_KEY_FILE="/etc/ssl/client-key.pem" #ETCD_CLIENT_CERT_AUTH="true"
# ETCD_TRUSTED_CA_FILE="/etc/ssl/ca.pem"
# ETCD_AUTO_TLS="true" #ETCD_PEER_CERT_FILE="/etc/ssl/member.pem" #ETCD_PEER_KEY_FILE="/etc/ssl/member-key.pem" # ETCD_PEER_CLIENT_CERT_AUTH="false"
# ETCD_PEER_TRUSTED_CA_FILE="/etc/ssl/ca.pem"
# ETCD_PEER_AUTO_TLS="true"

搭建服务注册发现中心

使用Etcd作为服务注册发现中心,你需要定义服务的注册和发现逻辑。这通常涉及到以下几个操作:

  1. 服务注册:服务启动时,向Etcd注册自己的地址和端口。
  2. 服务发现:客户端通过Etcd获取服务的地址和端口,用于远程调用。
  3. 健康检查:服务定期向Etcd发送心跳,以维持其注册信息的有效性。
    etcd采用golang编写,v3版本通信采用grpc API,即(HTTP2+protobuf)

官方只维护了go语言版本的client库, 因此需要找到C/C++ 非官方的client 开发库:

etcd-cpp-apiv3

etcd-cpp-apiv3是一个etcd的C++版本客户端API。它依赖于mipsasm, boost, protobuf, gRPC, cpprestsdk等库。

命名空间:

1
namespace etcd 

客户端类与接口介绍

alt text

Value对象:存放键值对数据的对象。

1
2
3
4
5
6
7
class Value 
{
bool is_dir(); //判断是否是一个目录
std::string const& key() //键值对的key值
std::string const& as_string() //键值对的val值
int64_t lease() //用于创建租约的响应中,返回租约ID
}

etcd会监控所管理的数据的变化,一旦数据产生变化会通知客户端。在通知客户端的时候,会返回改变前的数据和改变后的数据

1
2
3
4
5
6
7
8
9
10
11
12
class Event 
{
enum class EventType
{
PUT, //键值对新增或数据发生改变
DELETE_,//键值对被删除
INVALID,
};
enum EventType event_type()
const Value& kv()
const Value& prev_kv()
}

Response对象:针对请求进行响应。

1
2
3
4
5
6
7
8
9
class Response 
{
bool is_ok()
std::string const& error_message()
Value const& value() //当前的数值或者一个请求的处理结果
Value const& prev_value() //之前的数值
Value const& value(int index)
std::vector<Event> const& events();//触发的事件
}

KeepAlive保活对象:一旦被析构,就无发保活,则租约数据失效被删除。

  • 本身提供一个获取租约的接口
  • 作用:针对一个租约可以不断进行续租,一直维护租约数据的有效性。
1
2
3
4
5
6
7
8
9
10
class KeepAlive 
{
KeepAlive(Client const& client, int ttl, int64_t lease_id = 0);

//返回租约ID
int64_t Lease();

//停止保活动作
void Cancel();
}

Client对象:客户端操作句柄对象

  • 提供了新增,获取数据的接口。
  • 提供了获取保活对象的接口,以及租约的接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
pplx::task 并行库异步结果对象 
阻塞方式 get(): 阻塞直到任务执行完成,并获取任务结果
非阻塞方式 wait(): 等待任务到达终止状态,然后返回任务状态

class Client
{
// etcd_url: "http://127.0.0.1:2379"
Client(std::string const& etcd_url, std::string const& load_balancer = "round_robin");

//Put a new key-value pair 新增一个键值对
pplx::task<Response> put(std::string const& key, std::string const& value);

//新增带有租约的键值对 (一定时间后,如果没有续租,数据自动删除)
pplx::task<Response> put(std::string const& key, std::string const& value, const int64_t leaseId);

//获取一个指定key目录下的数据列表
pplx::task<Response> ls(std::string const& key);

//创建并获取一个存活ttl时间的租约
pplx::task<Response> leasegrant(int ttl);

//获取一个租约保活对象,其参数ttl表示租约有效时间
pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int ttl);

//撤销一个指定的租约
pplx::task<Response> leaserevoke(int64_t lease_id);

//数据锁
pplx::task<Response> lock(std::string const& key);
}

Watcher对象:进行数据变化通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Watcher 
{
Watcher(Client const& client, std::string const& key, std::function<void(Response)> callback, bool recursive = false);

参数:
Client const& client:监控的客户端对象
std::string const& key:要监控的键值对key
std::function<void(Response)> callback:发生改变后的回调
bool recursive = false:是否递归监控目录下的所有数据改变

Watcher(std::string const& address, std::string const& key, std::function<void(Response)> callback, bool recursive = false);

//阻塞等待,直到监控任务被停止
bool Wait();
bool Cancel();
}

举例

get.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <iostream>
#include <thread>

#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Watcher.hpp>
#include <etcd/Value.hpp>

void callback(const etcd::Response& resp)
{
if(resp.is_ok() == false)
{
std::cout << "收到未知事件错误通知" << resp.error_message() << std::endl;
return;
}
for(auto const& ev : resp.events())
{
if(ev.event_type() == etcd::Event::EventType::PUT)
{
std::cout << "服务信息发生改变" << std::endl;
std::cout << "当前的值:" << ev.kv().key() << "-" << ev.kv().as_string() << std::endl;
std::cout << "原来的值:" << ev.prev_kv().key() << "-" << ev.prev_kv().as_string() << std::endl;
}
else if(ev.event_type() == etcd::Event::EventType::DELETE_)
{
std::cout << "服务信息下线被删除" << std::endl;
std::cout << "当前的值:" << ev.kv().key() << "-" << ev.kv().as_string() << std::endl;
std::cout << "原来的值:" << ev.prev_kv().key() << "-" << ev.prev_kv().as_string() << std::endl;
}
}
}

int main(int argc, char* argv[])
{
std::string etcd_host = "http://127.0.0.1:2379";
// 实例化客户端对象
etcd::Client client(etcd_host);
// 或取指定的键值对信息
auto resp = client.ls("/service").get();
if(resp.is_ok() == false)
{
std::cout << "获取数据失败" << resp.error_message() << std::endl;
return -1;
}
int sz = resp.keys().size();
for(int i = 0 ; i < sz ; i++)
std::cout << resp.value(i).as_string() << "可以提供" << resp.key(i) << "服务" << endl;

auto watcher = etcd::Watcher(client, "/service", callback, true);
watcher.Wait();

return 0;
}

put.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>
#include <thread>

#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>

int main(int argc, char* argv[])
{
std::string etcd_host = "http://127.0.0.1:2379";
// 实例化客户端对象
etcd::Client client(etcd_host);
// 获取租约保活对象,伴随着创建指定一个较长的租约
auto keep_alive = client.leasekeepalive(3).get();
// 获取租约ID
auto lease_id = keep_alive->Lease();
// 向Etcd新增数据
auto resp1 = client.put("/service/user", "127.0.0.1:8080", lease_id).get();
if(resp1.is_ok() == false)
{
std::cout << "新增数据失败" << resp1.error_message() << std::endl;
return -1;
}

// 向Etcd新增数据
auto resp2 = client.put("/service/friend", "127.0.0.1:9090", lease_id).get();
if(resp2.is_ok() == false)
{
std::cout << "新增数据失败" << resp2.error_message() << std::endl;
return -1;
}

std::this_thread::sleep_for(std::chrono::seconds(10));
return 0;
}

运行结果

10ms之后新增的键值对没有人再续租过期销毁,销毁过程中调用了回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
user@Ubuntu:~/build$ ./get.exe 
127.0.0.1:9090可以提供/service/friend服务
127.0.0.1:8080可以提供/service/user服务
服务信息下线被删除
当前的值:/service/friend-
原来的值:/service/friend-127.0.0.1:9090
服务信息下线被删除
当前的值:/service/user-
原来的值:/service/user-127.0.0.1:8080

user@Ubuntu:~/build$ ./put.exe
user@Ubuntu:~/build$