Generated on 2009-08-09 21:22:57 with etap 0.3.4.
| Name | Total lines | Lines of code | Total coverage | Code coverage | ||||
| couch_db | ?? | ?? | ?? |
|
....1 % Licensed under the Apache License, Version 2.0 (the "License"); you may not ....2 % use this file except in compliance with the License. You may obtain a copy of ....3 % the License at ....4 % ....5 % http://www.apache.org/licenses/LICENSE-2.0 ....6 % ....7 % Unless required by applicable law or agreed to in writing, software ....8 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ....9 % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ...10 % License for the specific language governing permissions and limitations under ...11 % the License. ...12 ...13 -module(couch_db). ...14 -behaviour(gen_server). ...15 ...16 -export([open/2,close/1,create/2,start_compact/1,get_db_info/1,get_design_docs/1]). ...17 -export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]). ...18 -export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]). ...19 -export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]). ...20 -export([set_revs_limit/2,get_revs_limit/1,register_update_notifier/3]). ...21 -export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]). ...22 -export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]). ...23 -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). ...24 -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). ...25 -export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]). ...26 -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). ...27 -export([changes_since/5,read_doc/2,new_revid/1]). ...28 ...29 -include("couch_db.hrl"). ...30 ...31 ...32 start_link(DbName, Filepath, Options) -> ...33 case open_db_file(Filepath, Options) of ...34 {ok, Fd} -> ...35 StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []), ...36 unlink(Fd), ...37 StartResult; ...38 Else -> ...39 Else ...40 end. ...41 ...42 open_db_file(Filepath, Options) -> ...43 case couch_file:open(Filepath, Options) of ...44 {ok, Fd} -> ...45 {ok, Fd}; ...46 {error, enoent} -> ...47 % couldn't find file. is there a compact version? This can happen if ...48 % crashed during the file switch. ...49 case couch_file:open(Filepath ++ ".compact") of ...50 {ok, Fd} -> ...51 ?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]), ...52 ok = file:rename(Filepath ++ ".compact", Filepath), ...53 ok = couch_file:sync(Fd), ...54 {ok, Fd}; ...55 {error, enoent} -> ...56 {not_found, no_db_file} ...57 end; ...58 Error -> ...59 Error ...60 end. ...61 ...62 ...63 create(DbName, Options) -> ...64 couch_server:create(DbName, Options). ...65 ...66 open(DbName, Options) -> ...67 couch_server:open(DbName, Options). ...68 ...69 ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) -> ...70 ok = gen_server:call(UpdatePid, full_commit, infinity), ...71 {ok, StartTime}. ...72 ...73 close(#db{fd_ref_counter=RefCntr}) -> ...74 couch_ref_counter:drop(RefCntr). ...75 ...76 open_ref_counted(MainPid, UserCtx) -> ...77 {ok, Db} = gen_server:call(MainPid, {open_ref_count, self()}), ...78 {ok, Db#db{user_ctx=UserCtx}}. ...79 ...80 is_idle(MainPid) -> ...81 gen_server:call(MainPid, is_idle). ...82 ...83 monitor(#db{main_pid=MainPid}) -> ...84 erlang:monitor(process, MainPid). ...85 ...86 register_update_notifier(#db{main_pid=Pid}, Seq, Fun) -> ...87 gen_server:call(Pid, {register_update_notifier, Seq, Fun}). ...88 ...89 start_compact(#db{update_pid=Pid}) -> ...90 gen_server:cast(Pid, start_compact). ...91 ...92 delete_doc(Db, Id, Revisions) -> ...93 DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions], ...94 {ok, [Result]} = update_docs(Db, DeletedDocs, []), ...95 {ok, Result}. ...96 ...97 open_doc(Db, IdOrDocInfo) -> ...98 open_doc(Db, IdOrDocInfo, []). ...99 ..100 open_doc(Db, Id, Options) -> ..101 couch_stats_collector:increment({couchdb, database_reads}), ..102 case open_doc_int(Db, Id, Options) of ..103 {ok, #doc{deleted=true}=Doc} -> ..104 case lists:member(deleted, Options) of ..105 true -> ..106 {ok, Doc}; ..107 false -> ..108 {not_found, deleted} ..109 end; ..110 Else -> ..111 Else ..112 end. ..113 ..114 open_doc_revs(Db, Id, Revs, Options) -> ..115 couch_stats_collector:increment({couchdb, database_reads}), ..116 [Result] = open_doc_revs_int(Db, [{Id, Revs}], Options), ..117 Result. ..118 ..119 get_missing_revs(Db, IdRevsList) -> ..120 Ids = [Id1 || {Id1, _Revs} <- IdRevsList], ..121 FullDocInfoResults = get_full_doc_infos(Db, Ids), ..122 Results = lists:zipwith( ..123 fun({Id, Revs}, FullDocInfoResult) -> ..124 case FullDocInfoResult of ..125 {ok, #full_doc_info{rev_tree=RevisionTree}} -> ..126 {Id, couch_key_tree:find_missing(RevisionTree, Revs)}; ..127 not_found -> ..128 {Id, Revs} ..129 end ..130 end, ..131 IdRevsList, FullDocInfoResults), ..132 % strip out the non-missing ids ..133 Missing = [{Id, Revs} || {Id, Revs} <- Results, Revs /= []], ..134 {ok, Missing}. ..135 ..136 get_doc_info(Db, Id) -> ..137 case get_full_doc_info(Db, Id) of ..138 {ok, DocInfo} -> ..139 {ok, couch_doc:to_doc_info(DocInfo)}; ..140 Else -> ..141 Else ..142 end. ..143 ..144 % returns {ok, DocInfo} or not_found ..145 get_full_doc_info(Db, Id) -> ..146 [Result] = get_full_doc_infos(Db, [Id]), ..147 Result. ..148 ..149 get_full_doc_infos(Db, Ids) -> ..150 couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids). ..151 ..152 increment_update_seq(#db{update_pid=UpdatePid}) -> ..153 gen_server:call(UpdatePid, increment_update_seq). ..154 ..155 purge_docs(#db{update_pid=UpdatePid}, IdsRevs) -> ..156 gen_server:call(UpdatePid, {purge_docs, IdsRevs}). ..157 ..158 get_committed_update_seq(#db{committed_update_seq=Seq}) -> ..159 Seq. ..160 ..161 get_update_seq(#db{update_seq=Seq})-> ..162 Seq. ..163 ..164 get_purge_seq(#db{header=#db_header{purge_seq=PurgeSeq}})-> ..165 PurgeSeq. ..166 ..167 get_last_purged(#db{header=#db_header{purged_docs=nil}}) -> ..168 {ok, []}; ..169 get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) -> ..170 couch_file:pread_term(Fd, PurgedPointer). ..171 ..172 get_db_info(Db) -> ..173 #db{fd=Fd, ..174 header=#db_header{disk_version=DiskVersion}, ..175 compactor_pid=Compactor, ..176 update_seq=SeqNum, ..177 name=Name, ..178 fulldocinfo_by_id_btree=FullDocBtree, ..179 instance_start_time=StartTime} = Db, ..180 {ok, Size} = couch_file:bytes(Fd), ..181 {ok, {Count, DelCount}} = couch_btree:full_reduce(FullDocBtree), ..182 InfoList = [ ..183 {db_name, Name}, ..184 {doc_count, Count}, ..185 {doc_del_count, DelCount}, ..186 {update_seq, SeqNum}, ..187 {purge_seq, couch_db:get_purge_seq(Db)}, ..188 {compact_running, Compactor/=nil}, ..189 {disk_size, Size}, ..190 {instance_start_time, StartTime}, ..191 {disk_format_version, DiskVersion} ..192 ], ..193 {ok, InfoList}. ..194 ..195 get_design_docs(#db{fulldocinfo_by_id_btree=Btree}=Db) -> ..196 couch_btree:foldl(Btree, <<"_design/">>, ..197 fun(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) -> ..198 {ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []), ..199 {ok, [Doc | AccDocs]}; ..200 (_, _Reds, AccDocs) -> ..201 {stop, AccDocs} ..202 end, ..203 []). ..204 ..205 check_is_admin(#db{admins=Admins, user_ctx=#user_ctx{name=Name,roles=Roles}}) -> ..206 DbAdmins = [<<"_admin">> | Admins], ..207 case DbAdmins -- [Name | Roles] of ..208 DbAdmins -> % same list, not an admin ..209 throw({unauthorized, <<"You are not a db or server admin.">>}); ..210 _ -> ..211 ok ..212 end. ..213 ..214 get_admins(#db{admins=Admins}) -> ..215 Admins. ..216 ..217 set_admins(#db{update_pid=Pid}=Db, Admins) when is_list(Admins) -> ..218 check_is_admin(Db), ..219 gen_server:call(Pid, {set_admins, Admins}, infinity). ..220 ..221 ..222 get_revs_limit(#db{revs_limit=Limit}) -> ..223 Limit. ..224 ..225 set_revs_limit(#db{update_pid=Pid}=Db, Limit) when Limit > 0 -> ..226 check_is_admin(Db), ..227 gen_server:call(Pid, {set_revs_limit, Limit}, infinity); ..228 set_revs_limit(_Db, _Limit) -> ..229 throw(invalid_revs_limit). ..230 ..231 name(#db{name=Name}) -> ..232 Name. ..233 ..234 update_doc(Db, Doc, Options) -> ..235 case update_docs(Db, [Doc], Options) of ..236 {ok, [{ok, NewRev}]} -> ..237 {ok, NewRev}; ..238 {ok, [Error]} -> ..239 throw(Error) ..240 end. ..241 ..242 update_docs(Db, Docs) -> ..243 update_docs(Db, Docs, []). ..244 ..245 % group_alike_docs groups the sorted documents into sublist buckets, by id. ..246 % ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]] ..247 group_alike_docs(Docs) -> ..248 Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs), ..249 group_alike_docs(Sorted, []). ..250 ..251 group_alike_docs([], Buckets) -> ..252 lists:reverse(Buckets); ..253 group_alike_docs([Doc|Rest], []) -> ..254 group_alike_docs(Rest, [[Doc]]); ..255 group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) -> ..256 [#doc{id=BucketId}|_] = Bucket, ..257 case Doc#doc.id == BucketId of ..258 true -> ..259 % add to existing bucket ..260 group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]); ..261 false -> ..262 % add to new bucket ..263 group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]]) ..264 end. ..265 ..266 ..267 validate_doc_update(#db{user_ctx=UserCtx, admins=Admins}, ..268 #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) -> ..269 UserNames = [UserCtx#user_ctx.name | UserCtx#user_ctx.roles], ..270 % if the user is a server admin or db admin, allow the save ..271 case length(UserNames -- [<<"_admin">> | Admins]) == length(UserNames) of ..272 true -> ..273 % not an admin ..274 {unauthorized, <<"You are not a server or database admin.">>}; ..275 false -> ..276 ok ..277 end; ..278 validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) -> ..279 ok; ..280 validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) -> ..281 ok; ..282 validate_doc_update(Db, Doc, GetDiskDocFun) -> ..283 DiskDoc = GetDiskDocFun(), ..284 JsonCtx = couch_util:json_user_ctx(Db), ..285 try [case Fun(Doc, DiskDoc, JsonCtx) of ..286 ok -> ok; ..287 Error -> throw(Error) ..288 end || Fun <- Db#db.validate_doc_funs], ..289 ok ..290 catch ..291 throw:Error -> ..292 Error ..293 end. ..294 ..295 ..296 prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc, ..297 OldFullDocInfo, LeafRevsDict, AllowConflict) -> ..298 case Revs of ..299 [PrevRev|_] -> ..300 case dict:find({RevStart, PrevRev}, LeafRevsDict) of ..301 {ok, {Deleted, DiskSp, DiskRevs}} -> ..302 case couch_doc:has_stubs(Doc) of ..303 true -> ..304 DiskDoc = make_doc(Db, Id, Deleted, DiskSp, DiskRevs), ..305 Doc2 = couch_doc:merge_stubs(Doc, DiskDoc), ..306 {validate_doc_update(Db, Doc2, fun() -> DiskDoc end), Doc2}; ..307 false -> ..308 LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end, ..309 {validate_doc_update(Db, Doc, LoadDiskDoc), Doc} ..310 end; ..311 error when AllowConflict -> ..312 {validate_doc_update(Db, Doc, fun() -> nil end), Doc}; ..313 error -> ..314 {conflict, Doc} ..315 end; ..316 [] -> ..317 % new doc, and we have existing revs. ..318 % reuse existing deleted doc ..319 if OldFullDocInfo#full_doc_info.deleted orelse AllowConflict -> ..320 {validate_doc_update(Db, Doc, fun() -> nil end), Doc}; ..321 true -> ..322 {conflict, Doc} ..323 end ..324 end. ..325 ..326 ..327 ..328 prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped, ..329 AccFatalErrors) -> ..330 {AccPrepped, AccFatalErrors}; ..331 prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], ..332 AllowConflict, AccPrepped, AccErrors) -> ..333 [#doc{id=Id}|_]=DocBucket, ..334 % no existing revs are known, ..335 {PreppedBucket, AccErrors3} = lists:foldl( ..336 fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) -> ..337 case Revs of ..338 {0, []} -> ..339 case validate_doc_update(Db, Doc, fun() -> nil end) of ..340 ok -> ..341 {[Doc | AccBucket], AccErrors2}; ..342 Error -> ..343 {AccBucket, [{{Id, {0, []}}, Error} | AccErrors2]} ..344 end; ..345 _ -> ..346 % old revs specified but none exist, a conflict ..347 {AccBucket, [{{Id, Revs}, conflict} | AccErrors2]} ..348 end ..349 end, ..350 {[], AccErrors}, DocBucket), ..351 ..352 prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, ..353 [PreppedBucket | AccPrepped], AccErrors3); ..354 prep_and_validate_updates(Db, [DocBucket|RestBuckets], ..355 [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups], ..356 AllowConflict, AccPrepped, AccErrors) -> ..357 Leafs = couch_key_tree:get_all_leafs(OldRevTree), ..358 LeafRevsDict = dict:from_list([{{Start, RevId}, {Deleted, Sp, Revs}} || ..359 {{Deleted, Sp, _Seq}, {Start, [RevId|_]}=Revs} <- Leafs]), ..360 {PreppedBucket, AccErrors3} = lists:foldl( ..361 fun(Doc, {Docs2Acc, AccErrors2}) -> ..362 case prep_and_validate_update(Db, Doc, OldFullDocInfo, ..363 LeafRevsDict, AllowConflict) of ..364 {ok, Doc2} -> ..365 {[Doc2 | Docs2Acc], AccErrors2}; ..366 {Error, #doc{id=Id,revs=Revs}} -> ..367 % Record the error ..368 {Docs2Acc, [{{Id, Revs}, Error} |AccErrors2]} ..369 end ..370 end, ..371 {[], AccErrors}, DocBucket), ..372 prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, [PreppedBucket | AccPrepped], AccErrors3). ..373 ..374 ..375 update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) -> ..376 update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options, interactive_edit). ..377 ..378 ..379 prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) -> ..380 Errors2 = [{{Id, {Pos, Rev}}, Error} || ..381 {#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors], ..382 {lists:reverse(AccPrepped), lists:reverse(Errors2)}; ..383 prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped, AccErrors) -> ..384 case OldInfo of ..385 not_found -> ..386 {ValidatedBucket, AccErrors3} = lists:foldl( ..387 fun(Doc, {AccPrepped2, AccErrors2}) -> ..388 case validate_doc_update(Db, Doc, fun() -> nil end) of ..389 ok -> ..390 {[Doc | AccPrepped2], AccErrors2}; ..391 Error -> ..392 {AccPrepped2, [{Doc, Error} | AccErrors2]} ..393 end ..394 end, ..395 {[], AccErrors}, Bucket), ..396 prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3); ..397 {ok, #full_doc_info{rev_tree=OldTree}} -> ..398 NewRevTree = lists:foldl( ..399 fun(NewDoc, AccTree) -> ..400 {NewTree, _} = couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]), ..401 NewTree ..402 end, ..403 OldTree, Bucket), ..404 Leafs = couch_key_tree:get_all_leafs_full(NewRevTree), ..405 LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]), ..406 {ValidatedBucket, AccErrors3} = ..407 lists:foldl( ..408 fun(#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, {AccValidated, AccErrors2}) -> ..409 case dict:find({Pos, RevId}, LeafRevsFullDict) of ..410 {ok, {Start, Path}} -> ..411 % our unflushed doc is a leaf node. Go back on the path ..412 % to find the previous rev that's on disk. ..413 LoadPrevRevFun = fun() -> ..414 make_first_doc_on_disk(Db,Id,Start-1, tl(Path)) ..415 end, ..416 case validate_doc_update(Db, Doc, LoadPrevRevFun) of ..417 ok -> ..418 {[Doc | AccValidated], AccErrors2}; ..419 Error -> ..420 {AccValidated, [{Doc, Error} | AccErrors2]} ..421 end; ..422 _ -> ..423 % this doc isn't a leaf or already exists in the tree. ..424 % ignore but consider it a success. ..425 {AccValidated, AccErrors2} ..426 end ..427 end, ..428 {[], AccErrors}, Bucket), ..429 prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, ..430 [ValidatedBucket | AccPrepped], AccErrors3) ..431 end. ..432 ..433 ..434 ..435 new_revid(#doc{body=Body,revs={OldStart,OldRevs}, ..436 atts=Atts,deleted=Deleted}) -> ..437 case [{N, T, M} || #att{name=N,type=T,md5=M} <- Atts, M /= <<>>] of ..438 Atts2 when length(Atts) /= length(Atts2) -> ..439 % We must have old style non-md5 attachments ..440 ?l2b(integer_to_list(couch_util:rand32())); ..441 Atts2 -> ..442 OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end, ..443 erlang:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2])) ..444 end. ..445 ..446 new_revs([], OutBuckets, IdRevsAcc) -> ..447 {lists:reverse(OutBuckets), IdRevsAcc}; ..448 new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) -> ..449 {NewBucket, IdRevsAcc3} = lists:mapfoldl( ..450 fun(#doc{id=Id,revs={Start, RevIds}}=Doc, IdRevsAcc2)-> ..451 NewRevId = new_revid(Doc), ..452 {Doc#doc{revs={Start+1, [NewRevId | RevIds]}}, ..453 [{{Id, {Start, RevIds}}, {ok, {Start+1, NewRevId}}} | IdRevsAcc2]} ..454 end, IdRevsAcc, Bucket), ..455 new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3). ..456 ..457 check_dup_atts([#att{name=N1}, #att{name=N2} | _]) when N1 == N2 -> ..458 throw({bad_request, <<"Duplicate attachments">>}); ..459 check_dup_atts([_, _ | Rest]) -> ..460 check_dup_atts(Rest); ..461 check_dup_atts(_) -> ..462 ok. ..463 ..464 sort_and_check_atts(#doc{atts=Atts}=Doc) -> ..465 Atts2 = lists:sort(fun(#att{name=N1}, #att{name=N2}) -> N1 < N2 end, Atts), ..466 check_dup_atts(Atts2), ..467 Doc#doc{atts=Atts2}. ..468 ..469 ..470 update_docs(Db, Docs, Options, replicated_changes) -> ..471 couch_stats_collector:increment({couchdb, database_writes}), ..472 DocBuckets = group_alike_docs(Docs), ..473 ..474 case (Db#db.validate_doc_funs /= []) orelse ..475 lists:any( ..476 fun(#doc{id= <>}) -> true; ..477 (_) -> false ..478 end, Docs) of ..479 true -> ..480 Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], ..481 ExistingDocs = get_full_doc_infos(Db, Ids), ..482 ..483 {DocBuckets2, DocErrors} = ..484 prep_and_validate_replicated_updates(Db, DocBuckets, ExistingDocs, [], []), ..485 DocBuckets3 = [Bucket || [_|_]=Bucket <- DocBuckets2]; % remove empty buckets ..486 false -> ..487 DocErrors = [], ..488 DocBuckets3 = DocBuckets ..489 end, ..490 DocBuckets4 = [[doc_flush_atts(sort_and_check_atts(Doc), Db#db.fd) ..491 || Doc <- Bucket] || Bucket <- DocBuckets3], ..492 {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]), ..493 {ok, DocErrors}; ..494 ..495 update_docs(Db, Docs, Options, interactive_edit) -> ..496 couch_stats_collector:increment({couchdb, database_writes}), ..497 AllOrNothing = lists:member(all_or_nothing, Options), ..498 % go ahead and generate the new revision ids for the documents. ..499 % separate out the NonRep documents from the rest of the documents ..500 {Docs2, NonRepDocs} = lists:foldl( ..501 fun(#doc{id=Id}=Doc, {DocsAcc, NonRepDocsAcc}) -> ..502 case Id of ..503 <> -> ..504 {DocsAcc, [Doc | NonRepDocsAcc]}; ..505 Id-> ..506 {[Doc | DocsAcc], NonRepDocsAcc} ..507 end ..508 end, {[], []}, Docs), ..509 ..510 DocBuckets = group_alike_docs(Docs2), ..511 ..512 case (Db#db.validate_doc_funs /= []) orelse ..513 lists:any( ..514 fun(#doc{id= <>}) -> ..515 true; ..516 (#doc{atts=Atts}) -> ..517 Atts /= [] ..518 end, Docs2) of ..519 true -> ..520 % lookup the doc by id and get the most recent ..521 Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], ..522 ExistingDocInfos = get_full_doc_infos(Db, Ids), ..523 ..524 {DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db, ..525 DocBuckets, ExistingDocInfos, AllOrNothing, [], []), ..526 ..527 % strip out any empty buckets ..528 DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped]; ..529 false -> ..530 PreCommitFailures = [], ..531 DocBuckets2 = DocBuckets ..532 end, ..533 ..534 if (AllOrNothing) and (PreCommitFailures /= []) -> ..535 {aborted, lists:map( ..536 fun({{Id,{Pos, [RevId|_]}}, Error}) -> ..537 {{Id, {Pos, RevId}}, Error}; ..538 ({{Id,{0, []}}, Error}) -> ..539 {{Id, {0, <<>>}}, Error} ..540 end, PreCommitFailures)}; ..541 true -> ..542 Options2 = if AllOrNothing -> [merge_conflicts]; ..543 true -> [] end ++ Options, ..544 DocBuckets3 = [[ ..545 doc_flush_atts(set_new_att_revpos( ..546 sort_and_check_atts(Doc)), Db#db.fd) ..547 || Doc <- B] || B <- DocBuckets2], ..548 {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []), ..549 ..550 {ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2), ..551 ..552 ResultsDict = dict:from_list(IdRevs ++ CommitResults ++ PreCommitFailures), ..553 {ok, lists:map( ..554 fun(#doc{id=Id,revs={Pos, RevIds}}) -> ..555 {ok, Result} = dict:find({Id, {Pos, RevIds}}, ResultsDict), ..556 Result ..557 end, Docs)} ..558 end. ..559 ..560 % Returns the first available document on disk. Input list is a full rev path ..561 % for the doc. ..562 make_first_doc_on_disk(_Db, _Id, _Pos, []) -> ..563 nil; ..564 make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING}|RestPath]) -> ..565 make_first_doc_on_disk(Db, Id, Pos - 1, RestPath); ..566 make_first_doc_on_disk(Db, Id, Pos, [{_Rev, {IsDel, Sp, _Seq}} |_]=DocPath) -> ..567 Revs = [Rev || {Rev, _} <- DocPath], ..568 make_doc(Db, Id, IsDel, Sp, {Pos, Revs}). ..569 ..570 ..571 write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets, ..572 NonRepDocs, Options) -> ..573 case gen_server:call(UpdatePid, ..574 {update_docs, DocBuckets, NonRepDocs, Options}, infinity) of ..575 {ok, Results} -> {ok, Results}; ..576 retry -> ..577 % This can happen if the db file we wrote to was swapped out by ..578 % compaction. Retry by reopening the db and writing to the current file ..579 {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx), ..580 DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], ..581 % We only retry once ..582 close(Db2), ..583 case gen_server:call(UpdatePid, {update_docs, DocBuckets2, NonRepDocs, Options}, infinity) of ..584 {ok, Results} -> {ok, Results}; ..585 retry -> throw({update_error, compaction_retry}) ..586 end ..587 end. ..588 ..589 set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) -> ..590 Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) -> ..591 % already commited to disk, do not set new rev ..592 Att; ..593 (Att) -> ..594 Att#att{revpos=RevPos+1} ..595 end, Atts)}. ..596 ..597 ..598 doc_flush_atts(Doc, Fd) -> ..599 Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}. ..600 ..601 check_md5(_NewSig, <<>>) -> ok; ..602 check_md5(Sig1, Sig2) when Sig1 == Sig2 -> ok; ..603 check_md5(_, _) -> throw(data_corruption). ..604 ..605 flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd -> ..606 % already written to our file, nothing to write ..607 Att; ..608 ..609 flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5}=Att) -> ..610 {NewStreamData, Len, Md5} = ..611 couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd), ..612 check_md5(Md5, InMd5), ..613 Att#att{data={Fd, NewStreamData}, md5=Md5, len=Len}; ..614 ..615 flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) -> ..616 with_stream(Fd, Att, fun(OutputStream) -> ..617 couch_stream:write(OutputStream, Data) ..618 end); ..619 ..620 flush_att(Fd, #att{data=Fun,len=undefined}=Att) when is_function(Fun) -> ..621 with_stream(Fd, Att, fun(OutputStream) -> ..622 % Fun(MaxChunkSize, WriterFun) must call WriterFun ..623 % once for each chunk of the attachment, ..624 Fun(4096, ..625 % WriterFun({Length, Binary}, State) ..626 % WriterFun({0, _Footers}, State) ..627 % Called with Length == 0 on the last time. ..628 % WriterFun returns NewState. ..629 fun({0, _Footers}, _) -> ..630 ok; ..631 ({_Length, Chunk}, _) -> ..632 couch_stream:write(OutputStream, Chunk) ..633 end, ok) ..634 end); ..635 ..636 flush_att(Fd, #att{data=Fun,len=Len}=Att) when is_function(Fun) -> ..637 with_stream(Fd, Att, fun(OutputStream) -> ..638 write_streamed_attachment(OutputStream, Fun, Len) ..639 end). ..640 ..641 with_stream(Fd, #att{md5=InMd5}=Att, Fun) -> ..642 {ok, OutputStream} = couch_stream:open(Fd), ..643 Fun(OutputStream), ..644 {StreamInfo, Len, Md5} = couch_stream:close(OutputStream), ..645 check_md5(Md5, InMd5), ..646 Att#att{data={Fd,StreamInfo},len=Len,md5=Md5}. ..647 ..648 ..649 write_streamed_attachment(_Stream, _F, 0) -> ..650 ok; ..651 write_streamed_attachment(Stream, F, LenLeft) -> ..652 Bin = F(), ..653 TruncatedBin = check_bin_length(LenLeft, Bin), ..654 ok = couch_stream:write(Stream, TruncatedBin), ..655 write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin)). ..656 ..657 %% There was a bug in ibrowse 1.4.1 that would cause it to append a CR to a ..658 %% chunked response when the CR and LF terminating the last data chunk were ..659 %% split across packets. The bug was fixed in version 1.5.0, but we still ..660 %% check for it just in case. ..661 check_bin_length(LenLeft, Bin) when size(Bin) > LenLeft -> ..662 <<_ValidData:LenLeft/binary, Crap/binary>> = Bin, ..663 ?LOG_ERROR("write_streamed_attachment has written too much expected: ~p" ++ ..664 " got: ~p tail: ~p", [LenLeft, size(Bin), Crap]), ..665 exit(replicated_attachment_too_large); ..666 check_bin_length(_, Bin) -> Bin. ..667 ..668 enum_docs_since_reduce_to_count(Reds) -> ..669 couch_btree:final_reduce( ..670 fun couch_db_updater:btree_by_seq_reduce/2, Reds). ..671 ..672 enum_docs_reduce_to_count(Reds) -> ..673 {Count, _DelCount} = couch_btree:final_reduce( ..674 fun couch_db_updater:btree_by_id_reduce/2, Reds), ..675 Count. ..676 ..677 changes_since(Db, Style, StartSeq, Fun, Acc) -> ..678 enum_docs_since(Db, StartSeq, fwd, ..679 fun(DocInfo, _Offset, Acc2) -> ..680 #doc_info{revs=Revs} = DocInfo, ..681 case Style of ..682 main_only -> ..683 Infos = [DocInfo]; ..684 all_docs -> ..685 % make each rev it's own doc info ..686 Infos = [DocInfo#doc_info{revs=[RevInfo]} || ..687 #rev_info{seq=RevSeq}=RevInfo <- Revs, StartSeq < RevSeq] ..688 end, ..689 Fun(Infos, Acc2) ..690 end, Acc). ..691 ..692 count_changes_since(Db, SinceSeq) -> ..693 {ok, Changes} = ..694 couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree, ..695 SinceSeq + 1, % startkey ..696 ok, % endkey ..697 fun(_,_) -> true end, % groupkeys ..698 fun(_SeqStart, PartialReds, 0) -> ..699 {ok, couch_btree:final_reduce(Db#db.docinfo_by_seq_btree, PartialReds)} ..700 end, ..701 0), ..702 Changes. ..703 ..704 enum_docs_since(Db, SinceSeq, Direction, InFun, Acc) -> ..705 couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Acc). ..706 ..707 enum_docs_since(Db, SinceSeq, InFun, Acc) -> ..708 enum_docs_since(Db, SinceSeq, fwd, InFun, Acc). ..709 ..710 enum_docs(Db, StartId, Direction, InFun, InAcc) -> ..711 couch_btree:fold(Db#db.fulldocinfo_by_id_btree, StartId, Direction, InFun, InAcc). ..712 ..713 enum_docs(Db, StartId, InFun, Ctx) -> ..714 enum_docs(Db, StartId, fwd, InFun, Ctx). ..715 ..716 % server functions ..717 ..718 init({DbName, Filepath, Fd, Options}) -> ..719 {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []), ..720 {ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db), ..721 couch_ref_counter:add(RefCntr), ..722 couch_stats_collector:track_process_count({couchdb, open_databases}), ..723 {ok, Db}. ..724 ..725 terminate(Reason, _Db) -> ..726 couch_util:terminate_linked(Reason), ..727 ok. ..728 ..729 handle_call({open_ref_count, OpenerPid}, _, #db{fd_ref_counter=RefCntr}=Db) -> ..730 ok = couch_ref_counter:add(RefCntr, OpenerPid), ..731 {reply, {ok, Db}, Db}; ..732 handle_call(is_idle, _From, #db{fd_ref_counter=RefCntr, compactor_pid=Compact, ..733 waiting_delayed_commit=Delay}=Db) -> ..734 % Idle means no referrers. Unless in the middle of a compaction file switch, ..735 % there are always at least 2 referrers, couch_db_updater and us. ..736 {reply, (Delay == nil) and (Compact == nil) and (couch_ref_counter:count(RefCntr) == 2), Db}; ..737 handle_call({db_updated, NewDb}, _From, #db{fd_ref_counter=OldRefCntr}) -> ..738 #db{fd_ref_counter=NewRefCntr}=NewDb, ..739 case NewRefCntr == OldRefCntr of ..740 true -> ok; ..741 false -> ..742 couch_ref_counter:add(NewRefCntr), ..743 couch_ref_counter:drop(OldRefCntr) ..744 end, ..745 {reply, ok, NewDb}. ..746 ..747 ..748 handle_cast(Msg, Db) -> ..749 ?LOG_ERROR("Bad cast message received for db ~s: ~p", [Db#db.name, Msg]), ..750 exit({error, Msg}). ..751 ..752 code_change(_OldVsn, State, _Extra) -> ..753 {ok, State}. ..754 ..755 handle_info(Msg, Db) -> ..756 ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]), ..757 exit({error, Msg}). ..758 ..759 ..760 %%% Internal function %%% ..761 open_doc_revs_int(Db, IdRevs, Options) -> ..762 Ids = [Id || {Id, _Revs} <- IdRevs], ..763 LookupResults = get_full_doc_infos(Db, Ids), ..764 lists:zipwith( ..765 fun({Id, Revs}, Lookup) -> ..766 case Lookup of ..767 {ok, #full_doc_info{rev_tree=RevTree}} -> ..768 {FoundRevs, MissingRevs} = ..769 case Revs of ..770 all -> ..771 {couch_key_tree:get_all_leafs(RevTree), []}; ..772 _ -> ..773 case lists:member(latest, Options) of ..774 true -> ..775 couch_key_tree:get_key_leafs(RevTree, Revs); ..776 false -> ..777 couch_key_tree:get(RevTree, Revs) ..778 end ..779 end, ..780 FoundResults = ..781 lists:map(fun({Value, {Pos, [Rev|_]}=FoundRevPath}) -> ..782 case Value of ..783 ?REV_MISSING -> ..784 % we have the rev in our list but know nothing about it ..785 {{not_found, missing}, {Pos, Rev}}; ..786 {IsDeleted, SummaryPtr, _UpdateSeq} -> ..787 {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)} ..788 end ..789 end, FoundRevs), ..790 Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs], ..791 {ok, Results}; ..792 not_found when Revs == all -> ..793 {ok, []}; ..794 not_found -> ..795 {ok, [{{not_found, missing}, Rev} || Rev <- Revs]} ..796 end ..797 end, ..798 IdRevs, LookupResults). ..799 ..800 open_doc_int(Db, <> = Id, _Options) -> ..801 case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of ..802 [{ok, {_, {Rev, BodyData}}}] -> ..803 {ok, #doc{id=Id, revs={0, [list_to_binary(integer_to_list(Rev))]}, body=BodyData}}; ..804 [not_found] -> ..805 {not_found, missing} ..806 end; ..807 open_doc_int(Db, #doc_info{id=Id,revs=[RevInfo|_]}=DocInfo, Options) -> ..808 #rev_info{deleted=IsDeleted,rev={Pos,RevId},body_sp=Bp} = RevInfo, ..809 Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos,[RevId]}), ..810 {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}}; ..811 open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) -> ..812 #doc_info{revs=[#rev_info{deleted=IsDeleted,rev=Rev,body_sp=Bp}|_]} = ..813 DocInfo = couch_doc:to_doc_info(FullDocInfo), ..814 {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]), ..815 Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath), ..816 {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}; ..817 open_doc_int(Db, Id, Options) -> ..818 case get_full_doc_info(Db, Id) of ..819 {ok, FullDocInfo} -> ..820 open_doc_int(Db, FullDocInfo, Options); ..821 not_found -> ..822 {not_found, missing} ..823 end. ..824 ..825 doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTree, Options) -> ..826 case lists:member(revs_info, Options) of ..827 false -> []; ..828 true -> ..829 {[{Pos, RevPath}],[]} = ..830 couch_key_tree:get_full_key_paths(RevTree, [Rev]), ..831 ..832 [{revs_info, Pos, lists:map( ..833 fun({Rev1, {true, _Sp, _UpdateSeq}}) -> ..834 {Rev1, deleted}; ..835 ({Rev1, {false, _Sp, _UpdateSeq}}) -> ..836 {Rev1, available}; ..837 ({Rev1, ?REV_MISSING}) -> ..838 {Rev1, missing} ..839 end, RevPath)}] ..840 end ++ ..841 case lists:member(conflicts, Options) of ..842 false -> []; ..843 true -> ..844 case [Rev1 || #rev_info{rev=Rev1,deleted=false} <- RestInfo] of ..845 [] -> []; ..846 ConflictRevs -> [{conflicts, ConflictRevs}] ..847 end ..848 end ++ ..849 case lists:member(deleted_conflicts, Options) of ..850 false -> []; ..851 true -> ..852 case [Rev1 || #rev_info{rev=Rev1,deleted=true} <- RestInfo] of ..853 [] -> []; ..854 DelConflictRevs -> [{deleted_conflicts, DelConflictRevs}] ..855 end ..856 end ++ ..857 case lists:member(local_seq, Options) of ..858 false -> []; ..859 true -> [{local_seq, Seq}] ..860 end. ..861 ..862 read_doc(#db{fd=Fd}, OldStreamPointer) when is_tuple(OldStreamPointer) -> ..863 % 09 UPGRADE CODE ..864 couch_stream:old_read_term(Fd, OldStreamPointer); ..865 read_doc(#db{fd=Fd}, Pos) -> ..866 couch_file:pread_term(Fd, Pos). ..867 ..868 ..869 doc_to_tree(#doc{revs={Start, RevIds}}=Doc) -> ..870 [Tree] = doc_to_tree_simple(Doc, lists:reverse(RevIds)), ..871 {Start - length(RevIds) + 1, Tree}. ..872 ..873 ..874 doc_to_tree_simple(Doc, [RevId]) -> ..875 [{RevId, Doc, []}]; ..876 doc_to_tree_simple(Doc, [RevId | Rest]) -> ..877 [{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}]. ..878 ..879 ..880 make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) -> ..881 {BodyData, Atts} = ..882 case Bp of ..883 nil -> ..884 {[], []}; ..885 _ -> ..886 {ok, {BodyData0, Atts0}} = read_doc(Db, Bp), ..887 {BodyData0, ..888 lists:map( ..889 fun({Name,Type,Sp,Len,RevPos,Md5}) -> ..890 #att{name=Name, ..891 type=Type, ..892 len=Len, ..893 md5=Md5, ..894 revpos=RevPos, ..895 data={Fd,Sp}}; ..896 ({Name,{Type,Sp,Len}}) -> ..897 #att{name=Name, ..898 type=Type, ..899 len=Len, ..900 md5= <<>>, ..901 revpos=0, ..902 data={Fd,Sp}} ..903 end, Atts0)} ..904 end, ..905 #doc{ ..906 id = Id, ..907 revs = RevisionPath, ..908 body = BodyData, ..909 atts = Atts, ..910 deleted = Deleted ..911 }. ..912 ..913 ..914
Generated using etap 0.3.4.