Coverage Report

Created: 2020-05-07 18:36

/proc/self/cwd/c/merged.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
Copyright 2020 Google LLC
3
4
Use of this source code is governed by a BSD-style
5
license that can be found in the LICENSE file or at
6
https://developers.google.com/open-source/licenses/bsd
7
*/
8
9
#include "merged.h"
10
11
#include "system.h"
12
13
#include "constants.h"
14
#include "iter.h"
15
#include "pq.h"
16
#include "reader.h"
17
18
static int merged_iter_init(struct merged_iter *mi)
19
364
{
20
364
  int i = 0;
21
1.36k
  for (i = 0; i < mi->stack_len; i++) {
22
1.00k
    struct record rec = new_record(mi->typ);
23
1.00k
    int err = iterator_next(mi->stack[i], rec);
24
1.00k
    if (err < 0) {
25
0
      return err;
26
0
    }
27
1.00k
28
1.00k
    if (err > 0) {
29
494
      reftable_iterator_destroy(&mi->stack[i]);
30
494
      record_destroy(&rec);
31
510
    } else {
32
510
      struct pq_entry e = {
33
510
        .rec = rec,
34
510
        .index = i,
35
510
      };
36
510
      merged_iter_pqueue_add(&mi->pq, e);
37
510
    }
38
1.00k
  }
39
364
40
364
  return 0;
41
364
}
42
43
static void merged_iter_close(void *p)
44
364
{
45
364
  struct merged_iter *mi = (struct merged_iter *)p;
46
364
  int i = 0;
47
364
  merged_iter_pqueue_clear(&mi->pq);
48
1.36k
  for (i = 0; i < mi->stack_len; i++) {
49
1.00k
    reftable_iterator_destroy(&mi->stack[i]);
50
1.00k
  }
51
364
  reftable_free(mi->stack);
52
364
}
53
54
static int merged_iter_advance_subiter(struct merged_iter *mi, size_t idx)
55
670
{
56
670
  if (iterator_is_null(mi->stack[idx])) {
57
0
    return 0;
58
0
  }
59
670
60
670
  {
61
670
    struct record rec = new_record(mi->typ);
62
670
    struct pq_entry e = {
63
670
      .rec = rec,
64
670
      .index = idx,
65
670
    };
66
670
    int err = iterator_next(mi->stack[idx], rec);
67
670
    if (err < 0) {
68
0
      return err;
69
0
    }
70
670
71
670
    if (err > 0) {
72
204
      reftable_iterator_destroy(&mi->stack[idx]);
73
204
      record_destroy(&rec);
74
204
      return 0;
75
204
    }
76
466
77
466
    merged_iter_pqueue_add(&mi->pq, e);
78
466
  }
79
466
  return 0;
80
466
}
81
82
static int merged_iter_next_entry(struct merged_iter *mi, struct record rec)
83
666
{
84
666
  struct slice entry_key = { 0 };
85
666
  struct pq_entry entry = { 0 };
86
666
  int err = 0;
87
666
88
666
  if (merged_iter_pqueue_is_empty(mi->pq)) {
89
2
    return 1;
90
2
  }
91
664
92
664
  entry = merged_iter_pqueue_remove(&mi->pq);
93
664
  err = merged_iter_advance_subiter(mi, entry.index);
94
664
  if (err < 0) {
95
0
    return err;
96
0
  }
97
664
98
664
  /*
99
664
    One can also use reftable as datacenter-local storage, where the ref
100
664
    database is maintained in globally consistent database (eg.
101
664
    CockroachDB or Spanner). In this scenario, replication delays together
102
664
    with compaction may cause newer tables to contain older entries. In
103
664
    such a deployment, the loop below must be changed to collect all
104
664
    entries for the same key, and return new the newest one.
105
664
  */
106
664
  record_key(entry.rec, &entry_key);
107
670
  while (!merged_iter_pqueue_is_empty(mi->pq)) {
108
592
    struct pq_entry top = merged_iter_pqueue_top(mi->pq);
109
592
    struct slice k = { 0 };
110
592
    int err = 0, cmp = 0;
111
592
112
592
    record_key(top.rec, &k);
113
592
114
592
    cmp = slice_compare(k, entry_key);
115
592
    slice_clear(&k);
116
592
117
592
    if (cmp > 0) {
118
586
      break;
119
586
    }
120
6
121
6
    merged_iter_pqueue_remove(&mi->pq);
122
6
    err = merged_iter_advance_subiter(mi, top.index);
123
6
    if (err < 0) {
124
0
      return err;
125
0
    }
126
6
    record_destroy(&top.rec);
127
6
  }
128
664
129
664
  record_copy_from(rec, entry.rec, hash_size(mi->hash_id));
130
664
  record_destroy(&entry.rec);
131
664
  slice_clear(&entry_key);
132
664
  return 0;
133
664
}
134
135
static int merged_iter_next(struct merged_iter *mi, struct record rec)
136
664
{
137
666
  while (true) {
138
666
    int err = merged_iter_next_entry(mi, rec);
139
666
    if (err == 0 && mi->suppress_deletions &&
140
666
        record_is_deletion(rec)) {
141
2
      continue;
142
2
    }
143
664
144
664
    return err;
145
664
  }
146
664
}
147
148
static int merged_iter_next_void(void *p, struct record rec)
149
912
{
150
912
  struct merged_iter *mi = (struct merged_iter *)p;
151
912
  if (merged_iter_pqueue_is_empty(mi->pq)) {
152
248
    return 1;
153
248
  }
154
664
155
664
  return merged_iter_next(mi, rec);
156
664
}
157
158
struct reftable_iterator_vtable merged_iter_vtable = {
159
  .next = &merged_iter_next_void,
160
  .close = &merged_iter_close,
161
};
162
163
static void iterator_from_merged_iter(struct reftable_iterator *it,
164
              struct merged_iter *mi)
