DSL

portals.api.dsl.DSL$
object DSL

DSL extensions for Portals, convenient shorthands for building applications.

The preferred way of building Portals applications is via the PortalsApp in portals.api.dsl.DSL.PortalsApp.

Attributes

See also
Graph
Supertypes
class Object
trait Matchable
class Any
Self type
DSL.type

Members list

Type members

Classlikes

class FlowBuilderAsker[T, U, CT, CU, CCU](fb: FlowBuilder[T, U, CT, CU])

Attributes

Supertypes
class Object
trait Matchable
class Any
class FlowBuilderReplier[T, U, CT, CU, CCU](fb: FlowBuilder[T, U, CT, CU])

Attributes

Supertypes
class Object
trait Matchable
class Any

Value members

Concrete methods

Summon the ConnectionBuilder.

Summon the ConnectionBuilder.

The connection builder is used to connect an atomic stream to a sequencer.

Use this method to create a connection within the context of an applicationbuilder.

Attributes

Returns

The connection builder.

Example
val myApp = PortalsApp("myApp") {
 val gen1 = Generators.signal[String]("hello")
 val gen2 = Generators.signal[String]("world")
 val sequencer = Sequencers.random[String]()
 val _ = Connections.connect(gen1.stream, sequencer)
 val _ = Connections.connect(gen2.stream, sequencer)
}
def Generators(name: String)(using ab: ApplicationBuilder): GeneratorBuilder

Summon the GeneratorBuilder.

Summon the GeneratorBuilder.

Use this method to create a generator within the context of an applicationbuilder.

Value parameters

name

The name of the generator.

Attributes

Returns

The generator builder.

