Generated on 2009-08-09 21:22:54 with etap 0.3.4.
| Name | Total lines | Lines of code | Total coverage | Code coverage | ||||
| couch_view_group | ?? | ?? | ?? |
|
....1 % Licensed under the Apache License, Version 2.0 (the "License"); you may not ....2 % use this file except in compliance with the License. You may obtain a copy of ....3 % the License at ....4 % ....5 % http://www.apache.org/licenses/LICENSE-2.0 ....6 % ....7 % Unless required by applicable law or agreed to in writing, software ....8 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ....9 % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ...10 % License for the specific language governing permissions and limitations under ...11 % the License. ...12 ...13 -module(couch_view_group). ...14 -behaviour(gen_server). ...15 ...16 %% API ...17 -export([start_link/1, request_group/2, request_group_info/1]). ...18 -export([open_db_group/2, open_temp_group/5, design_doc_to_view_group/1,design_root/2]). ...19 ...20 %% gen_server callbacks ...21 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, ...22 terminate/2, code_change/3]). ...23 ...24 -include("couch_db.hrl"). ...25 ...26 -record(group_state, { ...27 type, ...28 db_name, ...29 init_args, ...30 group, ...31 updater_pid=nil, ...32 compactor_pid=nil, ...33 waiting_commit=false, ...34 waiting_list=[], ...35 ref_counter=nil ...36 }). ...37 ...38 % api methods ...39 request_group(Pid, Seq) -> ...40 ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]), ...41 case gen_server:call(Pid, {request_group, Seq}, infinity) of ...42 {ok, Group, RefCounter} -> ...43 couch_ref_counter:add(RefCounter), ...44 {ok, Group}; ...45 Error -> ...46 ?LOG_DEBUG("request_group Error ~p", [Error]), ...47 throw(Error) ...48 end. ...49 ...50 request_group_info(Pid) -> ...51 case gen_server:call(Pid, request_group_info) of ...52 {ok, GroupInfoList} -> ...53 {ok, GroupInfoList}; ...54 Error -> ...55 throw(Error) ...56 end. ...57 ...58 % from template ...59 start_link(InitArgs) -> ...60 case gen_server:start_link(couch_view_group, ...61 {InitArgs, self(), Ref = make_ref()}, []) of ...62 {ok, Pid} -> ...63 {ok, Pid}; ...64 ignore -> ...65 receive ...66 {Ref, Pid, Error} -> ...67 case process_info(self(), trap_exit) of ...68 {trap_exit, true} -> receive {'EXIT', Pid, _} -> ok end; ...69 {trap_exit, false} -> ok ...70 end, ...71 Error ...72 end; ...73 Error -> ...74 Error ...75 end. ...76 ...77 % init creates a closure which spawns the appropriate view_updater. ...78 init({InitArgs, ReturnPid, Ref}) -> ...79 process_flag(trap_exit, true), ...80 case prepare_group(InitArgs, false) of ...81 {ok, #group{db=Db, fd=Fd}=Group} -> ...82 couch_db:monitor(Db), ...83 Owner = self(), ...84 Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end), ...85 {ok, RefCounter} = couch_ref_counter:start([Fd]), ...86 {ok, #group_state{ ...87 db_name=couch_db:name(Db), ...88 init_args=InitArgs, ...89 updater_pid = Pid, ...90 group=Group, ...91 ref_counter=RefCounter}}; ...92 Error -> ...93 ReturnPid ! {Ref, self(), Error}, ...94 ignore ...95 end. ...96 ...97 ...98 ...99 ..100 % There are two sources of messages: couch_view, which requests an up to date ..101 % view group, and the couch_view_updater, which when spawned, updates the ..102 % group and sends it back here. We employ a caching mechanism, so that between ..103 % database writes, we don't have to spawn a couch_view_updater with every view ..104 % request. ..105 ..106 % The caching mechanism: each request is submitted with a seq_id for the ..107 % database at the time it was read. We guarantee to return a view from that ..108 % sequence or newer. ..109 ..110 % If the request sequence is higher than our current high_target seq, we set ..111 % that as the highest seqence. If the updater is not running, we launch it. ..112 ..113 handle_call({request_group, RequestSeq}, From, ..114 #group_state{ ..115 db_name=DbName, ..116 group=#group{current_seq=Seq}=Group, ..117 updater_pid=nil, ..118 waiting_list=WaitList ..119 }=State) when RequestSeq > Seq -> ..120 {ok, Db} = couch_db:open(DbName, []), ..121 Group2 = Group#group{db=Db}, ..122 Owner = self(), ..123 Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end), ..124 ..125 {noreply, State#group_state{ ..126 updater_pid=Pid, ..127 group=Group2, ..128 waiting_list=[{From,RequestSeq}|WaitList] ..129 }, infinity}; ..130 ..131 ..132 % If the request seqence is less than or equal to the seq_id of a known Group, ..133 % we respond with that Group. ..134 handle_call({request_group, RequestSeq}, _From, #group_state{ ..135 group = #group{current_seq=GroupSeq} = Group, ..136 ref_counter = RefCounter ..137 } = State) when RequestSeq =< GroupSeq -> ..138 {reply, {ok, Group, RefCounter}, State}; ..139 ..140 % Otherwise: TargetSeq => RequestSeq > GroupSeq ..141 % We've already initiated the appropriate action, so just hold the response until the group is up to the RequestSeq ..142 handle_call({request_group, RequestSeq}, From, ..143 #group_state{waiting_list=WaitList}=State) -> ..144 {noreply, State#group_state{ ..145 waiting_list=[{From, RequestSeq}|WaitList] ..146 }, infinity}; ..147 ..148 handle_call(request_group_info, _From, #group_state{ ..149 group = Group, ..150 compactor_pid = CompactorPid ..151 } = State) -> ..152 GroupInfo = get_group_info(Group, CompactorPid), ..153 {reply, {ok, GroupInfo}, State}. ..154 ..155 handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil} ..156 = State) -> ..157 #group_state{ ..158 group = #group{name = GroupId, sig = GroupSig} = Group, ..159 init_args = {RootDir, DbName, _} ..160 } = State, ..161 ?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]), ..162 {ok, Db} = couch_db:open(DbName, []), ..163 {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig), ..164 NewGroup = reset_file(Db, Fd, DbName, Group), ..165 Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end), ..166 {noreply, State#group_state{compactor_pid = Pid}}; ..167 handle_cast({start_compact, _}, State) -> ..168 %% compact already running, this is a no-op ..169 {noreply, State}; ..170 ..171 handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, ..172 #group_state{group = #group{current_seq=OldSeq}} = State) ..173 when NewSeq >= OldSeq -> ..174 #group_state{ ..175 group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group, ..176 init_args = {RootDir, DbName, _}, ..177 updater_pid = UpdaterPid, ..178 ref_counter = RefCounter ..179 } = State, ..180 ..181 ?LOG_INFO("View index compaction complete for ~s ~s", [DbName, GroupId]), ..182 FileName = index_file_name(RootDir, DbName, GroupSig), ..183 CompactName = index_file_name(compact, RootDir, DbName, GroupSig), ..184 file:delete(FileName), ..185 ok = file:rename(CompactName, FileName), ..186 ..187 %% if an updater is running, kill it and start a new one ..188 NewUpdaterPid = ..189 if is_pid(UpdaterPid) -> ..190 unlink(UpdaterPid), ..191 exit(UpdaterPid, view_compaction_complete), ..192 Owner = self(), ..193 spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end); ..194 true -> ..195 nil ..196 end, ..197 ..198 %% cleanup old group ..199 unlink(OldFd), ..200 couch_ref_counter:drop(RefCounter), ..201 {ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]), ..202 case Group#group.db of ..203 nil -> ok; ..204 Else -> couch_db:close(Else) ..205 end, ..206 ..207 self() ! delayed_commit, ..208 {noreply, State#group_state{ ..209 group=NewGroup, ..210 ref_counter=NewRefCounter, ..211 compactor_pid=nil, ..212 updater_pid=NewUpdaterPid ..213 }}; ..214 handle_cast({compact_done, NewGroup}, State) -> ..215 #group_state{ ..216 group = #group{name = GroupId, current_seq = CurrentSeq}, ..217 init_args={_RootDir, DbName, _} ..218 } = State, ..219 ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++ ..220 "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]), ..221 couch_db:close(NewGroup#group.db), ..222 {ok, Db} = couch_db:open(DbName, []), ..223 Pid = spawn_link(fun() -> ..224 {_,Ref} = erlang:spawn_monitor(fun() -> ..225 couch_view_updater:update(nil, NewGroup#group{db = Db}) ..226 end), ..227 receive ..228 {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> ..229 #group{name=GroupId} = NewGroup2, ..230 Pid2 = couch_view:get_group_server(DbName, GroupId), ..231 gen_server:cast(Pid2, {compact_done, NewGroup2}) ..232 end ..233 end), ..234 {noreply, State#group_state{compactor_pid = Pid}}; ..235 ..236 handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid} ..237 = State) -> ..238 #group_state{ ..239 db_name = DbName, ..240 waiting_commit = WaitingCommit ..241 } = State, ..242 NewSeq = NewGroup#group.current_seq, ..243 ?LOG_INFO("checkpointing view update at seq ~p for ~s ~s", [NewSeq, ..244 DbName, NewGroup#group.name]), ..245 if not WaitingCommit -> ..246 erlang:send_after(1000, self(), delayed_commit); ..247 true -> ok ..248 end, ..249 {noreply, State#group_state{group=NewGroup, waiting_commit=true}}; ..250 handle_cast({partial_update, _, _}, State) -> ..251 %% message from an old (probably pre-compaction) updater; ignore ..252 {noreply, State}. ..253 ..254 handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> ..255 {ok, Db} = couch_db:open(DbName, []), ..256 CommittedSeq = couch_db:get_committed_update_seq(Db), ..257 couch_db:close(Db), ..258 if CommittedSeq >= Group#group.current_seq -> ..259 % save the header ..260 Header = {Group#group.sig, get_index_header_data(Group)}, ..261 ok = couch_file:write_header(Group#group.fd, Header), ..262 {noreply, State#group_state{waiting_commit=false}}; ..263 true -> ..264 % We can't commit the header because the database seq that's fully ..265 % committed to disk is still behind us. If we committed now and the ..266 % database lost those changes our view could be forever out of sync ..267 % with the database. But a crash before we commit these changes, no big ..268 % deal, we only lose incremental changes since last committal. ..269 erlang:send_after(1000, self(), delayed_commit), ..270 {noreply, State#group_state{waiting_commit=true}} ..271 end; ..272 ..273 handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, ..274 #group_state{db_name=DbName, ..275 updater_pid=UpPid, ..276 ref_counter=RefCounter, ..277 waiting_list=WaitList, ..278 waiting_commit=WaitingCommit}=State) when UpPid == FromPid -> ..279 ok = couch_db:close(Db), ..280 if not WaitingCommit -> ..281 erlang:send_after(1000, self(), delayed_commit); ..282 true -> ok ..283 end, ..284 case reply_with_group(Group, WaitList, [], RefCounter) of ..285 [] -> ..286 {noreply, State#group_state{waiting_commit=true, waiting_list=[], ..287 group=Group#group{db=nil}, updater_pid=nil}}; ..288 StillWaiting -> ..289 % we still have some waiters, reopen the database and reupdate the index ..290 {ok, Db2} = couch_db:open(DbName, []), ..291 Group2 = Group#group{db=Db2}, ..292 Owner = self(), ..293 Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end), ..294 {noreply, State#group_state{waiting_commit=true, ..295 waiting_list=StillWaiting, group=Group2, updater_pid=Pid}} ..296 end; ..297 handle_info({'EXIT', _, {new_group, _}}, State) -> ..298 %% message from an old (probably pre-compaction) updater; ignore ..299 {noreply, State}; ..300 ..301 handle_info({'EXIT', FromPid, reset}, ..302 #group_state{ ..303 init_args=InitArgs, ..304 updater_pid=UpPid, ..305 group=Group}=State) when UpPid == FromPid -> ..306 ok = couch_db:close(Group#group.db), ..307 case prepare_group(InitArgs, true) of ..308 {ok, ResetGroup} -> ..309 Owner = self(), ..310 Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end), ..311 {noreply, State#group_state{ ..312 updater_pid=Pid, ..313 group=ResetGroup}}; ..314 Error -> ..315 {stop, normal, reply_all(State, Error)} ..316 end; ..317 handle_info({'EXIT', _, reset}, State) -> ..318 %% message from an old (probably pre-compaction) updater; ignore ..319 {noreply, State}; ..320 ..321 handle_info({'EXIT', _FromPid, normal}, State) -> ..322 {noreply, State}; ..323 ..324 handle_info({'EXIT', FromPid, {{nocatch, Reason}, _Trace}}, State) -> ..325 ?LOG_DEBUG("Uncaught throw() in linked pid: ~p", [{FromPid, Reason}]), ..326 {stop, Reason, State}; ..327 ..328 handle_info({'EXIT', FromPid, Reason}, State) -> ..329 ?LOG_DEBUG("Exit from linked pid: ~p", [{FromPid, Reason}]), ..330 {stop, Reason, State}; ..331 ..332 handle_info({'DOWN',_,_,_,_}, State) -> ..333 ?LOG_INFO("Shutting down view group server, monitored db is closing.", []), ..334 {stop, normal, reply_all(State, shutdown)}. ..335 ..336 ..337 terminate(Reason, #group_state{updater_pid=Update, compactor_pid=Compact}=S) -> ..338 reply_all(S, Reason), ..339 catch exit(Update, Reason), ..340 catch exit(Compact, Reason), ..341 ok. ..342 ..343 code_change(_OldVsn, State, _Extra) -> ..344 {ok, State}. ..345 ..346 %% Local Functions ..347 ..348 % reply_with_group/3 ..349 % for each item in the WaitingList {Pid, Seq} ..350 % if the Seq is =< GroupSeq, reply ..351 reply_with_group(Group=#group{current_seq=GroupSeq}, [{Pid, Seq}|WaitList], ..352 StillWaiting, RefCounter) when Seq =< GroupSeq -> ..353 gen_server:reply(Pid, {ok, Group, RefCounter}), ..354 reply_with_group(Group, WaitList, StillWaiting, RefCounter); ..355 ..356 % else ..357 % put it in the continuing waiting list ..358 reply_with_group(Group, [{Pid, Seq}|WaitList], StillWaiting, RefCounter) -> ..359 reply_with_group(Group, WaitList, [{Pid, Seq}|StillWaiting], RefCounter); ..360 ..361 % return the still waiting list ..362 reply_with_group(_Group, [], StillWaiting, _RefCounter) -> ..363 StillWaiting. ..364 ..365 reply_all(#group_state{waiting_list=WaitList}=State, Reply) -> ..366 [catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList], ..367 State#group_state{waiting_list=[]}. ..368 ..369 prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)-> ..370 case couch_db:open(DbName, []) of ..371 {ok, Db} -> ..372 case open_index_file(RootDir, DbName, Sig) of ..373 {ok, Fd} -> ..374 if ForceReset -> ..375 % this can happen if we missed a purge ..376 {ok, reset_file(Db, Fd, DbName, Group)}; ..377 true -> ..378 % 09 UPGRADE CODE ..379 ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), ..380 case (catch couch_file:read_header(Fd)) of ..381 {ok, {Sig, HeaderInfo}} -> ..382 % sigs match! ..383 {ok, init_group(Db, Fd, Group, HeaderInfo)}; ..384 _ -> ..385 % this happens on a new file ..386 {ok, reset_file(Db, Fd, DbName, Group)} ..387 end ..388 end; ..389 Error -> ..390 catch delete_index_file(RootDir, DbName, Sig), ..391 Error ..392 end; ..393 Else -> ..394 Else ..395 end. ..396 ..397 get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, ..398 id_btree=IdBtree,views=Views}) -> ..399 ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views], ..400 #index_header{seq=Seq, ..401 purge_seq=PurgeSeq, ..402 id_btree_state=couch_btree:get_state(IdBtree), ..403 view_states=ViewStates}. ..404 ..405 hex_sig(GroupSig) -> ..406 couch_util:to_hex(?b2l(GroupSig)). ..407 ..408 design_root(RootDir, DbName) -> ..409 RootDir ++ "/." ++ ?b2l(DbName) ++ "_design/". ..410 ..411 index_file_name(RootDir, DbName, GroupSig) -> ..412 design_root(RootDir, DbName) ++ hex_sig(GroupSig) ++".view". ..413 ..414 index_file_name(compact, RootDir, DbName, GroupSig) -> ..415 design_root(RootDir, DbName) ++ hex_sig(GroupSig) ++".compact.view". ..416 ..417 ..418 open_index_file(RootDir, DbName, GroupSig) -> ..419 FileName = index_file_name(RootDir, DbName, GroupSig), ..420 case couch_file:open(FileName) of ..421 {ok, Fd} -> {ok, Fd}; ..422 {error, enoent} -> couch_file:open(FileName, [create]); ..423 Error -> Error ..424 end. ..425 ..426 open_index_file(compact, RootDir, DbName, GroupSig) -> ..427 FileName = index_file_name(compact, RootDir, DbName, GroupSig), ..428 case couch_file:open(FileName) of ..429 {ok, Fd} -> {ok, Fd}; ..430 {error, enoent} -> couch_file:open(FileName, [create]); ..431 Error -> Error ..432 end. ..433 ..434 open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc) -> ..435 case couch_db:open(DbName, []) of ..436 {ok, Db} -> ..437 View = #view{map_names=[<<"_temp">>], ..438 id_num=0, ..439 btree=nil, ..440 def=MapSrc, ..441 reduce_funs= if RedSrc==[] -> []; true -> [{<<"_temp">>, RedSrc}] end}, ..442 ..443 {ok, Db, #group{ ..444 name = <<"_temp">>, ..445 db=Db, ..446 views=[View], ..447 def_lang=Language, ..448 design_options=DesignOptions, ..449 sig = erlang:md5(term_to_binary({[View], Language, DesignOptions})) ..450 }}; ..451 Error -> ..452 Error ..453 end. ..454 ..455 open_db_group(DbName, GroupId) -> ..456 case couch_db:open(DbName, []) of ..457 {ok, Db} -> ..458 case couch_db:open_doc(Db, GroupId) of ..459 {ok, Doc} -> ..460 {ok, Db, design_doc_to_view_group(Doc)}; ..461 Else -> ..462 couch_db:close(Db), ..463 Else ..464 end; ..465 Else -> ..466 Else ..467 end. ..468 ..469 get_group_info(#group{ ..470 fd = Fd, ..471 sig = GroupSig, ..472 def_lang = Lang ..473 }, CompactorPid) -> ..474 {ok, Size} = couch_file:bytes(Fd), ..475 [ ..476 {signature, ?l2b(hex_sig(GroupSig))}, ..477 {language, Lang}, ..478 {disk_size, Size}, ..479 {compact_running, CompactorPid /= nil} ..480 ]. ..481 ..482 % maybe move to another module ..483 design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> ..484 Language = proplists:get_value(<<"language">>, Fields, <<"javascript">>), ..485 {DesignOptions} = proplists:get_value(<<"options">>, Fields, {[]}), ..486 {RawViews} = proplists:get_value(<<"views">>, Fields, {[]}), ..487 % sort the views by name to avoid spurious signature changes ..488 SortedRawViews = lists:sort(fun({Name1, _}, {Name2, _}) -> ..489 Name1 >= Name2 ..490 end, RawViews), ..491 % add the views to a dictionary object, with the map source as the key ..492 DictBySrc = ..493 lists:foldl( ..494 fun({Name, {MRFuns}}, DictBySrcAcc) -> ..495 MapSrc = proplists:get_value(<<"map">>, MRFuns), ..496 RedSrc = proplists:get_value(<<"reduce">>, MRFuns, null), ..497 View = ..498 case dict:find(MapSrc, DictBySrcAcc) of ..499 {ok, View0} -> View0; ..500 error -> #view{def=MapSrc} % create new view object ..501 end, ..502 View2 = ..503 if RedSrc == null -> ..504 View#view{map_names=[Name|View#view.map_names]}; ..505 true -> ..506 View#view{reduce_funs=[{Name,RedSrc}|View#view.reduce_funs]} ..507 end, ..508 dict:store(MapSrc, View2, DictBySrcAcc) ..509 end, dict:new(), SortedRawViews), ..510 % number the views ..511 {Views, _N} = lists:mapfoldl( ..512 fun({_Src, View}, N) -> ..513 {View#view{id_num=N},N+1} ..514 end, 0, dict:to_list(DictBySrc)), ..515 ..516 Group = #group{name=Id, views=Views, def_lang=Language, design_options=DesignOptions}, ..517 Group#group{sig=erlang:md5(term_to_binary({Views, Language, DesignOptions}))}. ..518 ..519 reset_group(#group{views=Views}=Group) -> ..520 Views2 = [View#view{btree=nil} || View <- Views], ..521 Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, ..522 id_btree=nil,views=Views2}. ..523 ..524 reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> ..525 ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]), ..526 ok = couch_file:truncate(Fd, 0), ..527 ok = couch_file:write_header(Fd, {Sig, nil}), ..528 init_group(Db, Fd, reset_group(Group), nil). ..529 ..530 delete_index_file(RootDir, DbName, GroupSig) -> ..531 file:delete(index_file_name(RootDir, DbName, GroupSig)). ..532 ..533 init_group(Db, Fd, #group{views=Views}=Group, nil) -> ..534 init_group(Db, Fd, Group, ..535 #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), ..536 id_btree_state=nil, view_states=[nil || _ <- Views]}); ..537 init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> ..538 #index_header{seq=Seq, purge_seq=PurgeSeq, ..539 id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, ..540 {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), ..541 Views2 = lists:zipwith( ..542 fun(BtreeState, #view{reduce_funs=RedFuns}=View) -> ..543 FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns], ..544 ReduceFun = ..545 fun(reduce, KVs) -> ..546 KVs2 = couch_view:expand_dups(KVs,[]), ..547 KVs3 = couch_view:detuple_kvs(KVs2,[]), ..548 {ok, Reduced} = couch_query_servers:reduce(Lang, FunSrcs, ..549 KVs3), ..550 {length(KVs3), Reduced}; ..551 (rereduce, Reds) -> ..552 Count = lists:sum([Count0 || {Count0, _} <- Reds]), ..553 UserReds = [UserRedsList || {_, UserRedsList} <- Reds], ..554 {ok, Reduced} = couch_query_servers:rereduce(Lang, FunSrcs, ..555 UserReds), ..556 {Count, Reduced} ..557 end, ..558 {ok, Btree} = couch_btree:open(BtreeState, Fd, ..559 [{less, fun couch_view:less_json_keys/2}, ..560 {reduce, ReduceFun}]), ..561 View#view{btree=Btree} ..562 end, ..563 ViewStates, Views), ..564 Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, ..565 id_btree=IdBtree, views=Views2}. ..566 ..567
Generated using etap 0.3.4.