1 | /* |
2 | |
3 | Derby - Class org.apache.derby.impl.store.access.sort.MergeSort |
4 | |
5 | Copyright 1997, 2004 The Apache Software Foundation or its licensors, as applicable. |
6 | |
7 | Licensed under the Apache License, Version 2.0 (the "License"); |
8 | you may not use this file except in compliance with the License. |
9 | You may obtain a copy of the License at |
10 | |
11 | http://www.apache.org/licenses/LICENSE-2.0 |
12 | |
13 | Unless required by applicable law or agreed to in writing, software |
14 | distributed under the License is distributed on an "AS IS" BASIS, |
15 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | See the License for the specific language governing permissions and |
17 | limitations under the License. |
18 | |
19 | */ |
20 | |
21 | package org.apache.derby.impl.store.access.sort; |
22 | |
23 | import org.apache.derby.iapi.reference.SQLState; |
24 | |
25 | import org.apache.derby.iapi.services.io.FormatableBitSet; |
26 | |
27 | import org.apache.derby.iapi.services.io.Storable; |
28 | import org.apache.derby.iapi.services.sanity.SanityManager; |
29 | import org.apache.derby.iapi.error.StandardException; |
30 | import org.apache.derby.iapi.store.access.conglomerate.ScanControllerRowSource; |
31 | import org.apache.derby.iapi.store.access.conglomerate.Sort; |
32 | import org.apache.derby.iapi.store.access.conglomerate.SortFactory; |
33 | import org.apache.derby.iapi.store.access.conglomerate.TransactionManager; |
34 | import org.apache.derby.iapi.types.CloneableObject; |
35 | import org.apache.derby.iapi.store.access.ColumnOrdering; |
36 | import org.apache.derby.iapi.store.access.ConglomerateController; |
37 | import org.apache.derby.iapi.store.access.Qualifier; |
38 | import org.apache.derby.iapi.store.access.RowUtil; |
39 | import org.apache.derby.iapi.store.access.ScanController; |
40 | import org.apache.derby.iapi.store.access.SortObserver; |
41 | import org.apache.derby.iapi.store.access.SortController; |
42 | import org.apache.derby.iapi.store.access.TransactionController; |
43 | |
44 | import org.apache.derby.iapi.store.raw.StreamContainerHandle; |
45 | import org.apache.derby.iapi.store.raw.RawStoreFactory; |
46 | import org.apache.derby.iapi.store.raw.Transaction; |
47 | |
48 | import org.apache.derby.iapi.types.DataValueDescriptor; |
49 | |
50 | import org.apache.derby.iapi.types.Orderable; |
51 | import org.apache.derby.iapi.types.RowLocation; |
52 | |
53 | import java.util.Enumeration; |
54 | import java.util.Properties; |
55 | import java.util.Vector; |
56 | |
57 | /** |
58 | |
59 | A sort implementation which does the sort in-memory if it can, |
60 | but which can do an external merge sort so that it can sort an |
61 | arbitrary number of rows. |
62 | |
63 | **/ |
64 | |
65 | public final class MergeSort implements Sort |
66 | { |
67 | |
68 | /* |
69 | * Fields |
70 | */ |
71 | |
72 | /** |
73 | **/ |
74 | static final int STATE_CLOSED = 0; |
75 | |
76 | /** |
77 | **/ |
78 | static final int STATE_INITIALIZED = 1; |
79 | |
80 | /** |
81 | **/ |
82 | static final int STATE_INSERTING = 2; |
83 | |
84 | /** |
85 | **/ |
86 | static final int STATE_DONE_INSERTING = 3; |
87 | |
88 | /** |
89 | **/ |
90 | static final int STATE_SCANNING = 4; |
91 | |
92 | /** |
93 | **/ |
94 | static final int STATE_DONE_SCANNING = 5; |
95 | |
96 | /** |
97 | Maintains the current state of the sort as defined in |
98 | the preceding values. Sorts start off and end up closed. |
99 | **/ |
100 | protected int state = STATE_CLOSED; |
101 | |
102 | /** |
103 | The template as passed in on create. Valid when the state |
104 | is INITIALIZED through SCANNING, null otherwise. |
105 | **/ |
106 | protected DataValueDescriptor[] template; |
107 | |
108 | /** |
109 | The column ordering as passed in on create. Valid when |
110 | the state is INITIALIZED through SCANNING, null otherwise. |
111 | May be null if there is no column ordering - this means |
112 | that all rows are considered to be duplicates, and the |
113 | sort will only emit a single row. |
114 | **/ |
115 | protected ColumnOrdering columnOrdering[]; |
116 | |
117 | /** |
118 | A lookup table to speed up lookup of a column associated with the i'th |
119 | column to compare. To find the column id to compare as the i'th column |
120 | look in columnOrderingMap[i]. |
121 | **/ |
122 | protected int columnOrderingMap[]; |
123 | |
124 | /** |
125 | A lookup table to speed up lookup of Ascending state of a column, |
126 | **/ |
127 | protected boolean columnOrderingAscendingMap[]; |
128 | |
129 | /** |
130 | The sort observer. May be null. Used as a callback. |
131 | **/ |
132 | protected SortObserver sortObserver; |
133 | |
134 | /** |
135 | Whether the rows are expected to be in order on insert, |
136 | as passed in on create. |
137 | **/ |
138 | protected boolean alreadyInOrder; |
139 | |
140 | /** |
141 | The inserter that's being used to insert rows into the sort. |
142 | This field is only valid when the state is INSERTING. |
143 | **/ |
144 | protected MergeInserter inserter = null; |
145 | |
146 | /** |
147 | The scan that's being used to return rows from the sort. |
148 | This field is only valid when the state is SCANNING. |
149 | **/ |
150 | protected Scan scan = null; |
151 | |
152 | /** |
153 | A vector of merge runs, produced by the MergeInserter. |
154 | Might be null if no merge runs were produced. |
155 | It is a vector of container ids. |
156 | **/ |
157 | protected Vector mergeRuns = null; |
158 | |
159 | /** |
160 | An ordered set of the leftover rows that didn't go |
161 | in the last merge run (might be all the rows if there |
162 | are no merge runs). |
163 | **/ |
164 | protected SortBuffer sortBuffer = null; |
165 | |
166 | /** |
167 | The maximum number of entries a sort buffer can hold. |
168 | **/ |
169 | protected int sortBufferMax; |
170 | |
171 | /** |
172 | The minimum number of entries a sort buffer can hold. |
173 | **/ |
174 | protected int sortBufferMin; |
175 | |
176 | /** |
177 | Properties for mergeSort |
178 | **/ |
179 | static Properties properties = null; |
180 | |
181 | /** |
182 | Static initializer for MergeSort, to initialize once the properties |
183 | for the sortBuffer. |
184 | **/ |
185 | static |
186 | { |
187 | properties = new Properties(); |
188 | properties.put(RawStoreFactory.STREAM_FILE_BUFFER_SIZE_PARAMETER, "16384"); |
189 | } |
190 | |
191 | /* |
192 | * Methods of Sort |
193 | */ |
194 | |
195 | /** |
196 | Open a sort controller. |
197 | <p> |
198 | This implementation only supports a single sort controller |
199 | per sort. |
200 | @see Sort#open |
201 | **/ |
202 | public SortController open(TransactionManager tran) |
203 | throws StandardException |
204 | { |
205 | if (SanityManager.DEBUG) |
206 | SanityManager.ASSERT(state == STATE_INITIALIZED); |
207 | |
208 | // Ready to start inserting rows. |
209 | state = STATE_INSERTING; |
210 | |
211 | // Create and initialize an inserter. When the caller |
212 | // closes it, it will call back to inserterIsClosed(). |
213 | this.inserter = new MergeInserter(); |
214 | if (this.inserter.initialize(this, tran) == false) |
215 | { |
216 | throw StandardException.newException(SQLState.SORT_COULD_NOT_INIT); |
217 | } |
218 | |
219 | return this.inserter; |
220 | } |
221 | |
222 | /** |
223 | Open a scan controller. |
224 | @see Sort#openSortScan |
225 | **/ |
226 | |
227 | public ScanController openSortScan( |
228 | TransactionManager tran, |
229 | boolean hold) |
230 | throws StandardException |
231 | { |
232 | if (SanityManager.DEBUG) |
233 | SanityManager.ASSERT(state == STATE_DONE_INSERTING); |
234 | |
235 | if (mergeRuns == null || mergeRuns.size() == 0) |
236 | { |
237 | // There were no merge runs so we can just return |
238 | // the rows from the sort buffer. |
239 | scan = new SortBufferScan(this, tran, sortBuffer, hold); |
240 | |
241 | // The scan now owns the sort buffer |
242 | sortBuffer = null; |
243 | } |
244 | else |
245 | { |
246 | // Dump the rows in the sort buffer to a merge run. |
247 | long containerId = createMergeRun(tran, sortBuffer); |
248 | mergeRuns.addElement(new Long(containerId)); |
249 | |
250 | // If there are more merge runs than we can sort |
251 | // at once with our sort buffer, we have to reduce |
252 | // the number of merge runs |
253 | if (mergeRuns.size() > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN || |
254 | mergeRuns.size() > sortBuffer.capacity()) |
255 | multiStageMerge(tran); |
256 | |
257 | // There are now few enough merge runs to sort |
258 | // at once, so create a scan for them. |
259 | MergeScan mscan = |
260 | new MergeScan( |
261 | this, tran, sortBuffer, mergeRuns, sortObserver, hold); |
262 | |
263 | if (!mscan.init(tran)) |
264 | { |
265 | throw StandardException.newException( |
266 | SQLState.SORT_COULD_NOT_INIT); |
267 | } |
268 | scan = mscan; |
269 | |
270 | // The scan now owns the sort buffer and merge runs. |
271 | sortBuffer = null; |
272 | mergeRuns = null; |
273 | } |
274 | |
275 | // Ready to start retrieving rows. |
276 | this.state = STATE_SCANNING; |
277 | |
278 | return scan; |
279 | } |
280 | |
281 | /** |
282 | Open a row source to get rows out of the sorter. |
283 | @see Sort#openSortRowSource |
284 | **/ |
285 | public ScanControllerRowSource openSortRowSource(TransactionManager tran) |
286 | throws StandardException |
287 | { |
288 | if (SanityManager.DEBUG) |
289 | SanityManager.ASSERT(state == STATE_DONE_INSERTING); |
290 | |
291 | ScanControllerRowSource rowSource = null; |
292 | |
293 | if (mergeRuns == null || mergeRuns.size() == 0) |
294 | { |
295 | // There were no merge runs so we can just return |
296 | // the rows from the sort buffer. |
297 | scan = new SortBufferRowSource(sortBuffer, tran, sortObserver, false, sortBufferMax); |
298 | rowSource = (ScanControllerRowSource)scan; |
299 | |
300 | // The scan now owns the sort buffer |
301 | sortBuffer = null; |
302 | } |
303 | else |
304 | { |
305 | // Dump the rows in the sort buffer to a merge run. |
306 | long containerId = createMergeRun(tran, sortBuffer); |
307 | mergeRuns.addElement(new Long(containerId)); |
308 | |
309 | // If there are more merge runs than we can sort |
310 | // at once with our sort buffer, we have to reduce |
311 | // the number of merge runs |
312 | if (mergeRuns.size() > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN || |
313 | mergeRuns.size() > sortBuffer.capacity()) |
314 | multiStageMerge(tran); |
315 | |
316 | // There are now few enough merge runs to sort |
317 | // at once, so create a rowSource for them. |
318 | MergeScanRowSource msRowSource = |
319 | new MergeScanRowSource(this, tran, sortBuffer, mergeRuns, sortObserver, false); |
320 | if (!msRowSource.init(tran)) |
321 | { |
322 | throw StandardException.newException( |
323 | SQLState.SORT_COULD_NOT_INIT); |
324 | } |
325 | scan = msRowSource; |
326 | rowSource = msRowSource; |
327 | |
328 | // The scan now owns the sort buffer and merge runs. |
329 | sortBuffer = null; |
330 | mergeRuns = null; |
331 | } |
332 | |
333 | // Ready to start retrieving rows. |
334 | this.state = STATE_SCANNING; |
335 | |
336 | return rowSource; |
337 | } |
338 | |
339 | |
340 | |
341 | /** |
342 | Drop the sort. |
343 | @see Sort#drop |
344 | **/ |
345 | public void drop(TransactionController tran) |
346 | throws StandardException |
347 | { |
348 | // Make sure the inserter is closed. Note this |
349 | // will cause the callback to doneInserting() |
350 | // which will give us any in-progress merge |
351 | // runs, if there are any. |
352 | if (inserter != null) |
353 | inserter.close(); |
354 | inserter = null; |
355 | |
356 | // Make sure the scan is closed, if there is one. |
357 | // This will cause the callback to doneScanning(). |
358 | if (scan != null) |
359 | { |
360 | scan.close(); |
361 | scan = null; |
362 | } |
363 | |
364 | // If we have a row set, get rid of it. |
365 | if (sortBuffer != null) |
366 | { |
367 | sortBuffer.close(); |
368 | sortBuffer = null; |
369 | } |
370 | |
371 | // Clean out the rest of the objects. |
372 | template = null; |
373 | columnOrdering = null; |
374 | sortObserver = null; |
375 | |
376 | // If there are any merge runs, drop them. |
377 | dropMergeRuns((TransactionManager)tran); |
378 | |
379 | // Whew! |
380 | state = STATE_CLOSED; |
381 | } |
382 | |
383 | |
384 | /* |
385 | * Methods of MergeSort. Arranged alphabetically. |
386 | */ |
387 | |
388 | /** |
389 | Check the column ordering against the template, making |
390 | sure that each column is present in the template, |
391 | implements Orderable, and is not mentioned more than |
392 | once. Intended to be called as part of a sanity check. |
393 | **/ |
394 | protected boolean checkColumnOrdering( |
395 | DataValueDescriptor[] template, |
396 | ColumnOrdering columnOrdering[]) |
397 | { |
398 | // Allocate an array to check that each column mentioned only once. |
399 | int templateNColumns = template.length; |
400 | boolean seen[] = new boolean[templateNColumns]; |
401 | |
402 | // Check each column ordering. |
403 | for (int i = 0; i < columnOrdering.length; i++) |
404 | { |
405 | int colid = columnOrdering[i].getColumnId(); |
406 | |
407 | // Check that the column id is valid. |
408 | if (colid < 0 || colid >= templateNColumns) |
409 | return false; |
410 | |
411 | // Check that the column isn't mentioned more than once. |
412 | if (seen[colid]) |
413 | return false; |
414 | seen[colid] = true; |
415 | |
416 | Object columnVal = |
417 | RowUtil.getColumn(template, (FormatableBitSet) null, colid); |
418 | |
419 | if (!(columnVal instanceof Orderable)) |
420 | return false; |
421 | } |
422 | |
423 | return true; |
424 | } |
425 | |
426 | /** |
427 | Check that the columns in the row agree with the columns |
428 | in the template, both in number and in type. |
429 | <p> |
430 | XXX (nat) Currently checks that the classes implementing |
431 | each column are the same -- is this right? |
432 | **/ |
433 | void checkColumnTypes(DataValueDescriptor[] row) |
434 | throws StandardException |
435 | { |
436 | int nCols = row.length; |
437 | if (template.length != nCols) |
438 | { |
439 | if (SanityManager.DEBUG) |
440 | { |
441 | SanityManager.THROWASSERT( |
442 | "template.length (" + template.length + |
443 | ") expected to be = to nCols (" + |
444 | nCols + ")"); |
445 | } |
446 | throw StandardException.newException( |
447 | SQLState.SORT_TYPE_MISMATCH); |
448 | } |
449 | |
450 | if (SanityManager.DEBUG) |
451 | { |
452 | for (int colid = 0; colid < nCols; colid++) |
453 | { |
454 | Object col1 = row[colid]; |
455 | Object col2 = template[colid]; |
456 | if (col1 == null) |
457 | { |
458 | SanityManager.THROWASSERT( |
459 | "col[" + colid + "] is null"); |
460 | } |
461 | |
462 | if (!(col1 instanceof CloneableObject)) |
463 | { |
464 | SanityManager.THROWASSERT( |
465 | "col[" + colid + "] (" +col1.getClass().getName()+ |
466 | ") is not a CloneableObject."); |
467 | } |
468 | |
469 | if (col1.getClass() != col2.getClass()) |
470 | { |
471 | SanityManager.THROWASSERT( |
472 | "col1.getClass() (" + col1.getClass() + |
473 | ") expected to be the same as col2.getClass() (" + |
474 | col2.getClass() + ")"); |
475 | } |
476 | } |
477 | } |
478 | } |
479 | |
480 | int compare( |
481 | DataValueDescriptor[] r1, |
482 | DataValueDescriptor[] r2) |
483 | throws StandardException |
484 | { |
485 | // Get the number of columns we have to compare. |
486 | int colsToCompare = this.columnOrdering.length; |
487 | int r; |
488 | |
489 | // Compare the columns specified in the column |
490 | // ordering array. |
491 | for (int i = 0; i < colsToCompare; i++) |
492 | { |
493 | // Get columns to compare. |
494 | int colid = this.columnOrderingMap[i]; |
495 | |
496 | // If the columns don't compare equal, we're done. |
497 | // Return the sense of the comparison. |
498 | if ((r = r1[colid].compare(r2[colid])) |
499 | != 0) |
500 | { |
501 | if (this.columnOrderingAscendingMap[i]) |
502 | return r; |
503 | else |
504 | return -r; |
505 | } |
506 | } |
507 | |
508 | // We made it through all the columns, and they must have |
509 | // all compared equal. So return that the rows compare equal. |
510 | return 0; |
511 | } |
512 | |
513 | /** |
514 | Go from the CLOSED to the INITIALIZED state. |
515 | **/ |
516 | public void initialize( |
517 | DataValueDescriptor[] template, |
518 | ColumnOrdering columnOrdering[], |
519 | SortObserver sortObserver, |
520 | boolean alreadyInOrder, |
521 | long estimatedRows, |
522 | int sortBufferMax) |
523 | throws StandardException |
524 | { |
525 | if (SanityManager.DEBUG) |
526 | { |
527 | SanityManager.ASSERT(state == STATE_CLOSED); |
528 | } |
529 | |
530 | // Make sure the column ordering makes sense |
531 | if (SanityManager.DEBUG) |
532 | { |
533 | SanityManager.ASSERT(checkColumnOrdering(template, columnOrdering), |
534 | "column ordering error"); |
535 | } |
536 | |
537 | // Set user-defined parameters. |
538 | this.template = template; |
539 | this.columnOrdering = columnOrdering; |
540 | this.sortObserver = sortObserver; |
541 | this.alreadyInOrder = alreadyInOrder; |
542 | |
543 | // Cache results of columnOrdering calls, results are not allowed |
544 | // to change throughout a sort. |
545 | columnOrderingMap = new int[columnOrdering.length]; |
546 | columnOrderingAscendingMap = new boolean[columnOrdering.length]; |
547 | for (int i = 0; i < columnOrdering.length; i++) |
548 | { |
549 | columnOrderingMap[i] = columnOrdering[i].getColumnId(); |
550 | columnOrderingAscendingMap[i] = columnOrdering[i].getIsAscending(); |
551 | } |
552 | |
553 | // No inserter or scan yet. |
554 | this.inserter = null; |
555 | this.scan = null; |
556 | |
557 | // We don't have any merge runs. |
558 | this.mergeRuns = null; |
559 | this.sortBuffer = null; |
560 | this.sortBufferMax = sortBufferMax; |
561 | |
562 | if (estimatedRows > sortBufferMax) |
563 | sortBufferMin = sortBufferMax; |
564 | else |
565 | sortBufferMin = (int)estimatedRows; |
566 | if (SanityManager.DEBUG) |
567 | { |
568 | if (SanityManager.DEBUG_ON("testSort")) |
569 | sortBufferMin = sortBufferMax; |
570 | } |
571 | |
572 | this.state = STATE_INITIALIZED; |
573 | } |
574 | |
575 | /** |
576 | An inserter is closing. |
577 | **/ |
578 | void doneInserting(MergeInserter inserter, |
579 | SortBuffer sortBuffer, Vector mergeRuns) |
580 | { |
581 | if (SanityManager.DEBUG) |
582 | { |
583 | SanityManager.ASSERT(state == STATE_INSERTING); |
584 | } |
585 | |
586 | this.sortBuffer = sortBuffer; |
587 | this.mergeRuns = mergeRuns; |
588 | this.inserter = null; |
589 | |
590 | this.state = STATE_DONE_INSERTING; |
591 | } |
592 | |
593 | void doneScanning(Scan scan, SortBuffer sortBuffer) |
594 | { |
595 | if (SanityManager.DEBUG) |
596 | { |
597 | // Make sure the scan we're getting back is the one we gave out |
598 | |
599 | if (this.scan != scan) |
600 | SanityManager.THROWASSERT("this.scan = " + this.scan |
601 | + " scan = " + scan); |
602 | } |
603 | |
604 | this.sortBuffer = sortBuffer; |
605 | this.scan = null; |
606 | |
607 | this.state = STATE_DONE_SCANNING; |
608 | } |
609 | |
610 | void doneScanning(Scan scan, SortBuffer sortBuffer, |
611 | Vector mergeRuns) |
612 | { |
613 | this.mergeRuns = mergeRuns; |
614 | |
615 | doneScanning(scan, sortBuffer); |
616 | } |
617 | |
618 | |
619 | /** |
620 | Get rid of the merge runs, if there are any. |
621 | Must not cause any errors because it's called |
622 | during error processing. |
623 | **/ |
624 | void dropMergeRuns(TransactionManager tran) |
625 | { |
626 | if (mergeRuns != null) |
627 | { |
628 | Enumeration e = mergeRuns.elements(); |
629 | |
630 | try |
631 | { |
632 | Transaction rawTran = tran.getRawStoreXact(); |
633 | long segmentId = StreamContainerHandle.TEMPORARY_SEGMENT; |
634 | |
635 | while (e.hasMoreElements()) |
636 | { |
637 | long containerId = ((Long) e.nextElement()).longValue(); |
638 | rawTran.dropStreamContainer(segmentId, containerId); |
639 | } |
640 | } |
641 | catch (StandardException se) |
642 | { |
643 | // Ignore problems with dropping, worst case |
644 | // the raw store will clean up at reboot. |
645 | } |
646 | mergeRuns = null; |
647 | } |
648 | } |
649 | |
650 | /* DEBUG (nat) |
651 | void printRunInfo(TransactionController tran) |
652 | throws StandardException |
653 | { |
654 | java.util.Enumeration e = mergeRuns.elements(); |
655 | while (e.hasMoreElements()) |
656 | { |
657 | long conglomid = ((Long) e.nextElement()).longValue(); |
658 | ScanController sc = tran.openScan(conglomid, false, |
659 | false, null, null, 0, null, |
660 | null, 0); |
661 | System.out.println("Merge run: conglomid=" + conglomid); |
662 | while (sc.next()) |
663 | { |
664 | sc.fetch(template); |
665 | System.out.println(template); |
666 | } |
667 | sc.close(); |
668 | } |
669 | } |
670 | */ |
671 | |
672 | private void multiStageMerge(TransactionManager tran) |
673 | throws StandardException |
674 | { |
675 | Enumeration e; |
676 | //int iterations = 0; // DEBUG (nat) |
677 | int maxMergeRuns = sortBuffer.capacity(); |
678 | |
679 | if (maxMergeRuns > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN) |
680 | maxMergeRuns = ExternalSortFactory.DEFAULT_MAX_MERGE_RUN; |
681 | |
682 | Vector subset; |
683 | Vector leftovers; |
684 | |
685 | while (mergeRuns.size() > maxMergeRuns) |
686 | { |
687 | // Move maxMergeRuns elements from the merge runs |
688 | // vector into a subset, leaving the rest. |
689 | subset = new Vector(maxMergeRuns); |
690 | leftovers = new Vector(mergeRuns.size() - maxMergeRuns); |
691 | e = mergeRuns.elements(); |
692 | while (e.hasMoreElements()) |
693 | { |
694 | Long containerId = (Long) e.nextElement(); |
695 | if (subset.size() < maxMergeRuns) |
696 | subset.addElement(containerId); |
697 | else |
698 | leftovers.addElement(containerId); |
699 | } |
700 | |
701 | /* DEBUG (nat) |
702 | iterations++; |
703 | System.out.println(subset.size() + " elements in subset"); |
704 | System.out.println(leftovers.size() + " elements in leftovers"); |
705 | System.out.println(mergeRuns.size() + " elements in mergeRuns"); |
706 | System.out.println("maxMergeRuns is " + maxMergeRuns); |
707 | System.out.println("iterations = " + iterations); |
708 | if (subset.size() == 0) |
709 | { |
710 | System.exit(1); |
711 | } |
712 | */ |
713 | |
714 | mergeRuns = leftovers; |
715 | |
716 | // Open a merge scan on the subset. |
717 | MergeScanRowSource msRowSource = |
718 | new MergeScanRowSource(this, tran, sortBuffer, subset, sortObserver, false); |
719 | |
720 | if (!msRowSource.init(tran)) |
721 | { |
722 | throw StandardException.newException( |
723 | SQLState.SORT_COULD_NOT_INIT); |
724 | } |
725 | |
726 | // Create and open another temporary stream conglomerate |
727 | // which will become |
728 | // a merge run made up with the merged runs from the subset. |
729 | Transaction rawTran = tran.getRawStoreXact(); |
730 | int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT; |
731 | long id = rawTran.addAndLoadStreamContainer(segmentId, |
732 | properties, msRowSource); |
733 | |
734 | mergeRuns.addElement(new Long(id)); |
735 | |
736 | // Drop the conglomerates in the merge subset |
737 | e = subset.elements(); |
738 | while (e.hasMoreElements()) |
739 | { |
740 | Long containerId = (Long) e.nextElement(); |
741 | rawTran.dropStreamContainer(segmentId, containerId.longValue()); |
742 | } |
743 | } |
744 | } |
745 | |
746 | /** |
747 | Remove all the rows from the sort buffer and store them |
748 | in a temporary conglomerate. The temporary conglomerate |
749 | is a "merge run". Returns the container id of the |
750 | merge run. |
751 | **/ |
752 | long createMergeRun(TransactionManager tran, SortBuffer sortBuffer) |
753 | throws StandardException |
754 | { |
755 | // this sort buffer is not a scan and is not tracked by any |
756 | // TransactionManager. |
757 | SortBufferRowSource rowSource = |
758 | new SortBufferRowSource(sortBuffer, (TransactionManager)null, sortObserver, true, sortBufferMax); |
759 | |
760 | // Create a temporary stream conglomerate... |
761 | Transaction rawTran = tran.getRawStoreXact(); // get raw transaction |
762 | int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT; |
763 | long id = rawTran.addAndLoadStreamContainer(segmentId, |
764 | properties, rowSource); |
765 | |
766 | // Don't close the sortBuffer, we just emptied it, the caller may reuse |
767 | // that sortBuffer for the next run. |
768 | rowSource = null; |
769 | |
770 | return id; |
771 | } |
772 | } |