C0 code coverage information

Generated on 2009-08-09 21:22:54 with etap 0.3.4.

Name Total lines Lines of code Total coverage Code coverage
couch_view_group ?? ?? ??
0% 
....1 % Licensed under the Apache License, Version 2.0 (the "License"); you may not
....2 % use this file except in compliance with the License. You may obtain a copy of
....3 % the License at
....4 %
....5 %   http://www.apache.org/licenses/LICENSE-2.0
....6 %
....7 % Unless required by applicable law or agreed to in writing, software
....8 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
....9 % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
...10 % License for the specific language governing permissions and limitations under
...11 % the License.
...12 
...13 -module(couch_view_group).
...14 -behaviour(gen_server).
...15 
...16 %% API
...17 -export([start_link/1, request_group/2, request_group_info/1]).
...18 -export([open_db_group/2, open_temp_group/5, design_doc_to_view_group/1,design_root/2]).
...19 
...20 %% gen_server callbacks
...21 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
...22          terminate/2, code_change/3]).
...23 
...24 -include("couch_db.hrl").
...25 
...26 -record(group_state, {
...27     type,
...28     db_name,
...29     init_args,
...30     group,
...31     updater_pid=nil,
...32     compactor_pid=nil,
...33     waiting_commit=false,
...34     waiting_list=[],
...35     ref_counter=nil
...36 }).
...37 
...38 % api methods
...39 request_group(Pid, Seq) ->
...40     ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]),
...41     case gen_server:call(Pid, {request_group, Seq}, infinity) of
...42     {ok, Group, RefCounter} ->
...43         couch_ref_counter:add(RefCounter),
...44         {ok, Group};
...45     Error ->
...46         ?LOG_DEBUG("request_group Error ~p", [Error]),
...47         throw(Error)
...48     end.
...49 
...50 request_group_info(Pid) ->
...51     case gen_server:call(Pid, request_group_info) of
...52     {ok, GroupInfoList} ->
...53         {ok, GroupInfoList};
...54     Error ->
...55         throw(Error)
...56     end.
...57 
...58 % from template
...59 start_link(InitArgs) ->
...60     case gen_server:start_link(couch_view_group,
...61             {InitArgs, self(), Ref = make_ref()}, []) of
...62     {ok, Pid} ->
...63         {ok, Pid};
...64     ignore ->
...65         receive
...66         {Ref, Pid, Error} ->
...67             case process_info(self(), trap_exit) of
...68             {trap_exit, true} -> receive {'EXIT', Pid, _} -> ok end;
...69             {trap_exit, false} -> ok
...70             end,
...71             Error
...72         end;
...73     Error ->
...74         Error
...75     end.
...76 
...77 % init creates a closure which spawns the appropriate view_updater.
...78 init({InitArgs, ReturnPid, Ref}) ->
...79     process_flag(trap_exit, true),
...80     case prepare_group(InitArgs, false) of
...81     {ok, #group{db=Db, fd=Fd}=Group} ->
...82         couch_db:monitor(Db),
...83         Owner = self(),
...84         Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end),
...85         {ok, RefCounter} = couch_ref_counter:start([Fd]),
...86         {ok, #group_state{
...87                 db_name=couch_db:name(Db),
...88                 init_args=InitArgs,
...89                 updater_pid = Pid,
...90                 group=Group,
...91                 ref_counter=RefCounter}};
...92     Error ->
...93         ReturnPid ! {Ref, self(), Error},
...94         ignore
...95     end.
...96 
...97 
...98 
...99 
..100 % There are two sources of messages: couch_view, which requests an up to date
..101 % view group, and the couch_view_updater, which when spawned, updates the
..102 % group and sends it back here. We employ a caching mechanism, so that between
..103 % database writes, we don't have to spawn a couch_view_updater with every view
..104 % request.
..105 
..106 % The caching mechanism: each request is submitted with a seq_id for the
..107 % database at the time it was read. We guarantee to return a view from that
..108 % sequence or newer.
..109 
..110 % If the request sequence is higher than our current high_target seq, we set
..111 % that as the highest seqence. If the updater is not running, we launch it.
..112 
..113 handle_call({request_group, RequestSeq}, From,
..114         #group_state{
..115             db_name=DbName,
..116             group=#group{current_seq=Seq}=Group,
..117             updater_pid=nil,
..118             waiting_list=WaitList
..119             }=State) when RequestSeq > Seq ->
..120     {ok, Db} = couch_db:open(DbName, []),
..121     Group2 = Group#group{db=Db},
..122     Owner = self(),
..123     Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end),
..124 
..125     {noreply, State#group_state{
..126         updater_pid=Pid,
..127         group=Group2,
..128         waiting_list=[{From,RequestSeq}|WaitList]
..129         }, infinity};
..130 
..131 
..132 % If the request seqence is less than or equal to the seq_id of a known Group,
..133 % we respond with that Group.
..134 handle_call({request_group, RequestSeq}, _From, #group_state{
..135             group = #group{current_seq=GroupSeq} = Group,
..136             ref_counter = RefCounter
..137         } = State) when RequestSeq =< GroupSeq  ->
..138     {reply, {ok, Group, RefCounter}, State};
..139 
..140 % Otherwise: TargetSeq => RequestSeq > GroupSeq
..141 % We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq
..142 handle_call({request_group, RequestSeq}, From,
..143         #group_state{waiting_list=WaitList}=State) ->
..144     {noreply, State#group_state{
..145         waiting_list=[{From, RequestSeq}|WaitList]
..146         }, infinity};
..147 
..148 handle_call(request_group_info, _From, #group_state{
..149             group = Group,
..150             compactor_pid = CompactorPid
..151         } = State) ->
..152     GroupInfo = get_group_info(Group, CompactorPid),
..153     {reply, {ok, GroupInfo}, State}.
..154 
..155 handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil}
..156         = State) ->
..157     #group_state{
..158         group = #group{name = GroupId, sig = GroupSig} = Group,
..159         init_args = {RootDir, DbName, _}
..160     } = State,
..161     ?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]),
..162     {ok, Db} = couch_db:open(DbName, []),
..163     {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig),
..164     NewGroup = reset_file(Db, Fd, DbName, Group),
..165     Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end),
..166     {noreply, State#group_state{compactor_pid = Pid}};
..167 handle_cast({start_compact, _}, State) ->
..168     %% compact already running, this is a no-op
..169     {noreply, State};
..170 
..171 handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
..172         #group_state{group = #group{current_seq=OldSeq}} = State)
..173         when NewSeq >= OldSeq ->
..174     #group_state{
..175         group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group,
..176         init_args = {RootDir, DbName, _}, 
..177         updater_pid = UpdaterPid,
..178         ref_counter = RefCounter
..179     } = State,
..180 
..181     ?LOG_INFO("View index compaction complete for ~s ~s", [DbName, GroupId]),
..182     FileName = index_file_name(RootDir, DbName, GroupSig),
..183     CompactName = index_file_name(compact, RootDir, DbName, GroupSig),
..184     file:delete(FileName),
..185     ok = file:rename(CompactName, FileName),
..186 
..187     %% if an updater is running, kill it and start a new one
..188     NewUpdaterPid =
..189     if is_pid(UpdaterPid) ->
..190         unlink(UpdaterPid),
..191         exit(UpdaterPid, view_compaction_complete),
..192         Owner = self(),
..193         spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end);
..194     true ->
..195         nil
..196     end,
..197 
..198     %% cleanup old group
..199     unlink(OldFd),
..200     couch_ref_counter:drop(RefCounter),
..201     {ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]),
..202     case Group#group.db of
..203         nil -> ok;
..204         Else -> couch_db:close(Else)
..205     end,
..206 
..207     self() ! delayed_commit,
..208     {noreply, State#group_state{
..209         group=NewGroup,
..210         ref_counter=NewRefCounter,
..211         compactor_pid=nil,
..212         updater_pid=NewUpdaterPid
..213     }};
..214 handle_cast({compact_done, NewGroup}, State) ->
..215     #group_state{
..216         group = #group{name = GroupId, current_seq = CurrentSeq},
..217         init_args={_RootDir, DbName, _}
..218     } = State,
..219     ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++
..220         "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]),
..221     couch_db:close(NewGroup#group.db),
..222     {ok, Db} = couch_db:open(DbName, []),
..223     Pid = spawn_link(fun() ->
..224         {_,Ref} = erlang:spawn_monitor(fun() ->
..225             couch_view_updater:update(nil, NewGroup#group{db = Db})
..226         end),
..227         receive
..228             {'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
..229                 #group{name=GroupId} = NewGroup2,
..230                 Pid2 = couch_view:get_group_server(DbName, GroupId),
..231                 gen_server:cast(Pid2, {compact_done, NewGroup2})
..232         end
..233     end),
..234     {noreply, State#group_state{compactor_pid = Pid}};
..235 
..236 handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid}
..237         = State) ->
..238     #group_state{
..239         db_name = DbName,
..240         waiting_commit = WaitingCommit
..241     } = State,
..242     NewSeq = NewGroup#group.current_seq,
..243     ?LOG_INFO("checkpointing view update at seq ~p for ~s ~s", [NewSeq,
..244         DbName, NewGroup#group.name]),
..245     if not WaitingCommit ->
..246         erlang:send_after(1000, self(), delayed_commit);
..247     true -> ok
..248     end,
..249     {noreply, State#group_state{group=NewGroup, waiting_commit=true}};
..250 handle_cast({partial_update, _, _}, State) ->
..251     %% message from an old (probably pre-compaction) updater; ignore
..252     {noreply, State}.
..253 
..254 handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
..255     {ok, Db} = couch_db:open(DbName, []),
..256     CommittedSeq = couch_db:get_committed_update_seq(Db),
..257     couch_db:close(Db),
..258     if CommittedSeq >= Group#group.current_seq ->
..259         % save the header
..260         Header = {Group#group.sig, get_index_header_data(Group)},
..261         ok = couch_file:write_header(Group#group.fd, Header),
..262         {noreply, State#group_state{waiting_commit=false}};
..263     true ->
..264         % We can't commit the header because the database seq that's fully
..265         % committed to disk is still behind us. If we committed now and the
..266         % database lost those changes our view could be forever out of sync
..267         % with the database. But a crash before we commit these changes, no big
..268         % deal, we only lose incremental changes since last committal.
..269         erlang:send_after(1000, self(), delayed_commit),
..270         {noreply, State#group_state{waiting_commit=true}}
..271     end;
..272 
..273 handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
..274         #group_state{db_name=DbName,
..275             updater_pid=UpPid,
..276             ref_counter=RefCounter,
..277             waiting_list=WaitList,
..278             waiting_commit=WaitingCommit}=State) when UpPid == FromPid ->
..279     ok = couch_db:close(Db),
..280     if not WaitingCommit ->
..281         erlang:send_after(1000, self(), delayed_commit);
..282     true -> ok
..283     end,
..284     case reply_with_group(Group, WaitList, [], RefCounter) of
..285     [] ->
..286         {noreply, State#group_state{waiting_commit=true, waiting_list=[],
..287                 group=Group#group{db=nil}, updater_pid=nil}};
..288     StillWaiting ->
..289         % we still have some waiters, reopen the database and reupdate the index
..290         {ok, Db2} = couch_db:open(DbName, []),
..291         Group2 = Group#group{db=Db2},
..292         Owner = self(),
..293         Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end),
..294         {noreply, State#group_state{waiting_commit=true,
..295                 waiting_list=StillWaiting, group=Group2, updater_pid=Pid}}
..296     end;
..297 handle_info({'EXIT', _, {new_group, _}}, State) ->
..298     %% message from an old (probably pre-compaction) updater; ignore
..299     {noreply, State};
..300 
..301 handle_info({'EXIT', FromPid, reset},
..302         #group_state{
..303             init_args=InitArgs,
..304             updater_pid=UpPid,
..305             group=Group}=State) when UpPid == FromPid ->
..306     ok = couch_db:close(Group#group.db),
..307     case prepare_group(InitArgs, true) of
..308     {ok, ResetGroup} ->
..309         Owner = self(),
..310         Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end),
..311         {noreply, State#group_state{
..312                 updater_pid=Pid,
..313                 group=ResetGroup}};
..314     Error ->
..315         {stop, normal, reply_all(State, Error)}
..316     end;
..317 handle_info({'EXIT', _, reset}, State) ->
..318     %% message from an old (probably pre-compaction) updater; ignore
..319     {noreply, State};
..320     
..321 handle_info({'EXIT', _FromPid, normal}, State) ->
..322     {noreply, State};
..323 
..324 handle_info({'EXIT', FromPid, {{nocatch, Reason}, _Trace}}, State) ->
..325     ?LOG_DEBUG("Uncaught throw() in linked pid: ~p", [{FromPid, Reason}]),
..326     {stop, Reason, State};
..327 
..328 handle_info({'EXIT', FromPid, Reason}, State) ->
..329     ?LOG_DEBUG("Exit from linked pid: ~p", [{FromPid, Reason}]),
..330     {stop, Reason, State};
..331 
..332 handle_info({'DOWN',_,_,_,_}, State) ->
..333     ?LOG_INFO("Shutting down view group server, monitored db is closing.", []),
..334     {stop, normal, reply_all(State, shutdown)}.
..335 
..336 
..337 terminate(Reason, #group_state{updater_pid=Update, compactor_pid=Compact}=S) ->
..338     reply_all(S, Reason),
..339     catch exit(Update, Reason),
..340     catch exit(Compact, Reason),
..341     ok.
..342 
..343 code_change(_OldVsn, State, _Extra) ->
..344     {ok, State}.
..345 
..346 %% Local Functions
..347 
..348 % reply_with_group/3
..349 % for each item in the WaitingList {Pid, Seq}
..350 % if the Seq is =< GroupSeq, reply
..351 reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList],
..352         StillWaiting, RefCounter) when Seq =< GroupSeq ->
..353     gen_server:reply(Pid, {ok, Group, RefCounter}),
..354     reply_with_group(Group, WaitList, StillWaiting, RefCounter);
..355 
..356 % else
..357 % put it in the continuing waiting list
..358 reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting, RefCounter) ->
..359     reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting], RefCounter);
..360 
..361 % return the still waiting list
..362 reply_with_group(_Group, [], StillWaiting, _RefCounter) ->
..363     StillWaiting.
..364 
..365 reply_all(#group_state{waiting_list=WaitList}=State, Reply) ->
..366     [catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList],
..367     State#group_state{waiting_list=[]}.
..368 
..369 prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)->
..370     case couch_db:open(DbName, []) of
..371     {ok, Db} ->
..372         case open_index_file(RootDir, DbName, Sig) of
..373         {ok, Fd} ->
..374             if ForceReset ->
..375                 % this can happen if we missed a purge
..376                 {ok, reset_file(Db, Fd, DbName, Group)};
..377             true ->
..378                 % 09 UPGRADE CODE
..379                 ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>),
..380                 case (catch couch_file:read_header(Fd)) of
..381                 {ok, {Sig, HeaderInfo}} ->
..382                     % sigs match!
..383                     {ok, init_group(Db, Fd, Group, HeaderInfo)};
..384                 _ ->
..385                     % this happens on a new file
..386                     {ok, reset_file(Db, Fd, DbName, Group)}
..387                 end
..388             end;
..389         Error ->
..390             catch delete_index_file(RootDir, DbName, Sig),
..391             Error
..392         end;
..393     Else ->
..394         Else
..395     end.
..396 
..397 get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq,
..398             id_btree=IdBtree,views=Views}) ->
..399     ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views],
..400     #index_header{seq=Seq,
..401             purge_seq=PurgeSeq,
..402             id_btree_state=couch_btree:get_state(IdBtree),
..403             view_states=ViewStates}.
..404 
..405 hex_sig(GroupSig) ->
..406     couch_util:to_hex(?b2l(GroupSig)).
..407 
..408 design_root(RootDir, DbName) ->
..409     RootDir ++ "/." ++ ?b2l(DbName) ++ "_design/".
..410 
..411 index_file_name(RootDir, DbName, GroupSig) ->
..412     design_root(RootDir, DbName) ++ hex_sig(GroupSig) ++".view".
..413 
..414 index_file_name(compact, RootDir, DbName, GroupSig) ->
..415     design_root(RootDir, DbName) ++ hex_sig(GroupSig) ++".compact.view".
..416 
..417 
..418 open_index_file(RootDir, DbName, GroupSig) ->
..419     FileName = index_file_name(RootDir, DbName, GroupSig),
..420     case couch_file:open(FileName) of
..421     {ok, Fd}        -> {ok, Fd};
..422     {error, enoent} -> couch_file:open(FileName, [create]);
..423     Error           -> Error
..424     end.
..425 
..426 open_index_file(compact, RootDir, DbName, GroupSig) ->
..427     FileName = index_file_name(compact, RootDir, DbName, GroupSig),
..428     case couch_file:open(FileName) of
..429     {ok, Fd}        -> {ok, Fd};
..430     {error, enoent} -> couch_file:open(FileName, [create]);
..431     Error           -> Error
..432     end.
..433 
..434 open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc) ->
..435     case couch_db:open(DbName, []) of
..436     {ok, Db} ->
..437         View = #view{map_names=[<<"_temp">>],
..438             id_num=0,
..439             btree=nil,
..440             def=MapSrc,
..441             reduce_funs= if RedSrc==[] -> []; true -> [{<<"_temp">>, RedSrc}] end},
..442 
..443         {ok, Db, #group{
..444             name = <<"_temp">>,
..445             db=Db,
..446             views=[View],
..447             def_lang=Language,
..448             design_options=DesignOptions,
..449             sig = erlang:md5(term_to_binary({[View], Language, DesignOptions}))
..450         }};
..451     Error ->
..452         Error
..453     end.
..454 
..455 open_db_group(DbName, GroupId) ->
..456     case couch_db:open(DbName, []) of
..457     {ok, Db} ->
..458         case couch_db:open_doc(Db, GroupId) of
..459         {ok, Doc} ->
..460             {ok, Db, design_doc_to_view_group(Doc)};
..461         Else ->
..462             couch_db:close(Db),
..463             Else
..464         end;
..465     Else ->
..466         Else
..467     end.
..468 
..469 get_group_info(#group{
..470         fd = Fd,
..471         sig = GroupSig,
..472         def_lang = Lang
..473     }, CompactorPid) ->
..474     {ok, Size} = couch_file:bytes(Fd),
..475     [
..476         {signature, ?l2b(hex_sig(GroupSig))},
..477         {language, Lang},
..478         {disk_size, Size},
..479         {compact_running, CompactorPid /= nil}
..480     ].
..481 
..482 % maybe move to another module
..483 design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
..484     Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>),
..485     {DesignOptions} = proplists:get_value(<<"options">>, Fields, {[]}),
..486     {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}),
..487     % sort the views by name to avoid spurious signature changes
..488     SortedRawViews = lists:sort(fun({Name1, _}, {Name2, _}) ->
..489             Name1 >= Name2
..490         end, RawViews),
..491     % add the views to a dictionary object, with the map source as the key
..492     DictBySrc =
..493     lists:foldl(
..494         fun({Name, {MRFuns}}, DictBySrcAcc) ->
..495             MapSrc = proplists:get_value(<<"map">>, MRFuns),
..496             RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null),
..497             View =
..498             case dict:find(MapSrc, DictBySrcAcc) of
..499                 {ok, View0} -> View0;
..500                 error -> #view{def=MapSrc} % create new view object
..501             end,
..502             View2 =
..503             if RedSrc == null ->
..504                 View#view{map_names=[Name|View#view.map_names]};
..505             true ->
..506                 View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]}
..507             end,
..508             dict:store(MapSrc, View2, DictBySrcAcc)
..509         end, dict:new(), SortedRawViews),
..510     % number the views
..511     {Views, _N} = lists:mapfoldl(
..512         fun({_Src, View}, N) ->
..513             {View#view{id_num=N},N+1}
..514         end, 0, dict:to_list(DictBySrc)),
..515 
..516     Group = #group{name=Id, views=Views, def_lang=Language, design_options=DesignOptions},
..517     Group#group{sig=erlang:md5(term_to_binary({Views, Language, DesignOptions}))}.
..518 
..519 reset_group(#group{views=Views}=Group) ->
..520     Views2 = [View#view{btree=nil} || View <- Views],
..521     Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
..522             id_btree=nil,views=Views2}.
..523 
..524 reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
..525     ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
..526     ok = couch_file:truncate(Fd, 0),
..527     ok = couch_file:write_header(Fd, {Sig, nil}),
..528     init_group(Db, Fd, reset_group(Group), nil).
..529 
..530 delete_index_file(RootDir, DbName, GroupSig) ->
..531     file:delete(index_file_name(RootDir, DbName, GroupSig)).
..532 
..533 init_group(Db, Fd, #group{views=Views}=Group, nil) ->
..534     init_group(Db, Fd, Group,
..535         #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
..536             id_btree_state=nil, view_states=[nil || _ <- Views]});
..537 init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->
..538      #index_header{seq=Seq, purge_seq=PurgeSeq,
..539             id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
..540     {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),
..541     Views2 = lists:zipwith(
..542         fun(BtreeState, #view{reduce_funs=RedFuns}=View) ->
..543             FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
..544             ReduceFun =
..545                 fun(reduce, KVs) ->
..546                     KVs2 = couch_view:expand_dups(KVs,[]),
..547                     KVs3 = couch_view:detuple_kvs(KVs2,[]),
..548                     {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs,
..549                         KVs3),
..550                     {length(KVs3), Reduced};
..551                 (rereduce, Reds) ->
..552                     Count = lists:sum([Count0 || {Count0, _} <- Reds]),
..553                     UserReds = [UserRedsList || {_, UserRedsList} <- Reds],
..554                     {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs,
..555                         UserReds),
..556                     {Count, Reduced}
..557                 end,
..558             {ok, Btree} = couch_btree:open(BtreeState, Fd,
..559                         [{less, fun couch_view:less_json_keys/2},
..560                             {reduce, ReduceFun}]),
..561             View#view{btree=Btree}
..562         end,
..563         ViewStates, Views),
..564     Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
..565         id_btree=IdBtree, views=Views2}.
..566 
..567 

Generated using etap 0.3.4.