View Javadoc

1   package org.apache.labs.bananadb.store;
2   
3   
4   /*
5    * Licensed to the Apache Software Foundation (ASF) under one or more
6    * contributor license agreements.  See the NOTICE file distributed with
7    * this work for additional information regarding copyright ownership.
8    * The ASF licenses this file to You under the Apache License, Version 2.0
9    * (the "License"); you may not use this file except in compliance with
10   * the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  import org.apache.labs.bananadb.store.lock.Lock;
22  import org.apache.labs.bananadb.store.data.bananatrie.HashCodesPartition;
23  import org.apache.labs.bananadb.store.data.bananatrie.Hashtable;
24  import org.apache.labs.bananadb.store.data.bananatrie.KeysPartition;
25  import org.apache.labs.bananadb.store.data.bananatrie.ValuesPartition;
26  import org.apache.labs.bananadb.store.data.Metadata;
27  import org.apache.labs.bananadb.store.data.FileHandler;
28  import org.apache.labs.bananadb.store.sequence.SequenceManager;
29  import org.apache.labs.bananadb.store.sequence.FilebasedSequenceManager;
30  
31  import java.io.File;
32  import java.io.IOException;
33  import java.util.ArrayList;
34  import java.util.Arrays;
35  import java.util.List;
36  import java.util.NoSuchElementException;
37  
38  import org.apache.labs.bananadb.store.data.FileHandler.Posting;
39  
40  /**
41   * This is the core Index<byte[], byte[]> that is stored on filesystem.
42   *
43   * @see org.apache.labs.bananadb.entity.EntityStore 
44   *
45   * todo make sure deleted postings are the last postings in all hash code chains!
46   * <p/>
47   * todo comsider implementing lock per file rather than store wide lock! but make sure it makes sense!
48   *
49   * @author kalle
50   * @since 2009-mar-16 15:34:56
51   */
52  public class Store {
53  
54    private static final Log log = new Log(Store.class);
55  
56    private Configuration configuration;
57    private SequenceManager sequenceManager;
58    private List<Accessor> accessors = new ArrayList<Accessor>();
59  
60    public Store(File dataPath) throws IOException {
61      this(new Configuration(dataPath));
62    }
63  
64    public Store(Configuration configuration) {
65      this.configuration = configuration;
66    }
67  
68  
69    public void open() throws IOException {
70      if (!getConfiguration().getDataPath().exists()) {
71        if (log.isInfo()) {
72          log.info("Creating directory " + getConfiguration().getDataPath().getAbsolutePath());
73        }
74        if (!getConfiguration().getDataPath().mkdirs()) {
75          throw new IOException("Could not create directory " + getConfiguration().getDataPath().getAbsolutePath());
76        }
77      }
78      File sequencePath = new File(configuration.getDataPath(), "seq");
79      sequenceManager = new FilebasedSequenceManager(sequencePath, configuration.getLockFactory(), configuration.getLockWaitTimeoutMilliseconds());
80    }
81  
82    public SequenceManager getSequenceManager() {
83      return sequenceManager;
84    }
85  
86  
87    List<Accessor> getAccessors() {
88      return accessors;
89    }
90  
91    public Accessor createAccessor(boolean readOnly) throws IOException {
92      Accessor accessor = new Accessor(this, readOnly);
93      accessors.add(accessor);
94      return accessor;
95    }
96  
97    public void close() throws IOException {
98      log.info("Closing store..");
99      if (accessors.size() > 0) {
100       log.warn("There are " + accessors.size() + " open accessors. They will be closed.");
101     }
102     for (Accessor accessor : new ArrayList<Accessor>(accessors)) {
103       accessor.close();
104     }
105     sequenceManager.close();
106     log.info("Store has been closed.");
107 
108   }
109 
110   private void validateKey(byte[] key) {
111     if (key == null || key.length == 0) {
112       throw new IllegalArgumentException("Null key is not allowed");
113     }
114   }
115 
116   public byte[] get(Accessor accessor, byte[] key, long hashCode) throws IOException {
117     return get(accessor, key, hashCode, Long.MAX_VALUE);
118   }
119 
120   public byte[] get(Accessor accessor, byte[] key, long hashCode, long revision) throws IOException {
121 
122     validateKey(key);
123 
124     Hashtable hashtable = accessor.getHashtable();
125     Hashtable.Posting hashtablePosting = new Hashtable.Posting();
126     hashtable.readPosting(hashtablePosting, hashtable.calculateHashCodePostingOffset(hashCode));
127 
128     //
129     // seek to the correct hash code posting
130     //
131     HashCodesPartition hashCodesPartition = accessor.getHashCodesPartition(hashtablePosting.getHashCodePostingPartition());
132     HashCodesPartition.Posting hashCodePosting = new HashCodesPartition.Posting();
133     hashCodesPartition.readPosting(hashCodePosting, hashtablePosting.getHashCodePostingPartitionOffset());
134     if (hashCodePosting.getFlagForRevision(revision) != Posting.FLAG_IN_USE) {
135       return null;
136     }
137     while (true) {
138 
139       if (hashCodePosting.getFlagForRevision(revision) == Posting.FLAG_IN_USE
140           && hashCode == hashCodePosting.getKeyHashCode()) {
141         break;
142       }
143 
144       if (hashCodePosting.getNextPostingPartition() < 0) {
145         return null;
146       }
147       if (hashCodePosting.getNextPostingPartition() != hashCodesPartition.getPartitionId()) {
148         hashCodesPartition = accessor.getHashCodesPartition(hashCodePosting.getNextPostingPartition());
149       }
150       hashCodesPartition.readPosting(hashCodePosting, hashCodePosting.getNextPostingPartitionOffset());
151     }
152 
153     //
154     // seek to the correct key posting
155     //
156 
157     KeysPartition.Posting keyPosting = new KeysPartition.Posting();
158 
159     KeysPartition keysPartition = accessor.getKeysPartition(hashCodePosting.getFirstKeyPostingPartition());
160     keysPartition.readPosting(keyPosting, hashCodePosting.getFirstKeyPostingPartitionOffset());
161     while (true) {
162 
163       if (keyPosting.getFlagForRevision(revision) == Posting.FLAG_IN_USE
164           && Arrays.equals(key, keyPosting.getBytes())) {
165         break;
166       }
167 
168       if (keyPosting.getNextKeyPostingPartition() < 0) {
169         return null;
170       }
171       if (keyPosting.getNextKeyPostingPartition() != keysPartition.getPartitionId()) {
172         keysPartition = accessor.getKeysPartition(keyPosting.getNextKeyPostingPartition());
173       }
174       keysPartition.readPosting(keyPosting, keyPosting.getNextKeyPostingPartitionOffset());
175     }
176 
177     //
178     // seek to the correct value posting
179     //
180 
181     ValuesPartition.Posting valuePosting = new ValuesPartition.Posting();
182 
183     ValuesPartition valuesPartition = accessor.getValuesPartition(keyPosting.getValuePostingPartition());
184     valuesPartition.readPosting(valuePosting, keyPosting.getValuePostingPartitionOffset());
185 
186     if (valuePosting.getBytesLength() == 0) {
187       return null;
188     }
189 
190     return valuePosting.getBytes();
191   }
192 
193 
194   /**
195    * Write locking.
196    *
197    * @param accessor
198    * @param key
199    * @param hashCode
200    * @param value
201    * @param revision
202    * @return
203    * @throws IOException
204    */
205   public byte[] put(final Accessor accessor, final byte[] key, final long hashCode, final byte[] value, final long revision) throws IOException {
206 
207     validateKey(key);
208 
209     Lock.With<byte[]> with = new Lock.With<byte[]>(accessor.getStoreWriteLock(), getConfiguration().getLockWaitTimeoutMilliseconds()) {
210       public byte[] doBody() throws IOException {
211         return doPut(accessor, key, hashCode, value, revision);
212       }
213     };
214     return with.run();
215   }
216 
217   /**
218    * Should be write locked at this time.
219    * <p/>
220    * revision must be the most recent revision in the store.
221    *
222    * @param accessor
223    * @param key
224    * @param hashCode
225    * @param value
226    * @return
227    * @throws IOException
228    */
229   private byte[] doPut(final Accessor accessor, final byte[] key, final long hashCode, final byte[] value, final long revision) throws IOException {
230 
231 
232     //
233     // create new value posting
234     //
235 
236     int newValuePostingPartitionNumber;
237     int newValuePostingPartitionOffset;
238 
239     ValuesPartition.Posting valuePosting = new ValuesPartition.Posting();
240     valuePosting.setFlag(FileHandler.Posting.FLAG_IN_USE);
241 
242     valuePosting.setCreatedRevision(revision);
243     if (value == null || value.length == 0) {
244       valuePosting.setBytesLength(0);
245       valuePosting.setBytes(null);
246 
247       newValuePostingPartitionNumber = -1;
248       newValuePostingPartitionOffset = -1;
249 
250     } else {
251       valuePosting.setBytesLength(value.length);
252       valuePosting.setBytes(value);
253 
254       Accessor.RequestPartitionWriterResponse<ValuesPartition> valueReservation = accessor.requestValueWrite(valuePosting);
255       newValuePostingPartitionNumber = valueReservation.getFileHandler().getPartitionId();
256       ValuesPartition valuesPartition = valueReservation.getFileHandler();
257       newValuePostingPartitionOffset = valueReservation.getStartOffset();
258 
259       // write new value posting, keep track of partition and offset
260       valuesPartition.writePosting(valuePosting, newValuePostingPartitionOffset);
261 
262     }
263 
264 
265     //
266     // create new key posting
267     //
268 
269     KeysPartition.Posting newKeyPosting = new KeysPartition.Posting();
270     newKeyPosting.setCreatedRevision(revision);
271     newKeyPosting.setFlag(Posting.FLAG_IN_USE);
272     newKeyPosting.setBytes(key);
273     newKeyPosting.setBytesLength(key.length);
274     newKeyPosting.setKeyHashCode(hashCode);
275     newKeyPosting.setNextKeyPostingPartition(-1);
276     newKeyPosting.setNextKeyPostingPartitionOffset(-1);
277     newKeyPosting.setValuePostingPartition(newValuePostingPartitionNumber);
278     newKeyPosting.setValuePostingPartitionOffset(newValuePostingPartitionOffset);
279 
280     Accessor.RequestPartitionWriterResponse<KeysPartition> keyReservation = accessor.requestValueWrite(newKeyPosting);
281     int newKeyPostingPartitionNumber = keyReservation.getFileHandler().getPartitionId();
282     KeysPartition newKeyPostingPartition = keyReservation.getFileHandler();
283     int newKeyPostingPartitionOffset = keyReservation.getStartOffset();
284 
285     // note that the key posting is written to disk later later as it might need updates
286     // due to durable posting links!
287 
288 
289     //
290     // find hashcode posting and hashtable posting for the new key
291     //
292 
293     Hashtable hashtable = accessor.getHashtable();
294     Hashtable.Posting hashtablePosting = new Hashtable.Posting();
295 
296     HashCodesPartition.Posting hashCodePosting = new HashCodesPartition.Posting();
297 
298     int hashtablePostingOffset = hashtable.calculateHashCodePostingOffset(hashCode);
299     hashtable.getRAF().seek(hashtablePostingOffset);
300     byte flag = hashtable.getRAF().readByte();
301     if (flag == Posting.FLAG_NEVER_USED) {
302 
303       // this is the first time we create a posting at this hashtable position
304       // that means there is no hash code posting either
305 
306       newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
307 
308 
309       hashCodePosting.setCreatedRevision(revision);
310       hashCodePosting.setFlag(Posting.FLAG_IN_USE);
311       hashCodePosting.setFirstKeyPostingPartition(newKeyPostingPartitionNumber);
312       hashCodePosting.setFirstKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
313       hashCodePosting.setKeyHashCode(hashCode);
314       hashCodePosting.setNextPostingPartition(-1);
315       hashCodePosting.setNextPostingPartitionOffset(-1);
316 
317       Accessor.RequestPartitionWriterResponse<HashCodesPartition> hashCodeReservation = accessor.requestValueWrite(hashCodePosting);
318       int newHashCodePostingPatition = hashCodeReservation.getFileHandler().getPartitionId();
319       HashCodesPartition hashCodesPartition = hashCodeReservation.getFileHandler();
320       int newHashCodePostingPatitionOffset = hashCodeReservation.getStartOffset();
321 
322       hashCodesPartition.writePosting(hashCodePosting, newHashCodePostingPatitionOffset);
323 
324       // update hashtable posting
325 
326       hashtablePosting.setFlag(Posting.FLAG_IN_USE);
327       hashtablePosting.setHashCodePostingPartition(newHashCodePostingPatition);
328       hashtablePosting.setHashCodePostingPartitionOffset(newHashCodePostingPatitionOffset);
329       hashtablePosting.setCreatedRevision(revision);
330       hashtable.writePosting(hashtablePosting, hashtablePostingOffset);
331 
332 
333       return null;
334     
335     } else {
336 
337       // there is a hashtable posting at the position for this hash code
338 
339       hashtable.readPosting(hashtablePosting, hashtablePostingOffset);
340 
341       //
342       // seek to the correct hash code posting
343       //      
344 
345       HashCodesPartition hashCodesPartition = accessor.getHashCodesPartition(hashtablePosting.getHashCodePostingPartition());
346       int currentHashCodesPostingPartitionOffset = hashtablePosting.getHashCodePostingPartitionOffset();
347       hashCodesPartition.readPosting(hashCodePosting, hashtablePosting.getHashCodePostingPartitionOffset());
348       while (true) {
349 
350         if (hashCodePosting.getFlagForRevision(revision) == Posting.FLAG_IN_USE
351             && hashCode == hashCodePosting.getKeyHashCode()) {
352           break;
353         }
354 
355         if (hashCodePosting.getNextPostingPartition() < 0) {
356 
357           // there is no hash code posting matching this hash code.
358 
359           newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
360 
361 
362           //
363           //  create new hash code posting
364           //
365 
366           HashCodesPartition.Posting newHashCodePosting = new HashCodesPartition.Posting();
367           newHashCodePosting.setCreatedRevision(revision);
368           newHashCodePosting.setFlag((byte) 1);
369           newHashCodePosting.setKeyHashCode(hashCode);
370           newHashCodePosting.setNextPostingPartition(-1);
371           newHashCodePosting.setNextPostingPartitionOffset(-1);
372           newHashCodePosting.setFirstKeyPostingPartition(newKeyPostingPartitionNumber);
373           newHashCodePosting.setFirstKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
374 
375           Accessor.RequestPartitionWriterResponse<HashCodesPartition> hashCodeReservation = accessor.requestValueWrite(newHashCodePosting);
376 
377           int newHashCodePostingPartition = hashCodeReservation.getFileHandler().getPartitionId();
378           int newHashCodePostingPartitionOffset = hashCodeReservation.getStartOffset();
379 
380           hashCodeReservation.getFileHandler().writePosting(newHashCodePosting, newHashCodePostingPartitionOffset);
381 
382 
383           //
384           // and update the current posting to point at the new one as next in chain.
385           //
386 
387           hashCodePosting.setNextPostingPartition(newHashCodePostingPartition);
388           hashCodePosting.setNextPostingPartitionOffset(newHashCodePostingPartitionOffset);
389           hashCodesPartition.writePosting(hashCodePosting, currentHashCodesPostingPartitionOffset);
390 
391           return null;
392 
393         }
394 
395         // keep seeking..
396         if (hashCodePosting.getNextPostingPartition() != hashCodesPartition.getPartitionId()) {
397           hashCodesPartition = accessor.getHashCodesPartition(hashCodePosting.getNextPostingPartition());
398         }
399         currentHashCodesPostingPartitionOffset = hashCodePosting.getNextPostingPartitionOffset();
400         hashCodesPartition.readPosting(hashCodePosting, hashCodePosting.getNextPostingPartitionOffset());
401       }
402 
403       //
404       // seek for the same key
405       //
406 
407       KeysPartition currentKeyPostingPartition = accessor.getKeysPartition(hashCodePosting.getFirstKeyPostingPartition());
408       KeysPartition previousKeyPostingPartition = null;
409 
410       KeysPartition.Posting currentKeyPosting = new KeysPartition.Posting();
411       KeysPartition.Posting previousKeyPosting = new KeysPartition.Posting();
412 
413 
414       int previousKeyPostingPartitionOffset = -1;
415 
416       int currentKeyPostingPartitionOffset = hashCodePosting.getFirstKeyPostingPartitionOffset();
417       currentKeyPostingPartition.readPosting(currentKeyPosting, hashCodePosting.getFirstKeyPostingPartitionOffset());
418       while (true) {
419 
420         if (currentKeyPosting.getFlagForRevision(revision) == Posting.FLAG_IN_USE
421             && Arrays.equals(key, currentKeyPosting.getBytes())) {
422           break;
423         }
424         if (currentKeyPosting.getNextKeyPostingPartition() < 0) {
425 
426           // the key did not exist
427           // update the current key posting to point at the new key posting partition and offset as next in chain.
428           currentKeyPosting.setNextKeyPostingPartition(newKeyPostingPartitionNumber);
429           currentKeyPosting.setNextKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
430           currentKeyPostingPartition.writePosting(currentKeyPosting, currentKeyPostingPartitionOffset);
431 
432           newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
433 
434           return null;
435         }
436 
437         if (currentKeyPosting.getNextKeyPostingPartition() != currentKeyPostingPartition.getPartitionId()) {
438           currentKeyPostingPartition = accessor.getKeysPartition(currentKeyPosting.getNextKeyPostingPartition());
439         }
440 
441         previousKeyPostingPartition = currentKeyPostingPartition;
442         previousKeyPostingPartitionOffset = currentKeyPostingPartitionOffset;
443 
444         currentKeyPostingPartitionOffset = currentKeyPosting.getNextKeyPostingPartitionOffset();
445 
446         currentKeyPostingPartition.readPosting(previousKeyPosting, currentKeyPosting.getNextKeyPostingPartitionOffset());
447         KeysPartition.Posting tmp = previousKeyPosting;
448         previousKeyPosting = currentKeyPosting;
449         currentKeyPosting = tmp;
450       }
451 
452       //
453       // a posting exists for this key.
454       //
455 
456 
457       if (previousKeyPostingPartition == null) {
458         // chain of keys with same hash code contains only two links
459 
460         // write new key posting that nothing yet points at
461         // but make that new posting point at the key link we are replacing
462         if (configuration.isUsingDurablePostingLinks()) {
463           newKeyPosting.setNextKeyPostingPartition(currentKeyPostingPartition.getPartitionId());
464           newKeyPosting.setNextKeyPostingPartitionOffset(currentKeyPostingPartitionOffset);
465         }
466         newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
467 
468 
469         // making the replaced link last of the two.
470         // [nk] [ck deleted]
471         currentKeyPosting.setFlag(Posting.FLAG_DELETED);
472         currentKeyPosting.setDeletedRevision(revision);
473         assert currentKeyPosting.getNextKeyPostingPartition() == -1;
474         assert currentKeyPosting.getNextKeyPostingPartitionOffset() == -1;
475         currentKeyPostingPartition.writePosting(currentKeyPosting, currentKeyPostingPartitionOffset);
476 
477         if (hashCodePosting.getFirstKeyPostingPartition() == currentKeyPostingPartition.getPartitionId()
478             && hashCodePosting.getFirstKeyPostingPartitionOffset() == currentKeyPostingPartitionOffset) {
479           // update the hash code posting
480           // to point at new key posting as the first in chain
481           //  [hash code posting] ---first key link+-> [nk] [ck]
482           // if it used to be the current key that was the first.
483           hashCodePosting.setFirstKeyPostingPartition(newKeyPostingPartitionNumber);
484           hashCodePosting.setFirstKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
485           hashCodesPartition.writePosting(hashCodePosting, currentHashCodesPostingPartitionOffset);
486         }
487 
488       } else {
489 
490         // there are at least three links in the chain of keys with the same hash code
491 
492         if (configuration.isUsingDurablePostingLinks()) {
493 
494           // todo this is sort of ugly code
495 
496           // we have
497           // [pk] [ck]
498 
499           // and now we replace current key with new key
500 
501           // [pk] [nk] [ck (deleted)]
502 
503           // new points at the current to be deleted
504           newKeyPosting.setNextKeyPostingPartition(currentKeyPostingPartition.getPartitionId());
505           newKeyPosting.setNextKeyPostingPartitionOffset(currentHashCodesPostingPartitionOffset);
506 
507           // previous points at new
508           previousKeyPosting.setNextKeyPostingPartition(newKeyPostingPartition.getPartitionId());
509           previousKeyPosting.setNextKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
510 
511           // deleted points at whatever it used to point at.
512           currentKeyPosting.setFlag(Posting.FLAG_DELETED);
513           currentKeyPosting.setDeletedRevision(revision);
514 
515           // if first of three is deleted and it is the first of links in the whole chain
516           // then make new key point at previous key and previous key back at current key
517           // and point at new key as first key hash code posting link
518           //
519           // i.e. from:
520           //  [hash code posting] ---first key link+-> [pk deleted] [nk] [ck deleted]
521           // to:
522           //  [hash code posting] ---first key link+-> [nk] [pk deleted] [ck deleted]
523           //
524           if (previousKeyPosting.getFlag() == Posting.FLAG_DELETED
525               && hashCodePosting.getFirstKeyPostingPartition() == previousKeyPostingPartition.getPartitionId()
526               && hashCodePosting.getFirstKeyPostingPartitionOffset() == previousKeyPostingPartitionOffset) {
527 
528             // write new key
529             newKeyPosting.setNextKeyPostingPartition(previousKeyPostingPartition.getPartitionId());
530             newKeyPosting.setNextKeyPostingPartitionOffset(previousKeyPostingPartitionOffset);
531             newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
532 
533             // todo short possible glitch here where old key is deleted and new key is not available
534             currentKeyPostingPartition.markPostingAsDeleted(currentHashCodesPostingPartitionOffset, revision);
535 
536             // update hash code posting to point at new key posting
537             hashCodePosting.setFirstKeyPostingPartition(newKeyPostingPartitionNumber);
538             hashCodePosting.setFirstKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
539             hashCodesPartition.writePosting(hashCodePosting, hashtablePostingOffset);
540 
541           } else {
542 
543             // write the values
544             newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
545             currentKeyPostingPartition.writePosting(currentKeyPosting, currentKeyPostingPartitionOffset);
546             previousKeyPostingPartition.writePosting(previousKeyPosting, previousKeyPostingPartitionOffset);
547           }
548 
549         } else {
550 
551           // no durable postings
552 
553 
554 
555           if (previousKeyPosting.getFlag() == Posting.FLAG_DELETED
556               && hashCodePosting.getFirstKeyPostingPartition() == previousKeyPostingPartition.getPartitionId()
557               && hashCodePosting.getFirstKeyPostingPartitionOffset() == previousKeyPostingPartitionOffset) {
558 
559             // if the previous key is the deleted
560             // and the first in chain of same hash code
561             // then make the new key the first link in chain, followed by the previous
562             // by updating the hash code posting
563 
564             // new points at previous
565             // [nk] [pk] [ck]
566             newKeyPosting.setNextKeyPostingPartition(previousKeyPostingPartition.getPartitionId());
567             newKeyPosting.setNextKeyPostingPartitionOffset(previousKeyPostingPartitionOffset);
568             newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
569 
570 
571             // update hash code
572             // [hash code posting] ---first key link+-> [pk]
573             // to
574             // [hash code posting] ---first key link+-> [nk]
575             hashCodePosting.setFirstKeyPostingPartition(newKeyPostingPartitionNumber);
576             hashCodePosting.setFirstKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
577             hashCodesPartition.writePosting(hashCodePosting, currentHashCodesPostingPartitionOffset);
578 
579             // mark old key as deleted
580             // [nk] [pk] [ck deleted]
581             currentKeyPostingPartition.markPostingAsDeleted(currentKeyPostingPartitionOffset, revision);
582 
583             // [pk] disconnects from [ck deleted]
584             previousKeyPosting.setNextKeyPostingPartitionOffset(-1);
585             previousKeyPosting.setNextKeyPostingPartitionOffset(-1);
586             previousKeyPostingPartition.writePosting(previousKeyPosting, previousKeyPostingPartitionOffset);
587             
588           } else {
589 
590             // we have [pk] [ck]
591 
592 
593             // create new key posting
594             // and make it point at what ever [ck] points at
595             newKeyPosting.setNextKeyPostingPartition(currentKeyPosting.getNextKeyPostingPartition());
596             newKeyPosting.setNextKeyPostingPartitionOffset(currentKeyPosting.getNextKeyPostingPartitionOffset());
597             newKeyPostingPartition.writePosting(newKeyPosting, newKeyPostingPartitionOffset);
598 
599             // replace current key as link from previous key with new key
600             // [pk] [nk]
601             previousKeyPosting.setNextKeyPostingPartition(newKeyPostingPartitionNumber);
602             previousKeyPosting.setNextKeyPostingPartitionOffset(newKeyPostingPartitionOffset);
603             previousKeyPostingPartition.writePosting(previousKeyPosting, previousKeyPostingPartitionOffset);
604 
605             // mark old disconnected key as deleted
606             // [ck deleted]
607             currentKeyPostingPartition.markPostingAsDeleted(currentKeyPostingPartitionOffset, revision);
608             
609 
610           }
611         }
612       }
613 
614 
615       // read the old value
616       ValuesPartition valuesPartition = accessor.getValuesPartition(currentKeyPosting.getValuePostingPartition());
617       valuesPartition.readPosting(valuePosting, currentKeyPosting.getValuePostingPartitionOffset());
618       byte[] oldValue;
619       if (valuePosting.getBytesLength() > 0) {
620         oldValue = valuePosting.getBytes();
621       } else {
622         oldValue = null;
623       }
624 
625       // mark the old value as deleted
626       valuesPartition.markPostingAsDeleted(currentKeyPosting.getValuePostingPartitionOffset(), revision);
627 
628       return oldValue;
629 
630     }
631 
632   }
633 
634   /**
635    * Write locking.
636    *
637    * @param accessor
638    * @param key
639    * @param hashCode
640    * @param revision revision flaggad as deletion revision
641    * @return
642    * @throws IOException
643    */
644   public byte[] remove(final Accessor accessor, final byte[] key, final long hashCode, final long revision) throws IOException {
645 
646     validateKey(key);
647 
648     Lock.With<byte[]> with = new Lock.With<byte[]>(accessor.getStoreWriteLock(), getConfiguration().getLockWaitTimeoutMilliseconds()) {
649       public byte[] doBody() throws IOException {
650 
651         return doRemove(accessor, key, hashCode, revision);
652 
653       }
654     };
655     return with.run();
656   }
657 
658   /**
659    * Marks all postings related with a key and its value as deleted using a specific revision.
660    * <p/>
661    * Should be write locked at this point!
662    * <p/>
663    * The code is written to handle only the current revision of the store as deletion revision,
664    * it might however work with earlier revisions too.
665    *
666    * @param accessor
667    * @param key
668    * @param hashCode
669    * @param revision
670    * @return
671    * @throws IOException
672    */
673   private byte[] doRemove(final Accessor accessor, final byte[] key, final long hashCode, final long revision) throws IOException {
674 
675     Hashtable.Posting hashtablePosting = new Hashtable.Posting();
676 
677     //
678     // find hashtable posting
679     //
680 
681     Hashtable hashtable = accessor.getHashtable();
682     int hashtablePostingOffset = hashtable.calculateHashCodePostingOffset(hashCode);
683     accessor.getHashtable().readPosting(hashtablePosting, hashtablePostingOffset);
684 
685     byte hashtableFlagForRevision = hashtablePosting.getFlagForRevision(revision);
686     if (hashtableFlagForRevision != FileHandler.Posting.FLAG_IN_USE) {
687       throw new NoSuchElementException();
688     }
689 
690     if (hashtablePosting.getHashCodePostingPartition() < 0) {
691       throw new StoreInconsistencyException("There is a hash table posting in the store at offset " + hashtablePostingOffset + ", hash code " + hashCode + ", but there is no pointer to a hash codes partition!");
692     }
693 
694 
695     //
696     // seek to the correct hash code posting
697     //
698 
699     HashCodesPartition.Posting hashCodePosting = new HashCodesPartition.Posting();
700 
701     HashCodesPartition hashCodesPartition = accessor.getHashCodesPartition(hashtablePosting.getHashCodePostingPartition());
702     int currentHashCodePostingPartitionOffset = hashtablePosting.getHashCodePostingPartitionOffset();
703     int currentHashCodePostingPartition = hashtablePosting.getHashCodePostingPartition();
704     hashCodesPartition.readPosting(hashCodePosting, hashtablePosting.getHashCodePostingPartitionOffset());
705     while (hashCode != hashCodePosting.getKeyHashCode()) {
706       if (hashCodePosting.getNextPostingPartition() < 0) {
707         // no hash code link postings in chain
708         throw new NoSuchElementException();
709       }
710 
711       if (hashCodePosting.getNextPostingPartition() != hashCodesPartition.getPartitionId()) {
712         hashCodesPartition = accessor.getHashCodesPartition(hashCodePosting.getNextPostingPartition());
713       }
714 
715       currentHashCodePostingPartitionOffset = hashCodePosting.getNextPostingPartitionOffset();
716       currentHashCodePostingPartition = hashCodePosting.getNextPostingPartition();
717       hashCodesPartition.readPosting(hashCodePosting, hashCodePosting.getNextPostingPartitionOffset());
718     }
719 
720     //
721     // seek for the same key
722     //
723 
724 
725     KeysPartition currentKeyLinkPostingPartition = accessor.getKeysPartition(hashCodePosting.getFirstKeyPostingPartition());
726     KeysPartition.Posting currentKeyLinkPosting = new KeysPartition.Posting();
727     int currentKeyPostingPartitionOffset = hashCodePosting.getFirstKeyPostingPartitionOffset();
728 
729     KeysPartition previousKeyPostingPartition = null;
730     KeysPartition.Posting previousKeyLinkPosting = new KeysPartition.Posting();
731     int previousKeyPostingPartitionOffset = -1;
732 
733     currentKeyLinkPostingPartition.readPosting(currentKeyLinkPosting, hashCodePosting.getFirstKeyPostingPartitionOffset());
734     while (true) {
735 
736       byte flagForKeyInRevision = currentKeyLinkPosting.getFlagForRevision(revision);
737 
738       if (flagForKeyInRevision == Posting.FLAG_IN_USE
739           && Arrays.equals(key, currentKeyLinkPosting.getBytes())) {
740         break;
741       }
742 
743       if (currentKeyLinkPosting.getNextKeyPostingPartition() < 0) {
744         // no more key link postings in chain
745         throw new NoSuchElementException();
746       }
747 
748       if (currentKeyLinkPosting.getNextKeyPostingPartition() != currentKeyLinkPostingPartition.getPartitionId()) {
749         currentKeyLinkPostingPartition = accessor.getKeysPartition(currentKeyLinkPosting.getNextKeyPostingPartition());
750       }
751       previousKeyPostingPartition = currentKeyLinkPostingPartition;
752       previousKeyPostingPartitionOffset = currentKeyPostingPartitionOffset;
753 
754       currentKeyPostingPartitionOffset = currentKeyLinkPosting.getNextKeyPostingPartitionOffset();
755 
756       KeysPartition.Posting tmp = currentKeyLinkPosting;
757       currentKeyLinkPostingPartition.readPosting(previousKeyLinkPosting, currentKeyLinkPosting.getNextKeyPostingPartitionOffset());
758       currentKeyLinkPosting = previousKeyLinkPosting;
759       previousKeyLinkPosting = tmp;
760 
761     }
762 
763     //
764     // a posting exists for this key.
765     //
766 
767 
768     // process key chain
769     //
770     if (hashCodePosting.getFirstKeyPostingPartition() == currentKeyLinkPostingPartition.getPartitionId()
771         && hashCodePosting.getFirstKeyPostingPartitionOffset() == currentKeyPostingPartitionOffset) {
772 
773       //
774       // no previous key in chain
775       //
776 
777       if (hashtablePosting.getHashCodePostingPartition() == currentHashCodePostingPartition
778           && hashtablePosting.getHashCodePostingPartitionOffset() == currentHashCodePostingPartitionOffset) {
779 
780         //
781         // no previous hash code posting in chain
782         // i.e. there are now no postings in the hashtable that match the hash code of this key
783         // so delete the hashtable and hash code posting
784         //
785 
786         hashCodesPartition.markPostingAsDeleted(currentHashCodePostingPartitionOffset, revision);
787 
788         hashtable.markPostingAsDeleted(hashtablePostingOffset, revision);
789 
790       }
791 
792     } else {
793 
794       //
795       // there is a previous key in the chain.
796       // update it to point at the next key in chain as defined by the current key
797       //
798 
799       if (configuration.isUsingDurablePostingLinks()) {
800 
801         //
802         // place deleted posting in end of key link chain for this hash code
803         //
804 
805         KeysPartition.Posting lastKeyLinkPosting = new KeysPartition.Posting();
806         int lastKeyLinkPartitionPostingOffset = -1;
807         // seek..
808         lastKeyLinkPosting.setNextKeyPostingPartition(currentKeyLinkPostingPartition.getPartitionId());
809         lastKeyLinkPosting.setNextKeyPostingPartitionOffset(currentKeyPostingPartitionOffset);
810         KeysPartition lastKeyLinkPartition;
811         while (true) {
812           lastKeyLinkPartitionPostingOffset = lastKeyLinkPosting.getNextKeyPostingPartitionOffset();
813           lastKeyLinkPartition = accessor.getKeysPartition(lastKeyLinkPosting.getNextKeyPostingPartition());
814           lastKeyLinkPartition.readPosting(lastKeyLinkPosting, lastKeyLinkPosting.getNextKeyPostingPartitionOffset());
815           if (lastKeyLinkPosting.getNextKeyPostingPartition() < 0) {
816             break;
817           }
818         }
819 
820         lastKeyLinkPosting.setNextKeyPostingPartition(currentKeyLinkPostingPartition.getPartitionId());
821         lastKeyLinkPosting.setNextKeyPostingPartitionOffset(currentHashCodePostingPartitionOffset);
822         lastKeyLinkPartition.writePosting(lastKeyLinkPosting, lastKeyLinkPartitionPostingOffset);
823 
824         currentKeyLinkPosting.setNextKeyPostingPartition(-1);
825         currentKeyLinkPosting.setNextKeyPostingPartitionOffset(-1);
826         currentKeyLinkPostingPartition.writePosting(currentKeyLinkPosting, currentHashCodePostingPartitionOffset);
827 
828         // todo handle internal break downs, attempt rollbacks et c
829       }
830 
831 
832       //
833       // link the previous posting with what ever the deleted posting was linked to.
834       //
835 
836       if (previousKeyPostingPartition != null) {
837         previousKeyLinkPosting.setNextKeyPostingPartition(currentKeyLinkPosting.getNextKeyPostingPartition());
838         previousKeyLinkPosting.setNextKeyPostingPartitionOffset(currentKeyLinkPosting.getNextKeyPostingPartitionOffset());
839         previousKeyPostingPartition.writePosting(previousKeyLinkPosting, previousKeyPostingPartitionOffset);
840       }
841 
842     }
843 
844 
845     // mark old key as deleted
846     currentKeyLinkPostingPartition.markPostingAsDeleted(currentKeyPostingPartitionOffset, revision);
847 
848 
849     // read the old value
850     ValuesPartition.Posting valuePosting = new ValuesPartition.Posting();
851 
852     ValuesPartition oldValuePartition = accessor.getValuesPartition(currentKeyLinkPosting.getValuePostingPartition());
853     oldValuePartition.readPosting(valuePosting, currentKeyLinkPosting.getValuePostingPartitionOffset());
854     byte[] oldValue;
855     if (valuePosting.getBytesLength() > 0) {
856       oldValue = valuePosting.getBytes();
857     } else {
858       oldValue = null;
859     }
860 
861     // mark the old value as deleted
862     oldValuePartition.markPostingAsDeleted(currentKeyLinkPosting.getValuePostingPartitionOffset(), revision);
863 
864     return oldValue;
865 
866   }
867 
868   public boolean containsKey(Accessor accessor, byte[] key, long hashCode) throws IOException {
869     return containsKey(accessor, key, hashCode, Long.MAX_VALUE);
870   }
871 
872   public boolean containsKey(Accessor accessor, byte[] key, long hashCode, long revision) throws IOException {
873 
874     validateKey(key);
875 
876     Hashtable hashtable = accessor.getHashtable();
877     Hashtable.Posting hashtablePosting = new Hashtable.Posting();
878     hashtable.readPosting(hashtablePosting, hashtable.calculateHashCodePostingOffset(hashCode));
879 
880     //
881     // seek to the correct hash code posting
882     //
883 
884     HashCodesPartition.Posting hashCodePosting = new HashCodesPartition.Posting();
885 
886     HashCodesPartition hashCodesPartition = accessor.getHashCodesPartition(hashtablePosting.getHashCodePostingPartition());
887     hashCodesPartition.readPosting(hashCodePosting, hashtablePosting.getHashCodePostingPartitionOffset());
888 
889 
890     if (hashCodePosting.getFlag() == Posting.FLAG_NEVER_USED) {
891       return false;
892     }
893 
894     while (true) {
895 
896       if (hashCode == hashCodePosting.getKeyHashCode()
897           && hashCodePosting.getFlagForRevision(revision) == Posting.FLAG_IN_USE) {
898         break;
899       }
900 
901       if (hashCodePosting.getNextPostingPartition() < 0) {
902         // no more hash code links in chain
903         return false;
904       }
905       if (hashCodePosting.getNextPostingPartition() != hashCodesPartition.getPartitionId()) {
906         hashCodesPartition = accessor.getHashCodesPartition(hashCodePosting.getNextPostingPartition());
907       }
908       hashCodesPartition.readPosting(hashCodePosting, hashCodePosting.getNextPostingPartitionOffset());
909     }
910 
911     //
912     // seek to the correct key posting
913     //
914 
915     KeysPartition.Posting keyPosting = new KeysPartition.Posting();
916 
917     KeysPartition keysPartition = accessor.getKeysPartition(hashCodePosting.getFirstKeyPostingPartition());
918     keysPartition.readPosting(keyPosting, hashCodePosting.getFirstKeyPostingPartitionOffset());
919     while (true) {
920       if (keyPosting.getFlagForRevision(revision) == Posting.FLAG_IN_USE
921           && keyPosting.getCreatedRevision() <= revision
922           && Arrays.equals(key, keyPosting.getBytes())) {
923         break;
924       }
925       if (keyPosting.getNextKeyPostingPartition() < 0) {
926         return false;
927       }
928       if (keyPosting.getNextKeyPostingPartition() != keysPartition.getPartitionId()) {
929         keysPartition = accessor.getKeysPartition(keyPosting.getNextKeyPostingPartition());
930       }
931       keysPartition.readPosting(keyPosting, keyPosting.getNextKeyPostingPartitionOffset());
932     }
933 
934     return true;
935 
936   }
937 
938   public Cursor values() {
939     // todo iterate all value partitions
940     throw new UnsupportedOperationException();
941   }
942 
943   public Cursor keys() {
944     // todo iterate all key partitions
945     throw new UnsupportedOperationException();
946 
947   }
948 
949   /**
950    * Removes all deleted postings from the store
951    * Write locking.
952    */
953   public void optimize() {
954     throw new UnsupportedOperationException();
955 
956   }
957 
958   /**
959    * Rehashes the hash table file.
960    *
961    * @param accessor
962    * @param resolution resolution in number of hashtable postings in new hashtable file.
963    * @throws IOException
964    */
965   public void rehash(final Accessor accessor, final int resolution) throws IOException {
966     Lock.With with = new Lock.With(accessor.getStoreWriteLock(), getConfiguration().getLockWaitTimeoutMilliseconds()) {
967       public Object doBody() throws IOException {
968 
969         log.info("Rehashing hashtable capacity ??? -> " + resolution);
970 
971         Metadata metadata = accessor.getMetadata();
972         Metadata.Header mdh = new Metadata.Header();
973         metadata.readHeader(mdh);
974         int topOldHashCodesPartition = mdh.getCurrentHashCodesPartition();
975         mdh.setCurrentHashCodesPartition(mdh.getCurrentHashCodesPartition() + 1);
976         metadata.writeHeader(mdh);
977 
978         Hashtable rehashedTable = new Hashtable(getConfiguration().getDataPath(), mdh.getCurrentHashtableId() + 1, accessor.getAccess(), getConfiguration().getLockFactory());
979         rehashedTable.format((resolution * Hashtable.Posting.POSTING_BYTE_SIZE) + rehashedTable.getHeaderByteSize());
980         rehashedTable.open();
981 
982         Hashtable.Header rehashedTableHeader = new Hashtable.Header();
983         rehashedTableHeader.setPostingsCapacity(resolution);
984         rehashedTable.writeHeader(rehashedTableHeader);
985 
986         // will data a new one!
987         HashCodesPartition rehashCodesPartition = accessor.getHashCodesPartition(mdh.getCurrentHashCodesPartition());
988 
989         Hashtable.Posting rehashedTablePosting = new Hashtable.Posting();
990         HashCodesPartition.Posting rehashCodePosting = new HashCodesPartition.Posting();
991         HashCodesPartition.Header rehashCodeHeader = new HashCodesPartition.Header();
992         rehashCodesPartition.readHeader(rehashCodeHeader);
993 
994         for (int currentOldHashCodePostingsPartitionId = 0; currentOldHashCodePostingsPartitionId <= topOldHashCodesPartition; currentOldHashCodePostingsPartitionId++) {
995           HashCodesPartition currentOldHashCodesPartition = new HashCodesPartition(getConfiguration().getDataPath(), currentOldHashCodePostingsPartitionId, accessor.getAccess(), getConfiguration().getLockFactory());
996           if (currentOldHashCodesPartition.exists()) {
997             currentOldHashCodesPartition.open();
998             HashCodesPartition.Header hcph = new HashCodesPartition.Header();
999             currentOldHashCodesPartition.readHeader(hcph);
1000 
1001             HashCodesPartition.Posting hcpp = new HashCodesPartition.Posting();
1002 
1003             int hcpStartOffset = currentOldHashCodesPartition.getHeaderByteSize();
1004             while (hcpStartOffset < hcph.getNextPostingOffset()) {
1005               currentOldHashCodesPartition.readPosting(hcpp, hcpStartOffset);
1006               hcpStartOffset += hcpp.getPostingByteSize();
1007 
1008               if (hcpp.getFlag() == (byte) 2) {
1009                 continue;
1010               } else if (hcpp.getFlag() == (byte) 0) {
1011                 break;
1012               } else if (hcpp.getFlag() != (byte) 1) {
1013                 log.warn("Unknown flag at offset " + hcpStartOffset + ": " + hcpp);
1014               }
1015 
1016               //
1017               // insert hashcode posting in rehashedtable and rehashcodes
1018               //
1019               rehash(accessor, hcpp, rehashedTable, rehashedTablePosting, rehashCodesPartition, rehashCodeHeader, rehashCodePosting);
1020 
1021               //
1022               // update rehashed codes partition header
1023               //
1024               rehashCodeHeader.setNextPostingOffset(rehashCodeHeader.getNextPostingOffset() + rehashCodePosting.getPostingByteSize());
1025               rehashCodeHeader.setBytesLeft(rehashCodeHeader.getBytesLeft() - rehashCodePosting.getPostingByteSize());
1026 
1027               // todo if there is no space for yet another rehased code in the partition, create new partition.
1028               if (rehashCodeHeader.getBytesLeft() < rehashCodePosting.getPostingByteSize()) {
1029                 System.currentTimeMillis();
1030               }
1031 
1032             }
1033 
1034             currentOldHashCodesPartition.close();
1035           }
1036         }
1037 
1038 
1039         // write meta data header so the new hash table will be used
1040         metadata.readHeader(mdh);
1041         mdh.setCurrentHashtableId(rehashedTable.getVersionId());
1042         accessor.getMetadata().writeHeader(mdh);
1043 
1044         return null;
1045       }
1046     };
1047     with.run();
1048   }
1049 
1050   public Configuration getConfiguration() {
1051     return configuration;
1052   }
1053 
1054 
1055   /**
1056    * @param accessor
1057    * @param hashCodePosting      posting to add to rehashed table
1058    * @param rehashedtable        rehaseed table
1059    * @param rehashedtablePosting reusable posting
1060    * @param rehashCodePartition  rehashed codes partition
1061    * @param rehashCodeHeader     current header
1062    * @param rehashCodePosting    reusable posting
1063    * @throws IOException
1064    */
1065   private void rehash(Accessor accessor,
1066                       HashCodesPartition.Posting hashCodePosting,
1067                       Hashtable rehashedtable, Hashtable.Posting rehashedtablePosting,
1068                       HashCodesPartition rehashCodePartition, HashCodesPartition.Header rehashCodeHeader, HashCodesPartition.Posting rehashCodePosting) throws IOException {
1069     //
1070     // find hashcode posting and hashtable posting for the new key
1071     //
1072 
1073     int rehashedtablePostingOffset = rehashedtable.calculateHashCodePostingOffset(hashCodePosting.getKeyHashCode());
1074     rehashedtable.readPosting(rehashedtablePosting, rehashedtablePostingOffset);
1075     if (rehashedtablePosting.getFlag() != (byte) 1) {
1076 
1077       // this is the first time we create a posting at this hashtable position
1078       // that means there is no hash code posting either
1079 
1080       rehashCodePosting.setFlag((byte) 1);
1081       rehashCodePosting.setFirstKeyPostingPartition(hashCodePosting.getFirstKeyPostingPartition());
1082       rehashCodePosting.setFirstKeyPostingPartitionOffset(hashCodePosting.getFirstKeyPostingPartitionOffset());
1083       rehashCodePosting.setKeyHashCode(hashCodePosting.getKeyHashCode());
1084       rehashCodePosting.setNextPostingPartition(-1);
1085       rehashCodePosting.setNextPostingPartitionOffset(-1);
1086       rehashCodePosting.setCreatedRevision(hashCodePosting.getCreatedRevision());
1087       rehashCodePosting.setDeletedRevision(hashCodePosting.getDeletedRevision());
1088 
1089       rehashCodePartition.writePosting(rehashCodePosting, rehashCodeHeader.getNextPostingOffset());
1090 
1091       // update hashtable posting
1092 
1093       rehashedtablePosting.setFlag((byte) 1);
1094       rehashedtablePosting.setHashCodePostingPartition(rehashCodePartition.getPartitionId());
1095       rehashedtablePosting.setHashCodePostingPartitionOffset(rehashCodeHeader.getNextPostingOffset());
1096       rehashedtable.writePosting(rehashedtablePosting, rehashedtablePostingOffset);
1097 
1098       return;
1099 
1100     } else {
1101 
1102       // there is a hashtable posting at the position for this hash code
1103 
1104       //
1105       // seek to the correct hash code posting
1106       //
1107 
1108       HashCodesPartition currentRehashedCodesPartition = rehashCodePartition;
1109       int currentRehashCodesPostingPartitionOffset = rehashedtablePosting.getHashCodePostingPartitionOffset();
1110       rehashCodePartition.readPosting(rehashCodePosting, rehashedtablePosting.getHashCodePostingPartitionOffset());
1111       while (hashCodePosting.getKeyHashCode() != rehashCodePosting.getKeyHashCode()) {
1112         if (rehashCodePosting.getNextPostingPartition() < 0) {
1113 
1114           // there is no hash code posting matching this hash code.
1115 
1116           //
1117           //  create new hash code posting
1118           //
1119 
1120           HashCodesPartition.Posting newHashCodePosting = new HashCodesPartition.Posting();
1121           newHashCodePosting.setFlag((byte) 1);
1122           newHashCodePosting.setKeyHashCode(hashCodePosting.getKeyHashCode());
1123           newHashCodePosting.setNextPostingPartition(-1);
1124           newHashCodePosting.setNextPostingPartitionOffset(-1);
1125           newHashCodePosting.setFirstKeyPostingPartition(hashCodePosting.getFirstKeyPostingPartition());
1126           newHashCodePosting.setFirstKeyPostingPartitionOffset(hashCodePosting.getFirstKeyPostingPartitionOffset());
1127 
1128           rehashCodePartition.writePosting(newHashCodePosting, rehashCodeHeader.getNextPostingOffset());
1129 
1130 
1131           //
1132           // and update the current posting to point at the new one as next in chain.
1133           //
1134 
1135           rehashCodePosting.setNextPostingPartition(rehashCodePartition.getPartitionId());
1136           rehashCodePosting.setNextPostingPartitionOffset(rehashCodeHeader.getNextPostingOffset());
1137           currentRehashedCodesPartition.writePosting(rehashCodePosting, currentRehashCodesPostingPartitionOffset);
1138 
1139           return;
1140 
1141         }
1142         if (rehashCodePosting.getNextPostingPartition() != currentRehashedCodesPartition.getPartitionId()) {
1143           currentRehashedCodesPartition = accessor.getHashCodesPartition(rehashCodePosting.getNextPostingPartition());
1144         }
1145         currentRehashCodesPostingPartitionOffset = rehashCodePosting.getNextPostingPartitionOffset();
1146         currentRehashedCodesPartition.readPosting(rehashCodePosting, rehashCodePosting.getNextPostingPartitionOffset());
1147       }
1148 
1149     }
1150 
1151 
1152   }
1153 
1154   public void setSequenceManager(SequenceManager sequenceManager) {
1155     this.sequenceManager = sequenceManager;
1156   }
1157 }