FlowBuilder

portals.api.builder.FlowBuilder
See theFlowBuilder companion object
trait FlowBuilder[T, U, CT, CU]

Flow Builder

Type parameters

CT

the current input type of the latest task

CU

the current output type of the latest task

T

the input type of the flow

U

the output type of the flow

Attributes

Companion
object
Graph
Supertypes
class Object
trait Matchable
class Any

Members list

Value members

Abstract methods

def allWithOnAtomComplete[WT, WU](onAtomComplete: ProcessorTaskContext[WT, WU] ?=> Unit): FlowBuilder[T, U, CT, CU]

Apply the provided onAtomComplete handler to all tasks of the workflow.

Apply the provided onAtomComplete handler to all tasks of the workflow.

Value parameters

onAtomComplete

the onAtomComplete handler to be applied to all tasks

Attributes

Returns

the transformed flow

def allWithWrapper[WT, WU](f: ProcessorTaskContext[WT, WU] ?=> (ProcessorTaskContext[WT, WU] ?=> WT => Unit) => WT => Unit): FlowBuilder[T | WT, U | WU, CT, CU]

Apply and wrap the provided wrapper f to all tasks of the workflow.

Apply and wrap the provided wrapper f to all tasks of the workflow.

Value parameters

f

the wrapper to be applied to all tasks

Attributes

Returns

the transformed flow

def asker[CCU, Req, Rep](portals: AtomicPortalRefKind[Req, Rep]*)(f: AskerTaskContext[CU, CCU, Req, Rep] ?=> CU => Unit): FlowBuilder[T, U, CU, CCU]

Transform the flow by an asker task, binds to portal p and executes the asker handler f.

Transform the flow by an asker task, binds to portal p and executes the asker handler f.

Value parameters

f

the asker handler

portals

the portals to bind to

Attributes

def askerreplier[CCU, Req, Rep](askerportals: AtomicPortalRefKind[Req, Rep]*)(replierportals: AtomicPortalRefKind[Req, Rep]*)(f1: AskerTaskContext[CU, CCU, Req, Rep] ?=> CU => Unit)(f2: AskerReplierTaskContext[CU, CCU, Req, Rep] ?=> Req => Unit): FlowBuilder[T, U, CU, CCU]

Transform the flow by an askerReplier task, binds to portals askerportals and replierportals, executes the handler f1 on regular events, and executes handler f2 on requests.

Transform the flow by an askerReplier task, binds to portals askerportals and replierportals, executes the handler f1 on regular events, and executes handler f2 on requests.

Value parameters

askerportals

the portals to bind to for asker

f1

the event handler

f2

the request handler

replierportals

the portals to bind to for replier

Attributes

def checkExpectedType[CCU = CU](): FlowBuilder[T, U, CT, CU]

Check the current type against the provided expected type.

Check the current type against the provided expected type.

Compares FlowBuilder[T, U, CT, CU] with CCU, succeeds if CU <: CCU <: CU.

Type parameters

CCU

the expected type

Attributes

def filter(p: CU => Boolean): FlowBuilder[T, U, CU, CU]

Use predicate p to filter the events of the flow.

Use predicate p to filter the events of the flow.

Value parameters

p

the predicate to filter the flow

Attributes

Returns

the filtered flow

def flatMap[CCU](f: MapTaskContext[CU, CCU] ?=> CU => Seq[CCU]): FlowBuilder[T, U, CU, CCU]

Transform the flow by flatMapping by the provided function f.

Transform the flow by flatMapping by the provided function f.

Note, you cannot emit events from the provided flatMap function, instead the events produced by the flatMap function will be emitted by the returned flow.

Type parameters

CCU

the new output type of the mapped flow

Value parameters

f

the function to flatMap the current flow

Attributes

Returns

the flatMapped flow

def freeze(): Workflow[T, U]

Freeze the flow builder, returns the frozen workflow.

Freeze the flow builder, returns the frozen workflow.

Attributes

Returns

the frozen workflow

Note

The workflow cannot be modified once frozen.

def identity(): FlowBuilder[T, U, CU, CU]

Transform by the identity function (i.e. do nothing).

Transform by the identity function (i.e. do nothing).

Can be used to create actual transforming behavior using the modifiers such as withOnNext.

Attributes

Returns

the transformed flow

def init[CCU](initFactory: ProcessorTaskContext[CU, CCU] ?=> GenericTask[CU, CCU, Nothing, Nothing]): FlowBuilder[T, U, CU, CCU]

Provide an initialization factory for a task that transforms this flow.