Example
val myApp = PortalsApp("myApp") {
 val generator = Generators("name").fromList(List("hello", "world")
}

Summon the GeneratorBuilder.

Summon the GeneratorBuilder.

Use this method to create a generator within the context of an applicationbuilder.

If the name is omitted, then the generator will be anonymous and a generated identifier will be used instead.

Attributes

Returns

The generator builder.

Example
val myApp = PortalsApp("myApp") {
 val generator = Generators.fromList(List("hello", "world")
}
def Portal[T, R](name: String)(using ab: ApplicationBuilder): AtomicPortalRef[T, R]

Summon the PortalBuilder.

Summon the PortalBuilder.

The portal builder is used to create portals. Portals have a request type, and a reply type, and a name. Some workflow must be designated to handle the requests of a portal, this is done via creating an asker task. The asker task has two event handlers, one for regular inputs to the task, and one for the requests. Another workflow may connect to the portal to send requests, this is done via a replyer task. The replyer task has a single event handler for the regular incoming events. Sending a request by the ask method will create a future, the asking task can await the completion of this future, which registers a continuation which is executed when the reply is received.

Use this method to create a portal within the context of an applicationbuilder.

Attributes

Example
// Example 1
val myApp = PortalsApp("myApp") {
 // Create a Portal with name "portal" which has request type Int, and Reply type String
 val portal = Portal[Int, String]("portal")
}
// Example 2
/////////////////////////////////////////////////////////////////////////
// Queries
/////////////////////////////////////////////////////////////////////////
case class Query()
case class QueryReply(x: Int)
val myApp = PortalsApp("myApp") {
 ///////////////////////////////////////////////////////////////////////
 // Aggregator
 ///////////////////////////////////////////////////////////////////////
 // Create a Portal which has request type Query, and Reply type QueryReply
 val portal = Portal[Query, QueryReply]("portal")
 // Atom(1, 2, 3, 4), Atom(1, 2, 3, 4), ...
 val generator = Generators.fromListOfLists(List.fill(10)(List(1, 2, 3, 4)))
 // The replying workflow
 val aggregator = Workflows[Int, Nothing]("aggregator")
 .source(generator.stream)
 // The replier consumes the stream of Ints, aggregates it. It replies to
 // requests with the latest aggregated value.
 .replier[Nothing](portal) { x => // regular input event (Int)
 // Aggregate the value
 val sum = PerTaskState("sum", 0)
 sum.set(sum.get() + x)
 } { case Query() => // handle portal request
 // Reply with the latest aggregated value
 reply(QueryReply(PerTaskState("sum", 0).get()))
 }
 .sink()
 .freeze()
 ///////////////////////////////////////////////////////////////////////
 // Query
 ///////////////////////////////////////////////////////////////////////
 // Atom(0), Atom(0), ...
 val queryTrigger = Generators.fromListOfLists(List.fill(10)(List(0)))
 // The requesting workflow
 val queryWorkflow = Workflows[Int, Nothing]("queryWorkflow")
 .source(queryTrigger.stream)
 // The asker consumes a stream of 0s, for each event it will send a Query
 // to the portal, and wait for and print the reply.
 .asker[Nothing](portal) { x =>
 // query the replier
 val future: Future[QueryReply] = ask(portal)(Query())
 future.await { // wait for the reply to complete
 future.value.get match
 case QueryReply(x) =>
 // print the aggregate to log
 ctx.log.info(x.toString())
 }
 }
 .sink()
 .freeze()
}
val system = Systems.test()
system.launch(myApp)
system.stepUntilComplete()
system.shutdown()
def Portal[T, R](name: String, f: T => Long)(using ab: ApplicationBuilder): AtomicPortalRef[T, R]

Summon the PortalBuilder.

Summon the PortalBuilder.

The portal builder is used to create portals. Portals have a request type, and a reply type, and a name. Some workflow must be designated to handle the requests of a portal, this is done via creating an asker task. The asker task has two event handlers, one for regular inputs to the task, and one for the requests. Another workflow may connect to the portal to send requests, this is done via a replyer task. The replyer task has a single event handler for the regular incoming events. Sending a request by the ask method will create a future, the asking task can await the completion of this future, which registers a continuation which is executed when the reply is received.

Use this method to create a portal within the context of an applicationbuilder.

Attributes

Example
// Example 1
val myApp = PortalsApp("myApp") {
 // Create a Portal with name "portal" which has request type Int, and Reply type String
 val portal = Portal[Int, String]("portal")
}
// Example 2
/////////////////////////////////////////////////////////////////////////
// Queries
/////////////////////////////////////////////////////////////////////////
case class Query()
case class QueryReply(x: Int)
val myApp = PortalsApp("myApp") {
 ///////////////////////////////////////////////////////////////////////
 // Aggregator
 ///////////////////////////////////////////////////////////////////////
 // Create a Portal which has request type Query, and Reply type QueryReply
 val portal = Portal[Query, QueryReply]("portal")
 // Atom(1, 2, 3, 4), Atom(1, 2, 3, 4), ...
 val generator = Generators.fromListOfLists(List.fill(10)(List(1, 2, 3, 4)))
 // The replying workflow
 val aggregator = Workflows[Int, Nothing]("aggregator")
 .source(generator.stream)
 // The replier consumes the stream of Ints, aggregates it. It replies to
 // requests with the latest aggregated value.
 .replier[Nothing](portal) { x => // regular input event (Int)
 // Aggregate the value
 val sum = PerTaskState("sum", 0)
 sum.set(sum.get() + x)
 } { case Query() => // handle portal request
 // Reply with the latest aggregated value
 reply(QueryReply(PerTaskState("sum", 0).get()))
 }
 .sink()
 .freeze()
 ///////////////////////////////////////////////////////////////////////
 // Query
 ///////////////////////////////////////////////////////////////////////
 // Atom(0), Atom(0), ...
 val queryTrigger = Generators.fromListOfLists(List.fill(10)(List(0)))
 // The requesting workflow
 val queryWorkflow = Workflows[Int, Nothing]("queryWorkflow")
 .source(queryTrigger.stream)
 // The asker consumes a stream of 0s, for each event it will send a Query
 // to the portal, and wait for and print the reply.
 .asker[Nothing](portal) { x =>
 // query the replier
 val future: Future[QueryReply] = ask(portal)(Query())
 future.await { // wait for the reply to complete
 future.value.get match
 case QueryReply(x) =>
 // print the aggregate to log
 ctx.log.info(x.toString())
 }
 }
 .sink()
 .freeze()
}
val system = Systems.test()
system.launch(myApp)
system.stepUntilComplete()
system.shutdown()
def PortalsApp(name: String)(app: ApplicationBuilder ?=> Unit): Application

Build a new PortalsApplication.

Build a new PortalsApplication.

Use this method to create a new Portals application. Define the application as the second parameter to this method, using the contextual application builder. Within this environment you can use the methods of the contextual appllication builder, and other related methods that use the contextual application builder from the DSL, including Sequencers, Connections, Splitters, Portal, Workflows, Generators, Splits, and Registry.

Value parameters

app

The application factory (see example).

name

The name of the application.

Attributes

Returns

The Portals application.

Example
// define app
val myApp = PortalsApp("myApp") {
 val generator = Generators.signal[String]("hello world")
 val workflow = Workflows[String, String]()
   .source(generator.stream)
   .logger()
   .sink()
   .freeze()
}
// launch app
val system = Systems.test()
system.launch(myApp)
system.stepUntilComplete()
system.shutdown()

Summon the RegistryBuilder.

Summon the RegistryBuilder.

Use this method to create a registry within the context of an applicationbuilder.

Attributes

Returns

The registry builder.

Example
val myApp = PortalsApp("myApp") {
 val externalStream =
   Registry.streams.get[String]("/path/to/external/stream")
 val externalSequencer =
   Registry.sequencers.get[String]("/path/to/external/sequencer")
 val externalPortal =
   Registry.portals.get[String, String]("/path/to/external/portal")
 val externalSplitter =
   Registry.splitters.get[String]("/path/to/external/splitter")
}
def Sequencers(name: String)(using ab: ApplicationBuilder): SequencerBuilder

Summon the SequencerBuilder.

Summon the SequencerBuilder.

A sequencer sequences a set of (mutable) input streams to a single output stream. New inputs to the sequencer can be added via the Connections connect method. You can access the output stream of a sequencer via its stream member.

Use this method to create a sequencer within the context of an applicationbuilder.

Value parameters

name

The name of the sequencer.

Attributes

Returns

The sequencer builder.

Example
val myApp = PortalsApp("myApp") {
 val gen1 = Generators.signal[String]("hello")
 val gen2 = Generators.signal[String]("world")
 val sequencer = Sequencers.random[String]()
 val _ = Connections.connect(gen1.stream, sequencer)
 val _ = Connections.connect(gen2.stream, sequencer)
}

Summon the SequencerBuilder.

Summon the SequencerBuilder.

A sequencer sequences a set of (mutable) input streams to a single output stream. New inputs to the sequencer can be added via the Connections connect method. You can access the output stream of a sequencer via its stream member.

Use this method to create a sequencer within the context of an applicationbuilder.

If the name is omitted, then the sequencer will be anonymous and a generated identifier will be used instead.

Attributes

Returns

The sequencer builder.

Example
val myApp = PortalsApp("myApp") {
 val gen1 = Generators.signal[String]("hello")
 val gen2 = Generators.signal[String]("world")
 val sequencer = Sequencers.random[String]()
 val _ = Connections.connect(gen1.stream, sequencer)
 val _ = Connections.connect(gen2.stream, sequencer)
}

Summon the SplitBuilder.

Summon the SplitBuilder.

The split builder is used to create a new stream (split) from a splitter. The splitter consumes an atomic stream, and splits can be added to this splitter via the Splits split method. The split method takes a predicate to filter out the events for the split. The method returns an atomic stream of the new split.

Use this method to create a split within the context of an applicationbuilder.

Attributes

Returns

The split builder.

Example
val myApp = PortalsApp("myApp") {
  val generator = Generators.fromList[String](List("Hello", "World"))
  // Create a splitter that splits the generator stream
  val splitter = Splitters.empty[String](generator.stream)
  // Add a split that only contains the "Hello" elements
  val helloSplit = Splits.split(splitter, _ == "Hello")
  // Add a split that only contains the "World" elements
  val worldSplit = Splits.split(splitter, _ == "World")
}
def Splitters(name: String)(using ab: ApplicationBuilder): SplitterBuilder

Summon the SplitterBuilder.

Summon the SplitterBuilder.

The splitter splits an input atomic stream into multiple output streams. The splits are mutable, we can add a new split via the Splits split method, this method takes the splitter and a predicate that determines which elements are kept for the split, and returns a stream of the new split.

Use this method to create a splitter within the context of an applicationbuilder.

Value parameters

name

The name of the splitter.

Attributes

Returns

The splitter builder.

Example
val myApp = PortalsApp("myApp") {
  val generator = Generators.fromList[String](List("Hello", "World"))
  // Create a splitter that splits the generator stream
  val splitter = Splitters.empty[String](generator.stream)
  // Add a split that only contains the "Hello" elements
  val helloSplit = Splits.split(splitter, _ == "Hello")
  // Add a split that only contains the "World" elements
  val worldSplit = Splits.split(splitter, _ == "World")
  // Log the output
  val wf = Workflows[String, String]()
    .source(helloSplit)
    .logger()
    .sink()
    .freeze()
}

Summon the SplitterBuilder.

Summon the SplitterBuilder.

The splitter splits an input atomic stream into multiple output streams. The splits are mutable, we can add a new split via the Splits split method, this method takes the splitter and a predicate that determines which elements are kept for the split, and returns a stream of the new split.

Use this method to create a splitter within the context of an applicationbuilder.

If the name is omitted, then the sequencer will be anonymous and a generated identifier will be used instead.

Attributes

Returns

The splitter builder.

Example
val myApp = PortalsApp("myApp") {
  val generator = Generators.fromList[String](List("Hello", "World"))
  // Create a splitter that splits the generator stream
  val splitter = Splitters.empty[String](generator.stream)
  // Add a split that only contains the "Hello" elements
  val helloSplit = Splits.split(splitter, _ == "Hello")
  // Add a split that only contains the "World" elements
  val worldSplit = Splits.split(splitter, _ == "World")
  // Log the output
  val wf = Workflows[String, String]()
    .source(helloSplit)
    .logger()
    .sink()
    .freeze()
}
def Tasks: TaskBuilder.type

Summon the TaskBuilder.

Summon the TaskBuilder.

Use this method to create a task behavior with the summoned TaskBuilder.

Attributes

Returns

The task builder.

Example
Tasks.processor[Int, Int] { ctx ?=> x => ctx.emit(x) 

}

def Workflows[T, U]()(using ab: ApplicationBuilder): WorkflowBuilder[T, U]

Summon the WorkflowBuilder.

Summon the WorkflowBuilder.

Use this method to create a workflow within the context of an applicationbuilder.

If the name is omitted, then the generator will be anonymous and a generated identifier will be used instead.

Attributes

Returns

The workflow builder.

Example
val myApp = PortalsApp("myApp") {
val generator = Generators.signal[String]("hello world")
val workflow = Workflows[String, String]()
 .source(generator.stream)
 .logger()
 .sink()
 .freeze()
}
def Workflows[T, U](name: String)(using ab: ApplicationBuilder): WorkflowBuilder[T, U]

Summon the WorkflowBuilder.

Summon the WorkflowBuilder.

Use this method to create a workflow within the context of an applicationbuilder.

Value parameters

name

The name of the workflow.

Attributes

Returns

The workflow builder.

Example
val myApp = PortalsApp("myApp") {
val generator = Generators.signal[String]("hello world")
val workflow = Workflows[String, String]("name")
 .source(generator.stream)
 .logger()
 .sink()
 .freeze()
}
def ask[Req, Rep](using ctx: AskingTaskContext[Req, Rep])(portal: AtomicPortalRefKind[Req, Rep])(req: Req): Future[Rep]

Ask the portal with req, returns a future of the reply.

Ask the portal with req, returns a future of the reply.

Type parameters

Rep

The Portal's type of the reply.

Req

The Portal's type of the request.

Value parameters

portal

The portal to ask.

req

The request to send.

Attributes

Returns

A future of the reply.

Example
val myApp = PortalsApp("myApp") {
 val portal = Registry.portals.get[String, String]("/path/to/external/portal")
 val asker = TaskBuilder.asker[Int, Int, String, String](portal) { x =>
 val future = ask(portal)(x.toString())
 await(future) {
   log.info(future.value.get)
 }
}
}
def await[U, Req, Rep](future: Future[Rep])(f: AskerTaskContext[_, U, Req, Rep] ?=> Unit)(using ctx: AwaitingTaskContext[U, Req, Rep]): Unit

Await the completion of the future and then execute continuation f.

Await the completion of the future and then execute continuation f.

Type parameters

Rep

The Portal's type of the reply.

Req

The Portal's type of the request.

U

The Task's type of the output value.

Value parameters

f

The function to execute.

future

The future to await.

Attributes

Example
// Example 1
Await(future) {log.info(future.value.get)}
// Example 2
val myApp = PortalsApp("myApp") {
 val portal = Registry.portals.get[String, String]("/path/to/external/portal")
 val asker = TaskBuilder.asker[Int, Int, String, String](portal) { x =>
   val future = ask(portal)(x.toString())
   Await(future) {
     log.info(future.value.get)
   }
 }
}
def ctx(using gctx: GenericGenericTaskContext): GenericGenericTaskContext

Summon the given TaskContext.

Summon the given TaskContext.

This is a generic method, so it can be used with any TaskContext. Here we use a generic task context instead, and use the return type of the dependent contextual generic task context to obtain a more specific task context type. This can for example return a MapTaskContext when used within a MapTask, or a regular ProcessorTaskContext when used within a ProcessorTask.

Attributes

Returns

The contextual task context.

Example
val task = TaskBuilder.map[Int, Int]{ x =>
 // access the ctx to log a message
 ctx.log.info("Hello from map task")
 // access the ctx to access the task state
 ctx.state.get(0)
 x
}
def emit[U](event: U)(using EmittingTaskContext[U]): Unit

Emit an event.

Emit an event.

To be accessed within a task.

Type parameters

U

The Task's type of the output value.

Value parameters

event

The event to emit.

Attributes

Example
val task = TaskBuilder.processor[Int, Int]{ x =>
 // emit an event
 emit(x + 1)
}
def log(using LoggingTaskContext): Logger

Logger.

Logger.

To be accessed within a task.

Attributes

Returns

The logger.

Example
val task = TaskBuilder.map[Int, Int]{ x =>
 // log the message
 log.info(x.toString())
 x
}
def reply[Rep](using ctx: ReplyingTaskContext[Rep])(rep: Rep): Unit

Reply with rep to the handled request.

Reply with rep to the handled request.

Type parameters

Rep

The Portal's type of the reply.

Value parameters

rep

The reply to send.

Attributes

Example
val myApp = PortalsApp("myApp") {
 val portal = Registry.portals.get[String, String]("/path/to/external/portal")
 val asker = TaskBuilder.asker[Int, Int, String, String](portal) { x =>
   val future = ask(portal)(x.toString())
   await(future) {
     log.info(future.value.get)
   }
 }
}
def state(using StatefulTaskContext): TaskState[Any, Any]

Task state.

Task state.

To be accessed within a task.

See also: Portals TaskStates, PerKeyState, PerTaskState for more convenient state access, typed state.

Attributes

Returns

The state of the task.

Example
val task = TaskBuilder.processor[Int, Int] { x =>
 state.set(0, state.get(0).getOrElse(0) + x)
 emit(x)
}

Extensions

Extensions

extension [T, U, CT, CU](fb: FlowBuilder[T, U, CT, CU])(fb: FlowBuilder[T, U, CT, CU])
def asker[CCU]: FlowBuilderAsker[T, U, CT, CU, CCU]

Create an asker task.

Create an asker task.

An asker task is a task that processes regular inputs and outputs, and may also send requests to a portal, and receive and await for the replies. This creates an asker task. During handling an event the asker can ask the portal, which returns a future of the reply, and can await for the completion of this reply.

Type parameters

CCU

The output type of the asker task.

Attributes

Returns

The new flow builder.

Example
val myApp = PortalsApp("myApp") {
 val portal = Registry.portals.get[String, String]("/path/to/external/portal")
 val workflow = Workflows[Int, Nothing]("workflow")
   .source( ... )
   // Create the asker task
   .asker(portal) [Nothing]{ x =>
     val future = ask(portal)(x.toString())
     await(future) {
       log.info(future.value.get)
     }
   }
   .sink()
   .freeze()
}
def replier[CCU]: FlowBuilderReplier[T, U, CT, CU, CCU]

Create a replier task.

Create a replier task.

A replier task is a task that processes regular inputs and outputs, and processes requests from a portal. It establishes a connection to a portal, and becomes the main task that handles requests from this portal. The request handler may reply to the request.

Type parameters

CCU

The output type of the replier task.

Attributes

Returns

The new flow builder.

Example
val myApp = PortalsApp("myApp") {
 val portal = Registry.portals.get[String, String]("/path/to/external/portal")
 val workflow = Workflows[Int, Int]("workflow")
   .source( ... )
   // Create the asker task
   .replier(portal) [Nothing]{ x => // input handler
     emit(x)
   } { req => // request handler
     reply("hello" + req)
   }
   .sink()
   .freeze()
}
extension [Rep](future: Future[Rep])(future: Future[Rep])
def onComplete[T, U, Req](f: Try[Rep] => Unit)(using AskerTaskContext[T, U, Req, Rep]): Unit

Execute the provided function f when the future is completed.

Execute the provided function f when the future is completed.

Note: The onComplete method is not a blocking method, it is executed when the future is completed (this happens when the reply is received). The execution is ordered serially with any other events, ensuring that there are no concurrently processed events for the task.

Value parameters

Rep

The type of the expected reply

f

The function to execute when the future completes

Attributes

Example
future.onComplete:
 case Success(value) => println(value)
 case Failure(exception) => println(exception)
extension (gb: GeneratorBuilder)(gb: GeneratorBuilder)
def signal[T](sig: T): AtomicGeneratorRef[T]

Generator with a single output value sig.

Generator with a single output value sig.

Type parameters

T

The type of the output value.

Value parameters

sig

The output value.

Attributes

Example
val sig = Generators.signal[Int](1)