Values

Values

In the following let’s look into which structures Kolibri provides to simplify definitions of values.

OrderedValues

trait OrderedValues[+T] extends KolibriSerializable {
  val name: String
  val totalValueCount: Int
  def getNthZeroBased(n: Int): Option[T]
  def getNFromPositionZeroBased(position: Int, n: Int): Seq[T]
  def getAll: Seq[T]
}

Two distinct implementations:

  • Using explicitly passed values:
    case class DistinctValues[+T](name: String, values: Seq[T]) extends OrderedValues[T]
    
  • Range with defined start, end and stepSize
    case class RangeValues[T:Fractional](name: String, start:T, end:T, stepSize:T)(implicit v: Numeric[T])
      extends OrderedValues[T]
    

OrderedMultiValues

Container for multiple OrderedValues with methods to edit (remove, add) values and methods to retrieve the n-th element out of the permutation over all contained OrderedValues.

trait OrderedMultiValues extends KolibriSerializable {

  def values: Seq[OrderedValues[Any]]
  def removeValue(valueName: String): (OrderedMultiValues, Boolean)
  def originalValueIndexOf(n: Int): Int = n
  def addValue(values: OrderedValues[Any], prepend: Boolean): OrderedMultiValues
  def addValues(values: Seq[OrderedValues[Any]], prepend: Boolean): OrderedMultiValues
  def addValues(values: OrderedMultiValues, prepend: Boolean): OrderedMultiValues
  def stepsForNthElementStartingFromFirstParam(n: Int): List[(Int, Int)]
  def getParameterNameSequence: Seq[String]
  def numberOfCombinations: Int
  def findNthElement(n: Int): Option[Seq[Any]]
  def findNNextElementsFromPosition(startElement: Int, nrOfElements: Int): Seq[Seq[Any]]
}
/**
 * Implementation of OrderedMultiValues assuming a value grid, providing the methods to find n-th permutations
 * @param values - Seq of OrderedValues of any type
 */
case class GridOrderedMultiValues(values: Seq[OrderedValues[Any]]) extends OrderedMultiValues

Batching

/**
 * Trait adding batchSize, batchNr and batchStartElement (shift of indices relative to the original OrderedMultiValues,
 * specifying at which element of the original the batch starts) to OrderedMultiValues  
 */
trait OrderedMultiValuesBatch extends OrderedMultiValues  {
  val batchSize: Int
  val batchNr: Int
  val batchStartElement: Int

  override def originalValueIndexOf(n: Int): Int = {
    batchStartElement + n
  }
}
/**
 * Batch representing at most batchSize elements of its input values. Starts at position batchSize * (batchNr -1) + 1
 * of the original values.
 *
 * @param multiValues The original values this batch represents a part of
 * @param batchSize   The maximum size of this batch (might contain less for last batch of values)
 * @param batchNr     The number of this batch. Used to determine which part of the values this batch represents. 1-based
 */
case class GridOrderedMultiValuesBatch(multiValues: OrderedMultiValues, batchSize: Int, batchNr: Int) 
  extends OrderedMultiValuesBatch

Implicit Conversions

Further conversions are defined within OrderedMultiValuesImplicits class via usage of an implicit class. On import this will make the functions directly available on any instance of type OrderedMultiValues.

Those are (for complete list see the mentioned class):

/**
* from OrderedMultiValues generate iterator over sequence of named values (_._1 is name of parameter, _._2 is value)
*/
def toNamedParamValuesIterator: Iterator[Seq[(String, Any)]] = new Iterator[Seq[(String, Any)]]

/**
  * from OrderedMultiValues generate iterator over Map of key , values pairs, where key is the name of
  * the parameter in OrderedMultiValues, values is Seq over all values (e.g Seq in case any parameter name
  * is contained multiple times, as might be the case e.g for non-unique url parameters)
  *
  * @return
  */
def toParamNameValuesMapIterator: Iterator[Map[String, Seq[Any]]] = new Iterator[Map[String, Seq[Any]]]

/**
  * from OrderedMultiValues generate iterator over sequence of values (the corresponding parameter name
  * for a value would be given by the element with the same index as the respective value in the
  * multiValues.getParameterNameSequence sequence of parameter names
  *
  * @return
  */
def toParamValuesIterator: Iterator[Seq[Any]] = new Iterator[Seq[Any]]

