Generated on 2009-08-09 21:22:55 with etap 0.3.4.
| Name | Total lines | Lines of code | Total coverage | Code coverage | ||||
| couch_query_servers | ?? | ?? | ?? |
|
....1 % Licensed under the Apache License, Version 2.0 (the "License"); you may not ....2 % use this file except in compliance with the License. You may obtain a copy of ....3 % the License at ....4 % ....5 % http://www.apache.org/licenses/LICENSE-2.0 ....6 % ....7 % Unless required by applicable law or agreed to in writing, software ....8 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT ....9 % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the ...10 % License for the specific language governing permissions and limitations under ...11 % the License. ...12 ...13 -module(couch_query_servers). ...14 -behaviour(gen_server). ...15 ...16 -export([start_link/0]). ...17 ...18 -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]). ...19 -export([start_doc_map/2, map_docs/2, stop_doc_map/1]). ...20 -export([reduce/3, rereduce/3,validate_doc_update/5]). ...21 -export([render_doc_show/6, start_view_list/2, ...22 render_list_head/4, render_list_row/4, render_list_tail/1]). ...23 -export([start_filter/2, filter_doc/4, end_filter/1]). ...24 % -export([test/0]). ...25 ...26 -include("couch_db.hrl"). ...27 ...28 start_link() -> ...29 gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []). ...30 ...31 stop() -> ...32 exit(whereis(couch_query_servers), close). ...33 ...34 start_doc_map(Lang, Functions) -> ...35 Pid = get_os_process(Lang), ...36 lists:foreach(fun(FunctionSource) -> ...37 true = couch_os_process:prompt(Pid, [<<"add_fun">>, FunctionSource]) ...38 end, Functions), ...39 {ok, {Lang, Pid}}. ...40 ...41 map_docs({_Lang, Pid}, Docs) -> ...42 % send the documents ...43 Results = lists:map( ...44 fun(Doc) -> ...45 Json = couch_doc:to_json_obj(Doc, []), ...46 ...47 FunsResults = couch_os_process:prompt(Pid, [<<"map_doc">>, Json]), ...48 % the results are a json array of function map yields like this: ...49 % [FunResults1, FunResults2 ...] ...50 % where funresults is are json arrays of key value pairs: ...51 % [[Key1, Value1], [Key2, Value2]] ...52 % Convert the key, value pairs to tuples like ...53 % [{Key1, Value1}, {Key2, Value2}] ...54 lists:map( ...55 fun(FunRs) -> ...56 [list_to_tuple(FunResult) || FunResult <- FunRs] ...57 end, ...58 FunsResults) ...59 end, ...60 Docs), ...61 {ok, Results}. ...62 ...63 ...64 stop_doc_map(nil) -> ...65 ok; ...66 stop_doc_map({Lang, Pid}) -> ...67 ok = ret_os_process(Lang, Pid). ...68 ...69 group_reductions_results([]) -> ...70 []; ...71 group_reductions_results(List) -> ...72 {Heads, Tails} = lists:foldl( ...73 fun([H|T], {HAcc,TAcc}) -> ...74 {[H|HAcc], [T|TAcc]} ...75 end, {[], []}, List), ...76 case Tails of ...77 [[]|_] -> % no tails left ...78 [Heads]; ...79 _ -> ...80 [Heads | group_reductions_results(Tails)] ...81 end. ...82 ...83 rereduce(_Lang, [], _ReducedValues) -> ...84 {ok, []}; ...85 rereduce(Lang, RedSrcs, ReducedValues) -> ...86 Pid = get_os_process(Lang), ...87 Grouped = group_reductions_results(ReducedValues), ...88 Results = try lists:zipwith( ...89 fun ...90 (<<"_", _/binary>> = FunSrc, Values) -> ...91 {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [[[], V] || V <- Values], []), ...92 Result; ...93 (FunSrc, Values) -> ...94 [true, [Result]] = ...95 couch_os_process:prompt(Pid, [<<"rereduce">>, [FunSrc], Values]), ...96 Result ...97 end, RedSrcs, Grouped) ...98 after ...99 ok = ret_os_process(Lang, Pid) ..100 end, ..101 {ok, Results}. ..102 ..103 reduce(_Lang, [], _KVs) -> ..104 {ok, []}; ..105 reduce(Lang, RedSrcs, KVs) -> ..106 {OsRedSrcs, BuiltinReds} = lists:partition(fun ..107 (<<"_", _/binary>>) -> false; ..108 (_OsFun) -> true ..109 end, RedSrcs), ..110 {ok, OsResults} = os_reduce(Lang, OsRedSrcs, KVs), ..111 {ok, BuiltinResults} = builtin_reduce(reduce, BuiltinReds, KVs, []), ..112 recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, []). ..113 ..114 recombine_reduce_results([], [], [], Acc) -> ..115 {ok, lists:reverse(Acc)}; ..116 recombine_reduce_results([<<"_", _/binary>>|RedSrcs], OsResults, [BRes|BuiltinResults], Acc) -> ..117 recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [BRes|Acc]); ..118 recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) -> ..119 recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [OsR|Acc]). ..120 ..121 os_reduce(_Lang, [], _KVs) -> ..122 {ok, []}; ..123 os_reduce(Lang, OsRedSrcs, KVs) -> ..124 Pid = get_os_process(Lang), ..125 OsResults = try couch_os_process:prompt(Pid, ..126 [<<"reduce">>, OsRedSrcs, KVs]) of ..127 [true, Reductions] -> Reductions ..128 after ..129 ok = ret_os_process(Lang, Pid) ..130 end, ..131 {ok, OsResults}. ..132 ..133 builtin_reduce(_Re, [], _KVs, Acc) -> ..134 {ok, lists:reverse(Acc)}; ..135 builtin_reduce(Re, [<<"_sum">>|BuiltinReds], KVs, Acc) -> ..136 Sum = builtin_sum_rows(KVs), ..137 builtin_reduce(Re, BuiltinReds, KVs, [Sum|Acc]); ..138 builtin_reduce(reduce, [<<"_count">>|BuiltinReds], KVs, Acc) -> ..139 Count = length(KVs), ..140 builtin_reduce(reduce, BuiltinReds, KVs, [Count|Acc]); ..141 builtin_reduce(rereduce, [<<"_count">>|BuiltinReds], KVs, Acc) -> ..142 Count = builtin_sum_rows(KVs), ..143 builtin_reduce(rereduce, BuiltinReds, KVs, [Count|Acc]). ..144 ..145 builtin_sum_rows(KVs) -> ..146 lists:foldl(fun ..147 ([_Key, Value], Acc) when is_number(Value) -> ..148 Acc + Value; ..149 (_Else, _Acc) -> ..150 throw({invalid_value, <<"builtin _sum function requires map values to be numbers">>}) ..151 end, 0, KVs). ..152 ..153 validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) -> ..154 Pid = get_os_process(Lang), ..155 JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]), ..156 JsonDiskDoc = ..157 if DiskDoc == nil -> ..158 null; ..159 true -> ..160 couch_doc:to_json_obj(DiskDoc, [revs]) ..161 end, ..162 try couch_os_process:prompt(Pid, ..163 [<<"validate">>, FunSrc, JsonEditDoc, JsonDiskDoc, Ctx]) of ..164 1 -> ..165 ok; ..166 {[{<<"forbidden">>, Message}]} -> ..167 throw({forbidden, Message}); ..168 {[{<<"unauthorized">>, Message}]} -> ..169 throw({unauthorized, Message}) ..170 after ..171 ok = ret_os_process(Lang, Pid) ..172 end. ..173 append_docid(DocId, JsonReqIn) -> ..174 [{<<"docId">>, DocId} | JsonReqIn]. ..175 ..176 render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) -> ..177 Pid = get_os_process(Lang), ..178 {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db), ..179 ..180 {JsonReq, JsonDoc} = case {DocId, Doc} of ..181 {nil, nil} -> {{JsonReqIn}, null}; ..182 {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null}; ..183 _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])} ..184 end, ..185 try couch_os_process:prompt(Pid, ..186 [<<"show">>, ShowSrc, JsonDoc, JsonReq]) of ..187 FormResp -> ..188 FormResp ..189 after ..190 ok = ret_os_process(Lang, Pid) ..191 end. ..192 ..193 start_view_list(Lang, ListSrc) -> ..194 Pid = get_os_process(Lang), ..195 true = couch_os_process:prompt(Pid, [<<"add_fun">>, ListSrc]), ..196 {ok, {Lang, Pid}}. ..197 ..198 render_list_head({_Lang, Pid}, Req, Db, Head) -> ..199 JsonReq = couch_httpd_external:json_req_obj(Req, Db), ..200 couch_os_process:prompt(Pid, [<<"list">>, Head, JsonReq]). ..201 ..202 render_list_row({_Lang, Pid}, Db, {{Key, DocId}, Value}, IncludeDoc) -> ..203 JsonRow = couch_httpd_view:view_row_obj(Db, {{Key, DocId}, Value}, IncludeDoc), ..204 couch_os_process:prompt(Pid, [<<"list_row">>, JsonRow]); ..205 ..206 render_list_row({_Lang, Pid}, _, {Key, Value}, _IncludeDoc) -> ..207 JsonRow = {[{key, Key}, {value, Value}]}, ..208 couch_os_process:prompt(Pid, [<<"list_row">>, JsonRow]). ..209 ..210 render_list_tail({Lang, Pid}) -> ..211 JsonResp = couch_os_process:prompt(Pid, [<<"list_end">>]), ..212 ok = ret_os_process(Lang, Pid), ..213 JsonResp. ..214 ..215 start_filter(Lang, FilterSrc) -> ..216 Pid = get_os_process(Lang), ..217 true = couch_os_process:prompt(Pid, [<<"add_fun">>, FilterSrc]), ..218 {ok, {Lang, Pid}}. ..219 ..220 filter_doc({_Lang, Pid}, Doc, Req, Db) -> ..221 JsonReq = couch_httpd_external:json_req_obj(Req, Db), ..222 JsonDoc = couch_doc:to_json_obj(Doc, [revs]), ..223 JsonCtx = couch_util:json_user_ctx(Db), ..224 [true, [Pass]] = couch_os_process:prompt(Pid, ..225 [<<"filter">>, [JsonDoc], JsonReq, JsonCtx]), ..226 {ok, Pass}. ..227 ..228 end_filter({Lang, Pid}) -> ..229 ok = ret_os_process(Lang, Pid). ..230 ..231 ..232 init([]) -> ..233 ..234 % read config and register for configuration changes ..235 ..236 % just stop if one of the config settings change. couch_server_sup ..237 % will restart us and then we will pick up the new settings. ..238 ..239 ok = couch_config:register( ..240 fun("query_servers" ++ _, _) -> ..241 ?MODULE:stop() ..242 end), ..243 ..244 Langs = ets:new(couch_query_server_langs, [set, private]), ..245 PidLangs = ets:new(couch_query_server_pid_langs, [set, private]), ..246 Pids = ets:new(couch_query_server_procs, [set, private]), ..247 InUse = ets:new(couch_query_server_used, [set, private]), ..248 lists:foreach(fun({Lang, Command}) -> ..249 true = ets:insert(Langs, {?l2b(Lang), Command}) ..250 end, couch_config:get("query_servers")), ..251 process_flag(trap_exit, true), ..252 {ok, {Langs, PidLangs, Pids, InUse}}. ..253 ..254 terminate(_Reason, _Server) -> ..255 ok. ..256 ..257 ..258 handle_call({get_proc, Lang}, _From, {Langs, PidLangs, Pids, InUse}=Server) -> ..259 % Note to future self. Add max process limit. ..260 case ets:lookup(Pids, Lang) of ..261 [{Lang, [Pid|_]}] -> ..262 add_value(PidLangs, Pid, Lang), ..263 rem_from_list(Pids, Lang, Pid), ..264 add_to_list(InUse, Lang, Pid), ..265 {reply, {recycled, Pid, get_query_server_config()}, Server}; ..266 _ -> ..267 case (catch new_process(Langs, Lang)) of ..268 {ok, Pid} -> ..269 add_to_list(InUse, Lang, Pid), ..270 {reply, {new, Pid}, Server}; ..271 Error -> ..272 {reply, Error, Server} ..273 end ..274 end; ..275 handle_call({ret_proc, Lang, Pid}, _From, {_, _, Pids, InUse}=Server) -> ..276 % Along with max process limit, here we should check ..277 % if we're over the limit and discard when we are. ..278 add_to_list(Pids, Lang, Pid), ..279 rem_from_list(InUse, Lang, Pid), ..280 {reply, true, Server}. ..281 ..282 handle_cast(_Whatever, Server) -> ..283 {noreply, Server}. ..284 ..285 handle_info({'EXIT', Pid, Status}, {_, PidLangs, Pids, InUse}=Server) -> ..286 case ets:lookup(PidLangs, Pid) of ..287 [{Pid, Lang}] -> ..288 case Status of ..289 normal -> ok; ..290 _ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status]) ..291 end, ..292 rem_value(PidLangs, Pid), ..293 catch rem_from_list(Pids, Lang, Pid), ..294 catch rem_from_list(InUse, Lang, Pid), ..295 {noreply, Server}; ..296 [] -> ..297 ?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]), ..298 {stop, Status, Server} ..299 end. ..300 ..301 code_change(_OldVsn, State, _Extra) -> ..302 {ok, State}. ..303 ..304 % Private API ..305 ..306 get_query_server_config() -> ..307 ReduceLimit = list_to_atom( ..308 couch_config:get("query_server_config","reduce_limit","true")), ..309 {[{<<"reduce_limit">>, ReduceLimit}]}. ..310 ..311 new_process(Langs, Lang) -> ..312 case ets:lookup(Langs, Lang) of ..313 [{Lang, Command}] -> ..314 couch_os_process:start_link(Command); ..315 _ -> ..316 {unknown_query_language, Lang} ..317 end. ..318 ..319 get_os_process(Lang) -> ..320 case gen_server:call(couch_query_servers, {get_proc, Lang}) of ..321 {new, Pid} -> ..322 couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get( ..323 "couchdb", "os_process_timeout", "5000"))), ..324 link(Pid), ..325 Pid; ..326 {recycled, Pid, QueryConfig} -> ..327 case (catch couch_os_process:prompt(Pid, [<<"reset">>, QueryConfig])) of ..328 true -> ..329 couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get( ..330 "couchdb", "os_process_timeout", "5000"))), ..331 link(Pid), ..332 Pid; ..333 _ -> ..334 catch couch_os_process:stop(Pid), ..335 get_os_process(Lang) ..336 end; ..337 Error -> ..338 throw(Error) ..339 end. ..340 ..341 ret_os_process(Lang, Pid) -> ..342 true = gen_server:call(couch_query_servers, {ret_proc, Lang, Pid}), ..343 catch unlink(Pid), ..344 ok. ..345 ..346 add_value(Tid, Key, Value) -> ..347 true = ets:insert(Tid, {Key, Value}). ..348 ..349 rem_value(Tid, Key) -> ..350 true = ets:delete(Tid, Key). ..351 ..352 add_to_list(Tid, Key, Value) -> ..353 case ets:lookup(Tid, Key) of ..354 [{Key, Vals}] -> ..355 true = ets:insert(Tid, {Key, [Value|Vals]}); ..356 [] -> ..357 true = ets:insert(Tid, {Key, [Value]}) ..358 end. ..359 ..360 rem_from_list(Tid, Key, Value) -> ..361 case ets:lookup(Tid, Key) of ..362 [{Key, Vals}] -> ..363 ets:insert(Tid, {Key, [Val || Val <- Vals, Val /= Value]}); ..364 [] -> ok ..365 end.
Generated using etap 0.3.4.