165
364
{
166
364
  assert(it->ops == NULL);
167
364
  it->iter_arg = mi;
168
364
  it->ops = &merged_iter_vtable;
169
364
}
170
171
int reftable_new_merged_table(struct reftable_merged_table **dest,
172
            struct reftable_reader **stack, int n,
173
            uint32_t hash_id)
174
276
{
175
276
  uint64_t last_max = 0;
176
276
  uint64_t first_min = 0;
177
276
  int i = 0;
178
1.08k
  for (i = 0; i < n; i++) {
179
812
    struct reftable_reader *r = stack[i];
180
812
    if (r->hash_id != hash_id) {
181
1
      return REFTABLE_FORMAT_ERROR;
182
1
    }
183
811
    if (i > 0 && last_max >= reftable_reader_min_update_index(r)) {
184
0
      return REFTABLE_FORMAT_ERROR;
185
0
    }
186
811
    if (i == 0) {
187
262
      first_min = reftable_reader_min_update_index(r);
188
262
    }
189
811
190
811
    last_max = reftable_reader_max_update_index(r);
191
811
  }
192
276
193
276
  {
194
275
    struct reftable_merged_table m = {
195
275
      .stack = stack,
196
275
      .stack_len = n,
197
275
      .min = first_min,
198
275
      .max = last_max,
199
275
      .hash_id = hash_id,
200
275
    };
201
275
202
275
    *dest = reftable_calloc(sizeof(struct reftable_merged_table));
203
275
    **dest = m;
204
275
  }
205
275
  return 0;
206
276
}
207
208
void reftable_merged_table_close(struct reftable_merged_table *mt)
209
15
{
210
15
  int i = 0;
211
29
  for (i = 0; i < mt->stack_len; i++) {
212
14
    reftable_reader_free(mt->stack[i]);
213
14
  }
214
15
  FREE_AND_NULL(mt->stack);
215
15
  mt->stack_len = 0;
216
15
}
217
218
/* clears the list of subtable, without affecting the readers themselves. */
219
void merged_table_clear(struct reftable_merged_table *mt)
220
535
{
221
535
  FREE_AND_NULL(mt->stack);
222
535
  mt->stack_len = 0;
223
535
}
224
225
void reftable_merged_table_free(struct reftable_merged_table *mt)
226
275
{
227
275
  if (mt == NULL) {
228
0
    return;
229
0
  }
230
275
  merged_table_clear(mt);
231
275
  reftable_free(mt);
232
275
}
233
234
uint64_t
235
reftable_merged_table_max_update_index(struct reftable_merged_table *mt)
236
0
{
237
0
  return mt->max;
238
0
}
239
240
uint64_t
241
reftable_merged_table_min_update_index(struct reftable_merged_table *mt)
242
0
{
243
0
  return mt->min;
244
0
}
245
246
int merged_table_seek_record(struct reftable_merged_table *mt,
247
           struct reftable_iterator *it, struct record rec)
