Basics

Basics

First, all batches are represented by an ActorRunnable object, that on execution provides a tuple of a KillSwitch and Future[Done] which completes when the execution completes. The KillSwitch allows killing the execution if indicated. The criteria whether an execution is to be stopped is represented by an ExecutionExpectation instance, which can contain multiple failedWhenMetExpectations, that is e.g if a certain rate or number of failed data sample processings or a timeout is exceeded.

Alt text

The RunnableGraph is provided by an ActorRunnable instance, which is the representation of each batch execution. An ActorRunnable has the following signature:

case class ActorRunnable[U, V, V1, Y <: WithCount](jobId: String,
                                                   batchNr: Int,
                                                   supplier: IndexedGenerator[U],
                                                   transformer: Flow[U, ProcessingMessage[V], NotUsed],
                                                   processingActorProps: Option[Props],
                                                   aggregatorConfig: AggregatorConfig[V1, Y],
                                                   expectationGenerator: Int => ExecutionExpectation,
                                                   sinkType: job.ActorRunnableSinkType.Value,
                                                   waitTimePerElement: FiniteDuration,
                                                   maxExecutionDuration: FiniteDuration,
                                                   sendResultsBack: Boolean) extends KolibriSerializable with WithBatchNr 

The arguments are as follows:

Name:TypeWhat for?
jobId: Stringjob identifier of which the ActorRunnable presents a batch
batchNr: Intnumber of the batch of the job identified by jobId
supplier: IndexedGenerator[U]generator providing the single elements to process, each of which is of type U (covariant)
transformer: Flow[U, ProcessingMessage[V], NotUsed]a processing flow of initial element of type U (covariant) to ProcessingMessage[V] holding the data of type V (covariant). ProcessingMessage is essentially a data container allowing specifying a weight for the data sample and typed application of tags (e.g for grouping and group-wise aggregations)
processingActorProps: Option[Props]if specified sends the output of the transformer to an actor created from the provided Props via ask, expecting back element of type V1 (with specified ask timeout and counting processing of single data point as failed if the type doesnt match V1). If not specified the ProcessingMessage[V] as input from transformer is not modified, thus type V = V1
aggregatorConfig: AggregatorConfig[V1, Y]The AggregatorConfig provides an AggregationSupplier and distinct FilteringMapper instances, specifying which single elements (type V1) are aggregated and how theyre modified (if so) before aggregating, another one to decide this on the partial aggregations (type Y) and another one providing this as modifier / decider how / if to send a particular partial aggregation to the JobManager (e.g for an overall job aggregation. Note that such an aggregation doesnt need to be composed like this, usually more effective will be writing directly from the nodes where partial results are composed and later execute another aggregation on the partial results instead of sending all partial results to JobManager after being serialized)
expectationGenerator: Int => ExecutionExpectationGiven the number of single data points in the batch, provide an ExecutionExpectation that determines success/fail crtieria for the respective batch
sinkType: job.ActorRunnableSinkType.ValueEither ‘IGNORE_SINK’ or ‘REPORT_TO_ACTOR_SINK’. If the former, results are ignored. If the latter, the ActorRef is picked from JobActorConfig by key ActorType.ACTOR_SINK. Usually this should refer to an AggregatingActor. This is the actor the results will be sent to. This happens either per element or in grouping fashion (if useResultElementGrouping=true)
waitTimePerElement: FiniteDurationDetermines the timeout, which only applies if processingActorProps is not empty and only refers to the processing time of the ASK to that specific actor
maxExecutionDuration: FiniteDurationNot used in the runnable itself, but utilized in creation of RunnableExecutionActor that effectively runs the RunnableGraph that is provided by the ActorRunnable
sendResultsBack: BooleanNot used in the runnable itself, but utilized in the creation of the AggregatingActor created by the RunnableExecutionActor. If set to false, no results will be provided back to the RunnableExecutionActor and via this actor to the jobSender (JobManagerActor)

Logic in the ActorRunnable: Alt text

The RunnableGraph is executed within a RunnableExecutionActor, which starts the execution and creates the AggregatingActor to handle the single results.

The job definition is received by the SupervisorActor, which creates an JobManagerActor per job and sends the job definition to it. Note that both SupervisorActor and JobManagerActor run on the same node, which is the node marked as httpserver. The JobManagerActor takes care of batches distribution, sending new batches to process when previous ones finish. The distribution is done via a Distributor and batches are sent via router across all nodes that have the role compute (note that each node can have multiple roles, yet there should only be one of with role httpserver).

The basic schema is as follows:

Alt text

In Progress: More coming shortly