Provide an initialization factory for a task that transforms this flow.

The initialization factory is called at runtime to create the task. This can be used to initial various objects, such as state, to be used by the processing behaviors.

Note: this may be re-executed more than once, every time that the task is restarted (e.g. after a failure).

Type parameters

CCU

the new output type of the transformed flow

Value parameters

initFactory

the task initialization factory

Attributes

Returns

the transformed flow

def key(f: CU => Long): FlowBuilder[T, U, CU, CU]

Compute and shuffle the flow according to a key extracted by f.

Compute and shuffle the flow according to a key extracted by f.

Value parameters

f

key extractor function

Attributes

Returns

flow shuffled by new key

def logger(prefix: String): FlowBuilder[T, U, CU, CU]

Log the current flow with the provided prefix.

Log the current flow with the provided prefix.

Value parameters

prefix

the prefix for logged messages

Attributes

Returns

the logged flow

def map[CCU](f: MapTaskContext[CU, CCU] ?=> CU => CCU): FlowBuilder[T, U, CU, CCU]

Map the current flow with the provided function f.

Map the current flow with the provided function f.

Type parameters

CCU

the new output type of the mapped flow

Value parameters

f

the function to map the current flow

Attributes

Returns

the map flow

def processor[CCU](f: ProcessorTaskContext[CU, CCU] ?=> CU => Unit): FlowBuilder[T, U, CU, CCU]

Transform the flow by a provided processor f.

Transform the flow by a provided processor f.

The processor function f may access the ProcessorTaskContext, which allows it to emit events, access state, log messages, etc.

Type parameters

CCU

the new output type of the mapped flow

Value parameters

f

the processor function

Attributes

Returns

the transformed flow by the processor function

def replier[CCU, Req, Rep](portals: AtomicPortalRefKind[Req, Rep]*)(f1: ProcessorTaskContext[CU, CCU] ?=> CU => Unit)(f2: ReplierTaskContext[CU, CCU, Req, Rep] ?=> Req => Unit): FlowBuilder[T, U, CU, CCU]

Transform the flow by a replier task, binds to portal p, executes the handler f1 on regular events, executes the replier handler f2 on requests.

Transform the flow by a replier task, binds to portal p, executes the handler f1 on regular events, executes the replier handler f2 on requests.

Value parameters

f1

the processor handler

f2

the replier handler

portals

the portals to bind to

Attributes

def sink[CC >: CU | U <: CU & U](): FlowBuilder[T, U, U, U]

End the flow in the sink.

End the flow in the sink.

Attributes

Returns

a new flow builder

Note

The output type of the current flowbuilder shas to have the same type as the sink / workflow output.

def task[CCU](taskBehavior: GenericTask[CU, CCU, _, _]): FlowBuilder[T, U, CU, CCU]

Transform the flow by the provided taskBehavior.

Transform the flow by the provided taskBehavior.

Type parameters

CCU

the new output type of the mapped flow

Value parameters

taskBehavior

the task behavior to transform the flow

Attributes

Returns

the transformed flow by the provided task

def union(others: List[FlowBuilder[T, U, _, CU]]): FlowBuilder[T, U, CU, CU]

Union the flow of the current flow with the others flows.

Union the flow of the current flow with the others flows.

Value parameters

others

list of other flows which will be unioned with the current flow

Attributes

Returns

a new flow builder

def vsm[CCU](defaultTask: VSMTask[CU, CCU]): FlowBuilder[T, U, CU, CCU]

Start an instance of a VSM task.

Start an instance of a VSM task.

Starts an instance of a VSM task. The VSM task is a stateful task behavior which manages the task in a per-key context. To use this you must provide an initial VSMTask behavior, which can be created using VSMTasks.processor. The VSMTask can still access state, emit events, and more as can the normal tasks.

Value parameters

defaultTask

the default task to use as an initial task behavior

Attributes

See also

portals.api.builder.TaskExtensions.VSMTask

Example
// VSMTasks
val init = VSMExtension.processor { event => started }
val started = VSMExtension.processor { event => init }
val vsm = VSMExtension.vsm[Int, Int] { init }
// flow
val flow: FlowBuilder[...] = ...
flow.vsm(init)
def withAndThen[CCU](task: GenericTask[CU, CCU, Nothing, Nothing]): FlowBuilder[T, U, CT, CCU]

Chaining task after the current task.

Chaining task after the current task.

This is used to explicitly chain tasks as a single task behavior.

Value parameters

task

the task to chain after the current task

Attributes

Returns

