C0 code coverage information

Generated on 2009-08-09 21:22:55 with etap 0.3.4.

Name Total lines Lines of code Total coverage Code coverage
couch_rep ?? ?? ??
0% 
....1 % Licensed under the Apache License, Version 2.0 (the "License"); you may not
....2 % use this file except in compliance with the License. You may obtain a copy of
....3 % the License at
....4 %
....5 %   http://www.apache.org/licenses/LICENSE-2.0
....6 %
....7 % Unless required by applicable law or agreed to in writing, software
....8 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
....9 % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
...10 % License for the specific language governing permissions and limitations under
...11 % the License.
...12 
...13 -module(couch_rep).
...14 -behaviour(gen_server).
...15 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
...16     code_change/3]).
...17 
...18 -export([replicate/2]).
...19 
...20 -define(BUFFER_NDOCS, 1000).
...21 -define(BUFFER_NATTACHMENTS, 50).
...22 -define(BUFFER_MEMORY, 10000000). %% bytes
...23 
...24 -include("couch_db.hrl").
...25 -include("../ibrowse/ibrowse.hrl").
...26 
...27 %% @spec replicate(Source::binary(), Target::binary()) ->
...28 %%      {ok, Stats} | {error, Reason}
...29 %% @doc Triggers a replication.  Stats is a JSON Object with the following
...30 %%      keys: session_id (UUID), source_last_seq (integer), and history (array).
...31 %%      Each element of the history is an Object with keys start_time, end_time,
...32 %%      start_last_seq, end_last_seq, missing_checked, missing_found, docs_read,
...33 %%      and docs_written.
...34 %%
...35 %%      The supervisor will try to restart the replication in case of any error
...36 %%      other than shutdown.  Just call this function again to listen for the
...37 %%      result of the retry.
...38 replicate(Source, Target) ->
...39 
...40     {ok, HostName} = inet:gethostname(),
...41     RepId = couch_util:to_hex(
...42             erlang:md5(term_to_binary([HostName, Source, Target]))),
...43     Args = [?MODULE, [RepId, Source,Target], []],
...44 
...45     Replicator = {RepId,
...46         {gen_server, start_link, Args},
...47         transient,
...48         1,
...49         worker,
...50         [?MODULE]
...51     },
...52 
...53     Server = case supervisor:start_child(couch_rep_sup, Replicator) of
...54         {ok, Pid} ->
...55             ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
...56             Pid;
...57         {error, already_present} ->
...58             case supervisor:restart_child(couch_rep_sup, RepId) of
...59                 {ok, Pid} ->
...60                     ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
...61                     Pid;
...62                 {error, running} ->
...63                     %% this error occurs if multiple replicators are racing
...64                     %% each other to start and somebody else won.  Just grab
...65                     %% the Pid by calling start_child again.
...66                     {error, {already_started, Pid}} =
...67                         supervisor:start_child(couch_rep_sup, Replicator),
...68                     ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
...69                     Pid
...70             end;
...71         {error, {already_started, Pid}} ->
...72             ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]),
...73             Pid
...74     end,
...75 
...76     case gen_server:call(Server, get_result, infinity) of
...77         retry -> replicate(Source, Target);
...78         Else -> Else
...79     end.
...80 
...81 %%=============================================================================
...82 %% gen_server callbacks
...83 %%=============================================================================
...84 
...85 -record(old_http_db, {
...86     uri,
...87     headers,
...88     oauth
...89 }).
...90 
...91 
...92 -record(state, {
...93     context,
...94     current_seq,
...95     source,
...96     target,
...97     stats,
...98     enum_pid,
...99     docs_buffer = [],
..100     listeners = [],
..101     done = false
..102 }).
..103 
..104 
..105 init([RepId, Source, Target]) ->
..106     process_flag(trap_exit, true),
..107 
..108     {ok, DbSrc, SrcName} = open_db(Source),
..109     {ok, DbTgt, TgtName} =  open_db(Target),
..110 
..111     DocKey = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
..112 
..113     {ok, InfoSrc} = get_db_info(DbSrc),
..114     {ok, InfoTgt} = get_db_info(DbTgt),
..115 
..116     ReplicationStartTime = httpd_util:rfc1123_date(),
..117     SrcInstanceStartTime = proplists:get_value(instance_start_time, InfoSrc),
..118     TgtInstanceStartTime = proplists:get_value(instance_start_time, InfoTgt),
..119 
..120     RepRecDocSrc =
..121     case open_doc(DbSrc, DocKey, []) of
..122     {ok, SrcDoc} ->
..123         ?LOG_DEBUG("Found existing replication record on source", []),
..124         SrcDoc;
..125     _ -> #doc{id=DocKey}
..126     end,
..127 
..128     RepRecDocTgt =
..129     case open_doc(DbTgt, DocKey, []) of
..130     {ok, TgtDoc} ->
..131         ?LOG_DEBUG("Found existing replication record on target", []),
..132         TgtDoc;
..133     _ -> #doc{id=DocKey}
..134     end,
..135 
..136     #doc{body={RepRecProps}} = RepRecDocSrc,
..137     #doc{body={RepRecPropsTgt}} = RepRecDocTgt,
..138 
..139     case proplists:get_value(<<"session_id">>, RepRecProps) ==
..140             proplists:get_value(<<"session_id">>, RepRecPropsTgt) of
..141     true ->
..142         % if the records have the same session id,
..143         % then we have a valid replication history
..144         OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0),
..145         OldHistory = proplists:get_value(<<"history">>, RepRecProps, []);
..146     false ->
..147         ?LOG_INFO("Replication records differ. "
..148                 "Performing full replication instead of incremental.", []),
..149         ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
..150                 [RepRecProps, RepRecPropsTgt]),
..151         OldSeqNum = 0,
..152         OldHistory = []
..153     end,
..154 
..155     Context = [
..156         {start_seq, OldSeqNum},
..157         {history, OldHistory},
..158         {rep_starttime, ReplicationStartTime},
..159         {src_starttime, SrcInstanceStartTime},
..160         {tgt_starttime, TgtInstanceStartTime},
..161         {src_record, RepRecDocSrc},
..162         {tgt_record, RepRecDocTgt}
..163     ],
..164 
..165     Stats = ets:new(replication_stats, [set, private]),
..166     ets:insert(Stats, {total_revs,0}),
..167     ets:insert(Stats, {missing_revs, 0}),
..168     ets:insert(Stats, {docs_read, 0}),
..169     ets:insert(Stats, {docs_written, 0}),
..170     ets:insert(Stats, {doc_write_failures, 0}),
..171 
..172     couch_task_status:add_task("Replication", < ",
..173         TgtName/binary>>, "Starting"),
..174 
..175     Parent = self(),
..176     Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{OldSeqNum,0}) end),
..177 
..178     State = #state{
..179         context = Context,
..180         current_seq = OldSeqNum,
..181         enum_pid = Pid,
..182         source = DbSrc,
..183         target = DbTgt,
..184         stats = Stats
..185     },
..186 
..187     {ok, State}.
..188 handle_call(get_result, From, #state{listeners=L,done=true} = State) ->
..189     {stop, normal, State#state{listeners=[From|L]}};
..190 handle_call(get_result, From, #state{listeners=L} = State) ->
..191     {noreply, State#state{listeners=[From|L]}};
..192 
..193 handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
..194     #state{
..195         context = Context,
..196         current_seq = Seq,
..197         docs_buffer = Buffer,
..198         source = Source,
..199         target = Target,
..200         stats = Stats
..201     } = State,
..202 
..203     ets:update_counter(Stats, missing_revs, length(Revs)),
..204 
..205     %% get document(s)
..206     {ok, DocResults} = open_doc_revs(Source, Id, Revs, [latest]),
..207     Docs = [RevDoc || {ok, RevDoc} <- DocResults],
..208     ets:update_counter(Stats, docs_read, length(Docs)),
..209 
..210     %% save them (maybe in a buffer)
..211     {NewBuffer, NewContext} =
..212     case should_flush(lists:flatlength([Docs|Buffer])) of
..213         true ->
..214             Docs2 = lists:flatten([Docs|Buffer]),
..215             try update_docs(Target, Docs2, [], replicated_changes) of
..216             {ok, Errors} ->
..217                 dump_update_errors(Errors),
..218                 ets:update_counter(Stats, doc_write_failures, length(Errors)),
..219                 ets:update_counter(Stats, docs_written, length(Docs2) -
..220                         length(Errors)),
..221                 {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats),
..222                 {[], Ctxt}
..223             catch
..224             throw:attachment_write_failed ->
..225                 ?LOG_ERROR("attachment request failed during write to disk", []),
..226                 exit({internal_server_error, replication_link_failure})
..227             end;
..228         false ->
..229             {[Docs | Buffer], Context}
..230     end,
..231 
..232     {reply, ok, State#state{context=NewContext, docs_buffer=NewBuffer}};
..233 
..234 handle_call({fin, {LastSeq, RevsCount}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
..235     ets:update_counter(State#state.stats, total_revs, RevsCount),
..236     case State#state.listeners of
..237     [] ->
..238         % still waiting for the first listener to send a request
..239         {noreply, State#state{current_seq=LastSeq,done=true}};
..240     _ ->
..241         {stop, normal, ok, State#state{current_seq=LastSeq}}
..242     end.
..243 
..244 handle_cast({increment_update_seq, Seq}, State) ->
..245     couch_task_status:update("Processed source update #~p", [Seq]),
..246     {noreply, State#state{current_seq=Seq}}.
..247 
..248 handle_info({'EXIT', Pid, Reason}, #state{enum_pid=Pid} = State) ->
..249     ?LOG_ERROR("replication enumerator exited with ~p .. respawning", [Reason]),
..250     #state{
..251         current_seq = Seq,
..252         source = Src,
..253         target = Tgt,
..254         enum_pid = Pid
..255     } = State,
..256     Parent = self(),
..257     NewPid = spawn_link(fun() -> enum_docs_since(Parent,Src,Tgt,{Seq,0}) end),
..258     {noreply, State#state{enum_pid=NewPid}};
..259 
..260 %% if any linked process dies, respawn the enumerator to get things going again
..261 handle_info({'EXIT', _From, normal}, State) ->
..262     {noreply, State};
..263 handle_info({'EXIT', From, Reason}, #state{enum_pid=EnumPid} = State) ->
..264     ?LOG_ERROR("replicator-linked pid ~p exited with ~p", [From, Reason]),
..265     exit(EnumPid, pls_restart_kthxbye),
..266     {noreply, State};
..267 
..268 handle_info(_Msg, State) ->
..269     {noreply, State}.
..270 
..271 terminate(normal, State) ->
..272     #state{
..273         context = Context,
..274         current_seq = Seq,
..275         docs_buffer = Buffer,
..276         listeners = Listeners,
..277         source = Source,
..278         target = Target,
..279         stats = Stats
..280     } = State,
..281 
..282     try update_docs(Target, lists:flatten(Buffer), [], replicated_changes) of
..283     {ok, Errors} ->
..284         dump_update_errors(Errors),
..285         ets:update_counter(Stats, doc_write_failures, length(Errors)),
..286         ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) -
..287                 length(Errors))
..288     catch
..289     throw:attachment_write_failed ->
..290         ?LOG_ERROR("attachment request failed during final write", []),
..291         exit({internal_server_error, replication_link_failure})
..292     end,
..293 
..294     couch_task_status:update("Finishing"),
..295 
..296     {ok, NewRepHistory, _} = do_checkpoint(Source, Target, Context, Seq, Stats),
..297     ets:delete(Stats),
..298     close_db(Target),
..299 
..300     [Original|Rest] = Listeners,
..301     gen_server:reply(Original, {ok, NewRepHistory}),
..302 
..303     %% maybe trigger another replication. If this replicator uses a local
..304     %% source Db, changes to that Db since we started will not be included in
..305     %% this pass.
..306     case up_to_date(Source, Seq) of
..307         true ->
..308             [gen_server:reply(R, {ok, NewRepHistory}) || R <- Rest];
..309         false ->
..310             [gen_server:reply(R, retry) || R <- Rest]
..311     end,
..312     close_db(Source);
..313 terminate(Reason, State) ->
..314     ?LOG_ERROR("replicator terminating with reason ~p", [Reason]),
..315     #state{
..316         listeners = Listeners,
..317         source = Source,
..318         target = Target,
..319         stats = Stats
..320     } = State,
..321 
..322     [gen_server:reply(L, {error, Reason}) || L <- Listeners],
..323 
..324     ets:delete(Stats),
..325     close_db(Target),
..326     close_db(Source).
..327 
..328 code_change(_OldVsn, State, _Extra) ->
..329     {ok, State}.
..330 
..331 %%=============================================================================
..332 %% internal functions
..333 %%=============================================================================
..334 
..335 
..336 % we should probably write these to a special replication log
..337 % or have a callback where the caller decides what to do with replication
..338 % errors.
..339 dump_update_errors([]) -> ok;
..340 dump_update_errors([{{Id, Rev}, Error}|Rest]) ->
..341     ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p",
..342         [Id, couch_doc:rev_to_str(Rev), Error]),
..343     dump_update_errors(Rest).
..344 
..345 attachment_loop(ReqId, Conn) ->
..346     couch_util:should_flush(),
..347     receive
..348         {From, {set_req_id, NewId}} ->
..349             %% we learn the ReqId to listen for
..350             From ! {self(), {ok, NewId}},
..351             attachment_loop(NewId, Conn);
..352         {ibrowse_async_headers, ReqId, Status, Headers} ->
..353             %% we got header, give the controlling process a chance to react
..354             receive
..355                 {From, gimme_status} ->
..356                     %% send status/headers to controller
..357                     From ! {self(), {status, Status, Headers}},
..358                     receive
..359                         {From, continue} ->
..360                             %% normal case
..361                             attachment_loop(ReqId, Conn);
..362                         {From, fail} ->
..363                             %% error, failure code
..364                             ?LOG_ERROR(
..365                                 "streaming attachment failed with status ~p",
..366                                 [Status]),
..367                             catch ibrowse:stop_worker_process(Conn),
..368                             exit(attachment_request_failed);
..369                         {From, stop_ok} ->
..370                             %% stop looping, controller will start a new loop
..371                             catch ibrowse:stop_worker_process(Conn),
..372                             stop_ok
..373                     end
..374             end,
..375             attachment_loop(ReqId, Conn);
..376         {ibrowse_async_response, ReqId, {chunk_start,_}} ->
..377             attachment_loop(ReqId, Conn);
..378         {ibrowse_async_response, ReqId, chunk_end} ->
..379             attachment_loop(ReqId, Conn);
..380         {ibrowse_async_response, ReqId, {error, Err}} ->
..381             ?LOG_ERROR("streaming attachment failed with ~p", [Err]),
..382             catch ibrowse:stop_worker_process(Conn),
..383             exit(attachment_request_failed);
..384         {ibrowse_async_response, ReqId, Data} ->
..385             receive {From, gimme_data} -> From ! {self(), Data} end,
..386             attachment_loop(ReqId, Conn);
..387         {ibrowse_async_response_end, ReqId} ->
..388             catch ibrowse:stop_worker_process(Conn),
..389             exit(normal)
..390     end.
..391 
..392 att_stub_converter(DbS, Id, Rev,
..393         #att{name=Name,data=stub,type=Type,len=Length}=Att) ->
..394     #old_http_db{uri=DbUrl, headers=Headers} = DbS,
..395     {Pos, [RevId|_]} = Rev,
..396     Url = lists:flatten([DbUrl, couch_util:url_encode(Id), "/", couch_util:url_encode(?b2l(Name)),
..397         "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]),
..398     ?LOG_DEBUG("Attachment URL ~s", [Url]),
..399     {ok, RcvFun} = make_att_stub_receiver(Url, Headers, Name,
..400         Type, Length),
..401     Att#att{name=Name,type=Type,data=RcvFun,len=Length}.
..402 
..403 make_att_stub_receiver(Url, Headers, Name, Type, Length) ->
..404     make_att_stub_receiver(Url, Headers, Name, Type, Length, 10, 1000).
..405 
..406 make_att_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0, _Pause) ->
..407     ?LOG_ERROR("streaming attachment request failed after 10 retries: ~s",
..408         [Url]),
..409     exit({attachment_request_failed, ?l2b(["failed to replicate ", Url])});
..410 
..411 make_att_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) ->
..412     %% start the process that receives attachment data from ibrowse
..413     #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
..414     {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port),
..415     Pid = spawn_link(fun() -> attachment_loop(nil, Conn) end),
..416 
..417     %% make the async request
..418     Opts = [{stream_to, Pid}, {response_format, binary}],
..419     ReqId =
..420     case ibrowse:send_req_direct(Conn, Url, Headers, get, [], Opts, infinity) of
..421     {ibrowse_req_id, X} ->
..422         X;
..423     {error, Reason} ->
..424         ?LOG_INFO("retrying couch_rep attachment request in ~p " ++
..425             "seconds due to {error, ~p}: ~s", [Pause/1000, Reason, Url]),
..426         catch ibrowse:stop_worker_process(Conn),
..427         timer:sleep(Pause),
..428         make_att_stub_receiver(Url, Headers, Name, Type, Length,
..429             Retries-1, 2*Pause)
..430     end,
..431 
..432     %% tell our receiver about the ReqId it needs to look for
..433     Pid ! {self(), {set_req_id, ReqId}},
..434     receive
..435     {Pid, {ok, ReqId}} ->
..436         ok;
..437     {'EXIT', Pid, _Reason} ->
..438         catch ibrowse:stop_worker_process(Conn),
..439         timer:sleep(Pause),
..440         make_att_stub_receiver(Url, Headers, Name, Type, Length,
..441             Retries-1, 2*Pause)
..442     end,
..443 
..444     %% wait for headers to ensure that we have a 200 status code
..445     %% this is where we follow redirects etc
..446     Pid ! {self(), gimme_status},
..447     receive
..448     {'EXIT', Pid, attachment_request_failed} ->
..449         catch ibrowse:stop_worker_process(Conn),
..450         make_att_stub_receiver(Url, Headers, Name, Type, Length,
..451             Retries-1, Pause);
..452     {Pid, {status, StreamStatus, StreamHeaders}} ->
..453         ?LOG_DEBUG("streaming attachment Status ~p Headers ~p",
..454             [StreamStatus, StreamHeaders]),
..455 
..456         ResponseCode = list_to_integer(StreamStatus),
..457         if
..458         ResponseCode >= 200, ResponseCode < 300 ->
..459             % the normal case
..460             Pid ! {self(), continue},
..461             %% this function goes into the streaming attachment code.
..462             %% It gets executed by the replication gen_server, so it can't
..463             %% be the one to actually receive the ibrowse data.
..464             {ok, fun() ->
..465                 Pid ! {self(), gimme_data},
..466                 receive
..467                     {Pid, Data} ->
..468                         Data;
..469                     {'EXIT', Pid, attachment_request_failed} ->
..470                         throw(attachment_write_failed)
..471                 end
..472             end};
..473         ResponseCode >= 300, ResponseCode < 400 ->
..474             % follow the redirect
..475             Pid ! {self(), stop_ok},
..476             RedirectUrl = mochiweb_headers:get_value("Location",
..477                 mochiweb_headers:make(StreamHeaders)),
..478             catch ibrowse:stop_worker_process(Conn),
..479             make_att_stub_receiver(RedirectUrl, Headers, Name, Type,
..480                 Length, Retries - 1, Pause);
..481         ResponseCode >= 400, ResponseCode < 500 ->
..482             % an error... log and fail
..483             ?LOG_ERROR("streaming attachment failed with code ~p: ~s",
..484                 [ResponseCode, Url]),
..485             Pid ! {self(), fail},
..486             exit(attachment_request_failed);
..487         ResponseCode == 500 ->
..488             % an error... log and retry
..489             ?LOG_INFO("retrying couch_rep attachment request in ~p " ++
..490                 "seconds due to 500 response: ~s", [Pause/1000, Url]),
..491             Pid ! {self(), fail},
..492             catch ibrowse:stop_worker_process(Conn),
..493             timer:sleep(Pause),
..494             make_att_stub_receiver(Url, Headers, Name, Type, Length,
..495                 Retries - 1, 2*Pause)
..496         end
..497     end.
..498 
..499 
..500 open_db({remote, Url, Headers, Auth})->
..501     {ok, #old_http_db{uri=?b2l(Url), headers=Headers, oauth=Auth}, Url};
..502 open_db({local, DbName, UserCtx})->
..503     case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
..504     {ok, Db} -> {ok, Db, DbName};
..505     Error -> Error
..506     end.
..507 
..508 
..509 close_db(#old_http_db{})->
..510     ok;
..511 close_db(Db)->
..512     couch_db:close(Db).
..513 
..514 do_checkpoint(Source, Target, Context, NewSeqNum, Stats) ->
..515     ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
..516     [
..517         {start_seq, StartSeqNum},
..518         {history, OldHistory},
..519         {rep_starttime, ReplicationStartTime},
..520         {src_starttime, SrcInstanceStartTime},
..521         {tgt_starttime, TgtInstanceStartTime},
..522         {src_record, #doc{body={LastRepRecord}}=RepRecDocSrc},
..523         {tgt_record, RepRecDocTgt}
..524     ] = Context,
..525 
..526     case NewSeqNum == StartSeqNum andalso OldHistory /= [] of
..527     true ->
..528         % nothing changed, don't record results
..529         {ok, {[{<<"no_changes">>, true} | LastRepRecord]}, Context};
..530     false ->
..531         % something changed, record results for incremental replication,
..532 
..533         % commit changes to both src and tgt. The src because if changes
..534         % we replicated are lost, we'll record the a seq number ahead
..535         % of what was committed. If those changes are lost and the seq number
..536         % reverts to a previous committed value, we will skip future changes
..537         % when new doc updates are given our already replicated seq nums.
..538 
..539         % commit the src async
..540         ParentPid = self(),
..541         SrcCommitPid = spawn_link(fun() ->
..542                 ParentPid ! {self(), ensure_full_commit(Source)} end),
..543 
..544         % commit tgt sync
..545         {ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
..546 
..547         SrcInstanceStartTime2 =
..548         receive
..549         {SrcCommitPid, {ok, Timestamp}} ->
..550             Timestamp;
..551         {'EXIT', SrcCommitPid, {http_request_failed, _}} ->
..552             exit(replication_link_failure)
..553         end,
..554 
..555         RecordSeqNum =
..556         if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
..557                 TgtInstanceStartTime2 == TgtInstanceStartTime ->
..558             NewSeqNum;
..559         true ->
..560             ?LOG_INFO("A server has restarted sinced replication start. "
..561                 "Not recording the new sequence number to ensure the "
..562                 "replication is redone and documents reexamined.", []),
..563             StartSeqNum
..564         end,
..565 
..566         NewHistoryEntry = {
..567             [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
..568             {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
..569             {<<"start_last_seq">>, StartSeqNum},
..570             {<<"end_last_seq">>, NewSeqNum},
..571             {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
..572             {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
..573             {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
..574             {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
..575             {<<"doc_write_failures">>, ets:lookup_element(Stats, doc_write_failures, 2)}
..576             ]},
..577         % limit history to 50 entries
..578         HistEntries =lists:sublist([NewHistoryEntry |  OldHistory], 50),
..579 
..580         NewRepHistory =
..581                 {[{<<"session_id">>, couch_util:new_uuid()},
..582                   {<<"source_last_seq">>, RecordSeqNum},
..583                   {<<"history">>, HistEntries}]},
..584 
..585         {ok, {SrcRevPos,SrcRevId}} = update_doc(Source,
..586                 RepRecDocSrc#doc{body=NewRepHistory}, []),
..587         {ok, {TgtRevPos,TgtRevId}} = update_doc(Target,
..588                 RepRecDocTgt#doc{body=NewRepHistory}, []),
..589 
..590         NewContext = [
..591             {start_seq, StartSeqNum},
..592             {history, OldHistory},
..593             {rep_starttime, ReplicationStartTime},
..594             {src_starttime, SrcInstanceStartTime},
..595             {tgt_starttime, TgtInstanceStartTime},
..596             {src_record, RepRecDocSrc#doc{revs={SrcRevPos,[SrcRevId]}}},
..597             {tgt_record, RepRecDocTgt#doc{revs={TgtRevPos,[TgtRevId]}}}
..598         ],
..599 
..600         {ok, NewRepHistory, NewContext}
..601 
..602     end.
..603 
..604 do_http_request(Url, Action, Headers, Auth) ->
..605     do_http_request(Url, Action, Headers, Auth, []).
..606 
..607 do_http_request(Url, Action, Headers, Auth, JsonBody) ->
..608     Headers0 = case Auth of
..609         {Props} ->
..610             % Add OAuth header
..611             {OAuth} = proplists:get_value(<<"oauth">>, Props),
..612             ConsumerKey = ?b2l(proplists:get_value(<<"consumer_key">>, OAuth)),
..613             Token = ?b2l(proplists:get_value(<<"token">>, OAuth)),
..614             TokenSecret = ?b2l(proplists:get_value(<<"token_secret">>, OAuth)),
..615             ConsumerSecret = ?b2l(proplists:get_value(<<"consumer_secret">>, OAuth)),
..616             Consumer = {ConsumerKey, ConsumerSecret, hmac_sha1},
..617             Method = case Action of
..618                 get -> "GET";
..619                 post -> "POST";
..620                 put -> "PUT"
..621             end,
..622             Params = oauth:signed_params(Method, Url, [], Consumer, Token, TokenSecret),
..623             [{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)} | Headers];
..624         _Else ->
..625             Headers
..626     end,
..627     do_http_request0(Url, Action, Headers0, JsonBody, 10, 1000).
..628 
..629 do_http_request0(Url, Action, Headers, Body, Retries, Pause) when is_binary(Url) ->
..630     do_http_request0(?b2l(Url), Action, Headers, Body, Retries, Pause);
..631 do_http_request0(Url, Action, _Headers, _JsonBody, 0, _Pause) ->
..632     ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s",
..633         [Action, Url]),
..634     exit({http_request_failed, ?l2b(["failed to replicate ", Url])});
..635 do_http_request0(Url, Action, Headers, JsonBody, Retries, Pause) ->
..636     ?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]),
..637     Body =
..638     case JsonBody of
..639     [] ->
..640         <<>>;
..641     _ ->
..642         iolist_to_binary(?JSON_ENCODE(JsonBody))
..643     end,
..644     Options = case Action of
..645         get -> [];
..646         _ -> [{transfer_encoding, {chunked, 65535}}]
..647     end ++ [
..648         {content_type, "application/json; charset=utf-8"},
..649         {max_pipeline_size, 101},
..650         {response_format, binary}
..651     ],
..652     case ibrowse:send_req(Url, Headers, Action, Body, Options, infinity) of
..653     {ok, Status, ResponseHeaders, ResponseBody} ->
..654         ResponseCode = list_to_integer(Status),
..655         if
..656         ResponseCode >= 200, ResponseCode < 300 ->
..657             ?JSON_DECODE(ResponseBody);
..658         ResponseCode >= 300, ResponseCode < 400 ->
..659             RedirectUrl = mochiweb_headers:get_value("Location",
..660                 mochiweb_headers:make(ResponseHeaders)),
..661             do_http_request0(RedirectUrl, Action, Headers, JsonBody, Retries-1,
..662                 Pause);
..663         ResponseCode >= 400, ResponseCode < 500 ->
..664             ?JSON_DECODE(ResponseBody);
..665         ResponseCode == 500 ->
..666             ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds " ++
..667                 "due to 500 error: ~s", [Action, Pause/1000, Url]),
..668             timer:sleep(Pause),
..669             do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause)
..670         end;
..671     {error, Reason} ->
..672         ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds due to " ++
..673             "{error, ~p}: ~s", [Action, Pause/1000, Reason, Url]),
..674         timer:sleep(Pause),
..675         do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause)
..676     end.
..677 
..678 ensure_full_commit(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
..679     {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post,
..680         Headers, OAuth, true),
..681     true = proplists:get_value(<<"ok">>, ResultProps),
..682     {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)};
..683 ensure_full_commit(Db) ->
..684     couch_db:ensure_full_commit(Db).
..685 
..686 enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) ->
..687     case get_doc_info_list(DbSource, StartSeq) of
..688     [] ->
..689         gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity);
..690     DocInfoList ->
..691         SrcRevsList = lists:map(fun(#doc_info{id=Id,revs=RevInfos}) ->
..692             SrcRevs = [Rev || #rev_info{rev=Rev} <- RevInfos],
..693             {Id, SrcRevs}
..694         end, DocInfoList),
..695         {ok, MissingRevs} = get_missing_revs(DbTarget, SrcRevsList),
..696 
..697         %% do we need to check for success here?
..698         [gen_server:call(Pid, {replicate_doc, Info}, infinity)
..699             || Info <- MissingRevs ],
..700 
..701         #doc_info{high_seq=LastSeq} = lists:last(DocInfoList),
..702         RevsCount2 = RevsCount + length(SrcRevsList),
..703         gen_server:cast(Pid, {increment_update_seq, LastSeq}),
..704 
..705         enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2})
..706     end.
..707 
..708 
..709 
..710 get_db_info(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) ->
..711     {DbProps} = do_http_request(DbUrl, get, Headers, OAuth),
..712     {ok, [{list_to_atom(?b2l(K)), V} || {K,V} <- DbProps]};
..713 get_db_info(Db) ->
..714     couch_db:get_db_info(Db).
..715 
..716 get_doc_info_list(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, StartSeq) ->
..717     Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey="
..718         ++ integer_to_list(StartSeq),
..719     {Results} = do_http_request(Url, get, Headers, OAuth),
..720     lists:map(fun({RowInfoList}) ->
..721         {RowValueProps} = proplists:get_value(<<"value">>, RowInfoList),
..722         Seq = proplists:get_value(<<"key">>, RowInfoList),
..723         Revs =
..724             [#rev_info{rev=couch_doc:parse_rev(proplists:get_value(<<"rev">>, RowValueProps)), deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)} |
..725                 [#rev_info{rev=Rev,deleted=false} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"conflicts">>, RowValueProps, []))] ++
..726                 [#rev_info{rev=Rev,deleted=true} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []))]],
..727         #doc_info{
..728             id=proplists:get_value(<<"id">>, RowInfoList),
..729             high_seq = Seq,
..730             revs = Revs
..731         }
..732     end, proplists:get_value(<<"rows">>, Results));
..733 get_doc_info_list(DbSource, StartSeq) ->
..734     {ok, {_Count, DocInfoList}} = couch_db:enum_docs_since(DbSource, StartSeq,
..735     fun (_, _, {100, DocInfoList}) ->
..736             {stop, {100, DocInfoList}};
..737         (DocInfo, _, {Count, DocInfoList}) ->
..738             {ok, {Count+1, [DocInfo|DocInfoList]}}
..739     end, {0, []}),
..740     lists:reverse(DocInfoList).
..741 
..742 get_missing_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocIdRevsList) ->
..743     DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList],
..744     {ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers, OAuth,
..745             {DocIdRevsList2}),
..746     {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
..747     DocMissingRevsList2 = [{Id, couch_doc:parse_revs(MissingRevStrs)} || {Id, MissingRevStrs} <- DocMissingRevsList],
..748     {ok, DocMissingRevsList2};
..749 get_missing_revs(Db, DocId) ->
..750     couch_db:get_missing_revs(Db, DocId).
..751 
..752 
..753 open_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocId, Options) ->
..754     [] = Options,
..755     case do_http_request(DbUrl ++ couch_util:url_encode(DocId), get, Headers, OAuth) of
..756     {[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} ->
..757         {couch_util:to_existing_atom(ErrId), Reason};
..758     Doc  ->
..759         {ok, couch_doc:from_json_obj(Doc)}
..760     end;
..761 open_doc(Db, DocId, Options) ->
..762     couch_db:open_doc(Db, DocId, Options).
..763 
..764 open_doc_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth} = DbS, DocId, Revs0,
..765         [latest]) ->
..766     Revs = couch_doc:rev_to_strs(Revs0),
..767     BaseUrl = DbUrl ++ couch_util:url_encode(DocId) ++ "?revs=true&latest=true",
..768 
..769     %% MochiWeb expects URLs < 8KB long, so maybe split into multiple requests
..770     MaxN = trunc((8192 - length(BaseUrl))/14),
..771 
..772     JsonResults = case length(Revs) > MaxN of
..773     false ->
..774         Url = ?l2b(BaseUrl ++ "&open_revs=" ++ ?JSON_ENCODE(Revs)),
..775         do_http_request(Url, get, Headers, OAuth);
..776     true ->
..777         {_, Rest, Acc} = lists:foldl(
..778         fun(Rev, {Count, RevsAcc, AccResults}) when Count =:= MaxN ->
..779             QSRevs = ?JSON_ENCODE(lists:reverse(RevsAcc)),
..780             Url = ?l2b(BaseUrl ++ "&open_revs=" ++ QSRevs),
..781             {1, [Rev], AccResults++do_http_request(Url, get, Headers, OAuth)};
..782         (Rev, {Count, RevsAcc, AccResults}) ->
..783             {Count+1, [Rev|RevsAcc], AccResults}
..784         end, {0, [], []}, Revs),
..785         Acc ++ do_http_request(?l2b(BaseUrl ++ "&open_revs=" ++
..786             ?JSON_ENCODE(lists:reverse(Rest))), get, Headers, OAuth)
..787     end,
..788 
..789     Results =
..790     lists:map(
..791         fun({[{<<"missing">>, Rev}]}) ->
..792             {{not_found, missing}, couch_doc:parse_rev(Rev)};
..793         ({[{<<"ok">>, JsonDoc}]}) ->
..794         #doc{id=Id, revs=Rev, atts=Atts} = Doc =
..795             couch_doc:from_json_obj(JsonDoc),
..796         {ok, Doc#doc{atts=[att_stub_converter(DbS,Id,Rev,A) || A <- Atts]}}
..797         end, JsonResults),
..798     {ok, Results};
..799 open_doc_revs(Db, DocId, Revs, Options) ->
..800     couch_db:open_doc_revs(Db, DocId, Revs, Options).
..801 
..802 %% @spec should_flush() -> true | false
..803 %% @doc Calculates whether it's time to flush the document buffer. Considers
..804 %%        - memory utilization
..805 %%        - number of pending document writes
..806 %%        - approximate number of pending attachment writes
..807 should_flush(DocCount) when DocCount > ?BUFFER_NDOCS ->
..808     true;
..809 should_flush(_DocCount) ->
..810     MeAndMyLinks = [self()|
..811         [P || P <- element(2,process_info(self(),links)), is_pid(P)]],
..812 
..813     case length(MeAndMyLinks)/2 > ?BUFFER_NATTACHMENTS of
..814     true -> true;
..815     false ->
..816         case memory_footprint(MeAndMyLinks) > 2*?BUFFER_MEMORY of
..817         true ->
..818             [garbage_collect(Pid) || Pid <- MeAndMyLinks],
..819             memory_footprint(MeAndMyLinks) > ?BUFFER_MEMORY;
..820         false -> false
..821         end
..822     end.
..823 
..824 %% @spec memory_footprint([pid()]) -> integer()
..825 %% @doc Sum of process and binary memory utilization for all processes in list
..826 memory_footprint(PidList) ->
..827     memory_footprint(PidList, {0,0}).
..828 
..829 memory_footprint([], {ProcessMemory, BinaryMemory}) ->
..830     ?LOG_DEBUG("ProcessMem ~p BinaryMem ~p", [ProcessMemory, BinaryMemory]),
..831     ProcessMemory + BinaryMemory;
..832 memory_footprint([Pid|Rest], {ProcAcc, BinAcc}) ->
..833     case is_process_alive(Pid) of
..834     true ->
..835         ProcMem = element(2,process_info(Pid, memory)),
..836         BinMem = binary_memory(Pid),
..837         memory_footprint(Rest, {ProcMem + ProcAcc, BinMem + BinAcc});
..838     false ->
..839         memory_footprint(Rest, {ProcAcc, BinAcc})
..840     end.
..841 
..842 %% @spec binary_memory(pid()) -> integer()
..843 %% @doc Memory utilization of all binaries referenced by this process.
..844 binary_memory(Pid) ->
..845     lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
..846         0, element(2,process_info(Pid, binary))).
..847 
..848 update_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, #doc{id=DocId}=Doc, Options) ->
..849     [] = Options,
..850     Url = DbUrl ++ couch_util:url_encode(DocId),
..851     {ResponseMembers} = do_http_request(Url, put, Headers, OAuth,
..852             couch_doc:to_json_obj(Doc, [attachments])),
..853     Rev = proplists:get_value(<<"rev">>, ResponseMembers),
..854     {ok, couch_doc:parse_rev(Rev)};
..855 update_doc(Db, Doc, Options) ->
..856     couch_db:update_doc(Db, Doc, Options).
..857 
..858 update_docs(_, [], _, _) ->
..859     {ok, []};
..860 update_docs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, Docs, [], replicated_changes) ->
..861     JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
..862     ErrorsJson =
..863         do_http_request(DbUrl ++ "_bulk_docs", post, Headers, OAuth,
..864                 {[{new_edits, false}, {docs, JsonDocs}]}),
..865     ErrorsList =
..866     lists:map(
..867         fun({Props}) ->
..868             Id = proplists:get_value(<<"id">>, Props),
..869             Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)),
..870             ErrId = couch_util:to_existing_atom(
..871                     proplists:get_value(<<"error">>, Props)),
..872             Reason = proplists:get_value(<<"reason">>, Props),
..873             Error = {ErrId, Reason},
..874             {{Id, Rev}, Error}
..875         end, ErrorsJson),
..876     {ok, ErrorsList};
..877 update_docs(Db, Docs, Options, UpdateType) ->
..878     couch_db:update_docs(Db, Docs, Options, UpdateType).
..879 
..880 up_to_date(#old_http_db{}, _Seq) ->
..881     true;
..882 up_to_date(Source, Seq) ->
..883     {ok, NewDb} = couch_db:open(Source#db.name, []),
..884     T = NewDb#db.update_seq == Seq,
..885     couch_db:close(NewDb),
..886     T.
..887 

Generated using etap 0.3.4.