/**
  * transform OrderedMultiValues instance to IndexedGenerator of value sequence (without parameter names)
  *
  * @return
  */
def toParamValuesIndexedGenerator: IndexedGenerator[Seq[Any]] = new IndexedGenerator[Seq[Any]]

/**
  * transform OrderedMultiValues instance to IndexedGenerator mappings of parameter name
  * to values for that parameter
  *
  * @return
  */
def toParamNameValuesMapIndexedGenerator: IndexedGenerator[Map[String, Seq[Any]]]

/**
  * transform OrderedMultiValues instance to IndexedGenerator Seq of (parameterName, parameterValue) pairs
  *
  * @return
  */
def toNamedParamValuesIndexedGenerator: IndexedGenerator[Seq[(String, Any)]]

Data Samples and Aggregations

DataPoint

trait DataPoint[+T] extends KolibriSerializable {
  def weight: Double
  def data: T
}

case class DataSample[+T](weight: Double, data: T) extends DataPoint[T]

AggregateValues

The AggregateValue trait provides a general interface for values that are generated using multiple single values. Keeps record of the number of samples used to calculate the value, the weight of the value, functions to add other values and a function to retrieve the weighted version of the AggregateValue.

trait AggregateValue[A] extends KolibriSerializable {
  def numSamples: Int
  def weight: Double
  def value: A
  def weighted(weight: Double): AggregateValue[A]
  def add(other: AggregateValue[A]): AggregateValue[A]
  def add(otherValue: DataPoint[A]): AggregateValue[A]
}

Implementation of AggregateValue providing methods to combine separate values.

case class RunningValue[A](weight: Double,
                           numSamples: Int,
                           value: A,
                           weightFunction: (Double, Double) => Double,
                           addFunc: (AggregateValue[A], AggregateValue[A]) => A) extends AggregateValue[A]

Running value of two distinct types, e.g can be used to record occurring errors and successful computation values in a single record, e.g in case your computation returns Either[SomeFailType, SomeComputationValue] or similar settings where two values are in some way connected. AggregateValue keeps the count of samples aggregated and the current value of the aggregation

case class BiRunningValue[A, B](value1: AggregateValue[A], value2: AggregateValue[B])

Metric Representations

A MetricValue provides a simple container keeping state with BiRunningValue that keeps track of occurring error types and the respective counts and some aggregated value type to keep track of the successful computations aggregated in the MetricValue

case class MetricValue[A](name: String, biValue: BiRunningValue[Map[ComputeFailReason, Int], A])

Helper functions are provided in the MetricValue object to create frequently used values:

def createAvgFailSample(metricName: String, failMap: Map[ComputeFailReason, Int]): MetricValue[Double]
def createAvgSuccessSample(metricName: String, value: Double, weight: Double): MetricValue[Double]
def createEmptyAveragingMetricValue(name: String): MetricValue[Double]

Metrics are represented by MetricRecord instances.

trait MetricRecord[A, B] {
  def getMetricsValue(key: A): Option[MetricValue[B]]
  def addMetricDontChangeCountStore(metric: MetricValue[B]): MetricRecord[A, B]
  def addFullMetricsSampleAndIncreaseSampleCount(metrics: MetricValue[B]*): MetricRecord[A, B]
  def addRecordAndIncreaseSampleCount(record: MetricRecord[A, B]): MetricRecord[A, B]
  def metricNames: Seq[A]
  def metricValues: Seq[MetricValue[B]]
  def containsMetric(key: A): Boolean
}

A metric row is identified by set of parameters and metric values that hold for the parameters.

case class MetricRow(countStore: ResultCountStore, params: Map[String, Seq[String]], 
  metrics: Map[String, MetricValue[Double]]) extends MetricRecord[String, Double] 

The MetricRow companion object provides multiple helper methods for easier composition.

MetricDocument represents a map of parameter set to MetricRow. Implementation uses a mutable map. A single document will only be modified within a single actor, thus single thread at a time.

case class MetricDocument[A <: AnyRef](id: A, rows: mutable.Map[ParamMap, MetricRow])

Usually Tag type would be used as type A, to group the results based on some grouping criteria. It comes with methods such as to add other results or generate a weighted copy.

Metric Aggregation

