TaskBuilder

portals.api.builder.TaskBuilder$
object TaskBuilder

Core Task Factories.

Create tasks using these factories. A task is a behavior that is executed on one a task of the workflow. It transforms an input T to an output U. In doing so, the task can emit events, and perform other actions as described in its task context.

Attributes

See also

portals.api.builder.TaskExtensions for more extensions.

Example
TaskBuilder.processor[String, Int](event => emit(event.length())
TaskBuilder.map[String, Int](x => x.length())

The created tasks can be used directly for the workflow like the following example.

val taskBehavior = TaskBuilder.processor[String, Int](event => emit(event.length()))
builder.workflows[String, Int]("workflowName")
.source(stream)
.task(taskBehavior)
.sink()
.freeze()
Graph
Supertypes
trait Sum
trait Mirror
class Object
trait Matchable
class Any
Self type

Members list

Type members

Classlikes

class PortalsTasks[Req, Rep](portals: AtomicPortalRefKind[Req, Rep]*)

Attributes

Supertypes
class Object
trait Matchable
class Any

Inherited types

type MirroredElemLabels <: Tuple

The names of the product elements

The names of the product elements

Attributes

Inherited from:
Mirror
type MirroredLabel <: String

The name of the type

The name of the type

Attributes

Inherited from:
Mirror

Value members

Concrete methods

def asker[T, U, Req, Rep](portals: AtomicPortalRefKind[Req, Rep]*)(f: AskerTaskContext[T, U, Req, Rep] ?=> T => Unit): GenericTask[T, U, Req, Rep]

Behavior factory for an asker task.

Behavior factory for an asker task.

The asker task transforms events from type T to type U. In addition to this, the asker task binds to a portal, and can send requests to the portal. It may also await the returned future from this.

Type parameters

Rep

type of the replies

Req

type of the requests

T

type of the input events

U

type of the output events

Value parameters

f

the task behavior

portals

the portals to which the task will send requests

Attributes

Returns

the asker task behavior

Example
TaskBuilder
 .asker[T, U, Req, Rep](portal)( event =>
   val request = Req(event)
   val future = ask(portal, request)
   await(future) { future.value.get match
     case Rep(value) =>
       log.info(value.toString())
       emit(value)
   }
 )
def askerreplier[T, U, Req, Rep](askerportals: AtomicPortalRefKind[Req, Rep]*)(replierportals: AtomicPortalRefKind[Req, Rep]*)(f1: AskerTaskContext[T, U, Req, Rep] ?=> T => Unit)(f2: AskerReplierTaskContext[T, U, Req, Rep] ?=> Req => Unit): GenericTask[T, U, Req, Rep]

Behavior factory for an asker-replier task.

Behavior factory for an asker-replier task.

The asker-replier can both send requests and receive requests. In particular, it can also nest asks within continuations. A common pattern is to use the same portal for both asker and replier.

The asker binds to portals askerportals, the replier binds to the replierportals, it executes the handler f1 on regular events, and executes handler f2 on requests.

Type parameters

Rep

type of the replies

Req

type of the requests

T

type of the input events

U

type of the output events

Value parameters

askerportals

the portals to which the task will send requests

f1

the handler for regular input events

f2

the handler for requests

replierportals

the portals from which the task will receive requests

Attributes

Returns

asker-replier task behavior

Example
TaskBuilder
 .askerreplier[T, U, Req, Rep](askerportal)(replierportal)( event =>
   // handle event as normal
   val future = ask(askerportal, event)
   await(future) { ctx.log.info(future.value.get.toString()) }
)(request =>
  // nested ask from within the request handler
  val future = ask(askerportal, event)
  await(future) {
    val reply = Rep(future.value.get)
    // reply from the continuation
    ctx.reply(reply)
def identity[T]: GenericTask[T, T, _, _]

Behavior factory for emitting the same values as ingested.

Behavior factory for emitting the same values as ingested.

Type parameters

T

type of the input events

Attributes

Returns

the identity task behavior

def init[T, U](initFactory: ProcessorTaskContext[T, U] ?=> GenericTask[T, U, Nothing, Nothing]): GenericTask[T, U, Nothing, Nothing]

Behavior factory for initializing the task before any events.

Behavior factory for initializing the task before any events.

Note: this may be re-executed more than once, every time that the task is restarted (e.g. after a failure). Emitted events during the initialization phase are ignored.

Type parameters

T

type of the input events

U

type of the output events

Value parameters

initFactory

the initialization factory

Attributes

Returns

the initialized task behavior

Example
val task = TaskBuilder.init {
 val state = new PerKeyState[Int]("state", 0)
 TaskBuilder.processor[String, (String, Int)] { s =>
   val count = state.get() + 1
   state.set(count)
   emit( (s, count) )
def map[T, U](f: MapTaskContext[T, U] ?=> T => U): GenericTask[T, U, Nothing, Nothing]

Behavior factory for map.

Behavior factory for map.

Map events of type T to events of type U. Cannot emit events using context, can access state and log.

Type parameters

T

type of the input events

U

type of the output events

Value parameters

f

the map function

Attributes

Returns

the map task behavior

Example
TaskBuilder.map[String, Int](x => x.length())
def processor[T, U](onNext: ProcessorTaskContext[T, U] ?=> T => Unit): GenericTask[T, U, Nothing, Nothing]

Behavior factory for processing incoming events.

Behavior factory for processing incoming events.

The ProcessorTaskContext allows the task to: emit events; acceess state; log messages.

Type parameters

T

type of the input events

U

type of the output events

Value parameters

onNext

handler for the incoming events

Attributes

Returns

a task behavior

Example
TaskBuilder.processor[String, Int](event => emit(event.length())
def replier[T, U, Req, Rep](portals: AtomicPortalRefKind[Req, Rep]*)(f1: ProcessorTaskContext[T, U] ?=> T => Unit)(f2: ReplierTaskContext[T, U, Req, Rep] ?=> Req => Unit): GenericTask[T, U, Req, Rep]

Behavior factory for a replier task.

Behavior factory for a replier task.

The replier task transforms events from type T to type U. In addition to this, the replier task binds to a portal, and can receive requests from the portal to which it may reply.

Type parameters

Rep

type of the replies

Req

type of the requests

T

type of the input events

U

type of the output events

Value parameters

f1

the handler for regular input events

f2

the handler for requests

portals

the portals from which the task will receive requests

Attributes

Returns

replier task behavior

Example
TaskBuilder
  .replier[T, U, Req, Rep](portal)( event =>
    // handle event as normal
    emit(event)
  )(request =>
    // handle request
    val reply = Rep(request)
    reply(request) // reply
  )

Extensions

Extensions

extension (t: TaskBuilder)(t: TaskBuilder)
def portal[Req, Rep](portals: AtomicPortalRefKind[Req, Rep]*): PortalsTasks[Req, Rep]