First, all batches are represented by an ActorRunnable object, that on execution provides a tuple of a KillSwitch
and
Future[Done] which completes when the execution completes. The KillSwitch allows killing the execution if indicated. The
criteria whether an execution is to be stopped is represented by an ExecutionExpectation
instance, which can contain
multiple
failedWhenMetExpectations
, that is e.g if a certain rate or number of failed data sample processings or a timeout
is exceeded.
The RunnableGraph
is provided by an ActorRunnable
instance, which is the representation of each batch execution.
An ActorRunnable has the following signature:
case class ActorRunnable[U, V, V1, Y <: WithCount](jobId: String,
batchNr: Int,
supplier: IndexedGenerator[U],
transformer: Flow[U, ProcessingMessage[V], NotUsed],
processingActorProps: Option[Props],
aggregatorConfig: AggregatorConfig[V1, Y],
expectationGenerator: Int => ExecutionExpectation,
sinkType: job.ActorRunnableSinkType.Value,
waitTimePerElement: FiniteDuration,
maxExecutionDuration: FiniteDuration,
sendResultsBack: Boolean) extends KolibriSerializable with WithBatchNr
The arguments are as follows:
Name:Type | What for? |
---|---|
jobId: String | job identifier of which the ActorRunnable presents a batch |
batchNr: Int | number of the batch of the job identified by jobId |
supplier: IndexedGenerator[U] | generator providing the single elements to process, each of which is of type U (covariant) |
transformer: Flow[U, ProcessingMessage[V], NotUsed] | a processing flow of initial element of type U (covariant) to ProcessingMessage[V] holding the data of type V (covariant). ProcessingMessage is essentially a data container allowing specifying a weight for the data sample and typed application of tags (e.g for grouping and group-wise aggregations) |
processingActorProps: Option[Props] | if specified sends the output of the transformer to an actor created from the provided Props via ask, expecting back element of type V1 (with specified ask timeout and counting processing of single data point as failed if the type doesnt match V1). If not specified the ProcessingMessage[V] as input from transformer is not modified, thus type V = V1 |
aggregatorConfig: AggregatorConfig[V1, Y] | The AggregatorConfig provides an AggregationSupplier and distinct FilteringMapper instances, specifying which single elements (type V1) are aggregated and how theyre modified (if so) before aggregating, another one to decide this on the partial aggregations (type Y) and another one providing this as modifier / decider how / if to send a particular partial aggregation to the JobManager (e.g for an overall job aggregation. Note that such an aggregation doesnt need to be composed like this, usually more effective will be writing directly from the nodes where partial results are composed and later execute another aggregation on the partial results instead of sending all partial results to JobManager after being serialized) |
expectationGenerator: Int => ExecutionExpectation | Given the number of single data points in the batch, provide an ExecutionExpectation that determines success/fail crtieria for the respective batch |
sinkType: job.ActorRunnableSinkType.Value | Either ‘IGNORE_SINK’ or ‘REPORT_TO_ACTOR_SINK’. If the former, results are ignored. If the latter, the ActorRef is picked from JobActorConfig by key ActorType.ACTOR_SINK. Usually this should refer to an AggregatingActor. This is the actor the results will be sent to. This happens either per element or in grouping fashion (if useResultElementGrouping=true ) |
waitTimePerElement: FiniteDuration | Determines the timeout, which only applies if processingActorProps is not empty and only refers to the processing time of the ASK to that specific actor |
maxExecutionDuration: FiniteDuration | Not used in the runnable itself, but utilized in creation of RunnableExecutionActor that effectively runs the RunnableGraph that is provided by the ActorRunnable |
sendResultsBack: Boolean | Not used in the runnable itself, but utilized in the creation of the AggregatingActor created by the RunnableExecutionActor. If set to false, no results will be provided back to the RunnableExecutionActor and via this actor to the jobSender (JobManagerActor) |
Logic in the ActorRunnable:
The RunnableGraph is executed within a RunnableExecutionActor
, which starts the execution and creates the
AggregatingActor
to handle the single results.
The job definition is received by the SupervisorActor, which creates an JobManagerActor per job and sends the job definition to it. Note that both SupervisorActor and JobManagerActor run on the same node, which is the node marked as httpserver. The JobManagerActor takes care of batches distribution, sending new batches to process when previous ones finish. The distribution is done via a Distributor and batches are sent via router across all nodes that have the role compute (note that each node can have multiple roles, yet there should only be one of with role httpserver).
The basic schema is as follows:
In Progress: More coming shortly