C0 code coverage information

Generated on 2009-08-09 21:22:57 with etap 0.3.4.

Name Total lines Lines of code Total coverage Code coverage
couch_db_updater ?? ?? ??
39% 
....1 % Licensed under the Apache License, Version 2.0 (the "License"); you may not
....2 % use this file except in compliance with the License. You may obtain a copy of
....3 % the License at
....4 %
....5 %   http://www.apache.org/licenses/LICENSE-2.0
....6 %
....7 % Unless required by applicable law or agreed to in writing, software
....8 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
....9 % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
...10 % License for the specific language governing permissions and limitations under
...11 % the License.
...12 
...13 -module(couch_db_updater).
...14 -behaviour(gen_server).
...15 
...16 -export([btree_by_id_reduce/2,btree_by_seq_reduce/2]).
...17 -export([less_docid/2]).
...18 -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
...19 
...20 -include("couch_db.hrl").
...21 
...22 
...23 init({MainPid, DbName, Filepath, Fd, Options}) ->
...24     case lists:member(create, Options) of
...25     true ->
...26         % create a new header and writes it to the file
...27         Header =  #db_header{},
...28         ok = couch_file:write_header(Fd, Header),
...29         % delete any old compaction files that might be hanging around
...30         file:delete(Filepath ++ ".compact");
...31     false ->
...32         ok = couch_file:upgrade_old_header(Fd, <<$g, $m, $k, 0>>), % 09 UPGRADE CODE
...33         {ok, Header} = couch_file:read_header(Fd)
...34     end,
...35 
...36     Db = init_db(DbName, Filepath, Fd, Header),
...37     Db2 = refresh_validate_doc_funs(Db),
...38     {ok, Db2#db{main_pid=MainPid}}.
...39 
...40 
...41 terminate(Reason, _Srv) ->
...42     couch_util:terminate_linked(Reason),
...43     ok.
...44 
...45 handle_call(get_db, _From, Db) ->
...46     {reply, {ok, Db}, Db};
...47 handle_call({update_docs, GroupedDocs, NonRepDocs, Options}, _From, Db) ->
...48     try update_docs_int(Db, GroupedDocs, NonRepDocs, Options) of
...49     {ok, Failures, Db2} ->
...50         ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
...51         if Db2#db.update_seq /= Db#db.update_seq ->
...52             couch_db_update_notifier:notify({updated, Db2#db.name});
...53         true -> ok
...54         end,
...55         {reply, {ok, Failures}, Db2}
...56     catch
...57         throw: retry ->
...58             {reply, retry, Db}
...59     end;
...60 handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
...61     {reply, ok, Db}; % no data waiting, return ok immediately
...62 handle_call(full_commit, _From,  Db) ->
...63     {reply, ok, commit_data(Db)}; % commit the data and return ok
...64 handle_call(increment_update_seq, _From, Db) ->
...65     Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
...66     ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
...67     couch_db_update_notifier:notify({updated, Db#db.name}),
...68     {reply, {ok, Db2#db.update_seq}, Db2};
...69 
...70 handle_call({set_admins, NewAdmins}, _From, Db) ->
...71     {ok, Ptr} = couch_file:append_term(Db#db.fd, NewAdmins),
...72     Db2 = commit_data(Db#db{admins=NewAdmins, admins_ptr=Ptr,
...73             update_seq=Db#db.update_seq+1}),
...74     ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
...75     {reply, ok, Db2};
...76 
...77 handle_call({set_revs_limit, Limit}, _From, Db) ->
...78     Db2 = commit_data(Db#db{revs_limit=Limit,
...79             update_seq=Db#db.update_seq+1}),
...80     ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
...81     {reply, ok, Db2};
...82 
...83 handle_call({purge_docs, _IdRevs}, _From,
...84         #db{compactor_pid=Pid}=Db) when Pid /= nil ->
...85     {reply, {error, purge_during_compaction}, Db};
...86 handle_call({purge_docs, IdRevs}, _From, Db) ->
...87     #db{
...88         fd=Fd,
...89         fulldocinfo_by_id_btree = DocInfoByIdBTree,
...90         docinfo_by_seq_btree = DocInfoBySeqBTree,
...91         update_seq = LastSeq,
...92         header = Header = #db_header{purge_seq=PurgeSeq}
...93         } = Db,
...94     DocLookups = couch_btree:lookup(DocInfoByIdBTree,
...95             [Id || {Id, _Revs} <- IdRevs]),
...96 
...97     NewDocInfos = lists:zipwith(
...98         fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) ->
...99             case couch_key_tree:remove_leafs(Tree, Revs) of
..100             {_, []=_RemovedRevs} -> % no change
..101                 nil;
..102             {NewTree, RemovedRevs} ->
..103                 {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs}
..104             end;
..105         (_, not_found) ->
..106             nil
..107         end,
..108         IdRevs, DocLookups),
..109 
..110     SeqsToRemove = [Seq
..111             || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos],
..112 
..113     FullDocInfoToUpdate = [FullInfo
..114             || {#full_doc_info{rev_tree=Tree}=FullInfo,_}
..115             <- NewDocInfos, Tree /= []],
..116 
..117     IdRevsPurged = [{Id, Revs}
..118             || {#full_doc_info{id=Id}, Revs} <- NewDocInfos],
..119 
..120     {DocInfoToUpdate, NewSeq} = lists:mapfoldl(
..121         fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) ->
..122             Tree2 = couch_key_tree:map_leafs( fun(RevInfo) ->
..123                     RevInfo#rev_info{seq=SeqAcc + 1}
..124                 end, Tree),
..125             {couch_doc:to_doc_info(FullInfo#full_doc_info{rev_tree=Tree2}),
..126                 SeqAcc + 1}
..127         end, LastSeq, FullDocInfoToUpdate),
..128 
..129     IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_}
..130             <- NewDocInfos],
..131 
..132     {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree,
..133             DocInfoToUpdate, SeqsToRemove),
..134     {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree,
..135             FullDocInfoToUpdate, IdsToRemove),
..136     {ok, Pointer} = couch_file:append_term(Fd, IdRevsPurged),
..137 
..138     Db2 = commit_data(
..139         Db#db{
..140             fulldocinfo_by_id_btree = DocInfoByIdBTree2,
..141             docinfo_by_seq_btree = DocInfoBySeqBTree2,
..142             update_seq = NewSeq + 1,
..143             header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}),
..144 
..145     ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
..146     couch_db_update_notifier:notify({updated, Db#db.name}),
..147     {reply, {ok, Db2#db.update_seq, IdRevsPurged}, Db2}.
..148 
..149 
..150 handle_cast(start_compact, Db) ->
..151     case Db#db.compactor_pid of
..152     nil ->
..153         ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]),
..154         Pid = spawn_link(fun() -> start_copy_compact(Db) end),
..155         Db2 = Db#db{compactor_pid=Pid},
..156         ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
..157         {noreply, Db2};
..158     _ ->
..159         % compact currently running, this is a no-op
..160         {noreply, Db}
..161     end;
..162 handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
..163     {ok, NewFd} = couch_file:open(CompactFilepath),
..164     {ok, NewHeader} = couch_file:read_header(NewFd),
..165     #db{update_seq=NewSeq} = NewDb =
..166             init_db(Db#db.name, Filepath, NewFd, NewHeader),
..167     unlink(NewFd),
..168     case Db#db.update_seq == NewSeq of
..169     true ->
..170         % suck up all the local docs into memory and write them to the new db
..171         {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
..172                 fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
..173         {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs),
..174 
..175         NewDb2 = commit_data( NewDb#db{local_docs_btree=NewLocalBtree,
..176                 main_pid = Db#db.main_pid,filepath = Filepath}),
..177 
..178         ?LOG_DEBUG("CouchDB swapping files ~s and ~s.",
..179                 [Filepath, CompactFilepath]),
..180         file:delete(Filepath),
..181         ok = file:rename(CompactFilepath, Filepath),
..182         close_db(Db),
..183         ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb2}),
..184         ?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]),
..185         {noreply, NewDb2#db{compactor_pid=nil}};
..186     false ->
..187         ?LOG_INFO("Compaction file still behind main file "
..188             "(update seq=~p. compact update seq=~p). Retrying.",
..189             [Db#db.update_seq, NewSeq]),
..190         close_db(NewDb),
..191         Pid = spawn_link(fun() -> start_copy_compact(Db) end),
..192         Db2 = Db#db{compactor_pid=Pid},
..193         {noreply, Db2}
..194     end.
..195 
..196 handle_info(delayed_commit, Db) ->
..197     {noreply, commit_data(Db)}.
..198 
..199 code_change(_OldVsn, State, _Extra) ->
..200     {ok, State}.
..201 
..202 
..203 btree_by_seq_split(#doc_info{id=Id, high_seq=KeySeq, revs=Revs}) ->
..204     RevInfos = [{Rev, Seq, Bp} ||
..205         #rev_info{rev=Rev,seq=Seq,deleted=false,body_sp=Bp} <- Revs],
..206     DeletedRevInfos = [{Rev, Seq, Bp} ||
..207         #rev_info{rev=Rev,seq=Seq,deleted=true,body_sp=Bp} <- Revs],
..208     {KeySeq,{Id, RevInfos, DeletedRevInfos}}.
..209 
..210 btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
..211     #doc_info{
..212         id = Id,
..213         high_seq=KeySeq,
..214         revs =
..215             [#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} ||
..216                 {Rev, Seq, Bp} <- RevInfos] ++
..217             [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} ||
..218                 {Rev, Seq, Bp} <- DeletedRevInfos]};
..219 btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) ->
..220     % 09 UPGRADE CODE
..221     % this is the 0.9.0 and earlier by_seq record. It's missing the body pointers
..222     % and individual seq nums for conflicts that are currently in the index,
..223     % meaning the filtered _changes api will not work except for on main docs.
..224     % Simply compact a 0.9.0 database to upgrade the index.
..225     #doc_info{
..226         id=Id,
..227         high_seq=KeySeq,
..228         revs = [#rev_info{rev=Rev,seq=KeySeq,deleted=Deleted,body_sp=Bp}] ++
..229             [#rev_info{rev=Rev1,seq=KeySeq,deleted=false} || Rev1 <- Conflicts] ++
..230             [#rev_info{rev=Rev2,seq=KeySeq,deleted=true} || Rev2 <- DelConflicts]}.
..231 
..232 btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
..233         deleted=Deleted, rev_tree=Tree}) ->
..234     DiskTree =
..235     couch_key_tree:map(
..236         fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) ->
..237             {if IsDeleted -> 1; true -> 0 end, BodyPointer, UpdateSeq};
..238         (_RevId, ?REV_MISSING) ->
..239             ?REV_MISSING
..240         end, Tree),
..241     {Id, {Seq, if Deleted -> 1; true -> 0 end, DiskTree}}.
..242 
..243 btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
..244     Tree =
..245     couch_key_tree:map(
..246         fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) ->
..247             {IsDeleted == 1, BodyPointer, UpdateSeq};
..248         (_RevId, ?REV_MISSING) ->
..249             ?REV_MISSING;
..250         (_RevId, {IsDeleted, BodyPointer}) ->
..251             % 09 UPGRADE CODE
..252             % this is the 0.9.0 and earlier rev info record. It's missing the seq
..253             % nums, which means couchdb will sometimes reexamine unchanged
..254             % documents with the _changes API.
..255             % This is fixed by compacting the database.
..256             {IsDeleted, BodyPointer, HighSeq}
..257         end, DiskTree),
..258 
..259     #full_doc_info{id=Id, update_seq=HighSeq, deleted=Deleted==1, rev_tree=Tree}.
..260 
..261 btree_by_id_reduce(reduce, FullDocInfos) ->
..262     % count the number of not deleted documents
..263     {length([1 || #full_doc_info{deleted=false} <- FullDocInfos]),
..264         length([1 || #full_doc_info{deleted=true} <- FullDocInfos])};
..265 btree_by_id_reduce(rereduce, Reds) ->
..266     {lists:sum([Count || {Count,_} <- Reds]),
..267         lists:sum([DelCount || {_, DelCount} <- Reds])}.
..268 
..269 btree_by_seq_reduce(reduce, DocInfos) ->
..270     % count the number of documents
..271     length(DocInfos);
..272 btree_by_seq_reduce(rereduce, Reds) ->
..273     lists:sum(Reds).
..274 
..275 simple_upgrade_record(Old, New) when size(Old) == size(New)->
..276     Old;
..277 simple_upgrade_record(Old, New) ->
..278     NewValuesTail =
..279         lists:sublist(tuple_to_list(New), size(Old) + 1, size(New)-size(Old)),
..280     list_to_tuple(tuple_to_list(Old) ++ NewValuesTail).
..281 
..282 % used for doc insertion, also for the PassedEndFun on all_docs view
..283 less_docid(A, B) when A==B -> false;
..284 less_docid(nil, _) -> true; % nil - special key sorts before all
..285 less_docid({}, _) -> false; % {} -> special key sorts after all
..286 less_docid(A, B) -> A < B.
..287 
..288 
..289 init_db(DbName, Filepath, Fd, Header0) ->
..290     Header1 = simple_upgrade_record(Header0, #db_header{}),
..291     Header =
..292     case element(2, Header1) of
..293     1 -> Header1#db_header{unused = 0}; % 0.9
..294     2 -> Header1#db_header{unused = 0}; % post 0.9 and pre 0.10
..295     3 -> Header1; % post 0.9 and pre 0.10
..296     ?LATEST_DISK_VERSION -> Header1;
..297     _ -> throw({database_disk_version_error, "Incorrect disk header version"})
..298     end,
..299     Less = fun less_docid/2,
..300 
..301     {ok, FsyncOptions} = couch_util:parse_term(
..302             couch_config:get("couchdb", "fsync_options",
..303                     "[before_header, after_header, on_file_open]")),
..304 
..305     case lists:member(on_file_open, FsyncOptions) of
..306     true -> ok = couch_file:sync(Fd);
..307     _ -> ok
..308     end,
..309 
..310     {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
..311         [{split, fun(X) -> btree_by_id_split(X) end},
..312         {join, fun(X,Y) -> btree_by_id_join(X,Y) end},
..313         {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end},
..314         {less, Less}]),
..315     {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
..316             [{split, fun(X) -> btree_by_seq_split(X) end},
..317             {join, fun(X,Y) -> btree_by_seq_join(X,Y) end},
..318             {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end}]),
..319     {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
..320     case Header#db_header.admins_ptr of
..321     nil ->
..322         Admins = [],
..323         AdminsPtr = nil;
..324     AdminsPtr ->
..325         {ok, Admins} = couch_file:pread_term(Fd, AdminsPtr)
..326     end,
..327     % convert start time tuple to microsecs and store as a binary string
..328     {MegaSecs, Secs, MicroSecs} = now(),
..329     StartTime = ?l2b(io_lib:format("~p",
..330             [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
..331     {ok, RefCntr} = couch_ref_counter:start([Fd]),
..332     #db{
..333         update_pid=self(),
..334         fd=Fd,
..335         fd_ref_counter = RefCntr,
..336         header=Header,
..337         fulldocinfo_by_id_btree = IdBtree,
..338         docinfo_by_seq_btree = SeqBtree,
..339         local_docs_btree = LocalDocsBtree,
..340         committed_update_seq = Header#db_header.update_seq,
..341         update_seq = Header#db_header.update_seq,
..342         name = DbName,
..343         filepath = Filepath,
..344         admins = Admins,
..345         admins_ptr = AdminsPtr,
..346         instance_start_time = StartTime,
..347         revs_limit = Header#db_header.revs_limit,
..348         fsync_options = FsyncOptions
..349         }.
..350 
..351 
..352 close_db(#db{fd_ref_counter = RefCntr}) ->
..353     couch_ref_counter:drop(RefCntr).
..354 
..355 
..356 refresh_validate_doc_funs(Db) ->
..357     {ok, DesignDocs} = couch_db:get_design_docs(Db),
..358     ProcessDocFuns = lists:flatmap(
..359         fun(DesignDoc) ->
..360             case couch_doc:get_validate_doc_fun(DesignDoc) of
..361             nil -> [];
..362             Fun -> [Fun]
..363             end
..364         end, DesignDocs),
..365     Db#db{validate_doc_funs=ProcessDocFuns}.
..366 
..367 % rev tree functions
..368 
..369 flush_trees(_Db, [], AccFlushedTrees) ->
..370     {ok, lists:reverse(AccFlushedTrees)};
..371 flush_trees(#db{fd=Fd,header=Header}=Db,
..372         [InfoUnflushed | RestUnflushed], AccFlushed) ->
..373     #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
..374     Flushed = couch_key_tree:map(
..375         fun(_Rev, Value) ->
..376             case Value of
..377             #doc{atts=Atts,deleted=IsDeleted}=Doc ->
..378                 % this node value is actually an unwritten document summary,
..379                 % write to disk.
..380                 % make sure the Fd in the written bins is the same Fd we are
..381                 % and convert bins, removing the FD.
..382                 % All bins should have been written to disk already.
..383                 DiskAtts =
..384                 case Atts of
..385                 [] -> [];
..386                 [#att{data={BinFd, _Sp}} | _ ] when BinFd == Fd ->
..387                     [{N,T,P,L,R,M}
..388                         || #att{name=N,type=T,data={_,P},md5=M,revpos=R,len=L}
..389                         <- Atts];
..390                 _ ->
..391                     % BinFd must not equal our Fd. This can happen when a database
..392                     % is being switched out during a compaction
..393                     ?LOG_DEBUG("File where the attachments are written has"
..394                             " changed. Possibly retrying.", []),
..395                     throw(retry)
..396                 end,
..397                 {ok, NewSummaryPointer} =
..398                 case Header#db_header.disk_version < 4 of
..399                 true ->
..400                     couch_file:append_term(Fd, {Doc#doc.body, DiskAtts});
..401                 false ->
..402                     couch_file:append_term_md5(Fd, {Doc#doc.body, DiskAtts})
..403                 end,
..404                 {IsDeleted, NewSummaryPointer, UpdateSeq};
..405             _ ->
..406                 Value
..407             end
..408         end, Unflushed),
..409     flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
..410 
..411 merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccConflicts, AccSeq) ->
..412     {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccConflicts, AccSeq};
..413 merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
..414         [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccConflicts, AccSeq) ->
..415     #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq}
..416             = OldDocInfo,
..417     {NewRevTree, NewConflicts} = lists:foldl(
..418         fun(#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, {AccTree, AccConflicts2}) ->
..419             if not MergeConflicts ->
..420                 case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of
..421                 {_NewTree, conflicts} when (not OldDeleted) ->
..422                     {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]};
..423                 {NewTree, no_conflicts} when  AccTree == NewTree ->
..424                     % the tree didn't change at all
..425                     % meaning we are saving a rev that's already
..426                     % been editted again.
..427                     if (Pos == 1) and OldDeleted ->
..428                         % this means we are recreating a brand new document
..429                         % into a state that already existed before.
..430                         % put the rev into a subsequent edit of the deletion
..431                         #doc_info{revs=[#rev_info{rev={OldPos,OldRev}}|_]} = 
..432                                 couch_doc:to_doc_info(OldDocInfo),
..433                         NewRevId = couch_db:new_revid(
..434                                 NewDoc#doc{revs={OldPos, [OldRev]}}),
..435                         NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
..436                         {NewTree2, _} = couch_key_tree:merge(AccTree,
..437                                 [couch_db:doc_to_tree(NewDoc2)]),
..438                         % we changed the rev id, this tells the caller we did.
..439                         {NewTree2, [{{Id, {Pos-1,PrevRevs}}, {ok, {OldPos + 1, NewRevId}}}
..440                                 | AccConflicts2]};
..441                     true ->
..442                         {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]}
..443                     end;
..444                 {NewTree, _} ->
..445                     {NewTree, AccConflicts2}
..446                 end;
..447             true ->
..448                 {NewTree, _} = couch_key_tree:merge(AccTree,
..449                             [couch_db:doc_to_tree(NewDoc)]),
..450                 {NewTree, AccConflicts2}
..451             end 
..452         end,
..453         {OldTree, AccConflicts}, NewDocs),
..454     if NewRevTree == OldTree ->
..455         % nothing changed
..456         merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos,
..457                 AccRemoveSeqs, NewConflicts, AccSeq);
..458     true ->
..459         % we have updated the document, give it a new seq #
..460         NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
..461         RemoveSeqs = case OldSeq of
..462             0 -> AccRemoveSeqs;
..463             _ -> [OldSeq | AccRemoveSeqs]
..464         end,
..465         merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo,
..466                 [NewInfo|AccNewInfos], RemoveSeqs, NewConflicts, AccSeq+1)
..467     end.
..468 
..469 
..470 
..471 new_index_entries([], AccById, AccBySeq) ->
..472     {AccById, AccBySeq};
..473 new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) ->
..474     #doc_info{revs=[#rev_info{deleted=Deleted}|_]} = DocInfo =
..475             couch_doc:to_doc_info(FullDocInfo),
..476     new_index_entries(RestInfos,
..477         [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
..478         [DocInfo|AccBySeq]).
..479 
..480 
..481 stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
..482     [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
..483             #full_doc_info{rev_tree=Tree}=Info <- DocInfos].
..484 
..485 update_docs_int(Db, DocsList, NonRepDocs, Options) ->
..486     #db{
..487         fulldocinfo_by_id_btree = DocInfoByIdBTree,
..488         docinfo_by_seq_btree = DocInfoBySeqBTree,
..489         update_seq = LastSeq
..490         } = Db,
..491     Ids = [Id || [#doc{id=Id}|_] <- DocsList],
..492     % lookup up the old documents, if they exist.
..493     OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
..494     OldDocInfos = lists:zipwith(
..495         fun(_Id, {ok, FullDocInfo}) ->
..496             FullDocInfo;
..497         (Id, not_found) ->
..498             #full_doc_info{id=Id}
..499         end,
..500         Ids, OldDocLookups),
..501 
..502     % Merge the new docs into the revision trees.
..503     {ok, NewDocInfos0, RemoveSeqs, Conflicts, NewSeq} = merge_rev_trees(
..504             lists:member(merge_conflicts, Options),
..505             DocsList, OldDocInfos, [], [], [], LastSeq),
..506 
..507     NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0),
..508 
..509     % All documents are now ready to write.
..510 
..511     {ok, LocalConflicts, Db2}  = update_local_docs(Db, NonRepDocs),
..512 
..513     % Write out the document summaries (the bodies are stored in the nodes of
..514     % the trees, the attachments are already written to disk)
..515     {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
..516 
..517     {IndexFullDocInfos, IndexDocInfos} =
..518             new_index_entries(FlushedFullDocInfos, [], []),
..519 
..520     % and the indexes
..521     {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []),
..522     {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexDocInfos, RemoveSeqs),
..523 
..524     Db3 = Db2#db{
..525         fulldocinfo_by_id_btree = DocInfoByIdBTree2,
..526         docinfo_by_seq_btree = DocInfoBySeqBTree2,
..527         update_seq = NewSeq},
..528 
..529     % Check if we just updated any design documents, and update the validation
..530     % funs if we did.
..531     case [1 || <<"_design/",_/binary>> <- Ids] of
..532     [] ->
..533         Db4 = Db3;
..534     _ ->
..535         Db4 = refresh_validate_doc_funs(Db3)
..536     end,
..537 
..538     {ok, LocalConflicts ++ Conflicts,
..539             commit_data(Db4, not lists:member(full_commit, Options))}.
..540 
..541 
..542 update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
..543     Ids = [Id || #doc{id=Id} <- Docs],
..544     OldDocLookups = couch_btree:lookup(Btree, Ids),
..545     BtreeEntries = lists:zipwith(
..546         fun(#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, OldDocLookup) ->
..547             case PrevRevs of
..548             [RevStr|_] ->
..549                 PrevRev = list_to_integer(?b2l(RevStr));
..550             [] ->
..551                 PrevRev = 0
..552             end,
..553             OldRev =
..554             case OldDocLookup of
..555                 {ok, {_, {OldRev0, _}}} -> OldRev0;
..556                 not_found -> 0
..557             end,
..558             case OldRev == PrevRev of
..559             true ->
..560                 case Delete of
..561                     false -> {update, {Id, {PrevRev + 1, PrevRevs, Body}}};
..562                     true  -> {remove, Id, PrevRevs}
..563                 end;
..564             false ->
..565                 {conflict, {Id, {0, PrevRevs}}}
..566             end
..567         end, Docs, OldDocLookups),
..568 
..569     BtreeIdsRemove = [Id || {remove, Id, _PrevRevs} <- BtreeEntries],
..570     BtreeIdsUpdate = [{Id, {NewRev, Body}} || {update, {Id, {NewRev, _OldRevs, Body}}} <- BtreeEntries],
..571     Results = 
..572             [{{Id, {0, PrevRevs}}, {ok, {0, <<"0">>}}}
..573                 || {remove, Id, PrevRevs} <- BtreeEntries] ++
..574             [{{Id, {0, PrevRevs}}, {ok, {0, ?l2b(integer_to_list(NewRev))}}}
..575                 || {update, {Id, {NewRev, PrevRevs, _Body}}} <- BtreeEntries] ++
..576             [{IdRevs, conflict}
..577                 || {conflict, IdRevs} <- BtreeEntries],
..578 
..579     {ok, Btree2} =
..580         couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
..581 
..582     {ok, Results, Db#db{local_docs_btree = Btree2}}.
..583 
..584 
..585 commit_data(Db) ->
..586     commit_data(Db, false).
..587 
..588 db_to_header(Db, Header) ->
..589     Header#db_header{
..590         update_seq = Db#db.update_seq,
..591         docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
..592         fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
..593         local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
..594         admins_ptr = Db#db.admins_ptr,
..595         revs_limit = Db#db.revs_limit}.
..596 
..597 commit_data(#db{fd=Fd,header=OldHeader,fsync_options=FsyncOptions}=Db, Delay) ->
..598     Header = db_to_header(Db, OldHeader),
..599     if OldHeader == Header ->
..600         Db;
..601     Delay and (Db#db.waiting_delayed_commit == nil) ->
..602         Db#db{waiting_delayed_commit=
..603                 erlang:send_after(1000, self(), delayed_commit)};
..604     Delay ->
..605         Db;
..606     true ->
..607         if Db#db.waiting_delayed_commit /= nil ->
..608             case erlang:cancel_timer(Db#db.waiting_delayed_commit) of
..609             false -> receive delayed_commit -> ok after 0 -> ok end;
..610             _ -> ok
..611             end;
..612         true -> ok
..613         end,
..614         case lists:member(before_header, FsyncOptions) of
..615         true -> ok = couch_file:sync(Fd);
..616         _    -> ok
..617         end,
..618 
..619         ok = couch_file:write_header(Fd, Header),
..620 
..621         case lists:member(after_header, FsyncOptions) of
..622         true -> ok = couch_file:sync(Fd);
..623         _    -> ok
..624         end,
..625 
..626         Db#db{waiting_delayed_commit=nil,
..627             header=Header,
..628             committed_update_seq=Db#db.update_seq}
..629     end.
..630 
..631 
..632 copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) ->
..633     {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcDb, SrcSp),
..634     % copy the bin values
..635     NewBinInfos = lists:map(
..636         fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null ->
..637             % 09 UPGRADE CODE
..638             {NewBinSp, Len, Md5} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
..639             {Name, Type, NewBinSp, Len, Pos, Md5};
..640         ({Name, {Type, BinSp, Len}}) ->
..641             % 09 UPGRADE CODE
..642             {NewBinSp, Len, Md5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
..643             {Name, Type, NewBinSp, Len, Pos, Md5};
..644         ({Name, Type, BinSp, Len, RevPos, Md5}) ->
..645             {NewBinSp, Len, Md5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
..646             {Name, Type, NewBinSp, Len, RevPos, Md5}
..647         end, BinInfos),
..648     {BodyData, NewBinInfos}.
..649 
..650 copy_rev_tree_attachments(SrcDb, DestFd, Tree) ->
..651     couch_key_tree:map(
..652         fun(Rev, {IsDel, Sp, Seq}, leaf) ->
..653             DocBody = copy_doc_attachments(SrcDb, Rev, Sp, DestFd),
..654             {IsDel, DocBody, Seq};
..655         (_, _, branch) ->
..656             ?REV_MISSING
..657         end, Tree).
..658             
..659 
..660 copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
..661     Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
..662     LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
..663 
..664     % write out the attachments
..665     NewFullDocInfos0 = lists:map(
..666         fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
..667             Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, DestFd, RevTree)}
..668         end, LookupResults),
..669     % write out the docs
..670     % we do this in 2 stages so the docs are written out contiguously, making
..671     % view indexing and replication faster.
..672     NewFullDocInfos1 = lists:map(
..673         fun(#full_doc_info{rev_tree=RevTree}=Info) ->
..674             Info#full_doc_info{rev_tree=couch_key_tree:map_leafs(
..675                 fun(_Key, {IsDel, DocBody, Seq}) ->
..676                     {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody),
..677                     {IsDel, Pos, Seq}
..678                 end, RevTree)}
..679         end, NewFullDocInfos0),
..680 
..681     NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1),
..682     NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos],
..683     RemoveSeqs =
..684     case Retry of
..685     false ->
..686         [];
..687     true ->
..688         % We are retrying a compaction, meaning the documents we are copying may
..689         % already exist in our file and must be removed from the by_seq index.
..690         Existing = couch_btree:lookup(NewDb#db.fulldocinfo_by_id_btree, Ids),
..691         [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
..692     end,
..693 
..694     {ok, DocInfoBTree} = couch_btree:add_remove(
..695             NewDb#db.docinfo_by_seq_btree, NewDocInfos, RemoveSeqs),
..696     {ok, FullDocInfoBTree} = couch_btree:add_remove(
..697             NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
..698     NewDb#db{ fulldocinfo_by_id_btree=FullDocInfoBTree,
..699               docinfo_by_seq_btree=DocInfoBTree}.
..700 
..701 
..702 
..703 copy_compact(Db, NewDb0, Retry) ->
..704     FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header],
..705     NewDb = NewDb0#db{fsync_options=FsyncOptions},
..706     TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
..707     EnumBySeqFun =
..708     fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
..709         couch_task_status:update("Copied ~p of ~p changes (~p%)",
..710                 [TotalCopied, TotalChanges, (TotalCopied*100) div TotalChanges]),
..711         if TotalCopied rem 1000 == 0 ->
..712             NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
..713             if TotalCopied rem 10000 == 0 ->
..714                 {ok, {commit_data(NewDb2#db{update_seq=Seq}), [], TotalCopied + 1}};
..715             true ->
..716                 {ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}}
..717             end;
..718         true ->
..719             {ok, {AccNewDb, [DocInfo | AccUncopied], TotalCopied + 1}}
..720         end
..721     end,
..722 
..723     couch_task_status:set_update_frequency(500),
..724 
..725     {ok, {NewDb2, Uncopied, TotalChanges}} =
..726         couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, [], 0}),
..727 
..728     couch_task_status:update("Flushing"),
..729 
..730     NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
..731 
..732     % copy misc header values
..733     if NewDb3#db.admins /= Db#db.admins ->
..734         {ok, Ptr} = couch_file:append_term(NewDb3#db.fd, Db#db.admins),
..735         NewDb4 = NewDb3#db{admins=Db#db.admins, admins_ptr=Ptr};
..736     true ->
..737         NewDb4 = NewDb3
..738     end,
..739 
..740     commit_data(NewDb4#db{update_seq=Db#db.update_seq}).
..741 
..742 start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
..743     CompactFile = Filepath ++ ".compact",
..744     ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
..745     case couch_file:open(CompactFile) of
..746     {ok, Fd} ->
..747         couch_task_status:add_task(<<"Database Compaction">>, <>, <<"Starting">>),
..748         Retry = true,
..749         {ok, Header} = couch_file:read_header(Fd);
..750     {error, enoent} ->
..751         couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
..752         {ok, Fd} = couch_file:open(CompactFile, [create]),
..753         Retry = false,
..754         ok = couch_file:write_header(Fd, Header=#db_header{})
..755     end,
..756     NewDb = init_db(Name, CompactFile, Fd, Header),
..757     NewDb2 = copy_compact(Db, NewDb, Retry),
..758 
..759     gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}),
..760     close_db(NewDb2).
..761 

Generated using etap 0.3.4.