Generated on 2009-08-09 21:22:54 with etap 0.3.4.
| Name | Total lines | Lines of code | Total coverage | Code coverage | ||||
| couch_rep_missing_revs | ?? | ?? | ?? |
|
....1 % Licensed under the Apache License, Version 2.0 (the "License"); you may not ....2 % use this file except in compliance with the License. You may obtain a copy of ....3 % the License at ....4 % ....5 % http://www.apache.org/licenses/LICENSE-2.0 ....6 % ....7 % Unless required by applicable law or agreed to in writing, software ....8 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ....9 % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ...10 % License for the specific language governing permissions and limitations under ...11 % the License. ...12 ...13 -module(couch_rep_missing_revs). ...14 -behaviour(gen_server). ...15 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, ...16 code_change/3]). ...17 ...18 -export([start_link/4, next/1, stop/1]). ...19 ...20 -define(BUFFER_SIZE, 1000). ...21 ...22 -include("couch_db.hrl"). ...23 ...24 -record (state, { ...25 changes_loop, ...26 changes_from = nil, ...27 target, ...28 parent, ...29 complete = false, ...30 count = 0, ...31 reply_to = nil, ...32 rows = queue:new(), ...33 high_source_seq = 0, ...34 high_missing_seq = 0, ...35 high_committed_seq = 0 ...36 }). ...37 ...38 start_link(Parent, Target, ChangesFeed, PostProps) -> ...39 gen_server:start_link(?MODULE, [Parent, Target, ChangesFeed, PostProps], []). ...40 ...41 next(Server) -> ...42 gen_server:call(Server, next_missing_revs, infinity). ...43 ...44 stop(Server) -> ...45 gen_server:call(Server, stop). ...46 ...47 init([Parent, Target, ChangesFeed, _PostProps]) -> ...48 process_flag(trap_exit, true), ...49 Self = self(), ...50 Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Target) end), ...51 {ok, #state{changes_loop=Pid, target=Target, parent=Parent}}. ...52 ...53 handle_call({add_missing_revs, {HighSeq, Revs}}, From, State) -> ...54 handle_add_missing_revs(HighSeq, Revs, From, State); ...55 ...56 handle_call(next_missing_revs, From, State) -> ...57 handle_next_missing_revs(From, State); ...58 ...59 handle_call({update_committed_seq, N}, _From, State) -> ...60 if State#state.high_committed_seq < N -> ...61 ?LOG_DEBUG("missing_revs updating committed seq to ~p", [N]); ...62 true -> ok end, ...63 {reply, ok, State#state{high_committed_seq=N}}. ...64 ...65 handle_cast(_Msg, State) -> ...66 {noreply, State}. ...67 ...68 handle_info({'EXIT', Pid, Reason}, #state{changes_loop=Pid} = State) -> ...69 handle_changes_loop_exit(Reason, State); ...70 ...71 handle_info(Msg, State) -> ...72 ?LOG_INFO("unexpected message ~p", [Msg]), ...73 {noreply, State}. ...74 ...75 terminate(_Reason, #state{changes_loop=Pid}) when is_pid(Pid) -> ...76 exit(Pid, shutdown), ...77 ok; ...78 terminate(_Reason, _State) -> ...79 ok. ...80 ...81 code_change(_OldVsn, State, _Extra) -> ...82 {ok, State}. ...83 ...84 %internal funs ...85 ...86 handle_add_missing_revs(HighSeq, [], _From, State) -> ...87 maybe_checkpoint(State), ...88 {reply, ok, State#state{high_source_seq=HighSeq}}; ...89 handle_add_missing_revs(HighSeq, Revs, From, #state{reply_to=nil} = State) -> ...90 #state{rows=Rows, count=Count} = State, ...91 NewState = State#state{ ...92 rows = queue:join(Rows, queue:from_list(Revs)), ...93 count = Count + length(Revs), ...94 high_source_seq = HighSeq, ...95 high_missing_seq = HighSeq ...96 }, ...97 if NewState#state.count < ?BUFFER_SIZE -> ...98 {reply, ok, NewState}; ...99 true -> ..100 {noreply, NewState#state{changes_from=From}} ..101 end; ..102 handle_add_missing_revs(HighSeq, Revs, _From, #state{count=0} = State) -> ..103 gen_server:reply(State#state.reply_to, {HighSeq, Revs}), ..104 NewState = State#state{ ..105 high_source_seq = HighSeq, ..106 high_missing_seq = HighSeq, ..107 reply_to = nil ..108 }, ..109 {reply, ok, NewState}. ..110 ..111 handle_next_missing_revs(From, #state{count=0} = State) -> ..112 if State#state.complete -> ..113 {stop, normal, complete, State}; ..114 true -> ..115 {noreply, State#state{reply_to=From}} ..116 end; ..117 handle_next_missing_revs(_From, State) -> ..118 #state{ ..119 changes_from = ChangesFrom, ..120 high_missing_seq = HighSeq, ..121 rows = Rows ..122 } = State, ..123 if ChangesFrom =/= nil -> gen_server:reply(ChangesFrom, ok); true -> ok end, ..124 NewState = State#state{count=0, changes_from=nil, rows=queue:new()}, ..125 {reply, {HighSeq, queue:to_list(Rows)}, NewState}. ..126 ..127 handle_changes_loop_exit(normal, State) -> ..128 if State#state.reply_to =/= nil -> ..129 gen_server:reply(State#state.reply_to, complete), ..130 {stop, normal, State}; ..131 true -> ..132 {noreply, State#state{complete=true, changes_loop=nil}} ..133 end; ..134 handle_changes_loop_exit(Reason, State) -> ..135 ?LOG_ERROR("changes_loop died with reason ~p", [Reason]), ..136 {stop, changes_loop_died, State#state{changes_loop=nil}}. ..137 ..138 changes_loop(OurServer, SourceChangesServer, Target) -> ..139 case couch_rep_changes_feed:next(SourceChangesServer) of ..140 complete -> ..141 exit(normal); ..142 Changes -> ..143 MissingRevs = get_missing_revs(Target, Changes), ..144 gen_server:call(OurServer, {add_missing_revs, MissingRevs}, infinity) ..145 end, ..146 changes_loop(OurServer, SourceChangesServer, Target). ..147 ..148 get_missing_revs(#http_db{}=Target, Changes) -> ..149 Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]}) -> ..150 {Id, [couch_doc:rev_to_str(R) || {[{<<"rev">>, R}]} <- C]} end, ..151 IdRevsList = [Transform(Change) || Change <- Changes], ..152 {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes), ..153 Request = Target#http_db{ ..154 resource = "_missing_revs", ..155 method = post, ..156 body = {IdRevsList} ..157 }, ..158 {Resp} = couch_rep_httpc:request(Request), ..159 {MissingRevs} = proplists:get_value(<<"missing_revs">>, Resp), ..160 X = [{Id, couch_doc:parse_revs(RevStrs)} || {Id,RevStrs} <- MissingRevs], ..161 {HighSeq, X}; ..162 ..163 get_missing_revs(Target, Changes) -> ..164 Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]}) -> ..165 {Id, [R || {[{<<"rev">>, R}]} <- C]} end, ..166 IdRevsList = [Transform(Change) || Change <- Changes], ..167 {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes), ..168 {ok, Results} = couch_db:get_missing_revs(Target, IdRevsList), ..169 {HighSeq, Results}. ..170 ..171 %% save a checkpoint if no revs are missing on target so we don't ..172 %% rescan metadata unnecessarily ..173 maybe_checkpoint(#state{high_missing_seq=N, high_committed_seq=N} = State) -> ..174 #state{ ..175 parent = Parent, ..176 high_source_seq = SourceSeq ..177 } = State, ..178 Parent ! {missing_revs_checkpoint, SourceSeq}; ..179 maybe_checkpoint(_State) -> ..180 ok.
Generated using etap 0.3.4.