/**
  * MetricAggregation that keeps track of full MetricDocuments for keys of defined type.
  * Each key stands for a separate aggregation, which can be used for selectively aggregating subsets of results
  *
  * @param aggregationStateMap - map with key = key of defined type A, value = MetricDocument, which maps a ParamMap to
  *                            a MetricRow, which carries all relevant parameters and corresponding metrics
  * @param keyMapFunction      - optional function to map result keys to before adding to aggregation. E.g can be used 
  *                                in case all incoming results shall only be aggregated under a single "ALL" 
  *                                aggregation instead of keeping track of distinct results per key
  * @tparam A - type of the keys that describe the aggregation groups
  */
case class MetricAggregation[A <: AnyRef](aggregationStateMap: mutable.Map[A, MetricDocument[A]] = 
  mutable.Map.empty[A, MetricDocument[A]], keyMapFunction: SerializableFunction1[A, A] = identity) extends WithCount

Aggregators

The aggregation of partial results is provided by an instance of type Aggregator

abstract class Aggregator[-U: TypeTag, V: TypeTag] extends KolibriSerializable {
    def add(sample: U): Unit
    def aggregation: V
    def addAggregate(aggregatedValue: V): Unit
}

See Aggregators class for Aggregators object containing distinct aggregator types. Some examples are given below:

/**
   * Aggregator taking start value generator, aggregation and merge functions to define the aggregation behavior
   * @param aggFunc - Given sample of type U and aggregation of type V, generate new value of type V
   * @param startValueGen - supplier of start value of aggregation, type V
   * @param mergeFunc - merge function of aggregations, taking two values of type V and providing new value of type V
   * @tparam U - type of single data points
   * @tparam V - type of aggregation
   */
  class BaseAggregator[U: TypeTag, V: TypeTag](aggFunc: SerializableFunction2[U, V, V], 
                                               startValueGen: SerializableSupplier[V], 
                                               mergeFunc: SerializableFunction2[V, V, V]) extends Aggregator[U, V]
/**
   *
   * @param aggFunc - aggregation function, taking single data point of type TT, aggregated value of type V, 
      providing new aggregation of type V
   * @param startValueForKey - function giving an initial aggregation value, given a Tag
   * @param mergeFunc - merge function of aggregation values
   * @param keyMapFunction - function mapping value of type Tag to value of type Tag (in case the Tag shall not be 
      mapped, just use identity)
   * @tparam TT - type of single data point, needs to extend TaggedWithType
   * @tparam V - type of aggregation
   */
  class BasePerClassAggregator[TT <: TaggedWithType : TypeTag, V: TypeTag](aggFunc: SerializableFunction2[TT, V, V],
                                                                           startValueForKey: SerializableFunction1[Tag, V],
                                                                           mergeFunc: SerializableFunction2[V, V, V],
                                                                           keyMapFunction: SerializableFunction1[Tag, Tag]) 
                                                                           extends Aggregator[TT, Map[Tag, V]]
/**
   * Aggregator that aggregates (running) averages per class
   * @param keyMapFunction - function mapping value of type Tag to value of type Tag (in case the Tag shall not be 
      mapped, just use identity)
   */
  class TagKeyRunningDoubleAvgPerClassAggregator(keyMapFunction: SerializableFunction1[Tag, Tag]) extends 
    BasePerClassAggregator[TaggedWithType with DataPoint[Double], AggregateValue[Double]](
      aggFunc = (x, y) => y.add(x),
      startValueForKey = _ => doubleAvgRunningValue(weightedCount = 0.0, count = 0, value = 0.0),
      mergeFunc = (x, y) => x.add(y),
      keyMapFunction) {
  }
/**
   * Aggregator aggregating to (running) averages overall
   */
  class TagKeyRunningDoubleAvgAggregator() extends BaseAggregator[DataPoint[Double], AggregateValue[Double]](
    aggFunc = (x, y) => y.add(x),
    startValueGen = () => doubleAvgRunningValue(weightedCount = 0.0, count = 0, value = 0.0),
    mergeFunc = (x, y) => x.add(y)) {
  }
