Behaviours: riak_pipe_vnode_worker.
Proof of concept for recursive input (fitting sending output
to itself). When this fitting receives an input, it passes
that input to its output, and also passes Input-1
to itself
as input until the input is 0
. Thus, sending 3
as the
input to this fitting, would result in the outputs 3
, 2
,
1
, and 0
. That is:
Spec = [#fitting_spec{name=counter, module=riak_pipe_w_rec_countdown}], {ok, Pipe} = riak_pipe:exec(Spec, []), riak_pipe:queue_work(Pipe, 3), riak_pipe:eoi(Pipe), {eoi, Results, []} = riak_pipe:collect_results(Pipe). [{counter,0},{counter,1},{counter,2},{counter,3}] = Results.
This fitting should work with any consistent-hash function. It requires no archiving for handoff.
If the argument is the atom testeoi
, then the final
recursive input (0
) will be sent three times, with no delay
before the second case and a 1-second delay before the third.
These two sends should test the behavior of vnode enqueueing
while attempting to force a worker to done
. If all eoi
handling is done properly, then 0
should appear three times
in the result list. The testeoi
case should go like this:
Spec = [#fitting_spec{name=counter, module=riak_pipe_w_rec_countdown, arg=testeoi}], Options = [{trace,[restart]},{log,sink}], {ok, Pipe} = riak_pipe:exec(Spec, Options), riak_pipe:queue_work(Pipe, 3), riak_pipe:eoi(Pipe), {eoi, Results, Trace} = riak_pipe:collect_results(Pipe). [{counter,0},{counter,0},{counter,0}, {counter,1},{counter,2},{counter,3}] = Results. [{counter,{trace,[restart],{vnode,{restart,_}}}}] = Trace.
If Results
contains less than three instances of
{counter,0}
, then the test failed. If Trace
is empty, the
done/eoi race was not triggered, and the test should be
re-run.
abstract datatype: state()
done/1 | Unused. |
init/2 | Initialization just stows the partition and fitting details in
the module's state, for sending outputs in process/3 . |
process/3 | Process sends Input directly to the next fitting, and also
Input-1 back to this fitting as new input. |
done(State::state()) -> ok
Unused.
init(Partition::riak_pipe_vnode:partition(), FittingDetails::riak_pipe_fitting:details()) -> {ok, state()}
Initialization just stows the partition and fitting details in
the module's state, for sending outputs in process/3
.
Process sends Input
directly to the next fitting, and also
Input-1
back to this fitting as new input.
Generated by EDoc, Oct 20 2012, 17:51:48.