Generated on 2009-08-09 21:22:55 with etap 0.3.4.
| Name | Total lines | Lines of code | Total coverage | Code coverage | ||||
| couch_rep | ?? | ?? | ?? |
|
....1 % Licensed under the Apache License, Version 2.0 (the "License"); you may not ....2 % use this file except in compliance with the License. You may obtain a copy of ....3 % the License at ....4 % ....5 % http://www.apache.org/licenses/LICENSE-2.0 ....6 % ....7 % Unless required by applicable law or agreed to in writing, software ....8 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ....9 % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ...10 % License for the specific language governing permissions and limitations under ...11 % the License. ...12 ...13 -module(couch_rep). ...14 -behaviour(gen_server). ...15 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, ...16 code_change/3]). ...17 ...18 -export([replicate/2]). ...19 ...20 -define(BUFFER_NDOCS, 1000). ...21 -define(BUFFER_NATTACHMENTS, 50). ...22 -define(BUFFER_MEMORY, 10000000). %% bytes ...23 ...24 -include("couch_db.hrl"). ...25 -include("../ibrowse/ibrowse.hrl"). ...26 ...27 %% @spec replicate(Source::binary(), Target::binary()) -> ...28 %% {ok, Stats} | {error, Reason} ...29 %% @doc Triggers a replication. Stats is a JSON Object with the following ...30 %% keys: session_id (UUID), source_last_seq (integer), and history (array). ...31 %% Each element of the history is an Object with keys start_time, end_time, ...32 %% start_last_seq, end_last_seq, missing_checked, missing_found, docs_read, ...33 %% and docs_written. ...34 %% ...35 %% The supervisor will try to restart the replication in case of any error ...36 %% other than shutdown. Just call this function again to listen for the ...37 %% result of the retry. ...38 replicate(Source, Target) -> ...39 ...40 {ok, HostName} = inet:gethostname(), ...41 RepId = couch_util:to_hex( ...42 erlang:md5(term_to_binary([HostName, Source, Target]))), ...43 Args = [?MODULE, [RepId, Source,Target], []], ...44 ...45 Replicator = {RepId, ...46 {gen_server, start_link, Args}, ...47 transient, ...48 1, ...49 worker, ...50 [?MODULE] ...51 }, ...52 ...53 Server = case supervisor:start_child(couch_rep_sup, Replicator) of ...54 {ok, Pid} -> ...55 ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]), ...56 Pid; ...57 {error, already_present} -> ...58 case supervisor:restart_child(couch_rep_sup, RepId) of ...59 {ok, Pid} -> ...60 ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]), ...61 Pid; ...62 {error, running} -> ...63 %% this error occurs if multiple replicators are racing ...64 %% each other to start and somebody else won. Just grab ...65 %% the Pid by calling start_child again. ...66 {error, {already_started, Pid}} = ...67 supervisor:start_child(couch_rep_sup, Replicator), ...68 ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]), ...69 Pid ...70 end; ...71 {error, {already_started, Pid}} -> ...72 ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]), ...73 Pid ...74 end, ...75 ...76 case gen_server:call(Server, get_result, infinity) of ...77 retry -> replicate(Source, Target); ...78 Else -> Else ...79 end. ...80 ...81 %%============================================================================= ...82 %% gen_server callbacks ...83 %%============================================================================= ...84 ...85 -record(old_http_db, { ...86 uri, ...87 headers, ...88 oauth ...89 }). ...90 ...91 ...92 -record(state, { ...93 context, ...94 current_seq, ...95 source, ...96 target, ...97 stats, ...98 enum_pid, ...99 docs_buffer = [], ..100 listeners = [], ..101 done = false ..102 }). ..103 ..104 ..105 init([RepId, Source, Target]) -> ..106 process_flag(trap_exit, true), ..107 ..108 {ok, DbSrc, SrcName} = open_db(Source), ..109 {ok, DbTgt, TgtName} = open_db(Target), ..110 ..111 DocKey = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), ..112 ..113 {ok, InfoSrc} = get_db_info(DbSrc), ..114 {ok, InfoTgt} = get_db_info(DbTgt), ..115 ..116 ReplicationStartTime = httpd_util:rfc1123_date(), ..117 SrcInstanceStartTime = proplists:get_value(instance_start_time, InfoSrc), ..118 TgtInstanceStartTime = proplists:get_value(instance_start_time, InfoTgt), ..119 ..120 RepRecDocSrc = ..121 case open_doc(DbSrc, DocKey, []) of ..122 {ok, SrcDoc} -> ..123 ?LOG_DEBUG("Found existing replication record on source", []), ..124 SrcDoc; ..125 _ -> #doc{id=DocKey} ..126 end, ..127 ..128 RepRecDocTgt = ..129 case open_doc(DbTgt, DocKey, []) of ..130 {ok, TgtDoc} -> ..131 ?LOG_DEBUG("Found existing replication record on target", []), ..132 TgtDoc; ..133 _ -> #doc{id=DocKey} ..134 end, ..135 ..136 #doc{body={RepRecProps}} = RepRecDocSrc, ..137 #doc{body={RepRecPropsTgt}} = RepRecDocTgt, ..138 ..139 case proplists:get_value(<<"session_id">>, RepRecProps) == ..140 proplists:get_value(<<"session_id">>, RepRecPropsTgt) of ..141 true -> ..142 % if the records have the same session id, ..143 % then we have a valid replication history ..144 OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0), ..145 OldHistory = proplists:get_value(<<"history">>, RepRecProps, []); ..146 false -> ..147 ?LOG_INFO("Replication records differ. " ..148 "Performing full replication instead of incremental.", []), ..149 ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", ..150 [RepRecProps, RepRecPropsTgt]), ..151 OldSeqNum = 0, ..152 OldHistory = [] ..153 end, ..154 ..155 Context = [ ..156 {start_seq, OldSeqNum}, ..157 {history, OldHistory}, ..158 {rep_starttime, ReplicationStartTime}, ..159 {src_starttime, SrcInstanceStartTime}, ..160 {tgt_starttime, TgtInstanceStartTime}, ..161 {src_record, RepRecDocSrc}, ..162 {tgt_record, RepRecDocTgt} ..163 ], ..164 ..165 Stats = ets:new(replication_stats, [set, private]), ..166 ets:insert(Stats, {total_revs,0}), ..167 ets:insert(Stats, {missing_revs, 0}), ..168 ets:insert(Stats, {docs_read, 0}), ..169 ets:insert(Stats, {docs_written, 0}), ..170 ets:insert(Stats, {doc_write_failures, 0}), ..171 ..172 couch_task_status:add_task("Replication", <", ..173 TgtName/binary>>, "Starting"), ..174 ..175 Parent = self(), ..176 Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{OldSeqNum,0}) end), ..177 ..178 State = #state{ ..179 context = Context, ..180 current_seq = OldSeqNum, ..181 enum_pid = Pid, ..182 source = DbSrc, ..183 target = DbTgt, ..184 stats = Stats ..185 }, ..186 ..187 {ok, State}. ..188 handle_call(get_result, From, #state{listeners=L,done=true} = State) -> ..189 {stop, normal, State#state{listeners=[From|L]}}; ..190 handle_call(get_result, From, #state{listeners=L} = State) -> ..191 {noreply, State#state{listeners=[From|L]}}; ..192 ..193 handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) -> ..194 #state{ ..195 context = Context, ..196 current_seq = Seq, ..197 docs_buffer = Buffer, ..198 source = Source, ..199 target = Target, ..200 stats = Stats ..201 } = State, ..202 ..203 ets:update_counter(Stats, missing_revs, length(Revs)), ..204 ..205 %% get document(s) ..206 {ok, DocResults} = open_doc_revs(Source, Id, Revs, [latest]), ..207 Docs = [RevDoc || {ok, RevDoc} <- DocResults], ..208 ets:update_counter(Stats, docs_read, length(Docs)), ..209 ..210 %% save them (maybe in a buffer) ..211 {NewBuffer, NewContext} = ..212 case should_flush(lists:flatlength([Docs|Buffer])) of ..213 true -> ..214 Docs2 = lists:flatten([Docs|Buffer]), ..215 try update_docs(Target, Docs2, [], replicated_changes) of ..216 {ok, Errors} -> ..217 dump_update_errors(Errors), ..218 ets:update_counter(Stats, doc_write_failures, length(Errors)), ..219 ets:update_counter(Stats, docs_written, length(Docs2) - ..220 length(Errors)), ..221 {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats), ..222 {[], Ctxt} ..223 catch ..224 throw:attachment_write_failed -> ..225 ?LOG_ERROR("attachment request failed during write to disk", []), ..226 exit({internal_server_error, replication_link_failure}) ..227 end; ..228 false -> ..229 {[Docs | Buffer], Context} ..230 end, ..231 ..232 {reply, ok, State#state{context=NewContext, docs_buffer=NewBuffer}}; ..233 ..234 handle_call({fin, {LastSeq, RevsCount}}, {Pid,_}, #state{enum_pid=Pid} = State) -> ..235 ets:update_counter(State#state.stats, total_revs, RevsCount), ..236 case State#state.listeners of ..237 [] -> ..238 % still waiting for the first listener to send a request ..239 {noreply, State#state{current_seq=LastSeq,done=true}}; ..240 _ -> ..241 {stop, normal, ok, State#state{current_seq=LastSeq}} ..242 end. ..243 ..244 handle_cast({increment_update_seq, Seq}, State) -> ..245 couch_task_status:update("Processed source update #~p", [Seq]), ..246 {noreply, State#state{current_seq=Seq}}. ..247 ..248 handle_info({'EXIT', Pid, Reason}, #state{enum_pid=Pid} = State) -> ..249 ?LOG_ERROR("replication enumerator exited with ~p .. respawning", [Reason]), ..250 #state{ ..251 current_seq = Seq, ..252 source = Src, ..253 target = Tgt, ..254 enum_pid = Pid ..255 } = State, ..256 Parent = self(), ..257 NewPid = spawn_link(fun() -> enum_docs_since(Parent,Src,Tgt,{Seq,0}) end), ..258 {noreply, State#state{enum_pid=NewPid}}; ..259 ..260 %% if any linked process dies, respawn the enumerator to get things going again ..261 handle_info({'EXIT', _From, normal}, State) -> ..262 {noreply, State}; ..263 handle_info({'EXIT', From, Reason}, #state{enum_pid=EnumPid} = State) -> ..264 ?LOG_ERROR("replicator-linked pid ~p exited with ~p", [From, Reason]), ..265 exit(EnumPid, pls_restart_kthxbye), ..266 {noreply, State}; ..267 ..268 handle_info(_Msg, State) -> ..269 {noreply, State}. ..270 ..271 terminate(normal, State) -> ..272 #state{ ..273 context = Context, ..274 current_seq = Seq, ..275 docs_buffer = Buffer, ..276 listeners = Listeners, ..277 source = Source, ..278 target = Target, ..279 stats = Stats ..280 } = State, ..281 ..282 try update_docs(Target, lists:flatten(Buffer), [], replicated_changes) of ..283 {ok, Errors} -> ..284 dump_update_errors(Errors), ..285 ets:update_counter(Stats, doc_write_failures, length(Errors)), ..286 ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) - ..287 length(Errors)) ..288 catch ..289 throw:attachment_write_failed -> ..290 ?LOG_ERROR("attachment request failed during final write", []), ..291 exit({internal_server_error, replication_link_failure}) ..292 end, ..293 ..294 couch_task_status:update("Finishing"), ..295 ..296 {ok, NewRepHistory, _} = do_checkpoint(Source, Target, Context, Seq, Stats), ..297 ets:delete(Stats), ..298 close_db(Target), ..299 ..300 [Original|Rest] = Listeners, ..301 gen_server:reply(Original, {ok, NewRepHistory}), ..302 ..303 %% maybe trigger another replication. If this replicator uses a local ..304 %% source Db, changes to that Db since we started will not be included in ..305 %% this pass. ..306 case up_to_date(Source, Seq) of ..307 true -> ..308 [gen_server:reply(R, {ok, NewRepHistory}) || R <- Rest]; ..309 false -> ..310 [gen_server:reply(R, retry) || R <- Rest] ..311 end, ..312 close_db(Source); ..313 terminate(Reason, State) -> ..314 ?LOG_ERROR("replicator terminating with reason ~p", [Reason]), ..315 #state{ ..316 listeners = Listeners, ..317 source = Source, ..318 target = Target, ..319 stats = Stats ..320 } = State, ..321 ..322 [gen_server:reply(L, {error, Reason}) || L <- Listeners], ..323 ..324 ets:delete(Stats), ..325 close_db(Target), ..326 close_db(Source). ..327 ..328 code_change(_OldVsn, State, _Extra) -> ..329 {ok, State}. ..330 ..331 %%============================================================================= ..332 %% internal functions ..333 %%============================================================================= ..334 ..335 ..336 % we should probably write these to a special replication log ..337 % or have a callback where the caller decides what to do with replication ..338 % errors. ..339 dump_update_errors([]) -> ok; ..340 dump_update_errors([{{Id, Rev}, Error}|Rest]) -> ..341 ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p", ..342 [Id, couch_doc:rev_to_str(Rev), Error]), ..343 dump_update_errors(Rest). ..344 ..345 attachment_loop(ReqId, Conn) -> ..346 couch_util:should_flush(), ..347 receive ..348 {From, {set_req_id, NewId}} -> ..349 %% we learn the ReqId to listen for ..350 From ! {self(), {ok, NewId}}, ..351 attachment_loop(NewId, Conn); ..352 {ibrowse_async_headers, ReqId, Status, Headers} -> ..353 %% we got header, give the controlling process a chance to react ..354 receive ..355 {From, gimme_status} -> ..356 %% send status/headers to controller ..357 From ! {self(), {status, Status, Headers}}, ..358 receive ..359 {From, continue} -> ..360 %% normal case ..361 attachment_loop(ReqId, Conn); ..362 {From, fail} -> ..363 %% error, failure code ..364 ?LOG_ERROR( ..365 "streaming attachment failed with status ~p", ..366 [Status]), ..367 catch ibrowse:stop_worker_process(Conn), ..368 exit(attachment_request_failed); ..369 {From, stop_ok} -> ..370 %% stop looping, controller will start a new loop ..371 catch ibrowse:stop_worker_process(Conn), ..372 stop_ok ..373 end ..374 end, ..375 attachment_loop(ReqId, Conn); ..376 {ibrowse_async_response, ReqId, {chunk_start,_}} -> ..377 attachment_loop(ReqId, Conn); ..378 {ibrowse_async_response, ReqId, chunk_end} -> ..379 attachment_loop(ReqId, Conn); ..380 {ibrowse_async_response, ReqId, {error, Err}} -> ..381 ?LOG_ERROR("streaming attachment failed with ~p", [Err]), ..382 catch ibrowse:stop_worker_process(Conn), ..383 exit(attachment_request_failed); ..384 {ibrowse_async_response, ReqId, Data} -> ..385 receive {From, gimme_data} -> From ! {self(), Data} end, ..386 attachment_loop(ReqId, Conn); ..387 {ibrowse_async_response_end, ReqId} -> ..388 catch ibrowse:stop_worker_process(Conn), ..389 exit(normal) ..390 end. ..391 ..392 att_stub_converter(DbS, Id, Rev, ..393 #att{name=Name,data=stub,type=Type,len=Length}=Att) -> ..394 #old_http_db{uri=DbUrl, headers=Headers} = DbS, ..395 {Pos, [RevId|_]} = Rev, ..396 Url = lists:flatten([DbUrl, couch_util:url_encode(Id), "/", couch_util:url_encode(?b2l(Name)), ..397 "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]), ..398 ?LOG_DEBUG("Attachment URL ~s", [Url]), ..399 {ok, RcvFun} = make_att_stub_receiver(Url, Headers, Name, ..400 Type, Length), ..401 Att#att{name=Name,type=Type,data=RcvFun,len=Length}. ..402 ..403 make_att_stub_receiver(Url, Headers, Name, Type, Length) -> ..404 make_att_stub_receiver(Url, Headers, Name, Type, Length, 10, 1000). ..405 ..406 make_att_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0, _Pause) -> ..407 ?LOG_ERROR("streaming attachment request failed after 10 retries: ~s", ..408 [Url]), ..409 exit({attachment_request_failed, ?l2b(["failed to replicate ", Url])}); ..410 ..411 make_att_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) -> ..412 %% start the process that receives attachment data from ibrowse ..413 #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), ..414 {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port), ..415 Pid = spawn_link(fun() -> attachment_loop(nil, Conn) end), ..416 ..417 %% make the async request ..418 Opts = [{stream_to, Pid}, {response_format, binary}], ..419 ReqId = ..420 case ibrowse:send_req_direct(Conn, Url, Headers, get, [], Opts, infinity) of ..421 {ibrowse_req_id, X} -> ..422 X; ..423 {error, Reason} -> ..424 ?LOG_INFO("retrying couch_rep attachment request in ~p " ++ ..425 "seconds due to {error, ~p}: ~s", [Pause/1000, Reason, Url]), ..426 catch ibrowse:stop_worker_process(Conn), ..427 timer:sleep(Pause), ..428 make_att_stub_receiver(Url, Headers, Name, Type, Length, ..429 Retries-1, 2*Pause) ..430 end, ..431 ..432 %% tell our receiver about the ReqId it needs to look for ..433 Pid ! {self(), {set_req_id, ReqId}}, ..434 receive ..435 {Pid, {ok, ReqId}} -> ..436 ok; ..437 {'EXIT', Pid, _Reason} -> ..438 catch ibrowse:stop_worker_process(Conn), ..439 timer:sleep(Pause), ..440 make_att_stub_receiver(Url, Headers, Name, Type, Length, ..441 Retries-1, 2*Pause) ..442 end, ..443 ..444 %% wait for headers to ensure that we have a 200 status code ..445 %% this is where we follow redirects etc ..446 Pid ! {self(), gimme_status}, ..447 receive ..448 {'EXIT', Pid, attachment_request_failed} -> ..449 catch ibrowse:stop_worker_process(Conn), ..450 make_att_stub_receiver(Url, Headers, Name, Type, Length, ..451 Retries-1, Pause); ..452 {Pid, {status, StreamStatus, StreamHeaders}} -> ..453 ?LOG_DEBUG("streaming attachment Status ~p Headers ~p", ..454 [StreamStatus, StreamHeaders]), ..455 ..456 ResponseCode = list_to_integer(StreamStatus), ..457 if ..458 ResponseCode >= 200, ResponseCode < 300 -> ..459 % the normal case ..460 Pid ! {self(), continue}, ..461 %% this function goes into the streaming attachment code. ..462 %% It gets executed by the replication gen_server, so it can't ..463 %% be the one to actually receive the ibrowse data. ..464 {ok, fun() -> ..465 Pid ! {self(), gimme_data}, ..466 receive ..467 {Pid, Data} -> ..468 Data; ..469 {'EXIT', Pid, attachment_request_failed} -> ..470 throw(attachment_write_failed) ..471 end ..472 end}; ..473 ResponseCode >= 300, ResponseCode < 400 -> ..474 % follow the redirect ..475 Pid ! {self(), stop_ok}, ..476 RedirectUrl = mochiweb_headers:get_value("Location", ..477 mochiweb_headers:make(StreamHeaders)), ..478 catch ibrowse:stop_worker_process(Conn), ..479 make_att_stub_receiver(RedirectUrl, Headers, Name, Type, ..480 Length, Retries - 1, Pause); ..481 ResponseCode >= 400, ResponseCode < 500 -> ..482 % an error... log and fail ..483 ?LOG_ERROR("streaming attachment failed with code ~p: ~s", ..484 [ResponseCode, Url]), ..485 Pid ! {self(), fail}, ..486 exit(attachment_request_failed); ..487 ResponseCode == 500 -> ..488 % an error... log and retry ..489 ?LOG_INFO("retrying couch_rep attachment request in ~p " ++ ..490 "seconds due to 500 response: ~s", [Pause/1000, Url]), ..491 Pid ! {self(), fail}, ..492 catch ibrowse:stop_worker_process(Conn), ..493 timer:sleep(Pause), ..494 make_att_stub_receiver(Url, Headers, Name, Type, Length, ..495 Retries - 1, 2*Pause) ..496 end ..497 end. ..498 ..499 ..500 open_db({remote, Url, Headers, Auth})-> ..501 {ok, #old_http_db{uri=?b2l(Url), headers=Headers, oauth=Auth}, Url}; ..502 open_db({local, DbName, UserCtx})-> ..503 case couch_db:open(DbName, [{user_ctx, UserCtx}]) of ..504 {ok, Db} -> {ok, Db, DbName}; ..505 Error -> Error ..506 end. ..507 ..508 ..509 close_db(#old_http_db{})-> ..510 ok; ..511 close_db(Db)-> ..512 couch_db:close(Db). ..513 ..514 do_checkpoint(Source, Target, Context, NewSeqNum, Stats) -> ..515 ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]), ..516 [ ..517 {start_seq, StartSeqNum}, ..518 {history, OldHistory}, ..519 {rep_starttime, ReplicationStartTime}, ..520 {src_starttime, SrcInstanceStartTime}, ..521 {tgt_starttime, TgtInstanceStartTime}, ..522 {src_record, #doc{body={LastRepRecord}}=RepRecDocSrc}, ..523 {tgt_record, RepRecDocTgt} ..524 ] = Context, ..525 ..526 case NewSeqNum == StartSeqNum andalso OldHistory /= [] of ..527 true -> ..528 % nothing changed, don't record results ..529 {ok, {[{<<"no_changes">>, true} | LastRepRecord]}, Context}; ..530 false -> ..531 % something changed, record results for incremental replication, ..532 ..533 % commit changes to both src and tgt. The src because if changes ..534 % we replicated are lost, we'll record the a seq number ahead ..535 % of what was committed. If those changes are lost and the seq number ..536 % reverts to a previous committed value, we will skip future changes ..537 % when new doc updates are given our already replicated seq nums. ..538 ..539 % commit the src async ..540 ParentPid = self(), ..541 SrcCommitPid = spawn_link(fun() -> ..542 ParentPid ! {self(), ensure_full_commit(Source)} end), ..543 ..544 % commit tgt sync ..545 {ok, TgtInstanceStartTime2} = ensure_full_commit(Target), ..546 ..547 SrcInstanceStartTime2 = ..548 receive ..549 {SrcCommitPid, {ok, Timestamp}} -> ..550 Timestamp; ..551 {'EXIT', SrcCommitPid, {http_request_failed, _}} -> ..552 exit(replication_link_failure) ..553 end, ..554 ..555 RecordSeqNum = ..556 if SrcInstanceStartTime2 == SrcInstanceStartTime andalso ..557 TgtInstanceStartTime2 == TgtInstanceStartTime -> ..558 NewSeqNum; ..559 true -> ..560 ?LOG_INFO("A server has restarted sinced replication start. " ..561 "Not recording the new sequence number to ensure the " ..562 "replication is redone and documents reexamined.", []), ..563 StartSeqNum ..564 end, ..565 ..566 NewHistoryEntry = { ..567 [{<<"start_time">>, list_to_binary(ReplicationStartTime)}, ..568 {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, ..569 {<<"start_last_seq">>, StartSeqNum}, ..570 {<<"end_last_seq">>, NewSeqNum}, ..571 {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, ..572 {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, ..573 {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, ..574 {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, ..575 {<<"doc_write_failures">>, ets:lookup_element(Stats, doc_write_failures, 2)} ..576 ]}, ..577 % limit history to 50 entries ..578 HistEntries =lists:sublist([NewHistoryEntry | OldHistory], 50), ..579 ..580 NewRepHistory = ..581 {[{<<"session_id">>, couch_util:new_uuid()}, ..582 {<<"source_last_seq">>, RecordSeqNum}, ..583 {<<"history">>, HistEntries}]}, ..584 ..585 {ok, {SrcRevPos,SrcRevId}} = update_doc(Source, ..586 RepRecDocSrc#doc{body=NewRepHistory}, []), ..587 {ok, {TgtRevPos,TgtRevId}} = update_doc(Target, ..588 RepRecDocTgt#doc{body=NewRepHistory}, []), ..589 ..590 NewContext = [ ..591 {start_seq, StartSeqNum}, ..592 {history, OldHistory}, ..593 {rep_starttime, ReplicationStartTime}, ..594 {src_starttime, SrcInstanceStartTime}, ..595 {tgt_starttime, TgtInstanceStartTime}, ..596 {src_record, RepRecDocSrc#doc{revs={SrcRevPos,[SrcRevId]}}}, ..597 {tgt_record, RepRecDocTgt#doc{revs={TgtRevPos,[TgtRevId]}}} ..598 ], ..599 ..600 {ok, NewRepHistory, NewContext} ..601 ..602 end. ..603 ..604 do_http_request(Url, Action, Headers, Auth) -> ..605 do_http_request(Url, Action, Headers, Auth, []). ..606 ..607 do_http_request(Url, Action, Headers, Auth, JsonBody) -> ..608 Headers0 = case Auth of ..609 {Props} -> ..610 % Add OAuth header ..611 {OAuth} = proplists:get_value(<<"oauth">>, Props), ..612 ConsumerKey = ?b2l(proplists:get_value(<<"consumer_key">>, OAuth)), ..613 Token = ?b2l(proplists:get_value(<<"token">>, OAuth)), ..614 TokenSecret = ?b2l(proplists:get_value(<<"token_secret">>, OAuth)), ..615 ConsumerSecret = ?b2l(proplists:get_value(<<"consumer_secret">>, OAuth)), ..616 Consumer = {ConsumerKey, ConsumerSecret, hmac_sha1}, ..617 Method = case Action of ..618 get -> "GET"; ..619 post -> "POST"; ..620 put -> "PUT" ..621 end, ..622 Params = oauth:signed_params(Method, Url, [], Consumer, Token, TokenSecret), ..623 [{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)} | Headers]; ..624 _Else -> ..625 Headers ..626 end, ..627 do_http_request0(Url, Action, Headers0, JsonBody, 10, 1000). ..628 ..629 do_http_request0(Url, Action, Headers, Body, Retries, Pause) when is_binary(Url) -> ..630 do_http_request0(?b2l(Url), Action, Headers, Body, Retries, Pause); ..631 do_http_request0(Url, Action, _Headers, _JsonBody, 0, _Pause) -> ..632 ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s", ..633 [Action, Url]), ..634 exit({http_request_failed, ?l2b(["failed to replicate ", Url])}); ..635 do_http_request0(Url, Action, Headers, JsonBody, Retries, Pause) -> ..636 ?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]), ..637 Body = ..638 case JsonBody of ..639 [] -> ..640 <<>>; ..641 _ -> ..642 iolist_to_binary(?JSON_ENCODE(JsonBody)) ..643 end, ..644 Options = case Action of ..645 get -> []; ..646 _ -> [{transfer_encoding, {chunked, 65535}}] ..647 end ++ [ ..648 {content_type, "application/json; charset=utf-8"}, ..649 {max_pipeline_size, 101}, ..650 {response_format, binary} ..651 ], ..652 case ibrowse:send_req(Url, Headers, Action, Body, Options, infinity) of ..653 {ok, Status, ResponseHeaders, ResponseBody} -> ..654 ResponseCode = list_to_integer(Status), ..655 if ..656 ResponseCode >= 200, ResponseCode < 300 -> ..657 ?JSON_DECODE(ResponseBody); ..658 ResponseCode >= 300, ResponseCode < 400 -> ..659 RedirectUrl = mochiweb_headers:get_value("Location", ..660 mochiweb_headers:make(ResponseHeaders)), ..661 do_http_request0(RedirectUrl, Action, Headers, JsonBody, Retries-1, ..662 Pause); ..663 ResponseCode >= 400, ResponseCode < 500 -> ..664 ?JSON_DECODE(ResponseBody); ..665 ResponseCode == 500 -> ..666 ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds " ++ ..667 "due to 500 error: ~s", [Action, Pause/1000, Url]), ..668 timer:sleep(Pause), ..669 do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause) ..670 end; ..671 {error, Reason} -> ..672 ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds due to " ++ ..673 "{error, ~p}: ~s", [Action, Pause/1000, Reason, Url]), ..674 timer:sleep(Pause), ..675 do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause) ..676 end. ..677 ..678 ensure_full_commit(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) -> ..679 {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post, ..680 Headers, OAuth, true), ..681 true = proplists:get_value(<<"ok">>, ResultProps), ..682 {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)}; ..683 ensure_full_commit(Db) -> ..684 couch_db:ensure_full_commit(Db). ..685 ..686 enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) -> ..687 case get_doc_info_list(DbSource, StartSeq) of ..688 [] -> ..689 gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity); ..690 DocInfoList -> ..691 SrcRevsList = lists:map(fun(#doc_info{id=Id,revs=RevInfos}) -> ..692 SrcRevs = [Rev || #rev_info{rev=Rev} <- RevInfos], ..693 {Id, SrcRevs} ..694 end, DocInfoList), ..695 {ok, MissingRevs} = get_missing_revs(DbTarget, SrcRevsList), ..696 ..697 %% do we need to check for success here? ..698 [gen_server:call(Pid, {replicate_doc, Info}, infinity) ..699 || Info <- MissingRevs ], ..700 ..701 #doc_info{high_seq=LastSeq} = lists:last(DocInfoList), ..702 RevsCount2 = RevsCount + length(SrcRevsList), ..703 gen_server:cast(Pid, {increment_update_seq, LastSeq}), ..704 ..705 enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2}) ..706 end. ..707 ..708 ..709 ..710 get_db_info(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) -> ..711 {DbProps} = do_http_request(DbUrl, get, Headers, OAuth), ..712 {ok, [{list_to_atom(?b2l(K)), V} || {K,V} <- DbProps]}; ..713 get_db_info(Db) -> ..714 couch_db:get_db_info(Db). ..715 ..716 get_doc_info_list(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, StartSeq) -> ..717 Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey=" ..718 ++ integer_to_list(StartSeq), ..719 {Results} = do_http_request(Url, get, Headers, OAuth), ..720 lists:map(fun({RowInfoList}) -> ..721 {RowValueProps} = proplists:get_value(<<"value">>, RowInfoList), ..722 Seq = proplists:get_value(<<"key">>, RowInfoList), ..723 Revs = ..724 [#rev_info{rev=couch_doc:parse_rev(proplists:get_value(<<"rev">>, RowValueProps)), deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)} | ..725 [#rev_info{rev=Rev,deleted=false} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"conflicts">>, RowValueProps, []))] ++ ..726 [#rev_info{rev=Rev,deleted=true} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []))]], ..727 #doc_info{ ..728 id=proplists:get_value(<<"id">>, RowInfoList), ..729 high_seq = Seq, ..730 revs = Revs ..731 } ..732 end, proplists:get_value(<<"rows">>, Results)); ..733 get_doc_info_list(DbSource, StartSeq) -> ..734 {ok, {_Count, DocInfoList}} = couch_db:enum_docs_since(DbSource, StartSeq, ..735 fun (_, _, {100, DocInfoList}) -> ..736 {stop, {100, DocInfoList}}; ..737 (DocInfo, _, {Count, DocInfoList}) -> ..738 {ok, {Count+1, [DocInfo|DocInfoList]}} ..739 end, {0, []}), ..740 lists:reverse(DocInfoList). ..741 ..742 get_missing_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocIdRevsList) -> ..743 DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList], ..744 {ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers, OAuth, ..745 {DocIdRevsList2}), ..746 {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers), ..747 DocMissingRevsList2 = [{Id, couch_doc:parse_revs(MissingRevStrs)} || {Id, MissingRevStrs} <- DocMissingRevsList], ..748 {ok, DocMissingRevsList2}; ..749 get_missing_revs(Db, DocId) -> ..750 couch_db:get_missing_revs(Db, DocId). ..751 ..752 ..753 open_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocId, Options) -> ..754 [] = Options, ..755 case do_http_request(DbUrl ++ couch_util:url_encode(DocId), get, Headers, OAuth) of ..756 {[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} -> ..757 {couch_util:to_existing_atom(ErrId), Reason}; ..758 Doc -> ..759 {ok, couch_doc:from_json_obj(Doc)} ..760 end; ..761 open_doc(Db, DocId, Options) -> ..762 couch_db:open_doc(Db, DocId, Options). ..763 ..764 open_doc_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth} = DbS, DocId, Revs0, ..765 [latest]) -> ..766 Revs = couch_doc:rev_to_strs(Revs0), ..767 BaseUrl = DbUrl ++ couch_util:url_encode(DocId) ++ "?revs=true&latest=true", ..768 ..769 %% MochiWeb expects URLs < 8KB long, so maybe split into multiple requests ..770 MaxN = trunc((8192 - length(BaseUrl))/14), ..771 ..772 JsonResults = case length(Revs) > MaxN of ..773 false -> ..774 Url = ?l2b(BaseUrl ++ "&open_revs=" ++ ?JSON_ENCODE(Revs)), ..775 do_http_request(Url, get, Headers, OAuth); ..776 true -> ..777 {_, Rest, Acc} = lists:foldl( ..778 fun(Rev, {Count, RevsAcc, AccResults}) when Count =:= MaxN -> ..779 QSRevs = ?JSON_ENCODE(lists:reverse(RevsAcc)), ..780 Url = ?l2b(BaseUrl ++ "&open_revs=" ++ QSRevs), ..781 {1, [Rev], AccResults++do_http_request(Url, get, Headers, OAuth)}; ..782 (Rev, {Count, RevsAcc, AccResults}) -> ..783 {Count+1, [Rev|RevsAcc], AccResults} ..784 end, {0, [], []}, Revs), ..785 Acc ++ do_http_request(?l2b(BaseUrl ++ "&open_revs=" ++ ..786 ?JSON_ENCODE(lists:reverse(Rest))), get, Headers, OAuth) ..787 end, ..788 ..789 Results = ..790 lists:map( ..791 fun({[{<<"missing">>, Rev}]}) -> ..792 {{not_found, missing}, couch_doc:parse_rev(Rev)}; ..793 ({[{<<"ok">>, JsonDoc}]}) -> ..794 #doc{id=Id, revs=Rev, atts=Atts} = Doc = ..795 couch_doc:from_json_obj(JsonDoc), ..796 {ok, Doc#doc{atts=[att_stub_converter(DbS,Id,Rev,A) || A <- Atts]}} ..797 end, JsonResults), ..798 {ok, Results}; ..799 open_doc_revs(Db, DocId, Revs, Options) -> ..800 couch_db:open_doc_revs(Db, DocId, Revs, Options). ..801 ..802 %% @spec should_flush() -> true | false ..803 %% @doc Calculates whether it's time to flush the document buffer. Considers ..804 %% - memory utilization ..805 %% - number of pending document writes ..806 %% - approximate number of pending attachment writes ..807 should_flush(DocCount) when DocCount > ?BUFFER_NDOCS -> ..808 true; ..809 should_flush(_DocCount) -> ..810 MeAndMyLinks = [self()| ..811 [P || P <- element(2,process_info(self(),links)), is_pid(P)]], ..812 ..813 case length(MeAndMyLinks)/2 > ?BUFFER_NATTACHMENTS of ..814 true -> true; ..815 false -> ..816 case memory_footprint(MeAndMyLinks) > 2*?BUFFER_MEMORY of ..817 true -> ..818 [garbage_collect(Pid) || Pid <- MeAndMyLinks], ..819 memory_footprint(MeAndMyLinks) > ?BUFFER_MEMORY; ..820 false -> false ..821 end ..822 end. ..823 ..824 %% @spec memory_footprint([pid()]) -> integer() ..825 %% @doc Sum of process and binary memory utilization for all processes in list ..826 memory_footprint(PidList) -> ..827 memory_footprint(PidList, {0,0}). ..828 ..829 memory_footprint([], {ProcessMemory, BinaryMemory}) -> ..830 ?LOG_DEBUG("ProcessMem ~p BinaryMem ~p", [ProcessMemory, BinaryMemory]), ..831 ProcessMemory + BinaryMemory; ..832 memory_footprint([Pid|Rest], {ProcAcc, BinAcc}) -> ..833 case is_process_alive(Pid) of ..834 true -> ..835 ProcMem = element(2,process_info(Pid, memory)), ..836 BinMem = binary_memory(Pid), ..837 memory_footprint(Rest, {ProcMem + ProcAcc, BinMem + BinAcc}); ..838 false -> ..839 memory_footprint(Rest, {ProcAcc, BinAcc}) ..840 end. ..841 ..842 %% @spec binary_memory(pid()) -> integer() ..843 %% @doc Memory utilization of all binaries referenced by this process. ..844 binary_memory(Pid) -> ..845 lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end, ..846 0, element(2,process_info(Pid, binary))). ..847 ..848 update_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, #doc{id=DocId}=Doc, Options) -> ..849 [] = Options, ..850 Url = DbUrl ++ couch_util:url_encode(DocId), ..851 {ResponseMembers} = do_http_request(Url, put, Headers, OAuth, ..852 couch_doc:to_json_obj(Doc, [attachments])), ..853 Rev = proplists:get_value(<<"rev">>, ResponseMembers), ..854 {ok, couch_doc:parse_rev(Rev)}; ..855 update_doc(Db, Doc, Options) -> ..856 couch_db:update_doc(Db, Doc, Options). ..857 ..858 update_docs(_, [], _, _) -> ..859 {ok, []}; ..860 update_docs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, Docs, [], replicated_changes) -> ..861 JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], ..862 ErrorsJson = ..863 do_http_request(DbUrl ++ "_bulk_docs", post, Headers, OAuth, ..864 {[{new_edits, false}, {docs, JsonDocs}]}), ..865 ErrorsList = ..866 lists:map( ..867 fun({Props}) -> ..868 Id = proplists:get_value(<<"id">>, Props), ..869 Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)), ..870 ErrId = couch_util:to_existing_atom( ..871 proplists:get_value(<<"error">>, Props)), ..872 Reason = proplists:get_value(<<"reason">>, Props), ..873 Error = {ErrId, Reason}, ..874 {{Id, Rev}, Error} ..875 end, ErrorsJson), ..876 {ok, ErrorsList}; ..877 update_docs(Db, Docs, Options, UpdateType) -> ..878 couch_db:update_docs(Db, Docs, Options, UpdateType). ..879 ..880 up_to_date(#old_http_db{}, _Seq) -> ..881 true; ..882 up_to_date(Source, Seq) -> ..883 {ok, NewDb} = couch_db:open(Source#db.name, []), ..884 T = NewDb#db.update_seq == Seq, ..885 couch_db:close(NewDb), ..886 T. ..887
Generated using etap 0.3.4.