1 package org.apache.labs.bananadb.store;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 import org.apache.labs.bananadb.store.lock.Lock;
22 import org.apache.labs.bananadb.store.data.bananatrie.HashCodesPartition;
23 import org.apache.labs.bananadb.store.data.bananatrie.Hashtable;
24 import org.apache.labs.bananadb.store.data.bananatrie.KeysPartition;
25 import org.apache.labs.bananadb.store.data.bananatrie.ValuesPartition;
26 import org.apache.labs.bananadb.store.data.Metadata;
27 import org.apache.labs.bananadb.store.data.FileHandler;
28 import org.apache.labs.bananadb.store.sequence.SequenceManager;
29 import org.apache.labs.bananadb.store.sequence.FilebasedSequenceManager;
30
31 import java.io.File;
32 import java.io.IOException;
33 import java.util.ArrayList;
34 import java.util.Arrays;
35 import java.util.List;
36 import java.util.NoSuchElementException;
37
38 import org.apache.labs.bananadb.store.data.FileHandler.Posting;
39
40
41
42
43
44
45
46
47
48
49
50
51
52 public class Store {
53
54 private static final Log log = new Log(Store.class);
55
56 private Configuration configuration;
57 private SequenceManager sequenceManager;
58 private List<Accessor> accessors = new ArrayList<Accessor>();
59
60 public Store(File dataPath) throws IOException {
61 this(new Configuration(dataPath));
62 }
63
64 public Store(Configuration configuration) {
65 this.configuration = configuration;
66 }
67
68
69 public void open() throws IOException {
70 if (!getConfiguration().getDataPath().exists()) {
71 if (log.isInfo()) {
72 log.info("Creating directory " + getConfiguration().getDataPath().getAbsolutePath());
73 }
74 if (!getConfiguration().getDataPath().mkdirs()) {
75 throw new IOException("Could not create directory " + getConfiguration().getDataPath().getAbsolutePath());
76 }
77 }
78 File sequencePath = new File(configuration.getDataPath(), "seq");
79 sequenceManager = new FilebasedSequenceManager(sequencePath, configuration.getLockFactory(), configuration.getLockWaitTimeoutMilliseconds());
80 }
81
82 public SequenceManager getSequenceManager() {
83 return sequenceManager;
84 }
85
86
87 List<Accessor> getAccessors() {
88 return accessors;
89 }
90
91 public Accessor createAccessor(boolean readOnly) throws IOException {
92 Accessor accessor = new Accessor(this, readOnly);
93 accessors.add(accessor);
94 return accessor;
95 }
96
97 public void close() throws IOException {
98 log.info("Closing store..");
99 if (accessors.size() > 0) {
100 log.warn("There are " + accessors.size() + " open accessors. They will be closed.");
101 }
102 for (Accessor accessor : new ArrayList<Accessor>(accessors)) {
103 accessor.close();
104 }
105 sequenceManager.close();
106 log.info("Store has been closed.");
107
108 }
109
110 private void validateKey(byte[] key) {
111 if (key == null || key.length == 0) {
112 throw new IllegalArgumentException("Null key is not allowed");
113 }
114 }
115
116 public byte[] get(Accessor accessor, byte[] key, long hashCode) throws IOException {
117 return get(accessor, key, hashCode, Long.MAX_VALUE);
118 }
119
120 public byte[] get(Accessor accessor, byte[] key, long hashCode, long revision) throws IOException {
121
122 validateKey(key);
123
124 Hashtable hashtable = accessor.getHashtable();
125 Hashtable.Posting hashtablePosting = new Hashtable.Posting();
126 hashtable.readPosting(hashtablePosting, hashtable.calculateHashCodePostingOffset(hashCode));
127
128
129
130
131 HashCodesPartition hashCodesPartition = accessor.getHashCodesPartition(hashtablePosting.getHashCodePostingPartition());
132 HashCodesPartition.Posting hashCodePosting = new HashCodesPartition.Posting();
133 hashCodesPartition.readPosting(hashCodePosting, hashtablePosting.getHashCodePostingPartitionOffset());
134 if (hashCodePosting.getFlagForRevision(revision) != Posting.FLAG_IN_USE) {
135 return null;
136 }
137 while (true) {
138
139 if (hashCodePosting.getFlagForRevision(revision) == Posting.FLAG_IN_USE
140 && hashCode == hashCodePosting.getKeyHashCode()) {
141 break;
142 }
143
144 if (hashCodePosting.getNextPostingPartition() < 0) {
145 return null;
146 }
147 if (hashCodePosting.getNextPostingPartition() != hashCodesPartition.getPartitionId()) {
148 hashCodesPartition = accessor.getHashCodesPartition(hashCodePosting.getNextPostingPartition());
149 }
150 hashCodesPartition.readPosting(hashCodePosting, hashCodePosting.getNextPostingPartitionOffset());
151 }
152
153
154
155
156
157 KeysPartition.Posting keyPosting = new KeysPartition.Posting();
158
159 KeysPartition keysPartition = accessor.getKeysPartition(hashCodePosting.getFirstKeyPostingPartition());
160 keysPartition.readPosting(keyPosting, hashCodePosting.getFirstKeyPostingPartitionOffset());
161 while (true) {
162
163 if (keyPosting.getFlagForRevision(revision) == Posting.FLAG_IN_USE
164 && Arrays.equals(key, keyPosting.getBytes())) {
165 break;
166 }
167
168 if (keyPosting.getNextKeyPostingPartition() < 0) {
169 return null;
170 }
171 if (keyPosting.getNextKeyPostingPartition() != keysPartition.getPartitionId()) {
172 keysPartition = accessor.getKeysPartition(keyPosting.getNextKeyPostingPartition());
173 }
174 keysPartition.readPosting(keyPosting, keyPosting.getNextKeyPostingPartitionOffset());
175 }
176
177
178
179
180
181 ValuesPartition.Posting valuePosting = new ValuesPartition.Posting();
182
183 ValuesPartition valuesPartition = accessor.getValuesPartition(keyPosting.getValuePostingPartition());
184 valuesPartition.readPosting(valuePosting, keyPosting.getValuePostingPartitionOffset());
185
186 if (valuePosting.getBytesLength() == 0) {
187 return null;
188 }
189
190 return valuePosting.getBytes();
191 }
192
193
194
195
196
197
198
199
200
201
202
203
204
205 public byte[] put(final Accessor accessor, final byte[] key, final long hashCode, final byte[] value, final long revision) throws IOException {
206
207 validateKey(key);
208
209 Lock.With<byte[]> with = new Lock.With<byte[]>(accessor.getStoreWriteLock(), getConfiguration().getLockWaitTimeoutMilliseconds()) {
210 public byte[] doBody() throws IOException {
211 return doPut(accessor, key, hashCode, value, revision);
212 }
213 };
214 return with.run();
215 }
216
217
218
219
220
221
222
223
224
225
226
227
228
229 private byte[] doPut(final Accessor accessor, final byte[] key, final long hashCode, final byte[] value, final long revision) throws IOException {
230
231
232
233
234
235
236 int newValuePostingPartitionNumber;
237 int newValuePostingPartitionOffset;
238
239 ValuesPartition.Posting valuePosting = new ValuesPartition.Posting();
240 valuePosting.setFlag(FileHandler.Posting.FLAG_IN_USE);
241
242 valuePosting.setCreatedRevision(revision);
243 if (value == null || value.length == 0) {
244 valuePosting.setBytesLength(0);
245 valuePosting.setBytes(null);
246
247 newValuePostingPartitionNumber = -1;
248 newValuePostingPartitionOffset = -1;
249
250 } else {
251 valuePosting.setBytesLength(value.length);
252 valuePosting.setBytes(value);
253
254 Accessor.RequestPartitionWriterResponse<ValuesPartition> valueReservation = accessor.requestValueWrite(valuePosting);
255 newValuePostingPartitionNumber = valueReservation.getFileHandler().getPartitionId();
256 ValuesPartition valuesPartition = valueReservation.getFileHandler();
257 newValuePostingPartitionOffset = valueReservation.getStartOffset();
258
259
260 valuesPartition.writePosting(valuePosting, newValuePostingPartitionOffset);
261
262 }
263
264
265
266
267
268
269 KeysPartition.Posting newKeyPosting = new KeysPartition.Posting();
270 newKeyPosting.setCreatedRevision(revision);
271 newKeyPosting.setFlag(Posting.FLAG_IN_USE);
272 newKeyPosting.setBytes(key);
273 newKeyPosting.setBytesLength(key.length);
274 newKeyPosting.setKeyHashCode(hashCode);
275 newKeyPosting.setNextKeyPostingPartition(-1);
276 newKeyPosting.setNextKeyPostingPartitionOffset(-1);
277 newKeyPosting.setValuePostingPartition(newValuePostingPartitionNumber);
278 newKeyPosting.setValuePostingPartitionOffset(newValuePostingPartitionOffset);
279
280 Accessor.RequestPartitionWriterResponse<KeysPartition> keyReservation = accessor.requestValueWrite(newKeyPosting);
281 int newKeyPostingPartitionNumber = keyReservation.getFileHandler().getPartitionId();
282 KeysPartition newKeyPostingPartition = keyReservation.getFileHandler();
283 int newKeyPostingPartitionOffset = keyReservation.getStartOffset();
284
285
286
287
288
289
290
291
292
293 Hashtable hashtable = accessor.getHashtable();
294 Hashtable.Posting hashtablePosting = new Hashtable.Posting();
295
296 HashCodesPartition.Posting hashCodePosting = new HashCodesPartition.Posting();
297
298 int hashtablePostingOffset = hashtable.calculateHashCodePostingOffset(hashCode);
299 hashtable.getRAF().seek(hashtablePostingOffset);
300 byte flag = hashtable.getRAF().readByte();
301 if (flag == Posting.FLAG_NEVER_USED) {
302
303
304
305
306 newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
307
308
309 hashCodePosting.setCreatedRevision(revision);
310 hashCodePosting.setFlag(Posting.FLAG_IN_USE);
311 hashCodePosting.setFirstKeyPostingPartition(newKeyPostingPartitionNumber);
312 hashCodePosting.setFirstKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
313 hashCodePosting.setKeyHashCode(hashCode);
314 hashCodePosting.setNextPostingPartition(-1);
315 hashCodePosting.setNextPostingPartitionOffset(-1);
316
317 Accessor.RequestPartitionWriterResponse<HashCodesPartition> hashCodeReservation = accessor.requestValueWrite(hashCodePosting);
318 int newHashCodePostingPatition = hashCodeReservation.getFileHandler().getPartitionId();
319 HashCodesPartition hashCodesPartition = hashCodeReservation.getFileHandler();
320 int newHashCodePostingPatitionOffset = hashCodeReservation.getStartOffset();
321
322 hashCodesPartition.writePosting(hashCodePosting, newHashCodePostingPatitionOffset);
323
324
325
326 hashtablePosting.setFlag(Posting.FLAG_IN_USE);
327 hashtablePosting.setHashCodePostingPartition(newHashCodePostingPatition);
328 hashtablePosting.setHashCodePostingPartitionOffset(newHashCodePostingPatitionOffset);
329 hashtablePosting.setCreatedRevision(revision);
330 hashtable.writePosting(hashtablePosting, hashtablePostingOffset);
331
332
333 return null;
334
335 } else {
336
337
338
339 hashtable.readPosting(hashtablePosting, hashtablePostingOffset);
340
341
342
343
344
345 HashCodesPartition hashCodesPartition = accessor.getHashCodesPartition(hashtablePosting.getHashCodePostingPartition());
346 int currentHashCodesPostingPartitionOffset = hashtablePosting.getHashCodePostingPartitionOffset();
347 hashCodesPartition.readPosting(hashCodePosting, hashtablePosting.getHashCodePostingPartitionOffset());
348 while (true) {
349
350 if (hashCodePosting.getFlagForRevision(revision) == Posting.FLAG_IN_USE
351 && hashCode == hashCodePosting.getKeyHashCode()) {
352 break;
353 }
354
355 if (hashCodePosting.getNextPostingPartition() < 0) {
356
357
358
359 newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
360
361
362
363
364
365
366 HashCodesPartition.Posting newHashCodePosting = new HashCodesPartition.Posting();
367 newHashCodePosting.setCreatedRevision(revision);
368 newHashCodePosting.setFlag((byte) 1);
369 newHashCodePosting.setKeyHashCode(hashCode);
370 newHashCodePosting.setNextPostingPartition(-1);
371 newHashCodePosting.setNextPostingPartitionOffset(-1);
372 newHashCodePosting.setFirstKeyPostingPartition(newKeyPostingPartitionNumber);
373 newHashCodePosting.setFirstKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
374
375 Accessor.RequestPartitionWriterResponse<HashCodesPartition> hashCodeReservation = accessor.requestValueWrite(newHashCodePosting);
376
377 int newHashCodePostingPartition = hashCodeReservation.getFileHandler().getPartitionId();
378 int newHashCodePostingPartitionOffset = hashCodeReservation.getStartOffset();
379
380 hashCodeReservation.getFileHandler().writePosting(newHashCodePosting, newHashCodePostingPartitionOffset);
381
382
383
384
385
386
387 hashCodePosting.setNextPostingPartition(newHashCodePostingPartition);
388 hashCodePosting.setNextPostingPartitionOffset(newHashCodePostingPartitionOffset);
389 hashCodesPartition.writePosting(hashCodePosting, currentHashCodesPostingPartitionOffset);
390
391 return null;
392
393 }
394
395
396 if (hashCodePosting.getNextPostingPartition() != hashCodesPartition.getPartitionId()) {
397 hashCodesPartition = accessor.getHashCodesPartition(hashCodePosting.getNextPostingPartition());
398 }
399 currentHashCodesPostingPartitionOffset = hashCodePosting.getNextPostingPartitionOffset();
400 hashCodesPartition.readPosting(hashCodePosting, hashCodePosting.getNextPostingPartitionOffset());
401 }
402
403
404
405
406
407 KeysPartition currentKeyPostingPartition = accessor.getKeysPartition(hashCodePosting.getFirstKeyPostingPartition());
408 KeysPartition previousKeyPostingPartition = null;
409
410 KeysPartition.Posting currentKeyPosting = new KeysPartition.Posting();
411 KeysPartition.Posting previousKeyPosting = new KeysPartition.Posting();
412
413
414 int previousKeyPostingPartitionOffset = -1;
415
416 int currentKeyPostingPartitionOffset = hashCodePosting.getFirstKeyPostingPartitionOffset();
417 currentKeyPostingPartition.readPosting(currentKeyPosting, hashCodePosting.getFirstKeyPostingPartitionOffset());
418 while (true) {
419
420 if (currentKeyPosting.getFlagForRevision(revision) == Posting.FLAG_IN_USE
421 && Arrays.equals(key, currentKeyPosting.getBytes())) {
422 break;
423 }
424 if (currentKeyPosting.getNextKeyPostingPartition() < 0) {
425
426
427
428 currentKeyPosting.setNextKeyPostingPartition(newKeyPostingPartitionNumber);
429 currentKeyPosting.setNextKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
430 currentKeyPostingPartition.writePosting(currentKeyPosting, currentKeyPostingPartitionOffset);
431
432 newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
433
434 return null;
435 }
436
437 if (currentKeyPosting.getNextKeyPostingPartition() != currentKeyPostingPartition.getPartitionId()) {
438 currentKeyPostingPartition = accessor.getKeysPartition(currentKeyPosting.getNextKeyPostingPartition());
439 }
440
441 previousKeyPostingPartition = currentKeyPostingPartition;
442 previousKeyPostingPartitionOffset = currentKeyPostingPartitionOffset;
443
444 currentKeyPostingPartitionOffset = currentKeyPosting.getNextKeyPostingPartitionOffset();
445
446 currentKeyPostingPartition.readPosting(previousKeyPosting, currentKeyPosting.getNextKeyPostingPartitionOffset());
447 KeysPartition.Posting tmp = previousKeyPosting;
448 previousKeyPosting = currentKeyPosting;
449 currentKeyPosting = tmp;
450 }
451
452
453
454
455
456
457 if (previousKeyPostingPartition == null) {
458
459
460
461
462 if (configuration.isUsingDurablePostingLinks()) {
463 newKeyPosting.setNextKeyPostingPartition(currentKeyPostingPartition.getPartitionId());
464 newKeyPosting.setNextKeyPostingPartitionOffset(currentKeyPostingPartitionOffset);
465 }
466 newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
467
468
469
470
471 currentKeyPosting.setFlag(Posting.FLAG_DELETED);
472 currentKeyPosting.setDeletedRevision(revision);
473 assert currentKeyPosting.getNextKeyPostingPartition() == -1;
474 assert currentKeyPosting.getNextKeyPostingPartitionOffset() == -1;
475 currentKeyPostingPartition.writePosting(currentKeyPosting, currentKeyPostingPartitionOffset);
476
477 if (hashCodePosting.getFirstKeyPostingPartition() == currentKeyPostingPartition.getPartitionId()
478 && hashCodePosting.getFirstKeyPostingPartitionOffset() == currentKeyPostingPartitionOffset) {
479
480
481
482
483 hashCodePosting.setFirstKeyPostingPartition(newKeyPostingPartitionNumber);
484 hashCodePosting.setFirstKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
485 hashCodesPartition.writePosting(hashCodePosting, currentHashCodesPostingPartitionOffset);
486 }
487
488 } else {
489
490
491
492 if (configuration.isUsingDurablePostingLinks()) {
493
494
495
496
497
498
499
500
501
502
503
504 newKeyPosting.setNextKeyPostingPartition(currentKeyPostingPartition.getPartitionId());
505 newKeyPosting.setNextKeyPostingPartitionOffset(currentHashCodesPostingPartitionOffset);
506
507
508 previousKeyPosting.setNextKeyPostingPartition(newKeyPostingPartition.getPartitionId());
509 previousKeyPosting.setNextKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
510
511
512 currentKeyPosting.setFlag(Posting.FLAG_DELETED);
513 currentKeyPosting.setDeletedRevision(revision);
514
515
516
517
518
519
520
521
522
523
524 if (previousKeyPosting.getFlag() == Posting.FLAG_DELETED
525 && hashCodePosting.getFirstKeyPostingPartition() == previousKeyPostingPartition.getPartitionId()
526 && hashCodePosting.getFirstKeyPostingPartitionOffset() == previousKeyPostingPartitionOffset) {
527
528
529 newKeyPosting.setNextKeyPostingPartition(previousKeyPostingPartition.getPartitionId());
530 newKeyPosting.setNextKeyPostingPartitionOffset(previousKeyPostingPartitionOffset);
531 newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
532
533
534 currentKeyPostingPartition.markPostingAsDeleted(currentHashCodesPostingPartitionOffset, revision);
535
536
537 hashCodePosting.setFirstKeyPostingPartition(newKeyPostingPartitionNumber);
538 hashCodePosting.setFirstKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
539 hashCodesPartition.writePosting(hashCodePosting, hashtablePostingOffset);
540
541 } else {
542
543
544 newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
545 currentKeyPostingPartition.writePosting(currentKeyPosting, currentKeyPostingPartitionOffset);
546 previousKeyPostingPartition.writePosting(previousKeyPosting, previousKeyPostingPartitionOffset);
547 }
548
549 } else {
550
551
552
553
554
555 if (previousKeyPosting.getFlag() == Posting.FLAG_DELETED
556 && hashCodePosting.getFirstKeyPostingPartition() == previousKeyPostingPartition.getPartitionId()
557 && hashCodePosting.getFirstKeyPostingPartitionOffset() == previousKeyPostingPartitionOffset) {
558
559
560
561
562
563
564
565
566 newKeyPosting.setNextKeyPostingPartition(previousKeyPostingPartition.getPartitionId());
567 newKeyPosting.setNextKeyPostingPartitionOffset(previousKeyPostingPartitionOffset);
568 newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
569
570
571
572
573
574
575 hashCodePosting.setFirstKeyPostingPartition(newKeyPostingPartitionNumber);
576 hashCodePosting.setFirstKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
577 hashCodesPartition.writePosting(hashCodePosting, currentHashCodesPostingPartitionOffset);
578
579
580
581 currentKeyPostingPartition.markPostingAsDeleted(currentKeyPostingPartitionOffset, revision);
582
583
584 previousKeyPosting.setNextKeyPostingPartitionOffset(-1);
585 previousKeyPosting.setNextKeyPostingPartitionOffset(-1);
586 previousKeyPostingPartition.writePosting(previousKeyPosting, previousKeyPostingPartitionOffset);
587
588 } else {
589
590
591
592
593
594
595 newKeyPosting.setNextKeyPostingPartition(currentKeyPosting.getNextKeyPostingPartition());
596 newKeyPosting.setNextKeyPostingPartitionOffset(currentKeyPosting.getNextKeyPostingPartitionOffset());
597 newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
598
599
600
601 previousKeyPosting.setNextKeyPostingPartition(newKeyPostingPartitionNumber);
602 previousKeyPosting.setNextKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
603 previousKeyPostingPartition.writePosting(previousKeyPosting, previousKeyPostingPartitionOffset);
604
605
606
607 currentKeyPostingPartition.markPostingAsDeleted(currentKeyPostingPartitionOffset, revision);
608
609
610 }
611 }
612 }
613
614
615
616 ValuesPartition valuesPartition = accessor.getValuesPartition(currentKeyPosting.getValuePostingPartition());
617 valuesPartition.readPosting(valuePosting, currentKeyPosting.getValuePostingPartitionOffset());
618 byte[] oldValue;
619 if (valuePosting.getBytesLength() > 0) {
620 oldValue = valuePosting.getBytes();
621 } else {
622 oldValue = null;
623 }
624
625
626 valuesPartition.markPostingAsDeleted(currentKeyPosting.getValuePostingPartitionOffset(), revision);
627
628 return oldValue;
629
630 }
631
632 }
633
634
635
636
637
638
639
640
641
642
643
644 public byte[] remove(final Accessor accessor, final byte[] key, final long hashCode, final long revision) throws IOException {
645
646 validateKey(key);
647
648 Lock.With<byte[]> with = new Lock.With<byte[]>(accessor.getStoreWriteLock(), getConfiguration().getLockWaitTimeoutMilliseconds()) {
649 public byte[] doBody() throws IOException {
650
651 return doRemove(accessor, key, hashCode, revision);
652
653 }
654 };
655 return with.run();
656 }
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673 private byte[] doRemove(final Accessor accessor, final byte[] key, final long hashCode, final long revision) throws IOException {
674
675 Hashtable.Posting hashtablePosting = new Hashtable.Posting();
676
677
678
679
680
681 Hashtable hashtable = accessor.getHashtable();
682 int hashtablePostingOffset = hashtable.calculateHashCodePostingOffset(hashCode);
683 accessor.getHashtable().readPosting(hashtablePosting, hashtablePostingOffset);
684
685 byte hashtableFlagForRevision = hashtablePosting.getFlagForRevision(revision);
686 if (hashtableFlagForRevision != FileHandler.Posting.FLAG_IN_USE) {
687 throw new NoSuchElementException();
688 }
689
690 if (hashtablePosting.getHashCodePostingPartition() < 0) {
691 throw new StoreInconsistencyException("There is a hash table posting in the store at offset " + hashtablePostingOffset + ", hash code " + hashCode + ", but there is no pointer to a hash codes partition!");
692 }
693
694
695
696
697
698
699 HashCodesPartition.Posting hashCodePosting = new HashCodesPartition.Posting();
700
701 HashCodesPartition hashCodesPartition = accessor.getHashCodesPartition(hashtablePosting.getHashCodePostingPartition());
702 int currentHashCodePostingPartitionOffset = hashtablePosting.getHashCodePostingPartitionOffset();
703 int currentHashCodePostingPartition = hashtablePosting.getHashCodePostingPartition();
704 hashCodesPartition.readPosting(hashCodePosting, hashtablePosting.getHashCodePostingPartitionOffset());
705 while (hashCode != hashCodePosting.getKeyHashCode()) {
706 if (hashCodePosting.getNextPostingPartition() < 0) {
707
708 throw new NoSuchElementException();
709 }
710
711 if (hashCodePosting.getNextPostingPartition() != hashCodesPartition.getPartitionId()) {
712 hashCodesPartition = accessor.getHashCodesPartition(hashCodePosting.getNextPostingPartition());
713 }
714
715 currentHashCodePostingPartitionOffset = hashCodePosting.getNextPostingPartitionOffset();
716 currentHashCodePostingPartition = hashCodePosting.getNextPostingPartition();
717 hashCodesPartition.readPosting(hashCodePosting, hashCodePosting.getNextPostingPartitionOffset());
718 }
719
720
721
722
723
724
725 KeysPartition currentKeyLinkPostingPartition = accessor.getKeysPartition(hashCodePosting.getFirstKeyPostingPartition());
726 KeysPartition.Posting currentKeyLinkPosting = new KeysPartition.Posting();
727 int currentKeyPostingPartitionOffset = hashCodePosting.getFirstKeyPostingPartitionOffset();
728
729 KeysPartition previousKeyPostingPartition = null;
730 KeysPartition.Posting previousKeyLinkPosting = new KeysPartition.Posting();
731 int previousKeyPostingPartitionOffset = -1;
732
733 currentKeyLinkPostingPartition.readPosting(currentKeyLinkPosting, hashCodePosting.getFirstKeyPostingPartitionOffset());
734 while (true) {
735
736 byte flagForKeyInRevision = currentKeyLinkPosting.getFlagForRevision(revision);
737
738 if (flagForKeyInRevision == Posting.FLAG_IN_USE
739 && Arrays.equals(key, currentKeyLinkPosting.getBytes())) {
740 break;
741 }
742
743 if (currentKeyLinkPosting.getNextKeyPostingPartition() < 0) {
744
745 throw new NoSuchElementException();
746 }
747
748 if (currentKeyLinkPosting.getNextKeyPostingPartition() != currentKeyLinkPostingPartition.getPartitionId()) {
749 currentKeyLinkPostingPartition = accessor.getKeysPartition(currentKeyLinkPosting.getNextKeyPostingPartition());
750 }
751 previousKeyPostingPartition = currentKeyLinkPostingPartition;
752 previousKeyPostingPartitionOffset = currentKeyPostingPartitionOffset;
753
754 currentKeyPostingPartitionOffset = currentKeyLinkPosting.getNextKeyPostingPartitionOffset();
755
756 KeysPartition.Posting tmp = currentKeyLinkPosting;
757 currentKeyLinkPostingPartition.readPosting(previousKeyLinkPosting, currentKeyLinkPosting.getNextKeyPostingPartitionOffset());
758 currentKeyLinkPosting = previousKeyLinkPosting;
759 previousKeyLinkPosting = tmp;
760
761 }
762
763
764
765
766
767
768
769
770 if (hashCodePosting.getFirstKeyPostingPartition() == currentKeyLinkPostingPartition.getPartitionId()
771 && hashCodePosting.getFirstKeyPostingPartitionOffset() == currentKeyPostingPartitionOffset) {
772
773
774
775
776
777 if (hashtablePosting.getHashCodePostingPartition() == currentHashCodePostingPartition
778 && hashtablePosting.getHashCodePostingPartitionOffset() == currentHashCodePostingPartitionOffset) {
779
780
781
782
783
784
785
786 hashCodesPartition.markPostingAsDeleted(currentHashCodePostingPartitionOffset, revision);
787
788 hashtable.markPostingAsDeleted(hashtablePostingOffset, revision);
789
790 }
791
792 } else {
793
794
795
796
797
798
799 if (configuration.isUsingDurablePostingLinks()) {
800
801
802
803
804
805 KeysPartition.Posting lastKeyLinkPosting = new KeysPartition.Posting();
806 int lastKeyLinkPartitionPostingOffset = -1;
807
808 lastKeyLinkPosting.setNextKeyPostingPartition(currentKeyLinkPostingPartition.getPartitionId());
809 lastKeyLinkPosting.setNextKeyPostingPartitionOffset(currentKeyPostingPartitionOffset);
810 KeysPartition lastKeyLinkPartition;
811 while (true) {
812 lastKeyLinkPartitionPostingOffset = lastKeyLinkPosting.getNextKeyPostingPartitionOffset();
813 lastKeyLinkPartition = accessor.getKeysPartition(lastKeyLinkPosting.getNextKeyPostingPartition());
814 lastKeyLinkPartition.readPosting(lastKeyLinkPosting, lastKeyLinkPosting.getNextKeyPostingPartitionOffset());
815 if (lastKeyLinkPosting.getNextKeyPostingPartition() < 0) {
816 break;
817 }
818 }
819
820 lastKeyLinkPosting.setNextKeyPostingPartition(currentKeyLinkPostingPartition.getPartitionId());
821 lastKeyLinkPosting.setNextKeyPostingPartitionOffset(currentHashCodePostingPartitionOffset);
822 lastKeyLinkPartition.writePosting(lastKeyLinkPosting, lastKeyLinkPartitionPostingOffset);
823
824 currentKeyLinkPosting.setNextKeyPostingPartition(-1);
825 currentKeyLinkPosting.setNextKeyPostingPartitionOffset(-1);
826 currentKeyLinkPostingPartition.writePosting(currentKeyLinkPosting, currentHashCodePostingPartitionOffset);
827
828
829 }
830
831
832
833
834
835
836 if (previousKeyPostingPartition != null) {
837 previousKeyLinkPosting.setNextKeyPostingPartition(currentKeyLinkPosting.getNextKeyPostingPartition());
838 previousKeyLinkPosting.setNextKeyPostingPartitionOffset(currentKeyLinkPosting.getNextKeyPostingPartitionOffset());
839 previousKeyPostingPartition.writePosting(previousKeyLinkPosting, previousKeyPostingPartitionOffset);
840 }
841
842 }
843
844
845
846 currentKeyLinkPostingPartition.markPostingAsDeleted(currentKeyPostingPartitionOffset, revision);
847
848
849
850 ValuesPartition.Posting valuePosting = new ValuesPartition.Posting();
851
852 ValuesPartition oldValuePartition = accessor.getValuesPartition(currentKeyLinkPosting.getValuePostingPartition());
853 oldValuePartition.readPosting(valuePosting, currentKeyLinkPosting.getValuePostingPartitionOffset());
854 byte[] oldValue;
855 if (valuePosting.getBytesLength() > 0) {
856 oldValue = valuePosting.getBytes();
857 } else {
858 oldValue = null;
859 }
860
861
862 oldValuePartition.markPostingAsDeleted(currentKeyLinkPosting.getValuePostingPartitionOffset(), revision);
863
864 return oldValue;
865
866 }
867
868 public boolean containsKey(Accessor accessor, byte[] key, long hashCode) throws IOException {
869 return containsKey(accessor, key, hashCode, Long.MAX_VALUE);
870 }
871
872 public boolean containsKey(Accessor accessor, byte[] key, long hashCode, long revision) throws IOException {
873
874 validateKey(key);
875
876 Hashtable hashtable = accessor.getHashtable();
877 Hashtable.Posting hashtablePosting = new Hashtable.Posting();
878 hashtable.readPosting(hashtablePosting, hashtable.calculateHashCodePostingOffset(hashCode));
879
880
881
882
883
884 HashCodesPartition.Posting hashCodePosting = new HashCodesPartition.Posting();
885
886 HashCodesPartition hashCodesPartition = accessor.getHashCodesPartition(hashtablePosting.getHashCodePostingPartition());
887 hashCodesPartition.readPosting(hashCodePosting, hashtablePosting.getHashCodePostingPartitionOffset());
888
889
890 if (hashCodePosting.getFlag() == Posting.FLAG_NEVER_USED) {
891 return false;
892 }
893
894 while (true) {
895
896 if (hashCode == hashCodePosting.getKeyHashCode()
897 && hashCodePosting.getFlagForRevision(revision) == Posting.FLAG_IN_USE) {
898 break;
899 }
900
901 if (hashCodePosting.getNextPostingPartition() < 0) {
902
903 return false;
904 }
905 if (hashCodePosting.getNextPostingPartition() != hashCodesPartition.getPartitionId()) {
906 hashCodesPartition = accessor.getHashCodesPartition(hashCodePosting.getNextPostingPartition());
907 }
908 hashCodesPartition.readPosting(hashCodePosting, hashCodePosting.getNextPostingPartitionOffset());
909 }
910
911
912
913
914
915 KeysPartition.Posting keyPosting = new KeysPartition.Posting();
916
917 KeysPartition keysPartition = accessor.getKeysPartition(hashCodePosting.getFirstKeyPostingPartition());
918 keysPartition.readPosting(keyPosting, hashCodePosting.getFirstKeyPostingPartitionOffset());
919 while (true) {
920 if (keyPosting.getFlagForRevision(revision) == Posting.FLAG_IN_USE
921 && keyPosting.getCreatedRevision() <= revision
922 && Arrays.equals(key, keyPosting.getBytes())) {
923 break;
924 }
925 if (keyPosting.getNextKeyPostingPartition() < 0) {
926 return false;
927 }
928 if (keyPosting.getNextKeyPostingPartition() != keysPartition.getPartitionId()) {
929 keysPartition = accessor.getKeysPartition(keyPosting.getNextKeyPostingPartition());
930 }
931 keysPartition.readPosting(keyPosting, keyPosting.getNextKeyPostingPartitionOffset());
932 }
933
934 return true;
935
936 }
937
938 public Cursor values() {
939
940 throw new UnsupportedOperationException();
941 }
942
943 public Cursor keys() {
944
945 throw new UnsupportedOperationException();
946
947 }
948
949
950
951
952
953 public void optimize() {
954 throw new UnsupportedOperationException();
955
956 }
957
958
959
960
961
962
963
964
965 public void rehash(final Accessor accessor, final int resolution) throws IOException {
966 Lock.With with = new Lock.With(accessor.getStoreWriteLock(), getConfiguration().getLockWaitTimeoutMilliseconds()) {
967 public Object doBody() throws IOException {
968
969 log.info("Rehashing hashtable capacity ??? -> " + resolution);
970
971 Metadata metadata = accessor.getMetadata();
972 Metadata.Header mdh = new Metadata.Header();
973 metadata.readHeader(mdh);
974 int topOldHashCodesPartition = mdh.getCurrentHashCodesPartition();
975 mdh.setCurrentHashCodesPartition(mdh.getCurrentHashCodesPartition() + 1);
976 metadata.writeHeader(mdh);
977
978 Hashtable rehashedTable = new Hashtable(getConfiguration().getDataPath(), mdh.getCurrentHashtableId() + 1, accessor.getAccess(), getConfiguration().getLockFactory());
979 rehashedTable.format((resolution * Hashtable.Posting.POSTING_BYTE_SIZE) + rehashedTable.getHeaderByteSize());
980 rehashedTable.open();
981
982 Hashtable.Header rehashedTableHeader = new Hashtable.Header();
983 rehashedTableHeader.setPostingsCapacity(resolution);
984 rehashedTable.writeHeader(rehashedTableHeader);
985
986
987 HashCodesPartition rehashCodesPartition = accessor.getHashCodesPartition(mdh.getCurrentHashCodesPartition());
988
989 Hashtable.Posting rehashedTablePosting = new Hashtable.Posting();
990 HashCodesPartition.Posting rehashCodePosting = new HashCodesPartition.Posting();
991 HashCodesPartition.Header rehashCodeHeader = new HashCodesPartition.Header();
992 rehashCodesPartition.readHeader(rehashCodeHeader);
993
994 for (int currentOldHashCodePostingsPartitionId = 0; currentOldHashCodePostingsPartitionId <= topOldHashCodesPartition; currentOldHashCodePostingsPartitionId++) {
995 HashCodesPartition currentOldHashCodesPartition = new HashCodesPartition(getConfiguration().getDataPath(), currentOldHashCodePostingsPartitionId, accessor.getAccess(), getConfiguration().getLockFactory());
996 if (currentOldHashCodesPartition.exists()) {
997 currentOldHashCodesPartition.open();
998 HashCodesPartition.Header hcph = new HashCodesPartition.Header();
999 currentOldHashCodesPartition.readHeader(hcph);
1000
1001 HashCodesPartition.Posting hcpp = new HashCodesPartition.Posting();
1002
1003 int hcpStartOffset = currentOldHashCodesPartition.getHeaderByteSize();
1004 while (hcpStartOffset < hcph.getNextPostingOffset()) {
1005 currentOldHashCodesPartition.readPosting(hcpp, hcpStartOffset);
1006 hcpStartOffset += hcpp.getPostingByteSize();
1007
1008 if (hcpp.getFlag() == (byte) 2) {
1009 continue;
1010 } else if (hcpp.getFlag() == (byte) 0) {
1011 break;
1012 } else if (hcpp.getFlag() != (byte) 1) {
1013 log.warn("Unknown flag at offset " + hcpStartOffset + ": " + hcpp);
1014 }
1015
1016
1017
1018
1019 rehash(accessor, hcpp, rehashedTable, rehashedTablePosting, rehashCodesPartition, rehashCodeHeader, rehashCodePosting);
1020
1021
1022
1023
1024 rehashCodeHeader.setNextPostingOffset(rehashCodeHeader.getNextPostingOffset() + rehashCodePosting.getPostingByteSize());
1025 rehashCodeHeader.setBytesLeft(rehashCodeHeader.getBytesLeft() - rehashCodePosting.getPostingByteSize());
1026
1027
1028 if (rehashCodeHeader.getBytesLeft() < rehashCodePosting.getPostingByteSize()) {
1029 System.currentTimeMillis();
1030 }
1031
1032 }
1033
1034 currentOldHashCodesPartition.close();
1035 }
1036 }
1037
1038
1039
1040 metadata.readHeader(mdh);
1041 mdh.setCurrentHashtableId(rehashedTable.getVersionId());
1042 accessor.getMetadata().writeHeader(mdh);
1043
1044 return null;
1045 }
1046 };
1047 with.run();
1048 }
1049
1050 public Configuration getConfiguration() {
1051 return configuration;
1052 }
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065 private void rehash(Accessor accessor,
1066 HashCodesPartition.Posting hashCodePosting,
1067 Hashtable rehashedtable, Hashtable.Posting rehashedtablePosting,
1068 HashCodesPartition rehashCodePartition, HashCodesPartition.Header rehashCodeHeader, HashCodesPartition.Posting rehashCodePosting) throws IOException {
1069
1070
1071
1072
1073 int rehashedtablePostingOffset = rehashedtable.calculateHashCodePostingOffset(hashCodePosting.getKeyHashCode());
1074 rehashedtable.readPosting(rehashedtablePosting, rehashedtablePostingOffset);
1075 if (rehashedtablePosting.getFlag() != (byte) 1) {
1076
1077
1078
1079
1080 rehashCodePosting.setFlag((byte) 1);
1081 rehashCodePosting.setFirstKeyPostingPartition(hashCodePosting.getFirstKeyPostingPartition());
1082 rehashCodePosting.setFirstKeyPostingPartitionOffset(hashCodePosting.getFirstKeyPostingPartitionOffset());
1083 rehashCodePosting.setKeyHashCode(hashCodePosting.getKeyHashCode());
1084 rehashCodePosting.setNextPostingPartition(-1);
1085 rehashCodePosting.setNextPostingPartitionOffset(-1);
1086 rehashCodePosting.setCreatedRevision(hashCodePosting.getCreatedRevision());
1087 rehashCodePosting.setDeletedRevision(hashCodePosting.getDeletedRevision());
1088
1089 rehashCodePartition.writePosting(rehashCodePosting, rehashCodeHeader.getNextPostingOffset());
1090
1091
1092
1093 rehashedtablePosting.setFlag((byte) 1);
1094 rehashedtablePosting.setHashCodePostingPartition(rehashCodePartition.getPartitionId());
1095 rehashedtablePosting.setHashCodePostingPartitionOffset(rehashCodeHeader.getNextPostingOffset());
1096 rehashedtable.writePosting(rehashedtablePosting, rehashedtablePostingOffset);
1097
1098 return;
1099
1100 } else {
1101
1102
1103
1104
1105
1106
1107
1108 HashCodesPartition currentRehashedCodesPartition = rehashCodePartition;
1109 int currentRehashCodesPostingPartitionOffset = rehashedtablePosting.getHashCodePostingPartitionOffset();
1110 rehashCodePartition.readPosting(rehashCodePosting, rehashedtablePosting.getHashCodePostingPartitionOffset());
1111 while (hashCodePosting.getKeyHashCode() != rehashCodePosting.getKeyHashCode()) {
1112 if (rehashCodePosting.getNextPostingPartition() < 0) {
1113
1114
1115
1116
1117
1118
1119
1120 HashCodesPartition.Posting newHashCodePosting = new HashCodesPartition.Posting();
1121 newHashCodePosting.setFlag((byte) 1);
1122 newHashCodePosting.setKeyHashCode(hashCodePosting.getKeyHashCode());
1123 newHashCodePosting.setNextPostingPartition(-1);
1124 newHashCodePosting.setNextPostingPartitionOffset(-1);
1125 newHashCodePosting.setFirstKeyPostingPartition(hashCodePosting.getFirstKeyPostingPartition());
1126 newHashCodePosting.setFirstKeyPostingPartitionOffset(hashCodePosting.getFirstKeyPostingPartitionOffset());
1127
1128 rehashCodePartition.writePosting(newHashCodePosting, rehashCodeHeader.getNextPostingOffset());
1129
1130
1131
1132
1133
1134
1135 rehashCodePosting.setNextPostingPartition(rehashCodePartition.getPartitionId());
1136 rehashCodePosting.setNextPostingPartitionOffset(rehashCodeHeader.getNextPostingOffset());
1137 currentRehashedCodesPartition.writePosting(rehashCodePosting, currentRehashCodesPostingPartitionOffset);
1138
1139 return;
1140
1141 }
1142 if (rehashCodePosting.getNextPostingPartition() != currentRehashedCodesPartition.getPartitionId()) {
1143 currentRehashedCodesPartition = accessor.getHashCodesPartition(rehashCodePosting.getNextPostingPartition());
1144 }
1145 currentRehashCodesPostingPartitionOffset = rehashCodePosting.getNextPostingPartitionOffset();
1146 currentRehashedCodesPartition.readPosting(rehashCodePosting, rehashCodePosting.getNextPostingPartitionOffset());
1147 }
1148
1149 }
1150
1151
1152 }
1153
1154 public void setSequenceManager(SequenceManager sequenceManager) {
1155 this.sequenceManager = sequenceManager;
1156 }
1157 }