C0 code coverage information

Generated on 2009-08-09 21:22:55 with etap 0.3.4.

Name Total lines Lines of code Total coverage Code coverage
couch_query_servers ?? ?? ??
6% 
....1 % Licensed under the Apache License, Version 2.0 (the "License"); you may not
....2 % use this file except in compliance with the License. You may obtain a copy of
....3 % the License at
....4 %
....5 %   http://www.apache.org/licenses/LICENSE-2.0
....6 %
....7 % Unless required by applicable law or agreed to in writing, software
....8 % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
....9 % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
...10 % License for the specific language governing permissions and limitations under
...11 % the License.
...12 
...13 -module(couch_query_servers).
...14 -behaviour(gen_server).
...15 
...16 -export([start_link/0]).
...17 
...18 -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]).
...19 -export([start_doc_map/2, map_docs/2, stop_doc_map/1]).
...20 -export([reduce/3, rereduce/3,validate_doc_update/5]).
...21 -export([render_doc_show/6, start_view_list/2,
...22         render_list_head/4, render_list_row/4, render_list_tail/1]).
...23 -export([start_filter/2, filter_doc/4, end_filter/1]).
...24 % -export([test/0]).
...25 
...26 -include("couch_db.hrl").
...27 
...28 start_link() ->
...29     gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []).
...30 
...31 stop() ->
...32     exit(whereis(couch_query_servers), close).
...33 
...34 start_doc_map(Lang, Functions) ->
...35     Pid = get_os_process(Lang),
...36     lists:foreach(fun(FunctionSource) ->
...37         true = couch_os_process:prompt(Pid, [<<"add_fun">>, FunctionSource])
...38     end, Functions),
...39     {ok, {Lang, Pid}}.
...40 
...41 map_docs({_Lang, Pid}, Docs) ->
...42     % send the documents
...43     Results = lists:map(
...44         fun(Doc) ->
...45             Json = couch_doc:to_json_obj(Doc, []),
...46 
...47             FunsResults = couch_os_process:prompt(Pid, [<<"map_doc">>, Json]),
...48             % the results are a json array of function map yields like this:
...49             % [FunResults1, FunResults2 ...]
...50             % where funresults is are json arrays of key value pairs:
...51             % [[Key1, Value1], [Key2, Value2]]
...52             % Convert the key, value pairs to tuples like
...53             % [{Key1, Value1}, {Key2, Value2}]
...54             lists:map(
...55                 fun(FunRs) ->
...56                     [list_to_tuple(FunResult) || FunResult <- FunRs]
...57                 end,
...58             FunsResults)
...59         end,
...60         Docs),
...61     {ok, Results}.
...62 
...63 
...64 stop_doc_map(nil) ->
...65     ok;
...66 stop_doc_map({Lang, Pid}) ->
...67     ok = ret_os_process(Lang, Pid).
...68 
...69 group_reductions_results([]) ->
...70     [];
...71 group_reductions_results(List) ->
...72     {Heads, Tails} = lists:foldl(
...73         fun([H|T], {HAcc,TAcc}) ->
...74             {[H|HAcc], [T|TAcc]}
...75         end, {[], []}, List),
...76     case Tails of
...77     [[]|_] -> % no tails left
...78         [Heads];
...79     _ ->
...80      [Heads | group_reductions_results(Tails)]
...81     end.
...82 
...83 rereduce(_Lang, [], _ReducedValues) ->
...84     {ok, []};
...85 rereduce(Lang, RedSrcs, ReducedValues) ->
...86     Pid = get_os_process(Lang),
...87     Grouped = group_reductions_results(ReducedValues),
...88     Results = try lists:zipwith(
...89         fun
...90         (<<"_", _/binary>> = FunSrc, Values) ->
...91             {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [[[], V] || V <- Values], []),
...92             Result;
...93         (FunSrc, Values) ->
...94             [true, [Result]] =
...95                 couch_os_process:prompt(Pid, [<<"rereduce">>, [FunSrc], Values]),
...96             Result
...97         end, RedSrcs, Grouped)
...98     after
...99         ok = ret_os_process(Lang, Pid)
..100     end,
..101     {ok, Results}.
..102 
..103 reduce(_Lang, [], _KVs) ->
..104     {ok, []};
..105 reduce(Lang, RedSrcs, KVs) ->
..106     {OsRedSrcs, BuiltinReds} = lists:partition(fun
..107         (<<"_", _/binary>>) -> false;
..108         (_OsFun) -> true
..109     end, RedSrcs),
..110     {ok, OsResults} = os_reduce(Lang, OsRedSrcs, KVs),
..111     {ok, BuiltinResults} = builtin_reduce(reduce, BuiltinReds, KVs, []),
..112     recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, []).
..113 
..114 recombine_reduce_results([], [], [], Acc) ->
..115     {ok, lists:reverse(Acc)};
..116 recombine_reduce_results([<<"_", _/binary>>|RedSrcs], OsResults, [BRes|BuiltinResults], Acc) ->
..117     recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [BRes|Acc]);
..118 recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) ->
..119     recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [OsR|Acc]).
..120 
..121 os_reduce(_Lang, [], _KVs) ->
..122     {ok, []};
..123 os_reduce(Lang, OsRedSrcs, KVs) ->
..124     Pid = get_os_process(Lang),
..125     OsResults = try couch_os_process:prompt(Pid,
..126             [<<"reduce">>, OsRedSrcs, KVs]) of
..127         [true, Reductions] -> Reductions
..128     after
..129         ok = ret_os_process(Lang, Pid)
..130     end,
..131     {ok, OsResults}.
..132 
..133 builtin_reduce(_Re, [], _KVs, Acc) ->
..134     {ok, lists:reverse(Acc)};
..135 builtin_reduce(Re, [<<"_sum">>|BuiltinReds], KVs, Acc) ->
..136     Sum = builtin_sum_rows(KVs),
..137     builtin_reduce(Re, BuiltinReds, KVs, [Sum|Acc]);
..138 builtin_reduce(reduce, [<<"_count">>|BuiltinReds], KVs, Acc) ->
..139     Count = length(KVs),
..140     builtin_reduce(reduce, BuiltinReds, KVs, [Count|Acc]);
..141 builtin_reduce(rereduce, [<<"_count">>|BuiltinReds], KVs, Acc) ->
..142     Count = builtin_sum_rows(KVs),
..143     builtin_reduce(rereduce, BuiltinReds, KVs, [Count|Acc]).
..144 
..145 builtin_sum_rows(KVs) ->
..146     lists:foldl(fun
..147         ([_Key, Value], Acc) when is_number(Value) ->
..148             Acc + Value;
..149         (_Else, _Acc) ->
..150             throw({invalid_value, <<"builtin _sum function requires map values to be numbers">>})
..151     end, 0, KVs).
..152 
..153 validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) ->
..154     Pid = get_os_process(Lang),
..155     JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
..156     JsonDiskDoc =
..157     if DiskDoc == nil ->
..158         null;
..159     true ->
..160         couch_doc:to_json_obj(DiskDoc, [revs])
..161     end,
..162     try couch_os_process:prompt(Pid,
..163             [<<"validate">>, FunSrc, JsonEditDoc, JsonDiskDoc, Ctx]) of
..164     1 ->
..165         ok;
..166     {[{<<"forbidden">>, Message}]} ->
..167         throw({forbidden, Message});
..168     {[{<<"unauthorized">>, Message}]} ->
..169         throw({unauthorized, Message})
..170     after
..171         ok = ret_os_process(Lang, Pid)
..172     end.
..173 append_docid(DocId, JsonReqIn) ->
..174     [{<<"docId">>, DocId} | JsonReqIn].
..175 
..176 render_doc_show(Lang, ShowSrc, DocId, Doc, Req, Db) ->
..177     Pid = get_os_process(Lang),
..178     {JsonReqIn} = couch_httpd_external:json_req_obj(Req, Db),
..179 
..180     {JsonReq, JsonDoc} = case {DocId, Doc} of
..181         {nil, nil} -> {{JsonReqIn}, null};
..182         {DocId, nil} -> {{append_docid(DocId, JsonReqIn)}, null};
..183         _ -> {{append_docid(DocId, JsonReqIn)}, couch_doc:to_json_obj(Doc, [revs])}
..184     end,
..185     try couch_os_process:prompt(Pid,
..186         [<<"show">>, ShowSrc, JsonDoc, JsonReq]) of
..187     FormResp ->
..188         FormResp
..189     after
..190         ok = ret_os_process(Lang, Pid)
..191     end.
..192 
..193 start_view_list(Lang, ListSrc) ->
..194     Pid = get_os_process(Lang),
..195     true = couch_os_process:prompt(Pid, [<<"add_fun">>, ListSrc]),
..196     {ok, {Lang, Pid}}.
..197 
..198 render_list_head({_Lang, Pid}, Req, Db, Head) ->
..199     JsonReq = couch_httpd_external:json_req_obj(Req, Db),
..200     couch_os_process:prompt(Pid, [<<"list">>, Head, JsonReq]).
..201 
..202 render_list_row({_Lang, Pid}, Db, {{Key, DocId}, Value}, IncludeDoc) ->
..203     JsonRow = couch_httpd_view:view_row_obj(Db, {{Key, DocId}, Value}, IncludeDoc),
..204     couch_os_process:prompt(Pid, [<<"list_row">>, JsonRow]);
..205 
..206 render_list_row({_Lang, Pid}, _, {Key, Value}, _IncludeDoc) ->
..207     JsonRow = {[{key, Key}, {value, Value}]},
..208     couch_os_process:prompt(Pid, [<<"list_row">>, JsonRow]).
..209 
..210 render_list_tail({Lang, Pid}) ->
..211     JsonResp = couch_os_process:prompt(Pid, [<<"list_end">>]),
..212     ok = ret_os_process(Lang, Pid),
..213     JsonResp.
..214 
..215 start_filter(Lang, FilterSrc) ->
..216     Pid = get_os_process(Lang),
..217     true = couch_os_process:prompt(Pid, [<<"add_fun">>, FilterSrc]),
..218     {ok, {Lang, Pid}}.
..219 
..220 filter_doc({_Lang, Pid}, Doc, Req, Db) ->
..221     JsonReq = couch_httpd_external:json_req_obj(Req, Db),
..222     JsonDoc = couch_doc:to_json_obj(Doc, [revs]),
..223     JsonCtx = couch_util:json_user_ctx(Db),
..224     [true, [Pass]] = couch_os_process:prompt(Pid,
..225         [<<"filter">>, [JsonDoc], JsonReq, JsonCtx]),
..226     {ok, Pass}.
..227 
..228 end_filter({Lang, Pid}) ->
..229     ok = ret_os_process(Lang, Pid).
..230     
..231 
..232 init([]) ->
..233 
..234     % read config and register for configuration changes
..235 
..236     % just stop if one of the config settings change. couch_server_sup
..237     % will restart us and then we will pick up the new settings.
..238 
..239     ok = couch_config:register(
..240         fun("query_servers" ++ _, _) ->
..241             ?MODULE:stop()
..242         end),
..243 
..244     Langs = ets:new(couch_query_server_langs, [set, private]),
..245     PidLangs = ets:new(couch_query_server_pid_langs, [set, private]),
..246     Pids = ets:new(couch_query_server_procs, [set, private]),
..247     InUse = ets:new(couch_query_server_used, [set, private]),
..248     lists:foreach(fun({Lang, Command}) ->
..249         true = ets:insert(Langs, {?l2b(Lang), Command})
..250     end, couch_config:get("query_servers")),
..251     process_flag(trap_exit, true),
..252     {ok, {Langs, PidLangs, Pids, InUse}}.
..253 
..254 terminate(_Reason, _Server) ->
..255     ok.
..256 
..257 
..258 handle_call({get_proc, Lang}, _From, {Langs, PidLangs, Pids, InUse}=Server) ->
..259     % Note to future self. Add max process limit.
..260     case ets:lookup(Pids, Lang) of
..261     [{Lang, [Pid|_]}] ->
..262         add_value(PidLangs, Pid, Lang),
..263         rem_from_list(Pids, Lang, Pid),
..264         add_to_list(InUse, Lang, Pid),
..265         {reply, {recycled, Pid, get_query_server_config()}, Server};
..266     _ ->
..267         case (catch new_process(Langs, Lang)) of
..268         {ok, Pid} ->
..269             add_to_list(InUse, Lang, Pid),
..270             {reply, {new, Pid}, Server};
..271         Error ->
..272             {reply, Error, Server}
..273         end
..274     end;
..275 handle_call({ret_proc, Lang, Pid}, _From, {_, _, Pids, InUse}=Server) ->
..276     % Along with max process limit, here we should check
..277     % if we're over the limit and discard when we are.
..278     add_to_list(Pids, Lang, Pid),
..279     rem_from_list(InUse, Lang, Pid),
..280     {reply, true, Server}.
..281 
..282 handle_cast(_Whatever, Server) ->
..283     {noreply, Server}.
..284 
..285 handle_info({'EXIT', Pid, Status}, {_, PidLangs, Pids, InUse}=Server) ->
..286     case ets:lookup(PidLangs, Pid) of
..287     [{Pid, Lang}] ->
..288         case Status of
..289         normal -> ok;
..290         _ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status])
..291         end,
..292         rem_value(PidLangs, Pid),
..293         catch rem_from_list(Pids, Lang, Pid),
..294         catch rem_from_list(InUse, Lang, Pid),
..295         {noreply, Server};
..296     [] ->
..297         ?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]),
..298         {stop, Status, Server}
..299     end.
..300 
..301 code_change(_OldVsn, State, _Extra) ->
..302     {ok, State}.
..303 
..304 % Private API
..305 
..306 get_query_server_config() ->
..307     ReduceLimit = list_to_atom(
..308         couch_config:get("query_server_config","reduce_limit","true")),
..309     {[{<<"reduce_limit">>, ReduceLimit}]}.
..310 
..311 new_process(Langs, Lang) ->
..312     case ets:lookup(Langs, Lang) of
..313     [{Lang, Command}] ->
..314         couch_os_process:start_link(Command);
..315     _ ->
..316         {unknown_query_language, Lang}
..317     end.
..318 
..319 get_os_process(Lang) ->
..320     case gen_server:call(couch_query_servers, {get_proc, Lang}) of
..321     {new, Pid} ->
..322         couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get(
..323                 "couchdb", "os_process_timeout", "5000"))),
..324         link(Pid),
..325         Pid;
..326     {recycled, Pid, QueryConfig} ->
..327         case (catch couch_os_process:prompt(Pid, [<<"reset">>, QueryConfig])) of
..328         true ->
..329             couch_os_process:set_timeout(Pid, list_to_integer(couch_config:get(
..330                 "couchdb", "os_process_timeout", "5000"))),
..331             link(Pid),
..332             Pid;
..333         _ ->
..334             catch couch_os_process:stop(Pid),
..335             get_os_process(Lang)
..336         end;
..337     Error ->
..338         throw(Error)
..339     end.
..340 
..341 ret_os_process(Lang, Pid) ->
..342     true = gen_server:call(couch_query_servers, {ret_proc, Lang, Pid}),
..343     catch unlink(Pid),
..344     ok.
..345 
..346 add_value(Tid, Key, Value) ->
..347     true = ets:insert(Tid, {Key, Value}).
..348 
..349 rem_value(Tid, Key) ->
..350     true = ets:delete(Tid, Key).
..351 
..352 add_to_list(Tid, Key, Value) ->
..353     case ets:lookup(Tid, Key) of
..354     [{Key, Vals}] ->
..355         true = ets:insert(Tid, {Key, [Value|Vals]});
..356     [] ->
..357         true = ets:insert(Tid, {Key, [Value]})
..358     end.
..359 
..360 rem_from_list(Tid, Key, Value) ->
..361     case ets:lookup(Tid, Key) of
..362     [{Key, Vals}] ->
..363         ets:insert(Tid, {Key, [Val || Val <- Vals, Val /= Value]});
..364     [] -> ok
..365     end.

Generated using etap 0.3.4.