1 | /* |
2 | |
3 | Derby - Class org.apache.derby.impl.store.raw.log.FlushedScan |
4 | |
5 | Copyright 1997, 2004 The Apache Software Foundation or its licensors, as applicable. |
6 | |
7 | Licensed under the Apache License, Version 2.0 (the "License"); |
8 | you may not use this file except in compliance with the License. |
9 | You may obtain a copy of the License at |
10 | |
11 | http://www.apache.org/licenses/LICENSE-2.0 |
12 | |
13 | Unless required by applicable law or agreed to in writing, software |
14 | distributed under the License is distributed on an "AS IS" BASIS, |
15 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
16 | See the License for the specific language governing permissions and |
17 | limitations under the License. |
18 | |
19 | */ |
20 | |
21 | package org.apache.derby.impl.store.raw.log; |
22 | |
23 | import org.apache.derby.iapi.reference.SQLState; |
24 | import org.apache.derby.iapi.reference.MessageId; |
25 | |
26 | import org.apache.derby.impl.store.raw.log.LogCounter; |
27 | import org.apache.derby.impl.store.raw.log.LogRecord; |
28 | import org.apache.derby.impl.store.raw.log.StreamLogScan; |
29 | |
30 | import org.apache.derby.iapi.services.sanity.SanityManager; |
31 | import org.apache.derby.iapi.error.StandardException; |
32 | import org.apache.derby.iapi.services.i18n.MessageService; |
33 | import org.apache.derby.iapi.store.raw.log.LogInstant; |
34 | import org.apache.derby.iapi.store.raw.log.LogFactory; |
35 | import org.apache.derby.iapi.store.raw.xact.TransactionId; |
36 | import org.apache.derby.iapi.services.io.ArrayInputStream; |
37 | |
38 | import org.apache.derby.io.StorageRandomAccessFile; |
39 | |
40 | import java.io.IOException; |
41 | |
42 | /** |
43 | |
44 | Scan the the log which is implemented by a series of log files.n |
45 | This log scan knows how to move across log file if it is positioned at |
46 | the boundary of a log file and needs to getNextRecord. |
47 | |
48 | <PRE> |
49 | 4 bytes - length of user data, i.e. N |
50 | 8 bytes - long representing log instant |
51 | N bytes of supplied data |
52 | 4 bytes - length of user data, i.e. N |
53 | </PRE> |
54 | |
55 | */ |
56 | public class FlushedScan implements StreamLogScan { |
57 | |
58 | private StorageRandomAccessFile scan; // an output stream to the log file |
59 | LogToFile logFactory; // log factory knows how to to skip |
60 | // from log file to log file |
61 | |
62 | boolean open; // true if the scan is open |
63 | |
64 | long currentLogFileNumber; // the log file the scan is currently on |
65 | |
66 | long currentLogFileFirstUnflushedPosition; |
67 | // The length of the unflushed portion |
68 | // of the current log file. This is the |
69 | // length of the file for all but the |
70 | // last log file. |
71 | |
72 | long currentInstant; // the log instant the scan is |
73 | // currently on - only valid after a |
74 | // successful getNextRecord |
75 | |
76 | long firstUnflushed = -1; // scan until we reach the first |
77 | // unflushed byte in the log. |
78 | long firstUnflushedFileNumber; |
79 | long firstUnflushedFilePosition; |
80 | |
81 | //RESOLVE: This belongs in a shared place. |
82 | static final int LOG_REC_LEN_BYTE_LENGTH = 4; |
83 | |
84 | public FlushedScan(LogToFile logFactory, long startAt) |
85 | throws StandardException |
86 | { |
87 | if (SanityManager.DEBUG) |
88 | { |
89 | SanityManager.ASSERT(startAt != LogCounter.INVALID_LOG_INSTANT, |
90 | "cannot start scan on an invalid log instant"); |
91 | } |
92 | |
93 | try |
94 | { |
95 | currentLogFileNumber = LogCounter.getLogFileNumber(startAt); |
96 | this.logFactory = logFactory; |
97 | scan = logFactory.getLogFileAtPosition(startAt); |
98 | setFirstUnflushed(); |
99 | open = true; |
100 | currentInstant = LogCounter.INVALID_LOG_INSTANT; // set at getNextRecord |
101 | } |
102 | |
103 | catch (IOException ioe) |
104 | { |
105 | throw logFactory.markCorrupt( |
106 | StandardException.newException(SQLState.LOG_IO_ERROR, ioe)); |
107 | } |
108 | } |
109 | |
110 | /* |
111 | ** Methods of LogScan |
112 | */ |
113 | |
114 | /** |
115 | Read a log record into the byte array provided. Resize the input |
116 | stream byte array if necessary. |
117 | |
118 | @return the length of the data written into data, or -1 if the end of the |
119 | scan has been reached. |
120 | |
121 | @exception StandardException Standard Cloudscape error policy |
122 | */ |
123 | public LogRecord getNextRecord(ArrayInputStream input, |
124 | TransactionId tranId, |
125 | int groupmask) |
126 | throws StandardException |
127 | { |
128 | try |
129 | { |
130 | boolean candidate; |
131 | int peekAmount = LogRecord.formatOverhead() + LogRecord.maxGroupStoredSize(); |
132 | if (tranId != null) |
133 | peekAmount += LogRecord.maxTransactionIdStoredSize(tranId); |
134 | int readAmount; // the number of bytes actually read |
135 | |
136 | LogRecord lr; |
137 | |
138 | do |
139 | { |
140 | if (!open || !positionToNextRecord()) |
141 | return null; |
142 | |
143 | int checkLength; |
144 | |
145 | // this log record is a candidate unless proven otherwise |
146 | lr = null; |
147 | candidate = true; |
148 | readAmount = -1; |
149 | |
150 | currentInstant = scan.readLong(); |
151 | byte[] data = input.getData(); |
152 | if (data.length < nextRecordLength) |
153 | { |
154 | // make a new array of sufficient size and reset the arrary |
155 | // in the input stream |
156 | data = new byte[nextRecordLength]; |
157 | input.setData(data); |
158 | } |
159 | |
160 | if (logFactory.databaseEncrypted()) |
161 | { |
162 | scan.readFully(data, 0, nextRecordLength); |
163 | int len = logFactory.decrypt(data, 0, nextRecordLength, data, 0); |
164 | if (SanityManager.DEBUG) |
165 | SanityManager.ASSERT(len == nextRecordLength); |
166 | input.setLimit(0, len); |
167 | |
168 | } |
169 | else // no need to decrypt, only get the group and tid if we filter |
170 | { |
171 | if (groupmask == 0 && tranId == null) |
172 | { |
173 | // no filter, get the whole thing |
174 | scan.readFully(data, 0, nextRecordLength); |
175 | input.setLimit(0, nextRecordLength); |
176 | } |
177 | else |
178 | { |
179 | // Read only enough so that group and the tran id is in |
180 | // the data buffer. Group is stored as compressed int |
181 | // and tran id is stored as who knows what. read min |
182 | // of peekAmount or nextRecordLength |
183 | readAmount = (nextRecordLength > peekAmount) ? |
184 | peekAmount : nextRecordLength; |
185 | |
186 | // in the data buffer, we now have enough to peek |
187 | scan.readFully(data, 0, readAmount); |
188 | input.setLimit(0, readAmount); |
189 | |
190 | } |
191 | } |
192 | |
193 | lr = (LogRecord) input.readObject(); |
194 | |
195 | if (groupmask != 0 || tranId != null) |
196 | { |
197 | if (groupmask != 0 && (groupmask & lr.group()) == 0) |
198 | candidate = false; // no match, throw this log record out |
199 | |
200 | if (candidate && tranId != null) |
201 | { |
202 | TransactionId tid = lr.getTransactionId(); |
203 | if (!tid.equals(tranId)) // nomatch |
204 | candidate = false; // throw this log record out |
205 | } |
206 | |
207 | // if this log record is not filtered out, we need to read |
208 | // in the rest of the log record to the input buffer. |
209 | // Except if it is an encrypted database, in which case the |
210 | // entire log record have already be read in for |
211 | // decryption. |
212 | |
213 | if (candidate && !logFactory.databaseEncrypted()) |
214 | { |
215 | // read the rest of the log into the buffer |
216 | if (SanityManager.DEBUG) |
217 | SanityManager.ASSERT(readAmount > 0); |
218 | |
219 | if (readAmount < nextRecordLength) |
220 | { |
221 | // Need to remember where we are because the log |
222 | // record may have read part of it off the input |
223 | // stream already and that position is lost when we |
224 | // set limit again. |
225 | int inputPosition = input.getPosition(); |
226 | |
227 | scan.readFully(data, readAmount, |
228 | nextRecordLength-readAmount); |
229 | |
230 | input.setLimit(0, nextRecordLength); |
231 | input.setPosition(inputPosition); |
232 | } |
233 | } |
234 | } |
235 | |
236 | if (candidate || logFactory.databaseEncrypted()) |
237 | { |
238 | checkLength = scan.readInt(); |
239 | |
240 | if (SanityManager.DEBUG) |
241 | { |
242 | SanityManager.ASSERT(checkLength == nextRecordLength, "log currupted"); |
243 | } |
244 | } |
245 | else // chances are, we haven't read all of the log record, skip it |
246 | { |
247 | // the starting record position is in the currentInstant, |
248 | // calculate the next record starting position using that |
249 | // and the nextRecordLength |
250 | long nextRecordStartPosition = |
251 | LogCounter.getLogFilePosition(currentInstant) + |
252 | nextRecordLength + LogToFile.LOG_RECORD_OVERHEAD; |
253 | |
254 | scan.seek(nextRecordStartPosition); |
255 | } |
256 | |
257 | } while (candidate == false); |
258 | |
259 | return lr; |
260 | } |
261 | catch (ClassNotFoundException cnfe) |
262 | { |
263 | throw logFactory.markCorrupt( |
264 | StandardException.newException(SQLState.LOG_CORRUPTED, cnfe)); |
265 | } |
266 | catch (IOException ioe) |
267 | { |
268 | throw logFactory.markCorrupt( |
269 | StandardException.newException(SQLState.LOG_IO_ERROR, ioe)); |
270 | } |
271 | } |
272 | |
273 | /** |
274 | Reset the scan to the given LogInstant. |
275 | |
276 | @param instant the position to reset to |
277 | @exception IOException scan cannot access the log at the new position. |
278 | */ |
279 | public void resetPosition(LogInstant instant) throws IOException |
280 | { |
281 | if (SanityManager.DEBUG) |
282 | { |
283 | SanityManager.THROWASSERT("Unsupported feature"); |
284 | } |
285 | } |
286 | |
287 | /** |
288 | Get the log instant that is right after the record just retrived |
289 | @return INVALID_LOG_INSTANT if this is not a FORWARD scan or, no |
290 | record have been returned yet or the scan has completed. |
291 | */ |
292 | public long getLogRecordEnd() |
293 | { |
294 | if (SanityManager.DEBUG) |
295 | { |
296 | SanityManager.THROWASSERT("Unsupported feature"); |
297 | } |
298 | return LogCounter.INVALID_LOG_INSTANT; |
299 | } |
300 | |
301 | |
302 | /** |
303 | returns true if there is partially writen log records before the crash |
304 | in the last log file. Partiall wrires are identified during forward |
305 | scans for log recovery. |
306 | */ |
307 | public boolean isLogEndFuzzy() |
308 | { |
309 | if (SanityManager.DEBUG) |
310 | { |
311 | SanityManager.THROWASSERT("Unsupported feature"); |
312 | } |
313 | return false; |
314 | } |
315 | |
316 | /** |
317 | Return the log instant (as an integer) the scan is currently on - this is the log |
318 | instant of the log record that was returned by getNextRecord. |
319 | */ |
320 | public long getInstant() |
321 | { |
322 | return currentInstant; |
323 | } |
324 | |
325 | /** |
326 | Return the log instant the scan is currently on - this is the log |
327 | instant of the log record that was returned by getNextRecord. |
328 | */ |
329 | public LogInstant getLogInstant() |
330 | { |
331 | if (currentInstant == LogCounter.INVALID_LOG_INSTANT) |
332 | return null; |
333 | else |
334 | return new LogCounter(currentInstant); |
335 | } |
336 | |
337 | /** |
338 | Close the scan. |
339 | */ |
340 | public void close() |
341 | { |
342 | if (scan != null) |
343 | { |
344 | try |
345 | { |
346 | scan.close(); |
347 | } |
348 | catch (IOException ioe) |
349 | {} |
350 | |
351 | scan = null; |
352 | } |
353 | currentInstant = LogCounter.INVALID_LOG_INSTANT; |
354 | open = false; |
355 | } |
356 | |
357 | /* |
358 | Private methods. |
359 | */ |
360 | private void setFirstUnflushed() |
361 | throws StandardException, IOException |
362 | { |
363 | LogInstant firstUnflushedInstant = |
364 | logFactory.getFirstUnflushedInstant(); |
365 | firstUnflushed = ((LogCounter)firstUnflushedInstant).getValueAsLong(); |
366 | firstUnflushedFileNumber = LogCounter.getLogFileNumber(firstUnflushed); |
367 | firstUnflushedFilePosition = LogCounter.getLogFilePosition(firstUnflushed); |
368 | |
369 | setCurrentLogFileFirstUnflushedPosition(); |
370 | } |
371 | |
372 | private void setCurrentLogFileFirstUnflushedPosition() |
373 | throws IOException |
374 | { |
375 | /* |
376 | Note we get the currentLogFileLength without synchronization. |
377 | This is safe because one of the following cases apply: |
378 | |
379 | <OL> |
380 | <LI> The end of the flushed section of the log is in another file. |
381 | In this case the end of the current file will not change. |
382 | <LI> The end of the log is in this file. In this case we |
383 | end our scan at the firstUnflushedInstant and do not use |
384 | currentLogFileLength. |
385 | </OL> |
386 | */ |
387 | if (currentLogFileNumber == firstUnflushedFileNumber) |
388 | currentLogFileFirstUnflushedPosition = firstUnflushedFilePosition; |
389 | else if (currentLogFileNumber < firstUnflushedFileNumber) |
390 | currentLogFileFirstUnflushedPosition = scan.length(); |
391 | else |
392 | { |
393 | // RESOLVE |
394 | throw new IOException( |
395 | MessageService.getTextMessage(MessageId.LOG_BAD_START_INSTANT)); |
396 | } |
397 | } |
398 | |
399 | private void switchLogFile() |
400 | throws StandardException |
401 | { |
402 | try |
403 | { |
404 | readNextRecordLength = false; |
405 | scan.close(); |
406 | scan = null; |
407 | scan = logFactory.getLogFileAtBeginning(++currentLogFileNumber); |
408 | setCurrentLogFileFirstUnflushedPosition(); |
409 | } |
410 | |
411 | catch (IOException ioe) |
412 | { |
413 | throw logFactory.markCorrupt( |
414 | StandardException.newException(SQLState.LOG_IO_ERROR, ioe)); |
415 | } |
416 | } |
417 | |
418 | /** |
419 | The length of the next record. Read from scan and set by |
420 | currentLogFileHasUnflushedRecord. This is used to retain the length of a |
421 | log record in the case currentLogFileHasUnflushedRecord reads the length |
422 | and determines that some bytes in the log record are not yet flushed. |
423 | */ |
424 | int nextRecordLength; |
425 | |
426 | /** |
427 | Flag to indicate that the length of the next log record has been read by |
428 | currentLogFileHasUnflushedRecord. |
429 | |
430 | This flag gets reset in two ways: |
431 | |
432 | <OL> |
433 | <LI> currentLogFileHasUnflushedRecord determines that the entire log |
434 | record is flushed and returns true. In this case getNextRecord reads and |
435 | returns the log record. |
436 | <LI> we switch log files --due to a partial log record at the end of an |
437 | old log file. |
438 | </OL> |
439 | */ |
440 | boolean readNextRecordLength; |
441 | |
442 | private boolean currentLogFileHasUnflushedRecord() |
443 | throws IOException |
444 | { |
445 | if (SanityManager.DEBUG) |
446 | SanityManager.ASSERT(scan != null, "scan is null"); |
447 | long curPos = scan.getFilePointer(); |
448 | |
449 | if (!readNextRecordLength) |
450 | { |
451 | if (curPos + LOG_REC_LEN_BYTE_LENGTH > |
452 | currentLogFileFirstUnflushedPosition) |
453 | return false; |
454 | |
455 | nextRecordLength = scan.readInt(); |
456 | curPos+=4; |
457 | readNextRecordLength = true; |
458 | } |
459 | |
460 | if (nextRecordLength==0) return false; |
461 | |
462 | int bytesNeeded = |
463 | nextRecordLength + LOG_REC_LEN_BYTE_LENGTH; |
464 | |
465 | if (curPos + bytesNeeded > currentLogFileFirstUnflushedPosition) |
466 | { |
467 | return false; |
468 | } |
469 | else |
470 | { |
471 | readNextRecordLength = false; |
472 | return true; |
473 | } |
474 | } |
475 | |
476 | private boolean positionToNextRecord() |
477 | throws StandardException, IOException |
478 | { |
479 | //If the flushed section of the current log file contains our record we |
480 | //simply return. |
481 | if (currentLogFileHasUnflushedRecord()) return true; |
482 | |
483 | //Update our cached copy of the first unflushed instant. |
484 | setFirstUnflushed(); |
485 | |
486 | //In the call to setFirstUnflushed, we may have noticed that the current |
487 | //log file really does contain our record. If so we simply return. |
488 | if (currentLogFileHasUnflushedRecord()) return true; |
489 | |
490 | //Our final chance of finding a record is if we are not scanning the log |
491 | //file with the last flushed instant we can switch logfiles. Note that |
492 | //we do this in a loop to cope with empty log files. |
493 | while(currentLogFileNumber < firstUnflushedFileNumber) |
494 | { |
495 | switchLogFile(); |
496 | if (currentLogFileHasUnflushedRecord()) return true; |
497 | } |
498 | |
499 | //The log contains no more flushed log records so we return false. |
500 | currentInstant = LogCounter.INVALID_LOG_INSTANT; |
501 | return false; |
502 | } |
503 | } |