jobs - a Job scheduler for load regulation

Copyright (c) 2014 Ulf Wiger

Version: 0.1

JOBS

Jobs is a job scheduler for load regulation of Erlang applications. It provides a queueing framework where each queue can be configured for throughput rate, credit pool and feedback compensation. Queues can be added and modified at runtime, and customizable "samplers" propagate load status across all nodes in the system.

Specifically, jobs provides three features:

Examples

The following examples are fetched from the EUC 2013 presentation on Jobs.

Regulate incoming HTTP requests (e.g. JSON-RPC)


%% @doc Handle a JSON-RPC request.
handler_session(Arg) ->
    jobs:run(
        rpc_from_web,
        fun() ->
               try
                  yaws_rpc:handler_session(
                    maybe_multipart(Arg),{?MODULE, web_rpc})
               catch
                   error:E ->
                       ...
               end
    end).

From Riak prototype, using explicit ask/done


case jobs:ask(riak_kv_fsm) of
  {ok, JobId} ->
    try
      {ok, Pid} = riak_kv_get_fsm_sup:start_get_fsm(...),
      Timeout = recv_timeout(Options),
      wait_for_reqid(ReqId, Timeout)
    after
      jobs:done(JobId)  %% Only needed if process stays alive
    end;
  {error, rejected} ->  %% Overload!
    {error, timeout}
end

Shell demo - simple rate-limited queue


2> jobs:add_queue(q, [{standard_rate,1}]).
ok
3> jobs:run(q, fun() -> io:fwrite("job: ~p~n", [time()]) end).
job: {14,37,7}
ok
4> jobs:run(q, fun() -> io:fwrite("job: ~p~n", [time()]) end).
job: {14,37,8}
ok
...
5> jobs:run(q, fun() -> io:fwrite("job: ~p~n", [time()]) end).
job: {14,37,10}
ok
6> jobs:run(q, fun() -> io:fwrite("job: ~p~n", [time()]) end).
job: {14,37,11}
ok

Shell demo - "stateful" queues


Eshell V5.9.2 (abort with ^G)
1> application:start(jobs).
ok
2> jobs:add_queue(q,
     [{standard_rate,1},
      {stateful,fun(init,_) -> {0,5};
                   ({call,{size,Sz},_,_},_) -> {reply,ok,{0,Sz}};
                   ({N,Sz},_) -> {N, {(N+1) rem Sz,Sz}}
                end}]).
ok
3> jobs:run(q,fun(Opaque) -> jobs:job_info(Opaque) end).
0
4> jobs:run(q,fun(Opaque) -> jobs:job_info(Opaque) end).
1
5> jobs:run(q,fun(Opaque) -> jobs:job_info(Opaque) end).
2
6> jobs:run(q,fun(Opaque) -> jobs:job_info(Opaque) end).
3
7> jobs:run(q,fun(Opaque) -> jobs:job_info(Opaque) end).
4
8> jobs:run(q,fun(Opaque) -> jobs:job_info(Opaque) end).
0
9> jobs:run(q,fun(Opaque) -> jobs:job_info(Opaque) end).
1
%% Resize the 'pool'
10> jobs:ask_queue(q, {size,3}).
ok
11> jobs:run(q,fun(Opaque) -> jobs:job_info(Opaque) end).
0
12> jobs:run(q,fun(Opaque) -> jobs:job_info(Opaque) end).
1
13> jobs:run(q,fun(Opaque) -> jobs:job_info(Opaque) end).
2
14> jobs:run(q,fun(Opaque) -> jobs:job_info(Opaque) end).
0
...

Demo - producers


Eshell V5.9.2 (abort with ^G)
1> application:start(jobs).
ok
2> jobs:add_queue(p,
  [{producer, fun() -> io:fwrite("job: ~p~n",[time()]) end},
   {standard_rate,1}]).
job: {14,33,51}
ok
3> job: {14,33,52}
job: {14,33,53}
job: {14,33,54}
job: {14,33,55}
...

Demo - passive queues


2> jobs:add_queue(q,[passive]).
ok
3> Fun = fun() -> io:fwrite("~p starting...~n",[self()]),
3>                Res = jobs:dequeue(q, 3),
3>                io:fwrite("Res = ~p~n", [Res])
3>       end.
#Fun<erl_eval.20.82930912>
4> jobs:add_queue(p, [{standard_counter,3},{producer,Fun}]).
<0.47.0> starting...
<0.48.0> starting...
<0.49.0> starting...
ok
5> jobs:enqueue(q, job1).
Res = [{113214444910647,job1}]
ok
<0.54.0> starting...

