C0 code coverage information

Generated on 2009-08-09 21:22:54 with etap 0.3.4.

Name Total lines Lines of code Total coverage Code coverage
couch_rep_missing_revs ?? ?? ??
58% 
....1 % Licensed under the Apache License, Version 2.0 (the "License"); you may not
....2 % use this file except in compliance with the License. You may obtain a copy of
....3 % the License at
....4 %
....5 %   http://www.apache.org/licenses/LICENSE-2.0
....6 %
....7 % Unless required by applicable law or agreed to in writing, software
....8 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
....9 % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
...10 % License for the specific language governing permissions and limitations under
...11 % the License.
...12 
...13 -module(couch_rep_missing_revs).
...14 -behaviour(gen_server).
...15 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 
...16     code_change/3]).
...17 
...18 -export([start_link/4, next/1, stop/1]).
...19 
...20 -define(BUFFER_SIZE, 1000).
...21 
...22 -include("couch_db.hrl").
...23 
...24 -record (state, {
...25     changes_loop,
...26     changes_from = nil,
...27     target,
...28     parent,
...29     complete = false,
...30     count = 0,
...31     reply_to = nil,
...32     rows = queue:new(),
...33     high_source_seq = 0,
...34     high_missing_seq = 0,
...35     high_committed_seq = 0
...36 }).
...37 
...38 start_link(Parent, Target, ChangesFeed, PostProps) ->
...39     gen_server:start_link(?MODULE, [Parent, Target, ChangesFeed, PostProps], []).
...40 
...41 next(Server) ->
...42     gen_server:call(Server, next_missing_revs, infinity).
...43 
...44 stop(Server) ->
...45     gen_server:call(Server, stop).
...46 
...47 init([Parent, Target, ChangesFeed, _PostProps]) ->
...48     process_flag(trap_exit, true),
...49     Self = self(),
...50     Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Target) end),
...51     {ok, #state{changes_loop=Pid, target=Target, parent=Parent}}.
...52 
...53 handle_call({add_missing_revs, {HighSeq, Revs}}, From, State) ->
...54     handle_add_missing_revs(HighSeq, Revs, From, State);
...55 
...56 handle_call(next_missing_revs, From, State) ->
...57     handle_next_missing_revs(From, State);
...58 
...59 handle_call({update_committed_seq, N}, _From, State) ->
...60     if State#state.high_committed_seq < N ->
...61         ?LOG_DEBUG("missing_revs updating committed seq to ~p", [N]);
...62     true -> ok end,
...63     {reply, ok, State#state{high_committed_seq=N}}.
...64 
...65 handle_cast(_Msg, State) ->
...66     {noreply, State}.
...67 
...68 handle_info({'EXIT', Pid, Reason}, #state{changes_loop=Pid} = State) ->
...69     handle_changes_loop_exit(Reason, State);
...70 
...71 handle_info(Msg, State) ->
...72     ?LOG_INFO("unexpected message ~p", [Msg]),
...73     {noreply, State}.
...74 
...75 terminate(_Reason, #state{changes_loop=Pid}) when is_pid(Pid) ->
...76     exit(Pid, shutdown),
...77     ok;
...78 terminate(_Reason, _State) ->
...79     ok.
...80 
...81 code_change(_OldVsn, State, _Extra) ->
...82     {ok, State}.
...83 
...84 %internal funs
...85 
...86 handle_add_missing_revs(HighSeq, [], _From, State) ->
...87     maybe_checkpoint(State),
...88     {reply, ok, State#state{high_source_seq=HighSeq}};
...89 handle_add_missing_revs(HighSeq, Revs, From, #state{reply_to=nil} = State) ->
...90     #state{rows=Rows, count=Count} = State,
...91     NewState = State#state{
...92         rows = queue:join(Rows, queue:from_list(Revs)),
...93         count = Count + length(Revs),
...94         high_source_seq = HighSeq,
...95         high_missing_seq = HighSeq
...96     },
...97     if NewState#state.count < ?BUFFER_SIZE ->
...98         {reply, ok, NewState};
...99     true ->
..100         {noreply, NewState#state{changes_from=From}}
..101     end;
..102 handle_add_missing_revs(HighSeq, Revs, _From, #state{count=0} = State) ->
..103     gen_server:reply(State#state.reply_to, {HighSeq, Revs}),
..104     NewState = State#state{
..105         high_source_seq = HighSeq,
..106         high_missing_seq = HighSeq,
..107         reply_to = nil
..108     },
..109     {reply, ok, NewState}.
..110 
..111 handle_next_missing_revs(From, #state{count=0} = State) ->
..112     if State#state.complete ->
..113         {stop, normal, complete, State};
..114     true ->
..115         {noreply, State#state{reply_to=From}}
..116     end;
..117 handle_next_missing_revs(_From, State) ->
..118     #state{
..119         changes_from = ChangesFrom,
..120         high_missing_seq = HighSeq,
..121         rows = Rows
..122     } = State,
..123     if ChangesFrom =/= nil -> gen_server:reply(ChangesFrom, ok); true -> ok end,
..124     NewState = State#state{count=0, changes_from=nil, rows=queue:new()},
..125     {reply, {HighSeq, queue:to_list(Rows)}, NewState}.
..126 
..127 handle_changes_loop_exit(normal, State) ->
..128     if State#state.reply_to =/= nil ->
..129         gen_server:reply(State#state.reply_to, complete),
..130         {stop, normal, State};
..131     true ->
..132         {noreply, State#state{complete=true, changes_loop=nil}}
..133     end;
..134 handle_changes_loop_exit(Reason, State) ->
..135     ?LOG_ERROR("changes_loop died with reason ~p", [Reason]),
..136     {stop, changes_loop_died, State#state{changes_loop=nil}}.
..137 
..138 changes_loop(OurServer, SourceChangesServer, Target) ->
..139     case couch_rep_changes_feed:next(SourceChangesServer) of
..140     complete ->
..141         exit(normal);
..142     Changes ->
..143         MissingRevs = get_missing_revs(Target, Changes),
..144         gen_server:call(OurServer, {add_missing_revs, MissingRevs}, infinity)
..145     end,
..146     changes_loop(OurServer, SourceChangesServer, Target).
..147 
..148 get_missing_revs(#http_db{}=Target, Changes) ->
..149     Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]}) ->
..150         {Id, [couch_doc:rev_to_str(R) || {[{<<"rev">>, R}]} <- C]} end,
..151     IdRevsList = [Transform(Change) || Change <- Changes],
..152     {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes),
..153     Request = Target#http_db{
..154         resource = "_missing_revs",
..155         method = post,
..156         body = {IdRevsList}
..157     },
..158     {Resp} = couch_rep_httpc:request(Request),
..159     {MissingRevs} = proplists:get_value(<<"missing_revs">>, Resp),
..160     X = [{Id, couch_doc:parse_revs(RevStrs)} || {Id,RevStrs} <- MissingRevs],
..161     {HighSeq, X};
..162         
..163 get_missing_revs(Target, Changes) ->
..164     Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]}) ->
..165         {Id, [R || {[{<<"rev">>, R}]} <- C]} end,
..166     IdRevsList = [Transform(Change) || Change <- Changes],
..167     {[{<<"seq">>, HighSeq}, _, _]} = lists:last(Changes),
..168     {ok, Results} = couch_db:get_missing_revs(Target, IdRevsList),
..169     {HighSeq, Results}.
..170 
..171 %% save a checkpoint if no revs are missing on target so we don't
..172 %% rescan metadata unnecessarily
..173 maybe_checkpoint(#state{high_missing_seq=N, high_committed_seq=N} = State) ->
..174     #state{
..175         parent = Parent,
..176         high_source_seq = SourceSeq
..177     } = State,
..178     Parent ! {missing_revs_checkpoint, SourceSeq};
..179 maybe_checkpoint(_State) ->
..180     ok.

Generated using etap 0.3.4.