C0 code coverage information

Generated on 2009-08-09 21:22:57 with etap 0.3.4.

Name Total lines Lines of code Total coverage Code coverage
couch_db ?? ?? ??
28% 
....1 % Licensed under the Apache License, Version 2.0 (the "License"); you may not
....2 % use this file except in compliance with the License. You may obtain a copy of
....3 % the License at
....4 %
....5 %   http://www.apache.org/licenses/LICENSE-2.0
....6 %
....7 % Unless required by applicable law or agreed to in writing, software
....8 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
....9 % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
...10 % License for the specific language governing permissions and limitations under
...11 % the License.
...12 
...13 -module(couch_db).
...14 -behaviour(gen_server).
...15 
...16 -export([open/2,close/1,create/2,start_compact/1,get_db_info/1,get_design_docs/1]).
...17 -export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]).
...18 -export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
...19 -export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
...20 -export([set_revs_limit/2,get_revs_limit/1,register_update_notifier/3]).
...21 -export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]).
...22 -export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]).
...23 -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
...24 -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
...25 -export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]).
...26 -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
...27 -export([changes_since/5,read_doc/2,new_revid/1]).
...28 
...29 -include("couch_db.hrl").
...30 
...31 
...32 start_link(DbName, Filepath, Options) ->
...33     case open_db_file(Filepath, Options) of
...34     {ok, Fd} ->
...35         StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []),
...36         unlink(Fd),
...37         StartResult;
...38     Else ->
...39         Else
...40     end.
...41 
...42 open_db_file(Filepath, Options) ->
...43     case couch_file:open(Filepath, Options) of
...44     {ok, Fd} ->
...45         {ok, Fd};
...46     {error, enoent} ->
...47         % couldn't find file. is there a compact version? This can happen if
...48         % crashed during the file switch.
...49         case couch_file:open(Filepath ++ ".compact") of
...50         {ok, Fd} ->
...51             ?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
...52             ok = file:rename(Filepath ++ ".compact", Filepath),
...53             ok = couch_file:sync(Fd),
...54             {ok, Fd};
...55         {error, enoent} ->
...56             {not_found, no_db_file}
...57         end;
...58     Error ->
...59         Error
...60     end.
...61 
...62 
...63 create(DbName, Options) ->
...64     couch_server:create(DbName, Options).
...65 
...66 open(DbName, Options) ->
...67     couch_server:open(DbName, Options).
...68 
...69 ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) ->
...70     ok = gen_server:call(UpdatePid, full_commit, infinity),
...71     {ok, StartTime}.
...72 
...73 close(#db{fd_ref_counter=RefCntr}) ->
...74     couch_ref_counter:drop(RefCntr).
...75 
...76 open_ref_counted(MainPid, UserCtx) ->
...77     {ok, Db} = gen_server:call(MainPid, {open_ref_count, self()}),
...78     {ok, Db#db{user_ctx=UserCtx}}.
...79 
...80 is_idle(MainPid) ->
...81     gen_server:call(MainPid, is_idle).
...82 
...83 monitor(#db{main_pid=MainPid}) ->
...84     erlang:monitor(process, MainPid).
...85 
...86 register_update_notifier(#db{main_pid=Pid}, Seq, Fun) ->
...87     gen_server:call(Pid, {register_update_notifier, Seq, Fun}).
...88 
...89 start_compact(#db{update_pid=Pid}) ->
...90     gen_server:cast(Pid, start_compact).
...91 
...92 delete_doc(Db, Id, Revisions) ->
...93     DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
...94     {ok, [Result]} = update_docs(Db, DeletedDocs, []),
...95     {ok, Result}.
...96 
...97 open_doc(Db, IdOrDocInfo) ->
...98     open_doc(Db, IdOrDocInfo, []).
...99 
..100 open_doc(Db, Id, Options) ->
..101     couch_stats_collector:increment({couchdb, database_reads}),
..102     case open_doc_int(Db, Id, Options) of
..103     {ok, #doc{deleted=true}=Doc} ->
..104         case lists:member(deleted, Options) of
..105         true ->
..106             {ok, Doc};
..107         false ->
..108             {not_found, deleted}
..109         end;
..110     Else ->
..111         Else
..112     end.
..113 
..114 open_doc_revs(Db, Id, Revs, Options) ->
..115     couch_stats_collector:increment({couchdb, database_reads}),
..116     [Result] = open_doc_revs_int(Db, [{Id, Revs}], Options),
..117     Result.
..118 
..119 get_missing_revs(Db, IdRevsList) ->
..120     Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
..121     FullDocInfoResults = get_full_doc_infos(Db, Ids),
..122     Results = lists:zipwith(
..123         fun({Id, Revs}, FullDocInfoResult) ->
..124             case FullDocInfoResult of
..125             {ok, #full_doc_info{rev_tree=RevisionTree}} ->
..126                 {Id, couch_key_tree:find_missing(RevisionTree, Revs)};
..127             not_found ->
..128                 {Id, Revs}
..129             end
..130         end,
..131         IdRevsList, FullDocInfoResults),
..132     % strip out the non-missing ids
..133     Missing = [{Id, Revs} || {Id, Revs} <- Results, Revs /= []],
..134     {ok, Missing}.
..135 
..136 get_doc_info(Db, Id) ->
..137     case get_full_doc_info(Db, Id) of
..138     {ok, DocInfo} ->
..139         {ok, couch_doc:to_doc_info(DocInfo)};
..140     Else ->
..141         Else
..142     end.
..143 
..144 %   returns {ok, DocInfo} or not_found
..145 get_full_doc_info(Db, Id) ->
..146     [Result] = get_full_doc_infos(Db, [Id]),
..147     Result.
..148 
..149 get_full_doc_infos(Db, Ids) ->
..150     couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids).
..151 
..152 increment_update_seq(#db{update_pid=UpdatePid}) ->
..153     gen_server:call(UpdatePid, increment_update_seq).
..154 
..155 purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->
..156     gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
..157 
..158 get_committed_update_seq(#db{committed_update_seq=Seq}) ->
..159     Seq.
..160 
..161 get_update_seq(#db{update_seq=Seq})->
..162     Seq.
..163 
..164 get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})->
..165     PurgeSeq.
..166 
..167 get_last_purged(#db{header=#db_header{purged_docs=nil}}) ->
..168     {ok, []};
..169 get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) ->
..170     couch_file:pread_term(Fd, PurgedPointer).
..171 
..172 get_db_info(Db) ->
..173     #db{fd=Fd,
..174         header=#db_header{disk_version=DiskVersion},
..175         compactor_pid=Compactor,
..176         update_seq=SeqNum,
..177         name=Name,
..178         fulldocinfo_by_id_btree=FullDocBtree,
..179         instance_start_time=StartTime} = Db,
..180     {ok, Size} = couch_file:bytes(Fd),
..181     {ok, {Count, DelCount}} = couch_btree:full_reduce(FullDocBtree),
..182     InfoList = [
..183         {db_name, Name},
..184         {doc_count, Count},
..185         {doc_del_count, DelCount},
..186         {update_seq, SeqNum},
..187         {purge_seq, couch_db:get_purge_seq(Db)},
..188         {compact_running, Compactor/=nil},
..189         {disk_size, Size},
..190         {instance_start_time, StartTime},
..191         {disk_format_version, DiskVersion}
..192         ],
..193     {ok, InfoList}.
..194 
..195 get_design_docs(#db{fulldocinfo_by_id_btree=Btree}=Db) ->
..196     couch_btree:foldl(Btree, <<"_design/">>,
..197         fun(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) ->
..198             {ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []),
..199             {ok, [Doc | AccDocs]};
..200         (_, _Reds, AccDocs) ->
..201             {stop, AccDocs}
..202         end,
..203         []).
..204 
..205 check_is_admin(#db{admins=Admins, user_ctx=#user_ctx{name=Name,roles=Roles}}) ->
..206     DbAdmins = [<<"_admin">> | Admins],
..207     case DbAdmins -- [Name | Roles] of
..208     DbAdmins -> % same list, not an admin
..209         throw({unauthorized, <<"You are not a db or server admin.">>});
..210     _ ->
..211         ok
..212     end.
..213 
..214 get_admins(#db{admins=Admins}) ->
..215     Admins.
..216 
..217 set_admins(#db{update_pid=Pid}=Db, Admins) when is_list(Admins) ->
..218     check_is_admin(Db),
..219     gen_server:call(Pid, {set_admins, Admins}, infinity).
..220 
..221 
..222 get_revs_limit(#db{revs_limit=Limit}) ->
..223     Limit.
..224 
..225 set_revs_limit(#db{update_pid=Pid}=Db, Limit) when Limit > 0 ->
..226     check_is_admin(Db),
..227     gen_server:call(Pid, {set_revs_limit, Limit}, infinity);
..228 set_revs_limit(_Db, _Limit) ->
..229     throw(invalid_revs_limit).
..230 
..231 name(#db{name=Name}) ->
..232     Name.
..233 
..234 update_doc(Db, Doc, Options) ->
..235     case update_docs(Db, [Doc], Options) of
..236     {ok, [{ok, NewRev}]} ->
..237         {ok, NewRev};
..238     {ok, [Error]} ->
..239         throw(Error)
..240     end.
..241 
..242 update_docs(Db, Docs) ->
..243     update_docs(Db, Docs, []).
..244 
..245 % group_alike_docs groups the sorted documents into sublist buckets, by id.
..246 % ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
..247 group_alike_docs(Docs) ->
..248     Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs),
..249     group_alike_docs(Sorted, []).
..250 
..251 group_alike_docs([], Buckets) ->
..252     lists:reverse(Buckets);
..253 group_alike_docs([Doc|Rest], []) ->
..254     group_alike_docs(Rest, [[Doc]]);
..255 group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
..256     [#doc{id=BucketId}|_] = Bucket,
..257     case Doc#doc.id == BucketId of
..258     true ->
..259         % add to existing bucket
..260         group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]);
..261     false ->
..262         % add to new bucket
..263        group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]])
..264     end.
..265 
..266 
..267 validate_doc_update(#db{user_ctx=UserCtx, admins=Admins},
..268         #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) ->
..269     UserNames = [UserCtx#user_ctx.name | UserCtx#user_ctx.roles],
..270     % if the user is a server admin or db admin, allow the save
..271     case length(UserNames -- [<<"_admin">> | Admins]) == length(UserNames) of
..272     true ->
..273         % not an admin
..274         {unauthorized, <<"You are not a server or database admin.">>};
..275     false ->
..276         ok
..277     end;
..278 validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) ->
..279     ok;
..280 validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->
..281     ok;
..282 validate_doc_update(Db, Doc, GetDiskDocFun) ->
..283     DiskDoc = GetDiskDocFun(),
..284     JsonCtx = couch_util:json_user_ctx(Db),
..285     try [case Fun(Doc, DiskDoc, JsonCtx) of
..286             ok -> ok;
..287             Error -> throw(Error)
..288         end || Fun <- Db#db.validate_doc_funs],
..289         ok
..290     catch
..291         throw:Error ->
..292             Error
..293     end.
..294 
..295 
..296 prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
..297         OldFullDocInfo, LeafRevsDict, AllowConflict) ->
..298     case Revs of
..299     [PrevRev|_] ->
..300         case dict:find({RevStart, PrevRev}, LeafRevsDict) of
..301         {ok, {Deleted, DiskSp, DiskRevs}} ->
..302             case couch_doc:has_stubs(Doc) of
..303             true ->
..304                 DiskDoc = make_doc(Db, Id, Deleted, DiskSp, DiskRevs),
..305                 Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
..306                 {validate_doc_update(Db, Doc2, fun() -> DiskDoc end), Doc2};
..307             false ->
..308                 LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end,
..309                 {validate_doc_update(Db, Doc, LoadDiskDoc), Doc}
..310             end;
..311         error when AllowConflict ->
..312             {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
..313         error ->
..314             {conflict, Doc}
..315         end;
..316     [] ->
..317         % new doc, and we have existing revs.
..318         % reuse existing deleted doc
..319         if OldFullDocInfo#full_doc_info.deleted orelse AllowConflict ->
..320             {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
..321         true ->
..322             {conflict, Doc}
..323         end
..324     end.
..325 
..326 
..327 
..328 prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
..329         AccFatalErrors) ->
..330    {AccPrepped, AccFatalErrors};
..331 prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], 
..332         AllowConflict, AccPrepped, AccErrors) ->
..333     [#doc{id=Id}|_]=DocBucket,
..334     % no existing revs are known,
..335     {PreppedBucket, AccErrors3} = lists:foldl(
..336         fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
..337             case Revs of
..338             {0, []} ->
..339                 case validate_doc_update(Db, Doc, fun() -> nil end) of
..340                 ok ->
..341                     {[Doc | AccBucket], AccErrors2};
..342                 Error ->
..343                     {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]}
..344                 end;
..345             _ ->
..346                 % old revs specified but none exist, a conflict
..347                 {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]}
..348             end
..349         end,
..350         {[], AccErrors}, DocBucket),
..351 
..352     prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
..353             [PreppedBucket | AccPrepped], AccErrors3);
..354 prep_and_validate_updates(Db, [DocBucket|RestBuckets],
..355         [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
..356         AllowConflict, AccPrepped, AccErrors) ->
..357     Leafs = couch_key_tree:get_all_leafs(OldRevTree),
..358     LeafRevsDict = dict:from_list([{{Start, RevId}, {Deleted, Sp, Revs}} ||
..359             {{Deleted, Sp, _Seq}, {Start, [RevId|_]}=Revs} <- Leafs]),
..360     {PreppedBucket, AccErrors3} = lists:foldl(
..361         fun(Doc, {Docs2Acc, AccErrors2}) ->
..362             case prep_and_validate_update(Db, Doc, OldFullDocInfo,
..363                     LeafRevsDict, AllowConflict) of
..364             {ok, Doc2} ->
..365                 {[Doc2 | Docs2Acc], AccErrors2};
..366             {Error, #doc{id=Id,revs=Revs}} ->
..367                 % Record the error
..368                 {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]}
..369             end
..370         end,
..371         {[], AccErrors}, DocBucket),
..372     prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, [PreppedBucket | AccPrepped], AccErrors3).
..373 
..374 
..375 update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
..376     update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options, interactive_edit).
..377 
..378 
..379 prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) ->
..380     Errors2 = [{{Id, {Pos, Rev}}, Error} ||
..381             {#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors],
..382     {lists:reverse(AccPrepped), lists:reverse(Errors2)};
..383 prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped, AccErrors) ->
..384     case OldInfo of
..385     not_found ->
..386         {ValidatedBucket, AccErrors3} = lists:foldl(
..387             fun(Doc, {AccPrepped2, AccErrors2}) ->
..388                 case validate_doc_update(Db, Doc, fun() -> nil end) of
..389                 ok ->
..390                     {[Doc | AccPrepped2], AccErrors2};
..391                 Error ->
..392                     {AccPrepped2, [{Doc, Error} | AccErrors2]}
..393                 end
..394             end,
..395             {[], AccErrors}, Bucket),
..396         prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
..397     {ok, #full_doc_info{rev_tree=OldTree}} ->
..398         NewRevTree = lists:foldl(
..399             fun(NewDoc, AccTree) ->
..400                 {NewTree, _} = couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]),
..401                 NewTree
..402             end,
..403             OldTree, Bucket),
..404         Leafs = couch_key_tree:get_all_leafs_full(NewRevTree),
..405         LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]),
..406         {ValidatedBucket, AccErrors3} =
..407         lists:foldl(
..408             fun(#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, {AccValidated, AccErrors2}) ->
..409                 case dict:find({Pos, RevId}, LeafRevsFullDict) of
..410                 {ok, {Start, Path}} ->
..411                     % our unflushed doc is a leaf node. Go back on the path
..412                     % to find the previous rev that's on disk.
..413                     LoadPrevRevFun = fun() ->
..414                                 make_first_doc_on_disk(Db,Id,Start-1, tl(Path))
..415                             end,
..416                     case validate_doc_update(Db, Doc, LoadPrevRevFun) of
..417                     ok ->
..418                         {[Doc | AccValidated], AccErrors2};
..419                     Error ->
..420                         {AccValidated, [{Doc, Error} | AccErrors2]}
..421                     end;
..422                 _ ->
..423                     % this doc isn't a leaf or already exists in the tree.
..424                     % ignore but consider it a success.
..425                     {AccValidated, AccErrors2}
..426                 end
..427             end,
..428             {[], AccErrors}, Bucket),
..429         prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo,
..430                 [ValidatedBucket | AccPrepped], AccErrors3)
..431     end.
..432 
..433 
..434 
..435 new_revid(#doc{body=Body,revs={OldStart,OldRevs},
..436         atts=Atts,deleted=Deleted}) ->
..437     case [{N, T, M} || #att{name=N,type=T,md5=M} <- Atts, M /= <<>>] of
..438     Atts2 when length(Atts) /= length(Atts2) ->
..439         % We must have old style non-md5 attachments
..440         ?l2b(integer_to_list(couch_util:rand32()));
..441     Atts2 ->
..442         OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end,
..443         erlang:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2]))
..444     end.
..445 
..446 new_revs([], OutBuckets, IdRevsAcc) ->
..447     {lists:reverse(OutBuckets), IdRevsAcc};
..448 new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
..449     {NewBucket, IdRevsAcc3} = lists:mapfoldl(
..450         fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)->
..451         NewRevId = new_revid(Doc),
..452         {Doc#doc{revs={Start+1, [NewRevId | RevIds]}},
..453             [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
..454     end, IdRevsAcc, Bucket),
..455     new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
..456 
..457 check_dup_atts([#att{name=N1}, #att{name=N2} | _]) when N1 == N2 ->
..458     throw({bad_request, <<"Duplicate attachments">>});
..459 check_dup_atts([_, _ | Rest]) ->
..460     check_dup_atts(Rest);
..461 check_dup_atts(_) ->
..462     ok.
..463 
..464 sort_and_check_atts(#doc{atts=Atts}=Doc) ->
..465     Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts),
..466     check_dup_atts(Atts2),
..467     Doc#doc{atts=Atts2}.
..468 
..469 
..470 update_docs(Db, Docs, Options, replicated_changes) ->
..471     couch_stats_collector:increment({couchdb, database_writes}),
..472     DocBuckets = group_alike_docs(Docs),
..473 
..474     case (Db#db.validate_doc_funs /= []) orelse
..475         lists:any(
..476             fun(#doc{id= <>}) -> true;
..477             (_) -> false
..478             end, Docs) of
..479     true ->
..480         Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
..481         ExistingDocs = get_full_doc_infos(Db, Ids),
..482 
..483         {DocBuckets2, DocErrors} =
..484                 prep_and_validate_replicated_updates(Db, DocBuckets, ExistingDocs, [], []),
..485         DocBuckets3 = [Bucket || [_|_]=Bucket <- DocBuckets2]; % remove empty buckets
..486     false ->
..487         DocErrors = [],
..488         DocBuckets3 = DocBuckets
..489     end,
..490     DocBuckets4 = [[doc_flush_atts(sort_and_check_atts(Doc), Db#db.fd)
..491             || Doc <- Bucket] || Bucket <- DocBuckets3],
..492     {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
..493     {ok, DocErrors};
..494 
..495 update_docs(Db, Docs, Options, interactive_edit) ->
..496     couch_stats_collector:increment({couchdb, database_writes}),
..497     AllOrNothing = lists:member(all_or_nothing, Options),
..498     % go ahead and generate the new revision ids for the documents.
..499     % separate out the NonRep documents from the rest of the documents
..500     {Docs2, NonRepDocs} = lists:foldl(
..501         fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) ->
..502             case Id of
..503             <> ->
..504                 {DocsAcc, [Doc | NonRepDocsAcc]};
..505             Id->
..506                 {[Doc | DocsAcc], NonRepDocsAcc}
..507             end
..508         end, {[], []}, Docs),
..509         
..510     DocBuckets = group_alike_docs(Docs2),
..511 
..512     case (Db#db.validate_doc_funs /= []) orelse
..513         lists:any(
..514             fun(#doc{id= <>}) ->
..515                 true;
..516             (#doc{atts=Atts}) ->
..517                 Atts /= []
..518             end, Docs2) of
..519     true ->
..520         % lookup the doc by id and get the most recent
..521         Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
..522         ExistingDocInfos = get_full_doc_infos(Db, Ids),
..523 
..524         {DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db,
..525                 DocBuckets, ExistingDocInfos, AllOrNothing, [], []),
..526 
..527         % strip out any empty buckets
..528         DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped];
..529     false ->
..530         PreCommitFailures = [],
..531         DocBuckets2 = DocBuckets
..532     end,
..533 
..534     if (AllOrNothing) and (PreCommitFailures /= []) ->
..535         {aborted, lists:map(
..536             fun({{Id,{Pos, [RevId|_]}}, Error}) ->
..537                 {{Id, {Pos, RevId}}, Error};
..538             ({{Id,{0, []}}, Error}) ->
..539                 {{Id, {0, <<>>}}, Error}
..540             end, PreCommitFailures)};
..541     true ->
..542         Options2 = if AllOrNothing -> [merge_conflicts];
..543                 true -> [] end ++ Options,
..544         DocBuckets3 = [[
..545                 doc_flush_atts(set_new_att_revpos(
..546                         sort_and_check_atts(Doc)), Db#db.fd)
..547                 || Doc <- B] || B <- DocBuckets2],
..548         {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
..549         
..550         {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
..551         
..552         ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures),
..553         {ok, lists:map(
..554             fun(#doc{id=Id,revs={Pos, RevIds}}) ->
..555                 {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict),
..556                 Result
..557             end, Docs)}
..558     end.
..559 
..560 % Returns the first available document on disk. Input list is a full rev path
..561 % for the doc.
..562 make_first_doc_on_disk(_Db, _Id, _Pos, []) ->
..563     nil;
..564 make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING}|RestPath]) ->
..565     make_first_doc_on_disk(Db, Id, Pos - 1, RestPath);
..566 make_first_doc_on_disk(Db, Id, Pos, [{_Rev, {IsDel, Sp, _Seq}} |_]=DocPath) ->
..567     Revs = [Rev || {Rev, _} <- DocPath],
..568     make_doc(Db, Id, IsDel, Sp, {Pos, Revs}).
..569 
..570 
..571 write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets,
..572         NonRepDocs, Options) ->
..573     case gen_server:call(UpdatePid, 
..574             {update_docs, DocBuckets, NonRepDocs, Options}, infinity) of
..575     {ok, Results} -> {ok, Results};
..576     retry ->
..577         % This can happen if the db file we wrote to was swapped out by
..578         % compaction. Retry by reopening the db and writing to the current file
..579         {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx),
..580         DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
..581         % We only retry once
..582         close(Db2),
..583         case gen_server:call(UpdatePid, {update_docs, DocBuckets2, NonRepDocs, Options}, infinity) of
..584         {ok, Results} -> {ok, Results};
..585         retry -> throw({update_error, compaction_retry})
..586         end
..587     end.
..588 
..589 set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
..590     Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) ->
..591             % already commited to disk, do not set new rev
..592             Att;
..593         (Att) ->
..594             Att#att{revpos=RevPos+1}
..595         end, Atts)}.
..596         
..597 
..598 doc_flush_atts(Doc, Fd) ->
..599     Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}.
..600 
..601 check_md5(_NewSig, <<>>) -> ok;
..602 check_md5(Sig1, Sig2) when Sig1 == Sig2 -> ok;
..603 check_md5(_, _) -> throw(data_corruption).
..604 
..605 flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd ->
..606     % already written to our file, nothing to write
..607     Att;
..608 
..609 flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5}=Att) ->
..610     {NewStreamData, Len, Md5} = 
..611             couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
..612     check_md5(Md5, InMd5),
..613     Att#att{data={Fd, NewStreamData}, md5=Md5, len=Len};
..614 
..615 flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) ->
..616     with_stream(Fd, Att, fun(OutputStream) ->
..617         couch_stream:write(OutputStream, Data)
..618     end);
..619 
..620 flush_att(Fd, #att{data=Fun,len=undefined}=Att) when is_function(Fun) ->
..621     with_stream(Fd, Att, fun(OutputStream) ->
..622         % Fun(MaxChunkSize, WriterFun) must call WriterFun
..623         % once for each chunk of the attachment,
..624         Fun(4096,
..625             % WriterFun({Length, Binary}, State)
..626             % WriterFun({0, _Footers}, State)
..627             % Called with Length == 0 on the last time.
..628             % WriterFun returns NewState.
..629             fun({0, _Footers}, _) ->
..630                 ok;
..631             ({_Length, Chunk}, _) ->
..632                 couch_stream:write(OutputStream, Chunk)
..633             end, ok)
..634     end);
..635 
..636 flush_att(Fd, #att{data=Fun,len=Len}=Att) when is_function(Fun) ->
..637     with_stream(Fd, Att, fun(OutputStream) ->
..638         write_streamed_attachment(OutputStream, Fun, Len)
..639     end).
..640 
..641 with_stream(Fd, #att{md5=InMd5}=Att, Fun) ->
..642     {ok, OutputStream} = couch_stream:open(Fd),
..643     Fun(OutputStream),
..644     {StreamInfo, Len, Md5} = couch_stream:close(OutputStream),
..645     check_md5(Md5, InMd5),
..646     Att#att{data={Fd,StreamInfo},len=Len,md5=Md5}.
..647 
..648 
..649 write_streamed_attachment(_Stream, _F, 0) ->
..650     ok;
..651 write_streamed_attachment(Stream, F, LenLeft) ->
..652     Bin = F(),
..653     TruncatedBin = check_bin_length(LenLeft, Bin),
..654     ok = couch_stream:write(Stream, TruncatedBin),
..655     write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin)).
..656 
..657 %% There was a bug in ibrowse 1.4.1 that would cause it to append a CR to a
..658 %% chunked response when the CR and LF terminating the last data chunk were
..659 %% split across packets.  The bug was fixed in version 1.5.0, but we still
..660 %% check for it just in case.
..661 check_bin_length(LenLeft, Bin) when size(Bin) > LenLeft ->
..662     <<_ValidData:LenLeft/binary, Crap/binary>> = Bin,
..663     ?LOG_ERROR("write_streamed_attachment has written too much expected: ~p" ++
..664         " got: ~p tail: ~p", [LenLeft, size(Bin), Crap]),
..665     exit(replicated_attachment_too_large);
..666 check_bin_length(_, Bin) -> Bin.
..667 
..668 enum_docs_since_reduce_to_count(Reds) ->
..669     couch_btree:final_reduce(
..670             fun couch_db_updater:btree_by_seq_reduce/2, Reds).
..671 
..672 enum_docs_reduce_to_count(Reds) ->
..673     {Count, _DelCount} = couch_btree:final_reduce(
..674             fun couch_db_updater:btree_by_id_reduce/2, Reds),
..675     Count.
..676 
..677 changes_since(Db, Style, StartSeq, Fun, Acc) ->
..678     enum_docs_since(Db, StartSeq, fwd,
..679         fun(DocInfo, _Offset, Acc2) ->
..680             #doc_info{revs=Revs} = DocInfo,
..681             case Style of
..682             main_only ->
..683                 Infos = [DocInfo];
..684             all_docs ->
..685                 % make each rev it's own doc info
..686                 Infos = [DocInfo#doc_info{revs=[RevInfo]} ||
..687                     #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq]
..688             end,
..689             Fun(Infos, Acc2)
..690         end, Acc).
..691 
..692 count_changes_since(Db, SinceSeq) ->
..693     {ok, Changes} =
..694     couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree,
..695         SinceSeq + 1, % startkey
..696         ok, % endkey
..697         fun(_,_) -> true end, % groupkeys
..698         fun(_SeqStart, PartialReds, 0) ->
..699             {ok, couch_btree:final_reduce(Db#db.docinfo_by_seq_btree, PartialReds)}
..700         end,
..701         0),
..702     Changes.
..703 
..704 enum_docs_since(Db, SinceSeq, Direction, InFun, Acc) ->
..705     couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Acc).
..706 
..707 enum_docs_since(Db, SinceSeq, InFun, Acc) ->
..708     enum_docs_since(Db, SinceSeq, fwd, InFun, Acc).
..709 
..710 enum_docs(Db, StartId, Direction, InFun, InAcc) ->
..711     couch_btree:fold(Db#db.fulldocinfo_by_id_btree, StartId, Direction, InFun, InAcc).
..712 
..713 enum_docs(Db, StartId, InFun, Ctx) ->
..714     enum_docs(Db, StartId, fwd, InFun, Ctx).
..715 
..716 % server functions
..717 
..718 init({DbName, Filepath, Fd, Options}) ->
..719     {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []),
..720     {ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db),
..721     couch_ref_counter:add(RefCntr),
..722     couch_stats_collector:track_process_count({couchdb, open_databases}),
..723     {ok, Db}.
..724 
..725 terminate(Reason, _Db) ->
..726     couch_util:terminate_linked(Reason),
..727     ok.
..728 
..729 handle_call({open_ref_count, OpenerPid}, _, #db{fd_ref_counter=RefCntr}=Db) ->
..730     ok = couch_ref_counter:add(RefCntr, OpenerPid),
..731     {reply, {ok, Db}, Db};
..732 handle_call(is_idle, _From, #db{fd_ref_counter=RefCntr, compactor_pid=Compact,
..733             waiting_delayed_commit=Delay}=Db) ->
..734     % Idle means no referrers. Unless in the middle of a compaction file switch,
..735     % there are always at least 2 referrers, couch_db_updater and us.
..736     {reply, (Delay == nil) and (Compact == nil) and (couch_ref_counter:count(RefCntr) == 2), Db};
..737 handle_call({db_updated, NewDb}, _From, #db{fd_ref_counter=OldRefCntr}) ->
..738     #db{fd_ref_counter=NewRefCntr}=NewDb,
..739     case NewRefCntr == OldRefCntr of
..740     true -> ok;
..741     false ->
..742         couch_ref_counter:add(NewRefCntr),
..743         couch_ref_counter:drop(OldRefCntr)
..744     end,
..745     {reply, ok, NewDb}.
..746 
..747 
..748 handle_cast(Msg, Db) ->
..749     ?LOG_ERROR("Bad cast message received for db ~s: ~p", [Db#db.name, Msg]),
..750     exit({error, Msg}).
..751 
..752 code_change(_OldVsn, State, _Extra) ->
..753     {ok, State}.
..754 
..755 handle_info(Msg, Db) ->
..756     ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]),
..757     exit({error, Msg}).
..758 
..759 
..760 %%% Internal function %%%
..761 open_doc_revs_int(Db, IdRevs, Options) ->
..762     Ids = [Id || {Id, _Revs} <- IdRevs],
..763     LookupResults = get_full_doc_infos(Db, Ids),
..764     lists:zipwith(
..765         fun({Id, Revs}, Lookup) ->
..766             case Lookup of
..767             {ok, #full_doc_info{rev_tree=RevTree}} ->
..768                 {FoundRevs, MissingRevs} =
..769                 case Revs of
..770                 all ->
..771                     {couch_key_tree:get_all_leafs(RevTree), []};
..772                 _ ->
..773                     case lists:member(latest, Options) of
..774                     true ->
..775                         couch_key_tree:get_key_leafs(RevTree, Revs);
..776                     false ->
..777                         couch_key_tree:get(RevTree, Revs)
..778                     end
..779                 end,
..780                 FoundResults =
..781                 lists:map(fun({Value, {Pos, [Rev|_]}=FoundRevPath}) ->
..782                     case Value of
..783                     ?REV_MISSING ->
..784                         % we have the rev in our list but know nothing about it
..785                         {{not_found, missing}, {Pos, Rev}};
..786                     {IsDeleted, SummaryPtr, _UpdateSeq} ->
..787                         {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)}
..788                     end
..789                 end, FoundRevs),
..790                 Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
..791                 {ok, Results};
..792             not_found when Revs == all ->
..793                 {ok, []};
..794             not_found ->
..795                 {ok, [{{not_found, missing}, Rev} || Rev <- Revs]}
..796             end
..797         end,
..798         IdRevs, LookupResults).
..799 
..800 open_doc_int(Db, <> = Id, _Options) ->
..801     case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of
..802     [{ok, {_, {Rev, BodyData}}}] ->
..803         {ok, #doc{id=Id, revs={0, [list_to_binary(integer_to_list(Rev))]}, body=BodyData}};
..804     [not_found] ->
..805         {not_found, missing}
..806     end;
..807 open_doc_int(Db, #doc_info{id=Id,revs=[RevInfo|_]}=DocInfo, Options) ->
..808     #rev_info{deleted=IsDeleted,rev={Pos,RevId},body_sp=Bp} = RevInfo,
..809     Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos,[RevId]}),
..810     {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}};
..811 open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
..812     #doc_info{revs=[#rev_info{deleted=IsDeleted,rev=Rev,body_sp=Bp}|_]} =
..813         DocInfo = couch_doc:to_doc_info(FullDocInfo),
..814     {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]),
..815     Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath),
..816     {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}};
..817 open_doc_int(Db, Id, Options) ->
..818     case get_full_doc_info(Db, Id) of
..819     {ok, FullDocInfo} ->
..820         open_doc_int(Db, FullDocInfo, Options);
..821     not_found ->
..822         {not_found, missing}
..823     end.
..824 
..825 doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTree, Options) ->
..826     case lists:member(revs_info, Options) of
..827     false -> [];
..828     true ->
..829         {[{Pos, RevPath}],[]} =
..830             couch_key_tree:get_full_key_paths(RevTree, [Rev]),
..831 
..832         [{revs_info, Pos, lists:map(
..833             fun({Rev1, {true, _Sp, _UpdateSeq}}) ->
..834                 {Rev1, deleted};
..835             ({Rev1, {false, _Sp, _UpdateSeq}}) ->
..836                 {Rev1, available};
..837             ({Rev1, ?REV_MISSING}) ->
..838                 {Rev1, missing}
..839             end, RevPath)}]
..840     end ++
..841     case lists:member(conflicts, Options) of
..842     false -> [];
..843     true ->
..844         case [Rev1 || #rev_info{rev=Rev1,deleted=false} <- RestInfo] of
..845         [] -> [];
..846         ConflictRevs -> [{conflicts, ConflictRevs}]
..847         end
..848     end ++
..849     case lists:member(deleted_conflicts, Options) of
..850     false -> [];
..851     true ->
..852         case [Rev1 || #rev_info{rev=Rev1,deleted=true} <- RestInfo] of
..853         [] -> [];
..854         DelConflictRevs -> [{deleted_conflicts, DelConflictRevs}]
..855         end
..856     end ++
..857     case lists:member(local_seq, Options) of
..858     false -> [];
..859     true -> [{local_seq, Seq}]
..860     end.
..861 
..862 read_doc(#db{fd=Fd}, OldStreamPointer) when is_tuple(OldStreamPointer) ->
..863     % 09 UPGRADE CODE
..864     couch_stream:old_read_term(Fd, OldStreamPointer);
..865 read_doc(#db{fd=Fd}, Pos) ->
..866     couch_file:pread_term(Fd, Pos).
..867 
..868 
..869 doc_to_tree(#doc{revs={Start, RevIds}}=Doc) ->
..870     [Tree] = doc_to_tree_simple(Doc, lists:reverse(RevIds)),
..871     {Start - length(RevIds) + 1, Tree}.
..872 
..873 
..874 doc_to_tree_simple(Doc, [RevId]) ->
..875     [{RevId, Doc, []}];
..876 doc_to_tree_simple(Doc, [RevId | Rest]) ->
..877     [{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}].
..878 
..879 
..880 make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
..881     {BodyData, Atts} =
..882     case Bp of
..883     nil ->
..884         {[], []};
..885     _ ->
..886         {ok, {BodyData0, Atts0}} = read_doc(Db, Bp),
..887         {BodyData0,
..888             lists:map(
..889                 fun({Name,Type,Sp,Len,RevPos,Md5}) ->
..890                     #att{name=Name,
..891                         type=Type,
..892                         len=Len,
..893                         md5=Md5,
..894                         revpos=RevPos,
..895                         data={Fd,Sp}};
..896                 ({Name,{Type,Sp,Len}}) ->
..897                     #att{name=Name,
..898                         type=Type,
..899                         len=Len,
..900                         md5= <<>>,
..901                         revpos=0,
..902                         data={Fd,Sp}}
..903                 end, Atts0)}
..904     end,
..905     #doc{
..906         id = Id,
..907         revs = RevisionPath,
..908         body = BodyData,
..909         atts = Atts,
..910         deleted = Deleted
..911         }.
..912 
..913 
..914 

Generated using etap 0.3.4.