Demo - queue status


(a@uwair)1> jobs:queue_info(q).
{queue,[{name,q},
        {mod,jobs_queue},
        {type,fifo},
        {group,undefined},
        {regulators,[{rr,[{name,{rate,q,1}},
                          {rate,{rate,[{limit,1},
                          {preset_limit,1},
                          {interval,1.0e3},
                          {modifiers,
                           [{cpu,10},{memory,10}]},
                          {active_modifiers,[]}
                         ]}}]}]},
        {max_time,undefined},
        {max_size,undefined},
        {latest_dispatch,113216378663298},
        {approved,4},
        {queued,0},
        ...,
        {stateful,undefined},
        {st,{st,45079}}]}

##Scenarios and Corresponding Configuration Examples

####EXAMPLE 1:

Configuration:


{ jobs, [
    { queues, [
        { heavy_crunches, [ { regulators, [{ counter, [{ limit, 7 }] } ] }] }
      ]
    }
  ]
}

Anywhere in your code wrap cpu-intensive work in a call to jobs server and-- voilĂ ! --it is counter-regulated:


jobs:run( heavy_crunches,fun()->my_cpu_intensive_calculation() end)

####EXAMPLE 2:

Configuration:


{ jobs, [
      { queues, [
            { http_requests, [ { max_size, 10000},  {regulators, [{ rate, [{limit, 1000}]}]}]}
        ]
      }
  ]
}

Wrap your request entry point in a call to jobs server and it will end up being rate-regulated.


jobs:run(http_requests,fun()->handle_http_request() end)

NOTE: with the config above, once 10,000 requests accumulates in the queue any incoming requests are dropped on the floor.

####EXAMPLE 3:


{ patient_user_requests, [
    { max_time, 10000},
    { regulators, [{rate, [ { limit, 1000 } ] }
  ]
}

{ impatient_user_requests, [
    { max_time, 200},
    { type, lifo},
    { regulators, [{rate, [ { limit, 1000 } ] }
  ]
}

NOTE: In order to pace requests from both queues at 1000 per second, use group_rate regulation (EXAMPLE 4)

####EXAMPLE 4:

Create group_rates regulator called http_request_rate and assign it to both impatient_user_requests and patient_user_requests


{ jobs, [
    { group_rates,[{ http_request_rate, [{limit,1000}] }] },
    { queues, [
        { impatient_user_requests,
            [ {max_time, 200},
              {type, lifo},
              {regulators,[{ group_rate, http_request_rate}]}
            ]
        },
        { patient_user_requests,
            [ {max_time, 10000},
              {regulators,[{ group_rate, http_request_rate}
            ]
        }
      ]
    }
  ]
}

####EXAMPLE 5:


 -module(my_persistent_queue).
 -behaviour(jobs_queue).
 -export([  new/2,
            delete/1,
            in/3,
            out/2,
            peek/1,
            info/2,
            all/1]).

 ## implementation
 ...

Configuration:


{ jobs, [
    { queues, [
        { http_requests, [
            { mod, my_persistent_queue},
            { max_size, 10000 },
            { regulators, [ { rate, [ { limit, 1000 } ] } ] }
          ]
        }
      ]
    }
  ]
}

###The use of sampler framework

  1. Get a sampler running and sending feedback to the jobs server.
  2. Apply its feedback to a regulator limit.

####EXAMPLE 6:


{ jobs, [
    { samplers, [{ cpu_feedback, jobs_sampler_cpu, [] } ] },
    { queues, [
        { http_requests, [
            { regulators,   [ { rate, [ { limit,1000 } ]  },
            { modifiers,    [ { cpu_feedback,  10} ] } %% 10 = % increment by which to modify the limit
          ]
        }
      ]
    }
  ]
}

Prerequisites

This application requires 'exprecs'. The 'exprecs' module is part of http://github.com/uwiger/parse_trans

Contribute

For issues, comments or feedback please create an issue!

Modules

jobs
jobs_app
jobs_info
jobs_lib
jobs_prod_simple
jobs_queue
jobs_queue_list
jobs_sampler
jobs_sampler_cpu
jobs_sampler_history
jobs_sampler_mnesia
jobs_server
jobs_stateful_simple