-
Notifications
You must be signed in to change notification settings - Fork 5
/
rsm_client.cc
118 lines (98 loc) · 2.83 KB
/
rsm_client.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include "rsm_client.h"
#include <vector>
#include <string>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include "handle.h"
#include "slock.h"
#include "lang/verify.h"
rsm_client::rsm_client(std::string dst)
{
printf("create rsm_client\n");
std::vector<std::string> mems;
pthread_mutex_init(&rsm_client_mutex, NULL);
sockaddr_in dstsock;
make_sockaddr(dst.c_str(), &dstsock);
primary = dst;
{
ScopedLock ml(&rsm_client_mutex);
VERIFY(init_members());
}
printf("rsm_client: done\n");
}
// Called when the current primary is not responding, i.e. the primary
// is dead or in another network partition. In either case, we select
// another primary to contact with.
// Assumes caller holds rsm_client_mutex.
void
rsm_client::primary_failure()
{
primary = known_mems.back();
known_mems.pop_back();
}
rsm_protocol::status
rsm_client::invoke(int proc, std::string req, std::string &rep)
{
ScopedLock ml(&rsm_client_mutex);
int ret;
while (1) {
printf("rsm_client::invoke proc %x primary %s\n", proc, primary.c_str());
handle h(primary);
VERIFY(pthread_mutex_unlock(&rsm_client_mutex) == 0);
rpcc *cl = h.safebind();
if (cl) {
ret = cl->call(rsm_client_protocol::invoke, proc, req, rep, rpcc::to(5000));
}
VERIFY(pthread_mutex_lock(&rsm_client_mutex) == 0);
if (!cl) {
goto prim_fail;
}
printf("rsm_client::invoke proc %x primary %s ret %d\n", proc, primary.c_str(), ret);
if (ret == rsm_client_protocol::OK) {
break;
}
if (ret == rsm_client_protocol::BUSY) {
printf("rsm is busy %s\n", primary.c_str());
sleep(3);
continue;
}
if (ret == rsm_client_protocol::NOTPRIMARY) {
printf("primary %s isn't the primary--let's get a complete list of mems\n", primary.c_str());
if (init_members())
continue;
}
prim_fail:
printf("primary %s failed ret %d\n", primary.c_str(), ret);
primary_failure();
printf("rsm_client::invoke: retry new primary %s\n", primary.c_str());
}
return ret;
}
bool
rsm_client::init_members()
{
printf("rsm_client::init_members get members!\n");
handle h(primary);
std::vector<std::string> new_view;
int ret;
VERIFY(pthread_mutex_unlock(&rsm_client_mutex) == 0);
rpcc *cl = h.safebind();
if (cl) {
ret = cl->call(rsm_client_protocol::members, 0, new_view, rpcc::to(1000));
}
VERIFY(pthread_mutex_lock(&rsm_client_mutex) == 0);
if (cl == 0 || ret != rsm_protocol::OK) {
printf("rsm_client::init_members failed, cl %ld, ret %d\n", (intptr_t) cl, ret);
return false;
}
if (new_view.size() < 1) {
printf("rsm_client::init_members do not know any members!\n");
VERIFY(0);
}
known_mems = new_view;
primary = known_mems.back();
known_mems.pop_back();
printf("rsm_client::init_members: primary %s\n", primary.c_str());
return true;
}