Generated on 2009-08-09 21:22:57 with etap 0.3.4.
| Name | Total lines | Lines of code | Total coverage | Code coverage | ||||
| couch_db_updater | ?? | ?? | ?? |
|
....1 % Licensed under the Apache License, Version 2.0 (the "License"); you may not ....2 % use this file except in compliance with the License. You may obtain a copy of ....3 % the License at ....4 % ....5 % http://www.apache.org/licenses/LICENSE-2.0 ....6 % ....7 % Unless required by applicable law or agreed to in writing, software ....8 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ....9 % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ...10 % License for the specific language governing permissions and limitations under ...11 % the License. ...12 ...13 -module(couch_db_updater). ...14 -behaviour(gen_server). ...15 ...16 -export([btree_by_id_reduce/2,btree_by_seq_reduce/2]). ...17 -export([less_docid/2]). ...18 -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). ...19 ...20 -include("couch_db.hrl"). ...21 ...22 ...23 init({MainPid, DbName, Filepath, Fd, Options}) -> ...24 case lists:member(create, Options) of ...25 true -> ...26 % create a new header and writes it to the file ...27 Header = #db_header{}, ...28 ok = couch_file:write_header(Fd, Header), ...29 % delete any old compaction files that might be hanging around ...30 file:delete(Filepath ++ ".compact"); ...31 false -> ...32 ok = couch_file:upgrade_old_header(Fd, <<$g, $m, $k, 0>>), % 09 UPGRADE CODE ...33 {ok, Header} = couch_file:read_header(Fd) ...34 end, ...35 ...36 Db = init_db(DbName, Filepath, Fd, Header), ...37 Db2 = refresh_validate_doc_funs(Db), ...38 {ok, Db2#db{main_pid=MainPid}}. ...39 ...40 ...41 terminate(Reason, _Srv) -> ...42 couch_util:terminate_linked(Reason), ...43 ok. ...44 ...45 handle_call(get_db, _From, Db) -> ...46 {reply, {ok, Db}, Db}; ...47 handle_call({update_docs, GroupedDocs, NonRepDocs, Options}, _From, Db) -> ...48 try update_docs_int(Db, GroupedDocs, NonRepDocs, Options) of ...49 {ok, Failures, Db2} -> ...50 ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), ...51 if Db2#db.update_seq /= Db#db.update_seq -> ...52 couch_db_update_notifier:notify({updated, Db2#db.name}); ...53 true -> ok ...54 end, ...55 {reply, {ok, Failures}, Db2} ...56 catch ...57 throw: retry -> ...58 {reply, retry, Db} ...59 end; ...60 handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) -> ...61 {reply, ok, Db}; % no data waiting, return ok immediately ...62 handle_call(full_commit, _From, Db) -> ...63 {reply, ok, commit_data(Db)}; % commit the data and return ok ...64 handle_call(increment_update_seq, _From, Db) -> ...65 Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}), ...66 ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), ...67 couch_db_update_notifier:notify({updated, Db#db.name}), ...68 {reply, {ok, Db2#db.update_seq}, Db2}; ...69 ...70 handle_call({set_admins, NewAdmins}, _From, Db) -> ...71 {ok, Ptr} = couch_file:append_term(Db#db.fd, NewAdmins), ...72 Db2 = commit_data(Db#db{admins=NewAdmins, admins_ptr=Ptr, ...73 update_seq=Db#db.update_seq+1}), ...74 ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), ...75 {reply, ok, Db2}; ...76 ...77 handle_call({set_revs_limit, Limit}, _From, Db) -> ...78 Db2 = commit_data(Db#db{revs_limit=Limit, ...79 update_seq=Db#db.update_seq+1}), ...80 ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), ...81 {reply, ok, Db2}; ...82 ...83 handle_call({purge_docs, _IdRevs}, _From, ...84 #db{compactor_pid=Pid}=Db) when Pid /= nil -> ...85 {reply, {error, purge_during_compaction}, Db}; ...86 handle_call({purge_docs, IdRevs}, _From, Db) -> ...87 #db{ ...88 fd=Fd, ...89 fulldocinfo_by_id_btree = DocInfoByIdBTree, ...90 docinfo_by_seq_btree = DocInfoBySeqBTree, ...91 update_seq = LastSeq, ...92 header = Header = #db_header{purge_seq=PurgeSeq} ...93 } = Db, ...94 DocLookups = couch_btree:lookup(DocInfoByIdBTree, ...95 [Id || {Id, _Revs} <- IdRevs]), ...96 ...97 NewDocInfos = lists:zipwith( ...98 fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) -> ...99 case couch_key_tree:remove_leafs(Tree, Revs) of ..100 {_, []=_RemovedRevs} -> % no change ..101 nil; ..102 {NewTree, RemovedRevs} -> ..103 {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs} ..104 end; ..105 (_, not_found) -> ..106 nil ..107 end, ..108 IdRevs, DocLookups), ..109 ..110 SeqsToRemove = [Seq ..111 || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos], ..112 ..113 FullDocInfoToUpdate = [FullInfo ..114 || {#full_doc_info{rev_tree=Tree}=FullInfo,_} ..115 <- NewDocInfos, Tree /= []], ..116 ..117 IdRevsPurged = [{Id, Revs} ..118 || {#full_doc_info{id=Id}, Revs} <- NewDocInfos], ..119 ..120 {DocInfoToUpdate, NewSeq} = lists:mapfoldl( ..121 fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) -> ..122 Tree2 = couch_key_tree:map_leafs( fun(RevInfo) -> ..123 RevInfo#rev_info{seq=SeqAcc + 1} ..124 end, Tree), ..125 {couch_doc:to_doc_info(FullInfo#full_doc_info{rev_tree=Tree2}), ..126 SeqAcc + 1} ..127 end, LastSeq, FullDocInfoToUpdate), ..128 ..129 IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_} ..130 <- NewDocInfos], ..131 ..132 {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, ..133 DocInfoToUpdate, SeqsToRemove), ..134 {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, ..135 FullDocInfoToUpdate, IdsToRemove), ..136 {ok, Pointer} = couch_file:append_term(Fd, IdRevsPurged), ..137 ..138 Db2 = commit_data( ..139 Db#db{ ..140 fulldocinfo_by_id_btree = DocInfoByIdBTree2, ..141 docinfo_by_seq_btree = DocInfoBySeqBTree2, ..142 update_seq = NewSeq + 1, ..143 header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}), ..144 ..145 ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), ..146 couch_db_update_notifier:notify({updated, Db#db.name}), ..147 {reply, {ok, Db2#db.update_seq, IdRevsPurged}, Db2}. ..148 ..149 ..150 handle_cast(start_compact, Db) -> ..151 case Db#db.compactor_pid of ..152 nil -> ..153 ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]), ..154 Pid = spawn_link(fun() -> start_copy_compact(Db) end), ..155 Db2 = Db#db{compactor_pid=Pid}, ..156 ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), ..157 {noreply, Db2}; ..158 _ -> ..159 % compact currently running, this is a no-op ..160 {noreply, Db} ..161 end; ..162 handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> ..163 {ok, NewFd} = couch_file:open(CompactFilepath), ..164 {ok, NewHeader} = couch_file:read_header(NewFd), ..165 #db{update_seq=NewSeq} = NewDb = ..166 init_db(Db#db.name, Filepath, NewFd, NewHeader), ..167 unlink(NewFd), ..168 case Db#db.update_seq == NewSeq of ..169 true -> ..170 % suck up all the local docs into memory and write them to the new db ..171 {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, ..172 fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), ..173 {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs), ..174 ..175 NewDb2 = commit_data( NewDb#db{local_docs_btree=NewLocalBtree, ..176 main_pid = Db#db.main_pid,filepath = Filepath}), ..177 ..178 ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", ..179 [Filepath, CompactFilepath]), ..180 file:delete(Filepath), ..181 ok = file:rename(CompactFilepath, Filepath), ..182 close_db(Db), ..183 ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb2}), ..184 ?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]), ..185 {noreply, NewDb2#db{compactor_pid=nil}}; ..186 false -> ..187 ?LOG_INFO("Compaction file still behind main file " ..188 "(update seq=~p. compact update seq=~p). Retrying.", ..189 [Db#db.update_seq, NewSeq]), ..190 close_db(NewDb), ..191 Pid = spawn_link(fun() -> start_copy_compact(Db) end), ..192 Db2 = Db#db{compactor_pid=Pid}, ..193 {noreply, Db2} ..194 end. ..195 ..196 handle_info(delayed_commit, Db) -> ..197 {noreply, commit_data(Db)}. ..198 ..199 code_change(_OldVsn, State, _Extra) -> ..200 {ok, State}. ..201 ..202 ..203 btree_by_seq_split(#doc_info{id=Id, high_seq=KeySeq, revs=Revs}) -> ..204 RevInfos = [{Rev, Seq, Bp} || ..205 #rev_info{rev=Rev,seq=Seq,deleted=false,body_sp=Bp} <- Revs], ..206 DeletedRevInfos = [{Rev, Seq, Bp} || ..207 #rev_info{rev=Rev,seq=Seq,deleted=true,body_sp=Bp} <- Revs], ..208 {KeySeq,{Id, RevInfos, DeletedRevInfos}}. ..209 ..210 btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) -> ..211 #doc_info{ ..212 id = Id, ..213 high_seq=KeySeq, ..214 revs = ..215 [#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} || ..216 {Rev, Seq, Bp} <- RevInfos] ++ ..217 [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} || ..218 {Rev, Seq, Bp} <- DeletedRevInfos]}; ..219 btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) -> ..220 % 09 UPGRADE CODE ..221 % this is the 0.9.0 and earlier by_seq record. It's missing the body pointers ..222 % and individual seq nums for conflicts that are currently in the index, ..223 % meaning the filtered _changes api will not work except for on main docs. ..224 % Simply compact a 0.9.0 database to upgrade the index. ..225 #doc_info{ ..226 id=Id, ..227 high_seq=KeySeq, ..228 revs = [#rev_info{rev=Rev,seq=KeySeq,deleted=Deleted,body_sp=Bp}] ++ ..229 [#rev_info{rev=Rev1,seq=KeySeq,deleted=false} || Rev1 <- Conflicts] ++ ..230 [#rev_info{rev=Rev2,seq=KeySeq,deleted=true} || Rev2 <- DelConflicts]}. ..231 ..232 btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq, ..233 deleted=Deleted, rev_tree=Tree}) -> ..234 DiskTree = ..235 couch_key_tree:map( ..236 fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) -> ..237 {if IsDeleted -> 1; true -> 0 end, BodyPointer, UpdateSeq}; ..238 (_RevId, ?REV_MISSING) -> ..239 ?REV_MISSING ..240 end, Tree), ..241 {Id, {Seq, if Deleted -> 1; true -> 0 end, DiskTree}}. ..242 ..243 btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) -> ..244 Tree = ..245 couch_key_tree:map( ..246 fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) -> ..247 {IsDeleted == 1, BodyPointer, UpdateSeq}; ..248 (_RevId, ?REV_MISSING) -> ..249 ?REV_MISSING; ..250 (_RevId, {IsDeleted, BodyPointer}) -> ..251 % 09 UPGRADE CODE ..252 % this is the 0.9.0 and earlier rev info record. It's missing the seq ..253 % nums, which means couchdb will sometimes reexamine unchanged ..254 % documents with the _changes API. ..255 % This is fixed by compacting the database. ..256 {IsDeleted, BodyPointer, HighSeq} ..257 end, DiskTree), ..258 ..259 #full_doc_info{id=Id, update_seq=HighSeq, deleted=Deleted==1, rev_tree=Tree}. ..260 ..261 btree_by_id_reduce(reduce, FullDocInfos) -> ..262 % count the number of not deleted documents ..263 {length([1 || #full_doc_info{deleted=false} <- FullDocInfos]), ..264 length([1 || #full_doc_info{deleted=true} <- FullDocInfos])}; ..265 btree_by_id_reduce(rereduce, Reds) -> ..266 {lists:sum([Count || {Count,_} <- Reds]), ..267 lists:sum([DelCount || {_, DelCount} <- Reds])}. ..268 ..269 btree_by_seq_reduce(reduce, DocInfos) -> ..270 % count the number of documents ..271 length(DocInfos); ..272 btree_by_seq_reduce(rereduce, Reds) -> ..273 lists:sum(Reds). ..274 ..275 simple_upgrade_record(Old, New) when size(Old) == size(New)-> ..276 Old; ..277 simple_upgrade_record(Old, New) -> ..278 NewValuesTail = ..279 lists:sublist(tuple_to_list(New), size(Old) + 1, size(New)-size(Old)), ..280 list_to_tuple(tuple_to_list(Old) ++ NewValuesTail). ..281 ..282 % used for doc insertion, also for the PassedEndFun on all_docs view ..283 less_docid(A, B) when A==B -> false; ..284 less_docid(nil, _) -> true; % nil - special key sorts before all ..285 less_docid({}, _) -> false; % {} -> special key sorts after all ..286 less_docid(A, B) -> A < B. ..287 ..288 ..289 init_db(DbName, Filepath, Fd, Header0) -> ..290 Header1 = simple_upgrade_record(Header0, #db_header{}), ..291 Header = ..292 case element(2, Header1) of ..293 1 -> Header1#db_header{unused = 0}; % 0.9 ..294 2 -> Header1#db_header{unused = 0}; % post 0.9 and pre 0.10 ..295 3 -> Header1; % post 0.9 and pre 0.10 ..296 ?LATEST_DISK_VERSION -> Header1; ..297 _ -> throw({database_disk_version_error, "Incorrect disk header version"}) ..298 end, ..299 Less = fun less_docid/2, ..300 ..301 {ok, FsyncOptions} = couch_util:parse_term( ..302 couch_config:get("couchdb", "fsync_options", ..303 "[before_header, after_header, on_file_open]")), ..304 ..305 case lists:member(on_file_open, FsyncOptions) of ..306 true -> ok = couch_file:sync(Fd); ..307 _ -> ok ..308 end, ..309 ..310 {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd, ..311 [{split, fun(X) -> btree_by_id_split(X) end}, ..312 {join, fun(X,Y) -> btree_by_id_join(X,Y) end}, ..313 {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end}, ..314 {less, Less}]), ..315 {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd, ..316 [{split, fun(X) -> btree_by_seq_split(X) end}, ..317 {join, fun(X,Y) -> btree_by_seq_join(X,Y) end}, ..318 {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end}]), ..319 {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), ..320 case Header#db_header.admins_ptr of ..321 nil -> ..322 Admins = [], ..323 AdminsPtr = nil; ..324 AdminsPtr -> ..325 {ok, Admins} = couch_file:pread_term(Fd, AdminsPtr) ..326 end, ..327 % convert start time tuple to microsecs and store as a binary string ..328 {MegaSecs, Secs, MicroSecs} = now(), ..329 StartTime = ?l2b(io_lib:format("~p", ..330 [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])), ..331 {ok, RefCntr} = couch_ref_counter:start([Fd]), ..332 #db{ ..333 update_pid=self(), ..334 fd=Fd, ..335 fd_ref_counter = RefCntr, ..336 header=Header, ..337 fulldocinfo_by_id_btree = IdBtree, ..338 docinfo_by_seq_btree = SeqBtree, ..339 local_docs_btree = LocalDocsBtree, ..340 committed_update_seq = Header#db_header.update_seq, ..341 update_seq = Header#db_header.update_seq, ..342 name = DbName, ..343 filepath = Filepath, ..344 admins = Admins, ..345 admins_ptr = AdminsPtr, ..346 instance_start_time = StartTime, ..347 revs_limit = Header#db_header.revs_limit, ..348 fsync_options = FsyncOptions ..349 }. ..350 ..351 ..352 close_db(#db{fd_ref_counter = RefCntr}) -> ..353 couch_ref_counter:drop(RefCntr). ..354 ..355 ..356 refresh_validate_doc_funs(Db) -> ..357 {ok, DesignDocs} = couch_db:get_design_docs(Db), ..358 ProcessDocFuns = lists:flatmap( ..359 fun(DesignDoc) -> ..360 case couch_doc:get_validate_doc_fun(DesignDoc) of ..361 nil -> []; ..362 Fun -> [Fun] ..363 end ..364 end, DesignDocs), ..365 Db#db{validate_doc_funs=ProcessDocFuns}. ..366 ..367 % rev tree functions ..368 ..369 flush_trees(_Db, [], AccFlushedTrees) -> ..370 {ok, lists:reverse(AccFlushedTrees)}; ..371 flush_trees(#db{fd=Fd,header=Header}=Db, ..372 [InfoUnflushed | RestUnflushed], AccFlushed) -> ..373 #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed, ..374 Flushed = couch_key_tree:map( ..375 fun(_Rev, Value) -> ..376 case Value of ..377 #doc{atts=Atts,deleted=IsDeleted}=Doc -> ..378 % this node value is actually an unwritten document summary, ..379 % write to disk. ..380 % make sure the Fd in the written bins is the same Fd we are ..381 % and convert bins, removing the FD. ..382 % All bins should have been written to disk already. ..383 DiskAtts = ..384 case Atts of ..385 [] -> []; ..386 [#att{data={BinFd, _Sp}} | _ ] when BinFd == Fd -> ..387 [{N,T,P,L,R,M} ..388 || #att{name=N,type=T,data={_,P},md5=M,revpos=R,len=L} ..389 <- Atts]; ..390 _ -> ..391 % BinFd must not equal our Fd. This can happen when a database ..392 % is being switched out during a compaction ..393 ?LOG_DEBUG("File where the attachments are written has" ..394 " changed. Possibly retrying.", []), ..395 throw(retry) ..396 end, ..397 {ok, NewSummaryPointer} = ..398 case Header#db_header.disk_version < 4 of ..399 true -> ..400 couch_file:append_term(Fd, {Doc#doc.body, DiskAtts}); ..401 false -> ..402 couch_file:append_term_md5(Fd, {Doc#doc.body, DiskAtts}) ..403 end, ..404 {IsDeleted, NewSummaryPointer, UpdateSeq}; ..405 _ -> ..406 Value ..407 end ..408 end, Unflushed), ..409 flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]). ..410 ..411 merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccConflicts, AccSeq) -> ..412 {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccConflicts, AccSeq}; ..413 merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], ..414 [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccConflicts, AccSeq) -> ..415 #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq} ..416 = OldDocInfo, ..417 {NewRevTree, NewConflicts} = lists:foldl( ..418 fun(#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, {AccTree, AccConflicts2}) -> ..419 if not MergeConflicts -> ..420 case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of ..421 {_NewTree, conflicts} when (not OldDeleted) -> ..422 {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]}; ..423 {NewTree, no_conflicts} when AccTree == NewTree -> ..424 % the tree didn't change at all ..425 % meaning we are saving a rev that's already ..426 % been editted again. ..427 if (Pos == 1) and OldDeleted -> ..428 % this means we are recreating a brand new document ..429 % into a state that already existed before. ..430 % put the rev into a subsequent edit of the deletion ..431 #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} = ..432 couch_doc:to_doc_info(OldDocInfo), ..433 NewRevId = couch_db:new_revid( ..434 NewDoc#doc{revs={OldPos, [OldRev]}}), ..435 NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}}, ..436 {NewTree2, _} = couch_key_tree:merge(AccTree, ..437 [couch_db:doc_to_tree(NewDoc2)]), ..438 % we changed the rev id, this tells the caller we did. ..439 {NewTree2, [{{Id, {Pos-1,PrevRevs}}, {ok, {OldPos + 1, NewRevId}}} ..440 | AccConflicts2]}; ..441 true -> ..442 {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]} ..443 end; ..444 {NewTree, _} -> ..445 {NewTree, AccConflicts2} ..446 end; ..447 true -> ..448 {NewTree, _} = couch_key_tree:merge(AccTree, ..449 [couch_db:doc_to_tree(NewDoc)]), ..450 {NewTree, AccConflicts2} ..451 end ..452 end, ..453 {OldTree, AccConflicts}, NewDocs), ..454 if NewRevTree == OldTree -> ..455 % nothing changed ..456 merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos, ..457 AccRemoveSeqs, NewConflicts, AccSeq); ..458 true -> ..459 % we have updated the document, give it a new seq # ..460 NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree}, ..461 RemoveSeqs = case OldSeq of ..462 0 -> AccRemoveSeqs; ..463 _ -> [OldSeq | AccRemoveSeqs] ..464 end, ..465 merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, ..466 [NewInfo|AccNewInfos], RemoveSeqs, NewConflicts, AccSeq+1) ..467 end. ..468 ..469 ..470 ..471 new_index_entries([], AccById, AccBySeq) -> ..472 {AccById, AccBySeq}; ..473 new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) -> ..474 #doc_info{revs=[#rev_info{deleted=Deleted}|_]} = DocInfo = ..475 couch_doc:to_doc_info(FullDocInfo), ..476 new_index_entries(RestInfos, ..477 [FullDocInfo#full_doc_info{deleted=Deleted}|AccById], ..478 [DocInfo|AccBySeq]). ..479 ..480 ..481 stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) -> ..482 [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} || ..483 #full_doc_info{rev_tree=Tree}=Info <- DocInfos]. ..484 ..485 update_docs_int(Db, DocsList, NonRepDocs, Options) -> ..486 #db{ ..487 fulldocinfo_by_id_btree = DocInfoByIdBTree, ..488 docinfo_by_seq_btree = DocInfoBySeqBTree, ..489 update_seq = LastSeq ..490 } = Db, ..491 Ids = [Id || [#doc{id=Id}|_] <- DocsList], ..492 % lookup up the old documents, if they exist. ..493 OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), ..494 OldDocInfos = lists:zipwith( ..495 fun(_Id, {ok, FullDocInfo}) -> ..496 FullDocInfo; ..497 (Id, not_found) -> ..498 #full_doc_info{id=Id} ..499 end, ..500 Ids, OldDocLookups), ..501 ..502 % Merge the new docs into the revision trees. ..503 {ok, NewDocInfos0, RemoveSeqs, Conflicts, NewSeq} = merge_rev_trees( ..504 lists:member(merge_conflicts, Options), ..505 DocsList, OldDocInfos, [], [], [], LastSeq), ..506 ..507 NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0), ..508 ..509 % All documents are now ready to write. ..510 ..511 {ok, LocalConflicts, Db2} = update_local_docs(Db, NonRepDocs), ..512 ..513 % Write out the document summaries (the bodies are stored in the nodes of ..514 % the trees, the attachments are already written to disk) ..515 {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []), ..516 ..517 {IndexFullDocInfos, IndexDocInfos} = ..518 new_index_entries(FlushedFullDocInfos, [], []), ..519 ..520 % and the indexes ..521 {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []), ..522 {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexDocInfos, RemoveSeqs), ..523 ..524 Db3 = Db2#db{ ..525 fulldocinfo_by_id_btree = DocInfoByIdBTree2, ..526 docinfo_by_seq_btree = DocInfoBySeqBTree2, ..527 update_seq = NewSeq}, ..528 ..529 % Check if we just updated any design documents, and update the validation ..530 % funs if we did. ..531 case [1 || <<"_design/",_/binary>> <- Ids] of ..532 [] -> ..533 Db4 = Db3; ..534 _ -> ..535 Db4 = refresh_validate_doc_funs(Db3) ..536 end, ..537 ..538 {ok, LocalConflicts ++ Conflicts, ..539 commit_data(Db4, not lists:member(full_commit, Options))}. ..540 ..541 ..542 update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> ..543 Ids = [Id || #doc{id=Id} <- Docs], ..544 OldDocLookups = couch_btree:lookup(Btree, Ids), ..545 BtreeEntries = lists:zipwith( ..546 fun(#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, OldDocLookup) -> ..547 case PrevRevs of ..548 [RevStr|_] -> ..549 PrevRev = list_to_integer(?b2l(RevStr)); ..550 [] -> ..551 PrevRev = 0 ..552 end, ..553 OldRev = ..554 case OldDocLookup of ..555 {ok, {_, {OldRev0, _}}} -> OldRev0; ..556 not_found -> 0 ..557 end, ..558 case OldRev == PrevRev of ..559 true -> ..560 case Delete of ..561 false -> {update, {Id, {PrevRev + 1, PrevRevs, Body}}}; ..562 true -> {remove, Id, PrevRevs} ..563 end; ..564 false -> ..565 {conflict, {Id, {0, PrevRevs}}} ..566 end ..567 end, Docs, OldDocLookups), ..568 ..569 BtreeIdsRemove = [Id || {remove, Id, _PrevRevs} <- BtreeEntries], ..570 BtreeIdsUpdate = [{Id, {NewRev, Body}} || {update, {Id, {NewRev, _OldRevs, Body}}} <- BtreeEntries], ..571 Results = ..572 [{{Id, {0, PrevRevs}}, {ok, {0, <<"0">>}}} ..573 || {remove, Id, PrevRevs} <- BtreeEntries] ++ ..574 [{{Id, {0, PrevRevs}}, {ok, {0, ?l2b(integer_to_list(NewRev))}}} ..575 || {update, {Id, {NewRev, PrevRevs, _Body}}} <- BtreeEntries] ++ ..576 [{IdRevs, conflict} ..577 || {conflict, IdRevs} <- BtreeEntries], ..578 ..579 {ok, Btree2} = ..580 couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), ..581 ..582 {ok, Results, Db#db{local_docs_btree = Btree2}}. ..583 ..584 ..585 commit_data(Db) -> ..586 commit_data(Db, false). ..587 ..588 db_to_header(Db, Header) -> ..589 Header#db_header{ ..590 update_seq = Db#db.update_seq, ..591 docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), ..592 fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), ..593 local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), ..594 admins_ptr = Db#db.admins_ptr, ..595 revs_limit = Db#db.revs_limit}. ..596 ..597 commit_data(#db{fd=Fd,header=OldHeader,fsync_options=FsyncOptions}=Db, Delay) -> ..598 Header = db_to_header(Db, OldHeader), ..599 if OldHeader == Header -> ..600 Db; ..601 Delay and (Db#db.waiting_delayed_commit == nil) -> ..602 Db#db{waiting_delayed_commit= ..603 erlang:send_after(1000, self(), delayed_commit)}; ..604 Delay -> ..605 Db; ..606 true -> ..607 if Db#db.waiting_delayed_commit /= nil -> ..608 case erlang:cancel_timer(Db#db.waiting_delayed_commit) of ..609 false -> receive delayed_commit -> ok after 0 -> ok end; ..610 _ -> ok ..611 end; ..612 true -> ok ..613 end, ..614 case lists:member(before_header, FsyncOptions) of ..615 true -> ok = couch_file:sync(Fd); ..616 _ -> ok ..617 end, ..618 ..619 ok = couch_file:write_header(Fd, Header), ..620 ..621 case lists:member(after_header, FsyncOptions) of ..622 true -> ok = couch_file:sync(Fd); ..623 _ -> ok ..624 end, ..625 ..626 Db#db{waiting_delayed_commit=nil, ..627 header=Header, ..628 committed_update_seq=Db#db.update_seq} ..629 end. ..630 ..631 ..632 copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) -> ..633 {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcDb, SrcSp), ..634 % copy the bin values ..635 NewBinInfos = lists:map( ..636 fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null -> ..637 % 09 UPGRADE CODE ..638 {NewBinSp, Len, Md5} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd), ..639 {Name, Type, NewBinSp, Len, Pos, Md5}; ..640 ({Name, {Type, BinSp, Len}}) -> ..641 % 09 UPGRADE CODE ..642 {NewBinSp, Len, Md5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), ..643 {Name, Type, NewBinSp, Len, Pos, Md5}; ..644 ({Name, Type, BinSp, Len, RevPos, Md5}) -> ..645 {NewBinSp, Len, Md5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), ..646 {Name, Type, NewBinSp, Len, RevPos, Md5} ..647 end, BinInfos), ..648 {BodyData, NewBinInfos}. ..649 ..650 copy_rev_tree_attachments(SrcDb, DestFd, Tree) -> ..651 couch_key_tree:map( ..652 fun(Rev, {IsDel, Sp, Seq}, leaf) -> ..653 DocBody = copy_doc_attachments(SrcDb, Rev, Sp, DestFd), ..654 {IsDel, DocBody, Seq}; ..655 (_, _, branch) -> ..656 ?REV_MISSING ..657 end, Tree). ..658 ..659 ..660 copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> ..661 Ids = [Id || #doc_info{id=Id} <- InfoBySeq], ..662 LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), ..663 ..664 % write out the attachments ..665 NewFullDocInfos0 = lists:map( ..666 fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> ..667 Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, DestFd, RevTree)} ..668 end, LookupResults), ..669 % write out the docs ..670 % we do this in 2 stages so the docs are written out contiguously, making ..671 % view indexing and replication faster. ..672 NewFullDocInfos1 = lists:map( ..673 fun(#full_doc_info{rev_tree=RevTree}=Info) -> ..674 Info#full_doc_info{rev_tree=couch_key_tree:map_leafs( ..675 fun(_Key, {IsDel, DocBody, Seq}) -> ..676 {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody), ..677 {IsDel, Pos, Seq} ..678 end, RevTree)} ..679 end, NewFullDocInfos0), ..680 ..681 NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1), ..682 NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos], ..683 RemoveSeqs = ..684 case Retry of ..685 false -> ..686 []; ..687 true -> ..688 % We are retrying a compaction, meaning the documents we are copying may ..689 % already exist in our file and must be removed from the by_seq index. ..690 Existing = couch_btree:lookup(NewDb#db.fulldocinfo_by_id_btree, Ids), ..691 [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing] ..692 end, ..693 ..694 {ok, DocInfoBTree} = couch_btree:add_remove( ..695 NewDb#db.docinfo_by_seq_btree, NewDocInfos, RemoveSeqs), ..696 {ok, FullDocInfoBTree} = couch_btree:add_remove( ..697 NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []), ..698 NewDb#db{ fulldocinfo_by_id_btree=FullDocInfoBTree, ..699 docinfo_by_seq_btree=DocInfoBTree}. ..700 ..701 ..702 ..703 copy_compact(Db, NewDb0, Retry) -> ..704 FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header], ..705 NewDb = NewDb0#db{fsync_options=FsyncOptions}, ..706 TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), ..707 EnumBySeqFun = ..708 fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) -> ..709 couch_task_status:update("Copied ~p of ~p changes (~p%)", ..710 [TotalCopied, TotalChanges, (TotalCopied*100) div TotalChanges]), ..711 if TotalCopied rem 1000 == 0 -> ..712 NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry), ..713 if TotalCopied rem 10000 == 0 -> ..714 {ok, {commit_data(NewDb2#db{update_seq=Seq}), [], TotalCopied + 1}}; ..715 true -> ..716 {ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}} ..717 end; ..718 true -> ..719 {ok, {AccNewDb, [DocInfo | AccUncopied], TotalCopied + 1}} ..720 end ..721 end, ..722 ..723 couch_task_status:set_update_frequency(500), ..724 ..725 {ok, {NewDb2, Uncopied, TotalChanges}} = ..726 couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, [], 0}), ..727 ..728 couch_task_status:update("Flushing"), ..729 ..730 NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry), ..731 ..732 % copy misc header values ..733 if NewDb3#db.admins /= Db#db.admins -> ..734 {ok, Ptr} = couch_file:append_term(NewDb3#db.fd, Db#db.admins), ..735 NewDb4 = NewDb3#db{admins=Db#db.admins, admins_ptr=Ptr}; ..736 true -> ..737 NewDb4 = NewDb3 ..738 end, ..739 ..740 commit_data(NewDb4#db{update_seq=Db#db.update_seq}). ..741 ..742 start_copy_compact(#db{name=Name,filepath=Filepath}=Db) -> ..743 CompactFile = Filepath ++ ".compact", ..744 ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), ..745 case couch_file:open(CompactFile) of ..746 {ok, Fd} -> ..747 couch_task_status:add_task(<<"Database Compaction">>, <>, <<"Starting">>), ..748 Retry = true, ..749 {ok, Header} = couch_file:read_header(Fd); ..750 {error, enoent} -> ..751 couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>), ..752 {ok, Fd} = couch_file:open(CompactFile, [create]), ..753 Retry = false, ..754 ok = couch_file:write_header(Fd, Header=#db_header{}) ..755 end, ..756 NewDb = init_db(Name, CompactFile, Fd, Header), ..757 NewDb2 = copy_compact(Db, NewDb, Retry), ..758 ..759 gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}), ..760 close_db(NewDb2). ..761
Generated using etap 0.3.4.