DSL
DSL extensions for Portals, convenient shorthands for building applications.
The preferred way of building Portals applications is via the PortalsApp
in portals.api.dsl.DSL.PortalsApp.
Attributes
- See also
- Graph
-
- Supertypes
-
class Objecttrait Matchableclass Any
- Self type
-
DSL.type
Members list
Type members
Classlikes
Attributes
- Supertypes
-
class Objecttrait Matchableclass Any
Attributes
- Supertypes
-
class Objecttrait Matchableclass Any
Value members
Concrete methods
Summon the ConnectionBuilder.
Summon the ConnectionBuilder.
The connection builder is used to connect an atomic stream to a sequencer.
Use this method to create a connection within the context of an applicationbuilder.
Attributes
- Returns
-
The connection builder.
- Example
-
val myApp = PortalsApp("myApp") { val gen1 = Generators.signal[String]("hello") val gen2 = Generators.signal[String]("world") val sequencer = Sequencers.random[String]() val _ = Connections.connect(gen1.stream, sequencer) val _ = Connections.connect(gen2.stream, sequencer) }
Summon the GeneratorBuilder.
Summon the GeneratorBuilder.
Use this method to create a generator within the context of an applicationbuilder.
Value parameters
- name
-
The name of the generator.
Attributes
- Returns
-
The generator builder.
- Example
-
val myApp = PortalsApp("myApp") { val generator = Generators("name").fromList(List("hello", "world") }
Summon the GeneratorBuilder.
Summon the GeneratorBuilder.
Use this method to create a generator within the context of an applicationbuilder.
If the name is omitted, then the generator will be anonymous and a generated identifier will be used instead.
Attributes
- Returns
-
The generator builder.
- Example
-
val myApp = PortalsApp("myApp") { val generator = Generators.fromList(List("hello", "world") }
Summon the PortalBuilder.
Summon the PortalBuilder.
The portal builder is used to create portals. Portals have a request type, and a reply type, and a name. Some workflow must be designated to handle the requests of a portal, this is done via creating an asker
task. The asker task has two event handlers, one for regular inputs to the task, and one for the requests. Another workflow may connect to the portal to send requests, this is done via a replyer
task. The replyer task has a single event handler for the regular incoming events. Sending a request by the ask
method will create a future, the asking task can await the completion of this future, which registers a continuation which is executed when the reply is received.
Use this method to create a portal within the context of an applicationbuilder.
Attributes
- Example
-
// Example 1 val myApp = PortalsApp("myApp") { // Create a Portal with name "portal" which has request type Int, and Reply type String val portal = Portal[Int, String]("portal") }
// Example 2 ///////////////////////////////////////////////////////////////////////// // Queries ///////////////////////////////////////////////////////////////////////// case class Query() case class QueryReply(x: Int) val myApp = PortalsApp("myApp") { /////////////////////////////////////////////////////////////////////// // Aggregator /////////////////////////////////////////////////////////////////////// // Create a Portal which has request type Query, and Reply type QueryReply val portal = Portal[Query, QueryReply]("portal") // Atom(1, 2, 3, 4), Atom(1, 2, 3, 4), ... val generator = Generators.fromListOfLists(List.fill(10)(List(1, 2, 3, 4))) // The replying workflow val aggregator = Workflows[Int, Nothing]("aggregator") .source(generator.stream) // The replier consumes the stream of Ints, aggregates it. It replies to // requests with the latest aggregated value. .replier[Nothing](portal) { x => // regular input event (Int) // Aggregate the value val sum = PerTaskState("sum", 0) sum.set(sum.get() + x) } { case Query() => // handle portal request // Reply with the latest aggregated value reply(QueryReply(PerTaskState("sum", 0).get())) } .sink() .freeze() /////////////////////////////////////////////////////////////////////// // Query /////////////////////////////////////////////////////////////////////// // Atom(0), Atom(0), ... val queryTrigger = Generators.fromListOfLists(List.fill(10)(List(0))) // The requesting workflow val queryWorkflow = Workflows[Int, Nothing]("queryWorkflow") .source(queryTrigger.stream) // The asker consumes a stream of 0s, for each event it will send a Query // to the portal, and wait for and print the reply. .asker[Nothing](portal) { x => // query the replier val future: Future[QueryReply] = ask(portal)(Query()) future.await { // wait for the reply to complete future.value.get match case QueryReply(x) => // print the aggregate to log ctx.log.info(x.toString()) } } .sink() .freeze() } val system = Systems.test() system.launch(myApp) system.stepUntilComplete() system.shutdown()
Summon the PortalBuilder.
Summon the PortalBuilder.
The portal builder is used to create portals. Portals have a request type, and a reply type, and a name. Some workflow must be designated to handle the requests of a portal, this is done via creating an asker
task. The asker task has two event handlers, one for regular inputs to the task, and one for the requests. Another workflow may connect to the portal to send requests, this is done via a replyer
task. The replyer task has a single event handler for the regular incoming events. Sending a request by the ask
method will create a future, the asking task can await the completion of this future, which registers a continuation which is executed when the reply is received.
Use this method to create a portal within the context of an applicationbuilder.
Attributes
- Example
-
// Example 1 val myApp = PortalsApp("myApp") { // Create a Portal with name "portal" which has request type Int, and Reply type String val portal = Portal[Int, String]("portal") }
// Example 2 ///////////////////////////////////////////////////////////////////////// // Queries ///////////////////////////////////////////////////////////////////////// case class Query() case class QueryReply(x: Int) val myApp = PortalsApp("myApp") { /////////////////////////////////////////////////////////////////////// // Aggregator /////////////////////////////////////////////////////////////////////// // Create a Portal which has request type Query, and Reply type QueryReply val portal = Portal[Query, QueryReply]("portal") // Atom(1, 2, 3, 4), Atom(1, 2, 3, 4), ... val generator = Generators.fromListOfLists(List.fill(10)(List(1, 2, 3, 4))) // The replying workflow val aggregator = Workflows[Int, Nothing]("aggregator") .source(generator.stream) // The replier consumes the stream of Ints, aggregates it. It replies to // requests with the latest aggregated value. .replier[Nothing](portal) { x => // regular input event (Int) // Aggregate the value val sum = PerTaskState("sum", 0) sum.set(sum.get() + x) } { case Query() => // handle portal request // Reply with the latest aggregated value reply(QueryReply(PerTaskState("sum", 0).get())) } .sink() .freeze() /////////////////////////////////////////////////////////////////////// // Query /////////////////////////////////////////////////////////////////////// // Atom(0), Atom(0), ... val queryTrigger = Generators.fromListOfLists(List.fill(10)(List(0))) // The requesting workflow val queryWorkflow = Workflows[Int, Nothing]("queryWorkflow") .source(queryTrigger.stream) // The asker consumes a stream of 0s, for each event it will send a Query // to the portal, and wait for and print the reply. .asker[Nothing](portal) { x => // query the replier val future: Future[QueryReply] = ask(portal)(Query()) future.await { // wait for the reply to complete future.value.get match case QueryReply(x) => // print the aggregate to log ctx.log.info(x.toString()) } } .sink() .freeze() } val system = Systems.test() system.launch(myApp) system.stepUntilComplete() system.shutdown()
Build a new PortalsApplication.
Build a new PortalsApplication.
Use this method to create a new Portals application. Define the application as the second parameter to this method, using the contextual application builder. Within this environment you can use the methods of the contextual appllication builder, and other related methods that use the contextual application builder from the DSL, including Sequencers, Connections, Splitters, Portal, Workflows, Generators, Splits, and Registry.
Value parameters
- app
-
The application factory (see example).
- name
-
The name of the application.
Attributes
- Returns
-
The Portals application.
- Example
-
// define app val myApp = PortalsApp("myApp") { val generator = Generators.signal[String]("hello world") val workflow = Workflows[String, String]() .source(generator.stream) .logger() .sink() .freeze() } // launch app val system = Systems.test() system.launch(myApp) system.stepUntilComplete() system.shutdown()
Summon the RegistryBuilder.
Summon the RegistryBuilder.
Use this method to create a registry within the context of an applicationbuilder.
Attributes
- Returns
-
The registry builder.
- Example
-
val myApp = PortalsApp("myApp") { val externalStream = Registry.streams.get[String]("/path/to/external/stream") val externalSequencer = Registry.sequencers.get[String]("/path/to/external/sequencer") val externalPortal = Registry.portals.get[String, String]("/path/to/external/portal") val externalSplitter = Registry.splitters.get[String]("/path/to/external/splitter") }
Summon the SequencerBuilder.
Summon the SequencerBuilder.
A sequencer sequences a set of (mutable) input streams to a single output stream. New inputs to the sequencer can be added via the Connections connect
method. You can access the output stream of a sequencer via its stream
member.
Use this method to create a sequencer within the context of an applicationbuilder.
Value parameters
- name
-
The name of the sequencer.
Attributes
- Returns
-
The sequencer builder.
- Example
-
val myApp = PortalsApp("myApp") { val gen1 = Generators.signal[String]("hello") val gen2 = Generators.signal[String]("world") val sequencer = Sequencers.random[String]() val _ = Connections.connect(gen1.stream, sequencer) val _ = Connections.connect(gen2.stream, sequencer) }
Summon the SequencerBuilder.
Summon the SequencerBuilder.
A sequencer sequences a set of (mutable) input streams to a single output stream. New inputs to the sequencer can be added via the Connections connect
method. You can access the output stream of a sequencer via its stream
member.
Use this method to create a sequencer within the context of an applicationbuilder.
If the name is omitted, then the sequencer will be anonymous and a generated identifier will be used instead.
Attributes
- Returns
-
The sequencer builder.
- Example
-
val myApp = PortalsApp("myApp") { val gen1 = Generators.signal[String]("hello") val gen2 = Generators.signal[String]("world") val sequencer = Sequencers.random[String]() val _ = Connections.connect(gen1.stream, sequencer) val _ = Connections.connect(gen2.stream, sequencer) }
Summon the SplitBuilder.
Summon the SplitBuilder.
The split builder is used to create a new stream (split) from a splitter. The splitter consumes an atomic stream, and splits can be added to this splitter via the Splits split
method. The split method takes a predicate to filter out the events for the split. The method returns an atomic stream of the new split.
Use this method to create a split within the context of an applicationbuilder.
Attributes
- Returns
-
The split builder.
- Example
-
val myApp = PortalsApp("myApp") { val generator = Generators.fromList[String](List("Hello", "World")) // Create a splitter that splits the generator stream val splitter = Splitters.empty[String](generator.stream) // Add a split that only contains the "Hello" elements val helloSplit = Splits.split(splitter, _ == "Hello") // Add a split that only contains the "World" elements val worldSplit = Splits.split(splitter, _ == "World") }
Summon the SplitterBuilder.
Summon the SplitterBuilder.
The splitter splits an input atomic stream into multiple output streams. The splits are mutable, we can add a new split via the Splits split
method, this method takes the splitter and a predicate that determines which elements are kept for the split, and returns a stream of the new split.
Use this method to create a splitter within the context of an applicationbuilder.
Value parameters
- name
-
The name of the splitter.
Attributes
- Returns
-
The splitter builder.
- Example
-
val myApp = PortalsApp("myApp") { val generator = Generators.fromList[String](List("Hello", "World")) // Create a splitter that splits the generator stream val splitter = Splitters.empty[String](generator.stream) // Add a split that only contains the "Hello" elements val helloSplit = Splits.split(splitter, _ == "Hello") // Add a split that only contains the "World" elements val worldSplit = Splits.split(splitter, _ == "World") // Log the output val wf = Workflows[String, String]() .source(helloSplit) .logger() .sink() .freeze() }
Summon the SplitterBuilder.
Summon the SplitterBuilder.
The splitter splits an input atomic stream into multiple output streams. The splits are mutable, we can add a new split via the Splits split
method, this method takes the splitter and a predicate that determines which elements are kept for the split, and returns a stream of the new split.
Use this method to create a splitter within the context of an applicationbuilder.
If the name is omitted, then the sequencer will be anonymous and a generated identifier will be used instead.
Attributes
- Returns
-
The splitter builder.
- Example
-
val myApp = PortalsApp("myApp") { val generator = Generators.fromList[String](List("Hello", "World")) // Create a splitter that splits the generator stream val splitter = Splitters.empty[String](generator.stream) // Add a split that only contains the "Hello" elements val helloSplit = Splits.split(splitter, _ == "Hello") // Add a split that only contains the "World" elements val worldSplit = Splits.split(splitter, _ == "World") // Log the output val wf = Workflows[String, String]() .source(helloSplit) .logger() .sink() .freeze() }
Summon the TaskBuilder.
Summon the TaskBuilder.
Use this method to create a task behavior with the summoned TaskBuilder.
Attributes
- Returns
-
The task builder.
- Example
-
Tasks.processor[Int, Int] { ctx ?=> x => ctx.emit(x)
}
Summon the WorkflowBuilder.
Summon the WorkflowBuilder.
Use this method to create a workflow within the context of an applicationbuilder.
If the name is omitted, then the generator will be anonymous and a generated identifier will be used instead.
Attributes
- Returns
-
The workflow builder.
- Example
-
val myApp = PortalsApp("myApp") { val generator = Generators.signal[String]("hello world") val workflow = Workflows[String, String]() .source(generator.stream) .logger() .sink() .freeze() }
Summon the WorkflowBuilder.
Summon the WorkflowBuilder.
Use this method to create a workflow within the context of an applicationbuilder.
Value parameters
- name
-
The name of the workflow.
Attributes
- Returns
-
The workflow builder.
- Example
-
val myApp = PortalsApp("myApp") { val generator = Generators.signal[String]("hello world") val workflow = Workflows[String, String]("name") .source(generator.stream) .logger() .sink() .freeze() }
Ask the portal
with req
, returns a future of the reply.
Ask the portal
with req
, returns a future of the reply.
Type parameters
- Rep
-
The Portal's type of the reply.
- Req
-
The Portal's type of the request.
Value parameters
- portal
-
The portal to ask.
- req
-
The request to send.
Attributes
- Returns
-
A future of the reply.
- Example
-
val myApp = PortalsApp("myApp") { val portal = Registry.portals.get[String, String]("/path/to/external/portal") val asker = TaskBuilder.asker[Int, Int, String, String](portal) { x => val future = ask(portal)(x.toString()) await(future) { log.info(future.value.get) } } }
Await the completion of the future
and then execute continuation f
.
Await the completion of the future
and then execute continuation f
.
Type parameters
- Rep
-
The Portal's type of the reply.
- Req
-
The Portal's type of the request.
- U
-
The Task's type of the output value.
Value parameters
- f
-
The function to execute.
- future
-
The future to await.
Attributes
- Example
-
// Example 1 Await(future) {log.info(future.value.get)}
// Example 2 val myApp = PortalsApp("myApp") { val portal = Registry.portals.get[String, String]("/path/to/external/portal") val asker = TaskBuilder.asker[Int, Int, String, String](portal) { x => val future = ask(portal)(x.toString()) Await(future) { log.info(future.value.get) } } }
Summon the given TaskContext.
Summon the given TaskContext.
This is a generic method, so it can be used with any TaskContext. Here we use a generic task context instead, and use the return type of the dependent contextual generic task context to obtain a more specific task context type. This can for example return a MapTaskContext when used within a MapTask, or a regular ProcessorTaskContext when used within a ProcessorTask.
Attributes
- Returns
-
The contextual task context.
- Example
-
val task = TaskBuilder.map[Int, Int]{ x => // access the ctx to log a message ctx.log.info("Hello from map task") // access the ctx to access the task state ctx.state.get(0) x }
Emit an event.
Emit an event.
To be accessed within a task.
Type parameters
- U
-
The Task's type of the output value.
Value parameters
- event
-
The event to emit.
Attributes
- Example
-
val task = TaskBuilder.processor[Int, Int]{ x => // emit an event emit(x + 1) }
Logger.
Logger.
To be accessed within a task.
Attributes
- Returns
-
The logger.
- Example
-
val task = TaskBuilder.map[Int, Int]{ x => // log the message log.info(x.toString()) x }
Reply with rep
to the handled request.
Reply with rep
to the handled request.
Type parameters
- Rep
-
The Portal's type of the reply.
Value parameters
- rep
-
The reply to send.
Attributes
- Example
-
val myApp = PortalsApp("myApp") { val portal = Registry.portals.get[String, String]("/path/to/external/portal") val asker = TaskBuilder.asker[Int, Int, String, String](portal) { x => val future = ask(portal)(x.toString()) await(future) { log.info(future.value.get) } } }
Task state.
Task state.
To be accessed within a task.
See also: Portals TaskStates, PerKeyState, PerTaskState for more convenient state access, typed state.
Attributes
- Returns
-
The state of the task.
- Example
-
val task = TaskBuilder.processor[Int, Int] { x => state.set(0, state.get(0).getOrElse(0) + x) emit(x) }
Extensions
Extensions
Create an asker task.
Create an asker task.
An asker task is a task that processes regular inputs and outputs, and may also send requests to a portal, and receive and await for the replies. This creates an asker task. During handling an event the asker can ask
the portal, which returns a future of the reply, and can await for the completion of this reply.
Type parameters
- CCU
-
The output type of the asker task.
Attributes
- Returns
-
The new flow builder.
- Example
-
val myApp = PortalsApp("myApp") { val portal = Registry.portals.get[String, String]("/path/to/external/portal") val workflow = Workflows[Int, Nothing]("workflow") .source( ... ) // Create the asker task .asker(portal) [Nothing]{ x => val future = ask(portal)(x.toString()) await(future) { log.info(future.value.get) } } .sink() .freeze() }
Create a replier task.
Create a replier task.
A replier task is a task that processes regular inputs and outputs, and processes requests from a portal. It establishes a connection to a portal, and becomes the main task that handles requests from this portal. The request handler may reply
to the request.
Type parameters
- CCU
-
The output type of the replier task.
Attributes
- Returns
-
The new flow builder.
- Example
-
val myApp = PortalsApp("myApp") { val portal = Registry.portals.get[String, String]("/path/to/external/portal") val workflow = Workflows[Int, Int]("workflow") .source( ... ) // Create the asker task .replier(portal) [Nothing]{ x => // input handler emit(x) } { req => // request handler reply("hello" + req) } .sink() .freeze() }
Execute the provided function f
when the future is completed.
Execute the provided function f
when the future is completed.
Note: The onComplete method is not a blocking method, it is executed when the future is completed (this happens when the reply is received). The execution is ordered serially with any other events, ensuring that there are no concurrently processed events for the task.
Value parameters
- Rep
-
The type of the expected reply
- f
-
The function to execute when the future completes
Attributes
- Example
-
future.onComplete: case Success(value) => println(value) case Failure(exception) => println(exception)
Generator with a single output value sig
.
Generator with a single output value sig
.
Type parameters
- T
-
The type of the output value.
Value parameters
- sig
-
The output value.
Attributes
- Example
-
val sig = Generators.signal[Int](1)