TaskBuilder
Core Task Factories.
Create tasks using these factories. A task is a behavior that is executed on one a task of the workflow. It transforms an input T
to an output U
. In doing so, the task can emit
events, and perform other actions as described in its task context.
Attributes
- See also
-
portals.api.builder.TaskExtensions
for more extensions. - Example
-
TaskBuilder.processor[String, Int](event => emit(event.length())
TaskBuilder.map[String, Int](x => x.length())
The created tasks can be used directly for the workflow like the following example.
val taskBehavior = TaskBuilder.processor[String, Int](event => emit(event.length())) builder.workflows[String, Int]("workflowName") .source(stream) .task(taskBehavior) .sink() .freeze()
- Graph
-
- Supertypes
-
trait Sumtrait Mirrorclass Objecttrait Matchableclass Any
- Self type
-
TaskBuilder.type
Members list
Type members
Classlikes
Attributes
- Supertypes
-
class Objecttrait Matchableclass Any
Inherited types
The names of the product elements
The names of the product elements
Attributes
- Inherited from:
- Mirror
The name of the type
The name of the type
Attributes
- Inherited from:
- Mirror
Value members
Concrete methods
Behavior factory for an asker task.
Behavior factory for an asker task.
The asker task transforms events from type T
to type U
. In addition to this, the asker task binds to a portal, and can send requests to the portal. It may also await the returned future from this.
Type parameters
- Rep
-
type of the replies
- Req
-
type of the requests
- T
-
type of the input events
- U
-
type of the output events
Value parameters
- f
-
the task behavior
- portals
-
the portals to which the task will send requests
Attributes
- Returns
-
the asker task behavior
- Example
-
TaskBuilder .asker[T, U, Req, Rep](portal)( event => val request = Req(event) val future = ask(portal, request) await(future) { future.value.get match case Rep(value) => log.info(value.toString()) emit(value) } )
Behavior factory for an asker-replier task.
Behavior factory for an asker-replier task.
The asker-replier can both send requests and receive requests. In particular, it can also nest asks within continuations. A common pattern is to use the same portal for both asker and replier.
The asker binds to portals askerportals
, the replier binds to the replierportals
, it executes the handler f1
on regular events, and executes handler f2
on requests.
Type parameters
- Rep
-
type of the replies
- Req
-
type of the requests
- T
-
type of the input events
- U
-
type of the output events
Value parameters
- askerportals
-
the portals to which the task will send requests
- f1
-
the handler for regular input events
- f2
-
the handler for requests
- replierportals
-
the portals from which the task will receive requests
Attributes
- Returns
-
asker-replier task behavior
- Example
-
TaskBuilder .askerreplier[T, U, Req, Rep](askerportal)(replierportal)( event => // handle event as normal val future = ask(askerportal, event) await(future) { ctx.log.info(future.value.get.toString()) } )(request => // nested ask from within the request handler val future = ask(askerportal, event) await(future) { val reply = Rep(future.value.get) // reply from the continuation ctx.reply(reply)
Behavior factory for emitting the same values as ingested.
Behavior factory for emitting the same values as ingested.
Type parameters
- T
-
type of the input events
Attributes
- Returns
-
the identity task behavior
Behavior factory for initializing the task before any events.
Behavior factory for initializing the task before any events.
Note: this may be re-executed more than once, every time that the task is restarted (e.g. after a failure). Emitted events during the initialization phase are ignored.
Type parameters
- T
-
type of the input events
- U
-
type of the output events
Value parameters
- initFactory
-
the initialization factory
Attributes
- Returns
-
the initialized task behavior
- Example
-
val task = TaskBuilder.init { val state = new PerKeyState[Int]("state", 0) TaskBuilder.processor[String, (String, Int)] { s => val count = state.get() + 1 state.set(count) emit( (s, count) )
Behavior factory for map.
Behavior factory for map.
Map events of type T
to events of type U
. Cannot emit events using context, can access state and log.
Type parameters
- T
-
type of the input events
- U
-
type of the output events
Value parameters
- f
-
the map function
Attributes
- Returns
-
the map task behavior
- Example
-
TaskBuilder.map[String, Int](x => x.length())
Behavior factory for processing incoming events.
Behavior factory for processing incoming events.
The ProcessorTaskContext allows the task to: emit events; acceess state; log messages.
Type parameters
- T
-
type of the input events
- U
-
type of the output events
Value parameters
- onNext
-
handler for the incoming events
Attributes
- Returns
-
a task behavior
- Example
-
TaskBuilder.processor[String, Int](event => emit(event.length())
Behavior factory for a replier task.
Behavior factory for a replier task.
The replier task transforms events from type T
to type U
. In addition to this, the replier task binds to a portal, and can receive requests from the portal to which it may reply.
Type parameters
- Rep
-
type of the replies
- Req
-
type of the requests
- T
-
type of the input events
- U
-
type of the output events
Value parameters
- f1
-
the handler for regular input events
- f2
-
the handler for requests
- portals
-
the portals from which the task will receive requests
Attributes
- Returns
-
replier task behavior
- Example
-
TaskBuilder .replier[T, U, Req, Rep](portal)( event => // handle event as normal emit(event) )(request => // handle request val reply = Rep(request) reply(request) // reply )