the transformed flow

def withLoop(count: Int)(task: GenericTask[CT, CU, Nothing, Nothing]): FlowBuilder[T, U, CT, CU]

Behavior modifier to loop over atoms for current task.

Behavior modifier to loop over atoms for current task.

This will loop and execute the provided task on the count next atoms.

Value parameters

count

the number of times to loop

task

the task to looped

Attributes

Returns

the transformed flow

def withName(name: String): FlowBuilder[T, U, CT, CU]

Give a name to the current task (latest task).

Give a name to the current task (latest task).

The current task is the latest task that was used to transform on this flow.

Value parameters

name

the new name of the current task

Attributes

Returns

the current flow

def withOnAtomComplete(onAtomComplete: ProcessorTaskContext[CT, CU] ?=> Unit): FlowBuilder[T, U, CT, CU]

Set the onAtomComplete handler of the current task to the provided handler.

Set the onAtomComplete handler of the current task to the provided handler.

Value parameters

onAtomComplete

the new onAtomComplete handler

Attributes

Returns

the current flow

def withOnComplete(onComplete: ProcessorTaskContext[CT, CU] ?=> Unit): FlowBuilder[T, U, CT, CU]

Set the onComplete handler of the current task to the provided handler.

Set the onComplete handler of the current task to the provided handler.

Value parameters

onComplete

the new onComplete handler

Attributes

Returns

the current flow

def withOnError(onError: ProcessorTaskContext[CT, CU] ?=> Throwable => Unit): FlowBuilder[T, U, CT, CU]

Set the onError handler of the current task to the provided handler.

Set the onError handler of the current task to the provided handler.

Value parameters

onError

the new onError handler

Attributes

Returns

the current flow

def withOnNext(onNext: ProcessorTaskContext[CT, CU] ?=> CT => Unit): FlowBuilder[T, U, CT, CU]

Set the onNext handler of the current task to the provided handler.

Set the onNext handler of the current task to the provided handler.

Value parameters

onNext

the new onNext handler

Attributes

Returns

the current flow

def withStep(task: GenericTask[CT, CU, Nothing, Nothing]): FlowBuilder[T, U, CT, CU]

Behavior modifier to step over atoms for current task.

Behavior modifier to step over atoms for current task.

This will execute the provided task on the subsequent atom.

Value parameters

task

the task to execute on the subsequent atom

Attributes

Returns

the transformed flow

def withWrapper[V](f: ProcessorTaskContext[CT, CU | V] ?=> (ProcessorTaskContext[CT, CU | V] ?=> CT => Unit) => CT => Unit): FlowBuilder[T, U, CT, CU | V]

Wrap around the behavior of the current task by the wrapping function f.

Wrap around the behavior of the current task by the wrapping function f.

Type parameters

V

additional type for widening the output type by union

Value parameters

f

the wrapping function

Attributes

Returns

the current flow

Example
FlowBuilder[_, _, Int, Int]
 .withWrapper[String]{ ctx ?=> wrapped => event =>
   if event < 3 then ctx.emit("0") else wrapped(event)
 }

Concrete methods

def asker[CCU]: FlowBuilderAsker[CCU]
def combineAllFrom[CU, CCU](flows: FlowBuilder[T, U, _, CU]*)(task: GenericTask[CU, CCU, _, _]): FlowBuilder[T, U, CU, CCU]

Combine and union all provided flows and process by task. Note, this does not union with the current flow.

Combine and union all provided flows and process by task. Note, this does not union with the current flow.

TODO: this should not be here, as it is confusing that it does not union with the current flow.

Value parameters

flows

the flows which will be unioned (NOT unioned with current flow)

task

the task to process the combined flows

Attributes

Returns

a new flow builder

def replier[CCU]: FlowBuilderReplier[CCU]
def split(p1: PartialFunction[CU, Boolean], p2: PartialFunction[CU, Boolean]): (FlowBuilder[T, U, CU, CU], FlowBuilder[T, U, CU, CU])

Split a flow into two disjoint flows by the provided predicates p1 and p2.

Split a flow into two disjoint flows by the provided predicates p1 and p2.

Value parameters

p1

the first splitting predicate

p2

the second splitting predicate

Attributes

Returns

two flow builders, one for each split

def union(others: FlowBuilder[T, U, _, CU]*): FlowBuilder[T, U, CU, CU]

Union the flow of the current flow with the others flows.

Union the flow of the current flow with the others flows.

Value parameters

others

the other flows which will be unioned with the current flow

Attributes

Returns

a new flow builder