-
Notifications
You must be signed in to change notification settings - Fork 26
/
txn.h
164 lines (132 loc) · 4.47 KB
/
txn.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#pragma once
#include <stdint.h>
#include <sys/types.h>
#include <vector>
#include "dbcore/xid.h"
#include "dbcore/sm-config.h"
#include "dbcore/sm-oid.h"
#include "dbcore/sm-log.h"
#include "dbcore/sm-object.h"
#include "dbcore/sm-rc.h"
#include "masstree/masstree_btree.h"
#include "macros.h"
#include "str_arena.h"
#include "tuple.h"
#include <sparsehash/dense_hash_map>
using google::dense_hash_map;
namespace ermia {
// A write-set entry is essentially a pointer to the OID array entry
// begin updated. The write-set is naturally de-duplicated: repetitive
// updates will leave only one entry by the first update. Dereferencing
// the entry pointer results a fat_ptr to the new object.
struct write_record_t {
fat_ptr *entry;
write_record_t(fat_ptr *entry) : entry(entry) {}
write_record_t() : entry(nullptr) {}
inline Object *get_object() { return (Object *)entry->offset(); }
};
struct write_set_t {
static const uint32_t kMaxEntries = 256;
uint32_t num_entries;
write_record_t entries[kMaxEntries];
write_set_t() : num_entries(0) {}
inline void emplace_back(fat_ptr *oe) {
ALWAYS_ASSERT(num_entries < kMaxEntries);
new (&entries[num_entries]) write_record_t(oe);
++num_entries;
ASSERT(entries[num_entries - 1].entry == oe);
}
inline uint32_t size() { return num_entries; }
inline void clear() { num_entries = 0; }
inline write_record_t &operator[](uint32_t idx) { return entries[idx]; }
};
class transaction {
friend class ConcurrentMasstreeIndex;
friend class sm_oid_mgr;
public:
typedef TXN::txn_state txn_state;
#if defined(SSN) || defined(SSI) || defined(MVOCC)
typedef std::vector<dbtuple *> read_set_t;
#endif
enum {
// use the low-level scan protocol for checking scan consistency,
// instead of keeping track of absent ranges
TXN_FLAG_LOW_LEVEL_SCAN = 0x1,
// true to mark a read-only transaction- if a txn marked read-only
// does a write, it is aborted. SSN uses it to implement to safesnap.
// No bookeeping is done with SSN if this is enable for a tx.
TXN_FLAG_READ_ONLY = 0x2,
TXN_FLAG_READ_MOSTLY = 0x3,
// A redo transaction running on a backup server using command logging.
TXN_FLAG_CMD_REDO = 0x4,
};
inline bool is_read_mostly() { return flags & TXN_FLAG_READ_MOSTLY; }
inline bool is_read_only() { return flags & TXN_FLAG_READ_ONLY; }
protected:
inline txn_state state() const { return xc->state; }
// the absent set is a mapping from (masstree node -> version_number).
typedef dense_hash_map<const ConcurrentMasstree::node_opaque_t *, uint64_t > MasstreeAbsentSet;
MasstreeAbsentSet masstree_absent_set;
public:
transaction(uint64_t flags, str_arena &sa);
~transaction();
void initialize_read_write();
inline void ensure_active() {
volatile_write(xc->state, TXN::TXN_ACTIVE);
ASSERT(state() == TXN::TXN_ACTIVE);
}
rc_t commit();
#ifdef SSN
rc_t parallel_ssn_commit();
rc_t ssn_read(dbtuple *tuple);
#elif defined SSI
rc_t parallel_ssi_commit();
rc_t ssi_read(dbtuple *tuple);
#elif defined MVOCC
rc_t mvocc_commit();
rc_t mvocc_read(dbtuple *tuple);
#else
rc_t si_commit();
#endif
bool MasstreeCheckPhantom();
void Abort();
OID PrepareInsert(OrderedIndex *index, varstr *value, dbtuple **out_tuple);
void FinishInsert(OrderedIndex *index, OID oid, const varstr *key, varstr *value, dbtuple *tuple);
bool TryInsertNewTuple(OrderedIndex *index, const varstr *key,
varstr *value, OID *inserted_oid);
rc_t Update(IndexDescriptor *index_desc, OID oid, const varstr *k, varstr *v);
public:
// Reads the contents of tuple into v within this transaction context
rc_t DoTupleRead(dbtuple *tuple, varstr *out_v);
// expected public overrides
inline str_arena &string_allocator() { return *sa; }
#if defined(SSN) || defined(SSI) || defined(MVOCC)
inline read_set_t &GetReadSet() {
thread_local read_set_t read_set;
return read_set;
}
#endif
inline write_set_t &GetWriteSet() {
thread_local write_set_t write_set;
return write_set;
}
inline void add_to_write_set(fat_ptr *entry) {
#ifndef NDEBUG
auto &write_set = GetWriteSet();
for (uint32_t i = 0; i < write_set.size(); ++i) {
auto &w = write_set[i];
ASSERT(w.entry);
ASSERT(w.entry != entry);
}
#endif
GetWriteSet().emplace_back(entry);
}
inline TXN::xid_context *GetXIDContext() { return xc; }
protected:
const uint64_t flags;
XID xid;
TXN::xid_context *xc;
sm_tx_log *log;
str_arena *sa;
};
} // namespace ermia