248
364
{
249
364
  struct reftable_iterator *iters = reftable_calloc(
250
364
    sizeof(struct reftable_iterator) * mt->stack_len);
251
364
  struct merged_iter merged = {
252
364
    .stack = iters,
253
364
    .typ = record_type(rec),
254
364
    .hash_id = mt->hash_id,
255
364
    .suppress_deletions = mt->suppress_deletions,
256
364
  };
257
364
  int n = 0;
258
364
  int err = 0;
259
364
  int i = 0;
260
1.36k
  for (i = 0; i < mt->stack_len && err == 0; i++) {
261
1.00k
    int e = reader_seek(mt->stack[i], &iters[n], rec);
262
1.00k
    if (e < 0) {
263
0
      err = e;
264
0
    }
265
1.00k
    if (e == 0) {
266
1.00k
      n++;
267
1.00k
    }
268
1.00k
  }
269
364
  if (err < 0) {
270
0
    int i = 0;
271
0
    for (i = 0; i < n; i++) {
272
0
      reftable_iterator_destroy(&iters[i]);
273
0
    }
274
0
    reftable_free(iters);
275
0
    return err;
276
0
  }
277
364
278
364
  merged.stack_len = n;
279
364
  err = merged_iter_init(&merged);
280
364
  if (err < 0) {
281
0
    merged_iter_close(&merged);
282
0
    return err;
283
0
  }
284
364
285
364
  {
286
364
    struct merged_iter *p =
287
364
      reftable_malloc(sizeof(struct merged_iter));
288
364
    *p = merged;
289
364
    iterator_from_merged_iter(it, p);
290
364
  }
291
364
  return 0;
292
364
}
293
294
int reftable_merged_table_seek_ref(struct reftable_merged_table *mt,
295
           struct reftable_iterator *it,
296
           const char *name)
297
66
{
298
66
  struct reftable_ref_record ref = {
299
66
    .ref_name = (char *)name,
300
66
  };
301
66
  struct record rec = { 0 };
302
66
  record_from_ref(&rec, &ref);
303
66
  return merged_table_seek_record(mt, it, rec);
304
66
}
305
306
int reftable_merged_table_seek_log_at(struct reftable_merged_table *mt,
307
              struct reftable_iterator *it,
308
              const char *name, uint64_t update_index)
309
72
{
310
72
  struct reftable_log_record log = {
311
72
    .ref_name = (char *)name,
312
72
    .update_index = update_index,
313
72
  };
314
72
  struct record rec = { 0 };
315
72
  record_from_log(&rec, &log);
316
72
  return merged_table_seek_record(mt, it, rec);
317
72
}
318
319
int reftable_merged_table_seek_log(struct reftable_merged_table *mt,
320
           struct reftable_iterator *it,
321
           const char *name)
322
72
{
323
72
  uint64_t max = ~((uint64_t)0);
324
72
  return reftable_merged_table_seek_log_at(mt, it, name, max);
325
72
}