/proc/self/cwd/c/merged.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | Copyright 2020 Google LLC |
3 | | |
4 | | Use of this source code is governed by a BSD-style |
5 | | license that can be found in the LICENSE file or at |
6 | | https://developers.google.com/open-source/licenses/bsd |
7 | | */ |
8 | | |
9 | | #include "merged.h" |
10 | | |
11 | | #include "system.h" |
12 | | |
13 | | #include "constants.h" |
14 | | #include "iter.h" |
15 | | #include "pq.h" |
16 | | #include "reader.h" |
17 | | |
18 | | static int merged_iter_init(struct merged_iter *mi) |
19 | 364 | { |
20 | 364 | int i = 0; |
21 | 1.36k | for (i = 0; i < mi->stack_len; i++) { |
22 | 1.00k | struct record rec = new_record(mi->typ); |
23 | 1.00k | int err = iterator_next(mi->stack[i], rec); |
24 | 1.00k | if (err < 0) { |
25 | 0 | return err; |
26 | 0 | } |
27 | 1.00k | |
28 | 1.00k | if (err > 0) { |
29 | 494 | reftable_iterator_destroy(&mi->stack[i]); |
30 | 494 | record_destroy(&rec); |
31 | 510 | } else { |
32 | 510 | struct pq_entry e = { |
33 | 510 | .rec = rec, |
34 | 510 | .index = i, |
35 | 510 | }; |
36 | 510 | merged_iter_pqueue_add(&mi->pq, e); |
37 | 510 | } |
38 | 1.00k | } |
39 | 364 | |
40 | 364 | return 0; |
41 | 364 | } |
42 | | |
43 | | static void merged_iter_close(void *p) |
44 | 364 | { |
45 | 364 | struct merged_iter *mi = (struct merged_iter *)p; |
46 | 364 | int i = 0; |
47 | 364 | merged_iter_pqueue_clear(&mi->pq); |
48 | 1.36k | for (i = 0; i < mi->stack_len; i++) { |
49 | 1.00k | reftable_iterator_destroy(&mi->stack[i]); |
50 | 1.00k | } |
51 | 364 | reftable_free(mi->stack); |
52 | 364 | } |
53 | | |
54 | | static int merged_iter_advance_subiter(struct merged_iter *mi, size_t idx) |
55 | 670 | { |
56 | 670 | if (iterator_is_null(mi->stack[idx])) { |
57 | 0 | return 0; |
58 | 0 | } |
59 | 670 | |
60 | 670 | { |
61 | 670 | struct record rec = new_record(mi->typ); |
62 | 670 | struct pq_entry e = { |
63 | 670 | .rec = rec, |
64 | 670 | .index = idx, |
65 | 670 | }; |
66 | 670 | int err = iterator_next(mi->stack[idx], rec); |
67 | 670 | if (err < 0) { |
68 | 0 | return err; |
69 | 0 | } |
70 | 670 | |
71 | 670 | if (err > 0) { |
72 | 204 | reftable_iterator_destroy(&mi->stack[idx]); |
73 | 204 | record_destroy(&rec); |
74 | 204 | return 0; |
75 | 204 | } |
76 | 466 | |
77 | 466 | merged_iter_pqueue_add(&mi->pq, e); |
78 | 466 | } |
79 | 466 | return 0; |
80 | 466 | } |
81 | | |
82 | | static int merged_iter_next_entry(struct merged_iter *mi, struct record rec) |
83 | 666 | { |
84 | 666 | struct slice entry_key = { 0 }; |
85 | 666 | struct pq_entry entry = { 0 }; |
86 | 666 | int err = 0; |
87 | 666 | |
88 | 666 | if (merged_iter_pqueue_is_empty(mi->pq)) { |
89 | 2 | return 1; |
90 | 2 | } |
91 | 664 | |
92 | 664 | entry = merged_iter_pqueue_remove(&mi->pq); |
93 | 664 | err = merged_iter_advance_subiter(mi, entry.index); |
94 | 664 | if (err < 0) { |
95 | 0 | return err; |
96 | 0 | } |
97 | 664 | |
98 | 664 | /* |
99 | 664 | One can also use reftable as datacenter-local storage, where the ref |
100 | 664 | database is maintained in globally consistent database (eg. |
101 | 664 | CockroachDB or Spanner). In this scenario, replication delays together |
102 | 664 | with compaction may cause newer tables to contain older entries. In |
103 | 664 | such a deployment, the loop below must be changed to collect all |
104 | 664 | entries for the same key, and return new the newest one. |
105 | 664 | */ |
106 | 664 | record_key(entry.rec, &entry_key); |
107 | 670 | while (!merged_iter_pqueue_is_empty(mi->pq)) { |
108 | 592 | struct pq_entry top = merged_iter_pqueue_top(mi->pq); |
109 | 592 | struct slice k = { 0 }; |
110 | 592 | int err = 0, cmp = 0; |
111 | 592 | |
112 | 592 | record_key(top.rec, &k); |
113 | 592 | |
114 | 592 | cmp = slice_compare(k, entry_key); |
115 | 592 | slice_clear(&k); |
116 | 592 | |
117 | 592 | if (cmp > 0) { |
118 | 586 | break; |
119 | 586 | } |
120 | 6 | |
121 | 6 | merged_iter_pqueue_remove(&mi->pq); |
122 | 6 | err = merged_iter_advance_subiter(mi, top.index); |
123 | 6 | if (err < 0) { |
124 | 0 | return err; |
125 | 0 | } |
126 | 6 | record_destroy(&top.rec); |
127 | 6 | } |
128 | 664 | |
129 | 664 | record_copy_from(rec, entry.rec, hash_size(mi->hash_id)); |
130 | 664 | record_destroy(&entry.rec); |
131 | 664 | slice_clear(&entry_key); |
132 | 664 | return 0; |
133 | 664 | } |
134 | | |
135 | | static int merged_iter_next(struct merged_iter *mi, struct record rec) |
136 | 664 | { |
137 | 666 | while (true) { |
138 | 666 | int err = merged_iter_next_entry(mi, rec); |
139 | 666 | if (err == 0 && mi->suppress_deletions && |
140 | 666 | record_is_deletion(rec)) { |
141 | 2 | continue; |
142 | 2 | } |
143 | 664 | |
144 | 664 | return err; |
145 | 664 | } |
146 | 664 | } |
147 | | |
148 | | static int merged_iter_next_void(void *p, struct record rec) |
149 | 912 | { |
150 | 912 | struct merged_iter *mi = (struct merged_iter *)p; |
151 | 912 | if (merged_iter_pqueue_is_empty(mi->pq)) { |
152 | 248 | return 1; |
153 | 248 | } |
154 | 664 | |
155 | 664 | return merged_iter_next(mi, rec); |
156 | 664 | } |
157 | | |
158 | | struct reftable_iterator_vtable merged_iter_vtable = { |
159 | | .next = &merged_iter_next_void, |
160 | | .close = &merged_iter_close, |
161 | | }; |
162 | | |
163 | | static void iterator_from_merged_iter(struct reftable_iterator *it, |
164 | | struct merged_iter *mi) |
165 | 364 | { |
166 | 364 | assert(it->ops == NULL); |
167 | 364 | it->iter_arg = mi; |
168 | 364 | it->ops = &merged_iter_vtable; |
169 | 364 | } |
170 | | |
171 | | int reftable_new_merged_table(struct reftable_merged_table **dest, |
172 | | struct reftable_reader **stack, int n, |
173 | | uint32_t hash_id) |
174 | 276 | { |
175 | 276 | uint64_t last_max = 0; |
176 | 276 | uint64_t first_min = 0; |
177 | 276 | int i = 0; |
178 | 1.08k | for (i = 0; i < n; i++) { |
179 | 812 | struct reftable_reader *r = stack[i]; |
180 | 812 | if (r->hash_id != hash_id) { |
181 | 1 | return REFTABLE_FORMAT_ERROR; |
182 | 1 | } |
183 | 811 | if (i > 0 && last_max >= reftable_reader_min_update_index(r)) { |
184 | 0 | return REFTABLE_FORMAT_ERROR; |
185 | 0 | } |
186 | 811 | if (i == 0) { |
187 | 262 | first_min = reftable_reader_min_update_index(r); |
188 | 262 | } |
189 | 811 | |
190 | 811 | last_max = reftable_reader_max_update_index(r); |
191 | 811 | } |
192 | 276 | |
193 | 276 | { |
194 | 275 | struct reftable_merged_table m = { |
195 | 275 | .stack = stack, |
196 | 275 | .stack_len = n, |
197 | 275 | .min = first_min, |
198 | 275 | .max = last_max, |
199 | 275 | .hash_id = hash_id, |
200 | 275 | }; |
201 | 275 | |
202 | 275 | *dest = reftable_calloc(sizeof(struct reftable_merged_table)); |
203 | 275 | **dest = m; |
204 | 275 | } |
205 | 275 | return 0; |
206 | 276 | } |
207 | | |
208 | | void reftable_merged_table_close(struct reftable_merged_table *mt) |
209 | 15 | { |
210 | 15 | int i = 0; |
211 | 29 | for (i = 0; i < mt->stack_len; i++) { |
212 | 14 | reftable_reader_free(mt->stack[i]); |
213 | 14 | } |
214 | 15 | FREE_AND_NULL(mt->stack); |
215 | 15 | mt->stack_len = 0; |
216 | 15 | } |
217 | | |
218 | | /* clears the list of subtable, without affecting the readers themselves. */ |
219 | | void merged_table_clear(struct reftable_merged_table *mt) |
220 | 535 | { |
221 | 535 | FREE_AND_NULL(mt->stack); |
222 | 535 | mt->stack_len = 0; |
223 | 535 | } |
224 | | |
225 | | void reftable_merged_table_free(struct reftable_merged_table *mt) |
226 | 275 | { |
227 | 275 | if (mt == NULL) { |
228 | 0 | return; |
229 | 0 | } |
230 | 275 | merged_table_clear(mt); |
231 | 275 | reftable_free(mt); |
232 | 275 | } |
233 | | |
234 | | uint64_t |
235 | | reftable_merged_table_max_update_index(struct reftable_merged_table *mt) |
236 | 0 | { |
237 | 0 | return mt->max; |
238 | 0 | } |
239 | | |
240 | | uint64_t |
241 | | reftable_merged_table_min_update_index(struct reftable_merged_table *mt) |
242 | 0 | { |
243 | 0 | return mt->min; |
244 | 0 | } |
245 | | |
246 | | int merged_table_seek_record(struct reftable_merged_table *mt, |
247 | | struct reftable_iterator *it, struct record rec) |
248 | 364 | { |
249 | 364 | struct reftable_iterator *iters = reftable_calloc( |
250 | 364 | sizeof(struct reftable_iterator) * mt->stack_len); |
251 | 364 | struct merged_iter merged = { |
252 | 364 | .stack = iters, |
253 | 364 | .typ = record_type(rec), |
254 | 364 | .hash_id = mt->hash_id, |
255 | 364 | .suppress_deletions = mt->suppress_deletions, |
256 | 364 | }; |
257 | 364 | int n = 0; |
258 | 364 | int err = 0; |
259 | 364 | int i = 0; |
260 | 1.36k | for (i = 0; i < mt->stack_len && err == 0; i++) { |
261 | 1.00k | int e = reader_seek(mt->stack[i], &iters[n], rec); |
262 | 1.00k | if (e < 0) { |
263 | 0 | err = e; |
264 | 0 | } |
265 | 1.00k | if (e == 0) { |
266 | 1.00k | n++; |
267 | 1.00k | } |
268 | 1.00k | } |
269 | 364 | if (err < 0) { |
270 | 0 | int i = 0; |
271 | 0 | for (i = 0; i < n; i++) { |
272 | 0 | reftable_iterator_destroy(&iters[i]); |
273 | 0 | } |
274 | 0 | reftable_free(iters); |
275 | 0 | return err; |
276 | 0 | } |
277 | 364 | |
278 | 364 | merged.stack_len = n; |
279 | 364 | err = merged_iter_init(&merged); |
280 | 364 | if (err < 0) { |
281 | 0 | merged_iter_close(&merged); |
282 | 0 | return err; |
283 | 0 | } |
284 | 364 | |
285 | 364 | { |
286 | 364 | struct merged_iter *p = |
287 | 364 | reftable_malloc(sizeof(struct merged_iter)); |
288 | 364 | *p = merged; |
289 | 364 | iterator_from_merged_iter(it, p); |
290 | 364 | } |
291 | 364 | return 0; |
292 | 364 | } |
293 | | |
294 | | int reftable_merged_table_seek_ref(struct reftable_merged_table *mt, |
295 | | struct reftable_iterator *it, |
296 | | const char *name) |
297 | 66 | { |
298 | 66 | struct reftable_ref_record ref = { |
299 | 66 | .ref_name = (char *)name, |
300 | 66 | }; |
301 | 66 | struct record rec = { 0 }; |
302 | 66 | record_from_ref(&rec, &ref); |
303 | 66 | return merged_table_seek_record(mt, it, rec); |
304 | 66 | } |
305 | | |
306 | | int reftable_merged_table_seek_log_at(struct reftable_merged_table *mt, |
307 | | struct reftable_iterator *it, |
308 | | const char *name, uint64_t update_index) |
309 | 72 | { |
310 | 72 | struct reftable_log_record log = { |
311 | 72 | .ref_name = (char *)name, |
312 | 72 | .update_index = update_index, |
313 | 72 | }; |
314 | 72 | struct record rec = { 0 }; |
315 | 72 | record_from_log(&rec, &log); |
316 | 72 | return merged_table_seek_record(mt, it, rec); |
317 | 72 | } |
318 | | |
319 | | int reftable_merged_table_seek_log(struct reftable_merged_table *mt, |
320 | | struct reftable_iterator *it, |
321 | | const char *name) |
322 | 72 | { |
323 | 72 | uint64_t max = ~((uint64_t)0); |
324 | 72 | return reftable_merged_table_seek_log_at(mt, it, name, max); |
325 | 72 | } |