/**
    * In case of a mapping function that alters original tags, ignoreIdDiff would need to be true to avoid conflicts.
    * Setting this attribute to true enables aggregating data for the original tag to data for the mapped tag.
    *
    * @param keyMapFunction - mapping function of Tag of input sample data
    * @param ignoreIdDiff - determines whether merging aggregations for different IDs is allowed
    */
  class TagKeyMetricDocumentPerClassAggregator(keyMapFunction: SerializableFunction1[Tag, Tag], 
    ignoreIdDiff: Boolean = false) extends 
      BasePerClassAggregator[TaggedWithType with DataPoint[MetricRow], MetricDocument[Tag]](
        aggFunc = (x, y) => {
          y.add(x.data)
          y
        },
        startValueForKey = x => MetricDocument.empty[Tag](x),
        mergeFunc = (x, y) => {
          x.add(y, ignoreIdDiff = ignoreIdDiff)
          x
        },
        keyMapFunction) {}
/**
    * In case of a mapping function that alters original tags, ignoreIdDiff would need to be true to avoid conflicts.
    * Setting this attribute to true enables aggregating data for the original tag to data for the mapped tag.
    *
    * @param keyMapFunction - mapping function of Tag of input sample data
    * @param ignoreIdDiff - determines whether merging aggregations for different IDs is allowed
    */
class TagKeyMetricAggregationPerClassAggregator(keyMapFunction: SerializableFunction1[Tag, Tag], 
  ignoreIdDiff: Boolean = false) extends Aggregator[TaggedWithType with DataPoint[MetricRow], MetricAggregation[Tag]]
/**
    * Wrapper for typed aggregators to accept any message and aggregate only those matching the type
    *
    * @param aggregator
    * @tparam T
    * @tparam V
    */
case class BaseAnyAggregator[T: TypeTag, V: TypeTag](aggregator: Aggregator[T, V]) extends Aggregator[Any, V]

Typed Maps

Typed maps make assumptions on the type of the values, and thus allow usage of map structure for distinct types of values with type guarantee. The assumptions are either baked into the key values or to be specified in the get call to retrieve the values.

Distinct implementations are provided, which mainly vary on the dimensions mutable/immutable and strong/weak typing. The strongly typed maps make use of TypeTag, causing use of reflection, and thus might slow down executions if evoked frequently. Weakly typed maps will return empty result (None) on get if the value cannot be cast to the expected type.

Strong typing (available as mutable or immutable implementation, refer to implementation for details):

trait TypeTaggedMap extends KolibriSerializable {
  def isOfType[T: TypeTag](data: T, typeInstance: Type): Boolean = {
    typeOf[T] =:= typeInstance
  }
  def put[T: TypeTag, V](key: ClassTyped[V], value: T): (Option[T], TypeTaggedMap)
  def remove[T](key: ClassTyped[T]): (Option[T], TypeTaggedMap)
  def get[V](key: ClassTyped[V]): Option[V]
  def keys: Iterable[ClassTyped[Any]]
  def keySet: collection.Set[ClassTyped[Any]]
}

Weakly typed (refer to implementation for details):

trait WeaklyTypedMap[T] extends KolibriSerializable {
  def put[U](key: T, value: U): Unit
  def remove(key: String): Option[Any]
  def get[U](key: T): Option[U]
  def keys: Iterable[T]
  def keySet: collection.Set[T]
}

Other useful structures

PriorityStores

Given some ordering keeps n top elements, utilizing priority queue. Elements can be added continuously and queue state will be updated.

/**
  * Priority store backed by PriorityQueue. Allows adding of elements and by some ordering criterium
  * only keep n elements that are the first according to the ordering
  */
abstract class PriorityStore[T, U] {
    def keep_n: Int
    def ordering: Ordering[U]
    def elementToKey: U => T
    def queueReversed: mutable.PriorityQueue[U]
    def addEntry(entry: U): Unit 
  }
  
case class BasePriorityStore[T, U](keep_n: Int,
                                   ordering: Ordering[U],
                                   elementToKey: U => T) extends PriorityStore[T, U]

AtomicMapPromiseStore

Thread-safe store that uses first request for resource to load the data. This can involve more time-consuming loads, as each retrieve call is answered by a Promise, that gets fulfilled as soon as resource loading is finished.

/**
  * Implementation ensuring thread safety of the value storage and also ensuring
  * that no more than one request leads to the creation of the stored resource which
  * could potentially be expensive (e.g in case multiple experiment batch processing
  * actors on a single node try to request the ressource at once)
  * E.g used to load some data expensive to load within an object to have only one data-instance per node
 *
  * @tparam U - the key used to identify a value
  * @tparam V - the corresponding value
  */
trait AtomicMapPromiseStore[U,V]