1 | /* |
2 | |
3 | Derby - Class org.apache.derby.impl.store.access.sort.MergeScan |
4 | |
5 | Copyright 1997, 2004 The Apache Software Foundation or its licensors, as applicable. |
6 | |
7 | Licensed under the Apache License, Version 2.0 (the "License"); |
8 | you may not use this file except in compliance with the License. |
9 | You may obtain a copy of the License at |
10 | |
11 | http://www.apache.org/licenses/LICENSE-2.0 |
12 | |
13 | Unless required by applicable law or agreed to in writing, software |
14 | distributed under the License is distributed on an "AS IS" BASIS, |
15 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | See the License for the specific language governing permissions and |
17 | limitations under the License. |
18 | |
19 | */ |
20 | |
21 | package org.apache.derby.impl.store.access.sort; |
22 | |
23 | import java.util.Enumeration; |
24 | import java.util.Vector; |
25 | |
26 | import org.apache.derby.iapi.services.sanity.SanityManager; |
27 | import org.apache.derby.iapi.services.io.Storable; |
28 | import org.apache.derby.iapi.error.StandardException; |
29 | import org.apache.derby.iapi.store.access.conglomerate.TransactionManager; |
30 | import org.apache.derby.iapi.store.access.conglomerate.ScanManager; |
31 | import org.apache.derby.iapi.store.access.ScanController; |
32 | import org.apache.derby.iapi.store.access.SortObserver; |
33 | import org.apache.derby.iapi.store.access.TransactionController; |
34 | import org.apache.derby.iapi.store.raw.StreamContainerHandle; |
35 | import org.apache.derby.iapi.store.raw.Transaction; |
36 | |
37 | import org.apache.derby.iapi.types.DataValueDescriptor; |
38 | |
39 | /** |
40 | A sort scan that is capable of merging as many merge runs |
41 | as will fit in the passed-in sort buffer. |
42 | **/ |
43 | |
44 | public class MergeScan extends SortScan |
45 | { |
46 | /** |
47 | The sort buffer we will use. |
48 | **/ |
49 | protected SortBuffer sortBuffer; |
50 | |
51 | /** |
52 | The merge runs. |
53 | **/ |
54 | protected Vector mergeRuns; |
55 | |
56 | /** |
57 | Array of scan controllers for the merge runs. |
58 | Entries in the array become null as the last |
59 | row is pulled out and the scan is closed. |
60 | **/ |
61 | protected StreamContainerHandle openScans[]; |
62 | |
63 | private SortObserver sortObserver; |
64 | |
65 | /* |
66 | * Constructors. |
67 | */ |
68 | |
69 | MergeScan( |
70 | MergeSort sort, |
71 | TransactionManager tran, |
72 | SortBuffer sortBuffer, |
73 | Vector mergeRuns, |
74 | SortObserver sortObserver, |
75 | boolean hold) |
76 | { |
77 | super(sort, tran, hold); |
78 | this.sortBuffer = sortBuffer; |
79 | this.mergeRuns = mergeRuns; |
80 | this.tran = tran; |
81 | this.sortObserver = sortObserver; |
82 | } |
83 | |
84 | /* |
85 | * Methods of MergeSortScan |
86 | */ |
87 | |
88 | /** |
89 | Move to the next position in the scan. |
90 | @see ScanController#next |
91 | **/ |
92 | public boolean next() |
93 | throws StandardException |
94 | { |
95 | current = sortBuffer.removeFirst(); |
96 | if (current != null) |
97 | mergeARow(sortBuffer.getLastAux()); |
98 | return (current != null); |
99 | } |
100 | |
101 | /** |
102 | Close the scan. |
103 | @see ScanController#close |
104 | **/ |
105 | public void close() |
106 | { |
107 | if (openScans != null) |
108 | { |
109 | for (int i = 0; i < openScans.length; i++) |
110 | { |
111 | if (openScans[i] != null) |
112 | { |
113 | openScans[i].close(); |
114 | } |
115 | openScans[i] = null; |
116 | } |
117 | openScans = null; |
118 | } |
119 | |
120 | // Hand sort buffer and remaining merge runs to sort. |
121 | if (super.sort != null) |
122 | { |
123 | sort.doneScanning(this, sortBuffer, mergeRuns); |
124 | sortBuffer = null; |
125 | mergeRuns = null; |
126 | } |
127 | |
128 | // Sets sort to null |
129 | super.close(); |
130 | } |
131 | |
132 | /** |
133 | Close the scan. |
134 | @see ScanManager#closeForEndTransaction |
135 | **/ |
136 | public boolean closeForEndTransaction(boolean closeHeldScan) |
137 | { |
138 | if (!hold || closeHeldScan) |
139 | { |
140 | close(); |
141 | return(true); |
142 | } |
143 | else |
144 | { |
145 | return(false); |
146 | } |
147 | } |
148 | |
149 | /* |
150 | * Methods of MergeScan |
151 | */ |
152 | |
153 | /** |
154 | Initialize the scan, returning false if there |
155 | was some error. |
156 | **/ |
157 | public boolean init(TransactionManager tran) |
158 | throws StandardException |
159 | { |
160 | if (SanityManager.DEBUG) |
161 | { |
162 | // We really expect to have at least one |
163 | // merge run. |
164 | SanityManager.ASSERT(mergeRuns != null); |
165 | SanityManager.ASSERT(mergeRuns.size() > 0); |
166 | |
167 | // This sort scan also expects that the |
168 | // caller has ensured that the sort buffer |
169 | // capacity will hold a row from all the |
170 | // merge runs. |
171 | SanityManager.ASSERT(sortBuffer.capacity() >= mergeRuns.size()); |
172 | } |
173 | |
174 | // Clear the sort buffer. |
175 | sortBuffer.reset(); |
176 | |
177 | // Create an array to hold a scan controller |
178 | // for each merge run. |
179 | openScans = new StreamContainerHandle[mergeRuns.size()]; |
180 | if (openScans == null) |
181 | return false; |
182 | |
183 | // Open a scan on each merge run. |
184 | int scanindex = 0; |
185 | Enumeration e = mergeRuns.elements(); |
186 | while (e.hasMoreElements()) |
187 | { |
188 | // get the container id |
189 | long id = ((Long) e.nextElement()).longValue(); |
190 | |
191 | Transaction rawTran = tran.getRawStoreXact(); // get raw transaction |
192 | int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT; |
193 | openScans[scanindex++] = |
194 | rawTran.openStreamContainer(segmentId, id, hold); |
195 | } |
196 | |
197 | // Load the initial rows. |
198 | for (scanindex = 0; scanindex < openScans.length; scanindex++) |
199 | mergeARow(scanindex); |
200 | |
201 | // Success! |
202 | return true; |
203 | } |
204 | |
205 | /** |
206 | Insert rows while we keep getting duplicates |
207 | from the merge run whose scan is in the |
208 | open scan array entry indexed by scanindex. |
209 | **/ |
210 | void mergeARow(int scanindex) |
211 | throws StandardException |
212 | { |
213 | if (SanityManager.DEBUG) |
214 | { |
215 | // Unless there's a bug, the scan index will refer |
216 | // to an open scan. That's because we never put |
217 | // a scan index for a closed scan into the sort |
218 | // buffer (via setNextAux). |
219 | SanityManager.ASSERT(openScans[scanindex] != null); |
220 | } |
221 | |
222 | DataValueDescriptor[] row; |
223 | |
224 | // Read rows from the merge run and stuff them into the |
225 | // sort buffer for as long as we encounter duplicates. |
226 | do |
227 | { |
228 | row = sortObserver.getArrayClone(); |
229 | |
230 | // Fetch the row from the merge run. |
231 | if (!openScans[scanindex].fetchNext(row)) |
232 | { |
233 | // If we're out of rows in the merge run, close the scan. |
234 | |
235 | openScans[scanindex].close(); |
236 | openScans[scanindex] = null; |
237 | return; |
238 | } |
239 | |
240 | // Save the index of this merge run with |
241 | // the row we're putting in the sort buffer. |
242 | sortBuffer.setNextAux(scanindex); |
243 | } |
244 | while (sortBuffer.insert(row) == SortBuffer.INSERT_DUPLICATE); |
245 | } |
246 | } |