User Guide
Before you start
NeXosim-py provides both a conventional API and an asynchronous API.
The asynchronous API makes it possible to concurrently advance simulation time, schedule events and monitor simulation outputs.
Because the asynchronous API faithfully reflects the conventional API, however, most examples in this guide use the conventional API. An example of concurrent simulation management leveraging the asynchronous API is provided in a dedicated section.
Setting up the simulation
The connection with the server is established through instantiation of a
Simulation object.
The client can communicate with the server over either a local Unix Domain Socket or a HTTP/2 connection, depending on how the server is set up.
To connect over a unix socket the address provided to the
Simulation constructor should be the socket path
prefixed with the unix: scheme. The server should be started with the
run_local function.
For a regular remote HTTP connection the address should omit the url scheme and
the server should be started using the run function.
Starting the simulation
Before the server can accept requests, the simulation must be initialized using
the start() method. Attempting to send a request
before the simulation is initialized will raise a
SimulationNotStartedError.
The method accepts a configuration object as an argument that can be used by the bench initializer.
use nexosim::model::Model;
use nexosim::ports::QuerySource;
use nexosim::registry::EndpointRegistry;
use nexosim::simulation::{Mailbox, SimInit, Simulation, SimulationError};
use nexosim::time::MonotonicTime;
use nexosim::server;
pub(crate) struct MyModel {
value: u16
}
impl MyModel {
pub async fn my_replier(&mut self) -> u16 {
self.value
}
pub(crate) fn new(value: u16) -> Self {
Self {value}
}
}
impl Model for MyModel {}
fn bench(cfg: u16) -> Result<(Simulation, EndpointRegistry), SimulationError> {
// Pass the configuration object to the model constructor.
let model = MyModel::new(cfg);
// Mailboxes.
let model_mbox = Mailbox::new();
let model_addr = model_mbox.address();
// Endpoints.
let mut registry = EndpointRegistry::new();
let mut replier = QuerySource::new();
replier.connect(MyModel::my_replier, &model_addr);
registry.add_query_source(replier, "replier").unwrap();
// Assembly and initialization.
let sim = SimInit::new()
.add_model(model, model_mbox, "model")
.init(MonotonicTime::EPOCH)?
.0;
Ok((sim, registry))
}
fn main() {
server::run(bench, "0.0.0.0:41633".parse().unwrap()).unwrap();
}
The configuration object can be any serializable type:
use serde::Deserialize;
use nexosim::model::Model;
use nexosim::ports::QuerySource;
use nexosim::registry::EndpointRegistry;
use nexosim::simulation::{Mailbox, SimInit, Simulation, SimulationError};
use nexosim::time::MonotonicTime;
use nexosim::server;
#[derive(Deserialize)]
struct ModelConfig {
foo: u16,
bar: String,
}
#[derive(Default)]
pub(crate) struct MyModel {
foo: u16,
bar: String,
}
impl MyModel {
pub async fn my_replier(&mut self) -> String {
self.bar.clone()
}
pub(crate) fn new(cfg: ModelConfig) -> Self {
let ModelConfig {foo, bar} = cfg;
Self {foo, bar}
}
}
impl Model for MyModel {}
fn bench(cfg: ModelConfig) -> Result<(Simulation, EndpointRegistry), SimulationError> {
let model = MyModel::new(cfg);
// Mailboxes.
let model_mbox = Mailbox::new();
let model_addr = model_mbox.address();
// Endpoints.
let mut registry = EndpointRegistry::new();
let mut input = QuerySource::new();
input.connect(MyModel::my_replier, &model_addr);
registry.add_query_source(input, "replier").unwrap();
// Assembly and initialization.
let sim = SimInit::new()
.add_model(model, model_mbox, "model")
.init(MonotonicTime::EPOCH)?
.0;
Ok((sim, registry))
}
fn main() {
server::run(bench, "0.0.0.0:41633".parse().unwrap()).unwrap();
}
If start() is called again, the simulation is
reinitialized and its previous state is lost.
Opening and closing sinks
The initial state of the simulation's individual sinks may be either open or closed, depending on the bench initializer. Closed sinks do not receive new events.
The open_sink() and
close_sink() methods can be used to control
the state of individual sinks.
Interacting with the simulation
Interacting with a running simulation for the most part involves:
- broadcasting events and queries to an
EventSourceorQuerySourcerespectively, - scheduling events to occur at a later time,
- advancing the simulation time,
- reading events sent by the simulation to an
EventSink.
Processing events and queries
Events can be broadcast to an EventSource using the
process_event() method.
To broadcast a query to a QuerySource use the
process_query() method. The type of the
returned value can be set using the reply_type parameter.
with Simulation("0.0.0.0:41633") as sim:
sim.start()
output = sim.process_query("my_replier", 5, reply_type=str)
print(output)
# Prints out:
# ['5']
Note
Both the process_event() and
process_query() methods block
until completion and do not affect the simulation time.
Scheduling events
Events can be scheduled for a later simulation time with the
schedule_event() method. Use the period
argument to schedule a periodically recurring event.
To be able to cancel a scheduled event at a later time, the
schedule_event() method must be called
with with_key = True. The event can be then cancelled using the
cancel_event() method and the returned
event key.
with Simulation("0.0.0.0:41633") as sim:
sim.start()
event_key = sim.schedule_event(Duration(10), "my_event", with_key=True)
sim.cancel_event(event_key)
The time at which an event is scheduled can be an absolute simulation time using
MonotonicTime or relative to the current
simulation time using Duration.
Advancing the simulation
The current time of the simulation can be retrieved using the
time() method.
The simulation can be advanced to the time of the next scheduled events with the
step() method. All events scheduled for the same
time are processed as well. This method blocks until all of the relevant events
are processed.
from nexosim import Simulation
from nexosim.time import MonotonicTime
with Simulation("0.0.0.0:41633") as sim:
sim.start()
sim.schedule_event(MonotonicTime(1), "input", 1)
sim.step()
print(sim.time()) # 1970-01-01 00:00:01
To advance the simulation to the specified time, processing all events scheduled
up to that time, use the step_until()
method. This method blocks until all of the relevant events are processed or, if
the simulation is synchronized with a Clock, until the specified simulation
time is reached. The time can be an absolute simulation time using
MonotonicTime or relative to the current
simulation time using Duration.
from nexosim import Simulation
from nexosim.time import MonotonicTime, Duration
with Simulation("0.0.0.0:41633") as sim:
sim.start()
sim.step_until(MonotonicTime(1))
sim.step_until(MonotonicTime(2))
print(sim.time()) # 1970-01-01 00:00:02
sim.step_until(Duration(2))
print(sim.time()) # 1970-01-01 00:00:04
The step_unbounded() method processes all
of the scheduled events as if by calling the step()
method repeatedly. This method blocks until completed.
from nexosim import Simulation
from nexosim.time import MonotonicTime
with Simulation("0.0.0.0:41633") as sim:
sim.start()
sim.schedule_event(MonotonicTime(1), "input", 1)
sim.schedule_event(MonotonicTime(3), "input", 1)
sim.step_unbounded()
print(sim.time()) # 1970-01-01 00:00:03
The simulation can be stopped using the halt()
method. After receiving a halt request, the simulation will stop at the next
attempt by the simulator to advance simulation time.
The next attempt to advance the simulation time, including if performed as part
of a concurrently executing step_until() and step_unbounded() call, will
raise a SimulationHaltedError.
The following is an example using the asyncio API and a simulation bench synchronized with the system clock:
import asyncio
from nexosim.aio import Simulation
from nexosim.exceptions import SimulationHaltedError
from nexosim.time import MonotonicTime, Duration
async def run():
async with Simulation("0.0.0.0:41633") as sim:
await sim.start()
await sim.schedule_event(MonotonicTime(1), "input", 1)
await sim.schedule_event(MonotonicTime(3), "input", 2)
try:
await sim.step_until(Duration(5))
except SimulationHaltedError:
time = await sim.time()
print(f"Simulation halted at {time}")
print(await sim.read_events("output"))
async def halt():
async with Simulation("0.0.0.0:41633") as sim:
await asyncio.sleep(2)
await sim.halt()
async def main():
await asyncio.gather(run(), halt())
asyncio.run(main())
# Prints out:
# Simulation halted at 1970-01-01 00:00:03
# [1]
use nexosim::model::Model;
use nexosim::ports::{EventSource, EventBuffer, Output};
use nexosim::registry::EndpointRegistry;
use nexosim::simulation::{Mailbox, SimInit, Simulation, SimulationError};
use nexosim::time::{MonotonicTime, AutoSystemClock};
use nexosim::server;
#[derive(Default)]
pub(crate) struct MyModel {
pub(crate) output: Output<u16>
}
impl MyModel {
pub async fn my_input(&mut self, value: u16) {
self.output.send(value).await;
}
}
impl Model for MyModel {}
fn bench(_cfg: ()) -> Result<(Simulation, EndpointRegistry), SimulationError> {
let mut model = MyModel::default();
// Mailboxes.
let model_mbox = Mailbox::new();
let model_addr = model_mbox.address();
// Endpoints.
let mut registry = EndpointRegistry::new();
let output = EventBuffer::new();
model.output.connect_sink(&output);
registry.add_event_sink(output, "output").unwrap();
let mut input = EventSource::new();
input.connect(MyModel::my_input, &model_addr);
registry.add_event_source(input, "input").unwrap();
// Assembly and initialization.
let sim = SimInit::new()
.add_model(model, model_mbox, "model")
.set_clock(AutoSystemClock::new()) // Synchronize with the system clock.
.init(MonotonicTime::EPOCH)?
.0;
Ok((sim, registry))
}
fn main() {
server::run(bench, "0.0.0.0:41633".parse().unwrap()).unwrap();
}
In the above example the simulation is stopped after 2 seconds. After the first event is processed the simulation time jumps to the time of the next event, but, since the simulation is synchronized with a real-time clock, the simulation is stopped before the next event can be processed.
Reading events
Events sent to sinks can be read using the
read_events() method. The event_type
parameter controls the type the read event will be mapped to.
from dataclasses import dataclass
from nexosim import Simulation
@dataclass
class OutputEvent:
foo: int
bar: str
with Simulation(address='localhost:41633') as sim:
sim.start()
sim.process_event("input", 1)
outputs = sim.read_events("output", OutputEvent)
print(f"Events mapped to the OutputEvent class: {outputs}")
sim.process_event("input", 1)
outputs = sim.read_events("output")
print(f"Events without specifying the reply type: {outputs}")
# Prints out:
# Events mapped to the OutputEvent class: [OutputEvent(foo=1, bar='string')]
# Events without specifying the reply type: [{'foo': 1, 'bar': 'string'}]
use serde::Serialize;
use nexosim::model::Model;
use nexosim::ports::{EventSource, EventBuffer, Output};
use nexosim::registry::EndpointRegistry;
use nexosim::simulation::{Mailbox, SimInit, Simulation, SimulationError};
use nexosim::time::MonotonicTime;
use nexosim::server;
#[derive(Clone, Serialize)]
pub(crate) struct OutputEvent {
pub(crate) foo: u16,
pub(crate) bar: String,
}
#[derive(Default)]
pub(crate) struct MyModel {
pub(crate) output: Output<OutputEvent>
}
impl MyModel {
pub async fn my_input(&mut self, value: u16) {
let event = OutputEvent{foo: value, bar: String::from("string")};
self.output.send(event).await;
}
}
impl Model for MyModel {}
fn bench(_cfg: ()) -> Result<(Simulation, EndpointRegistry), SimulationError> {
let mut model = MyModel::default();
// Mailboxes.
let model_mbox = Mailbox::new();
let model_addr = model_mbox.address();
// Endpoints.
let mut registry = EndpointRegistry::new();
let output = EventBuffer::new();
model.output.connect_sink(&output);
registry.add_event_sink(output, "output").unwrap();
let mut input = EventSource::new();
input.connect(MyModel::my_input, &model_addr);
registry.add_event_source(input, "input").unwrap();
// Assembly and initialization.
let sim = SimInit::new()
.add_model(model, model_mbox, "model")
.init(MonotonicTime::EPOCH)?
.0;
Ok((sim, registry))
}
fn main() {
server::run(bench, "0.0.0.0:41633".parse().unwrap()).unwrap();
}
The EventSink must be open to receive events
from the simulation.
Serializable types
The NeXosim-py package provides a convenient API for constructing Python
counterparts to rust's struct and enum types that can be (de)serialized as
events, requests or replies within a Simulation.
A detailed description of how to use serializable types can be found in the types module reference.
Asyncio API
The aio module provides the asynchronous
Simulation class, with an interface mirroring that
of the regular Simulation class. The asynchronous
version can be used with asyncio to perform concurrent calls to the
simulation.
Note
Note that step* and process* requests are mutually blocking when using
the asynchronous Simulation.
Here's an example usage of the aio API with concurrent requests and a simulation synchronized with the system clock:
import asyncio
from nexosim.aio import Simulation
from nexosim.time import MonotonicTime, Duration
async def run():
async with Simulation("0.0.0.0:41633") as sim:
await sim.start()
await sim.schedule_event(MonotonicTime(1), "input", 1)
await sim.schedule_event(MonotonicTime(3), "input", 2)
print("step_until started")
await sim.step_until(Duration(4))
print("step_until finished")
async def read():
async with Simulation("0.0.0.0:41633") as sim:
await asyncio.sleep(2)
print(await sim.read_events("output"))
await asyncio.sleep(3)
print(await sim.read_events("output"))
async def main():
await asyncio.gather(run(), read())
asyncio.run(main())
# Prints out
# step_until started
# [1]
# step_until finished
# [2]
use nexosim::model::Model;
use nexosim::ports::{EventSource, EventBuffer, Output};
use nexosim::registry::EndpointRegistry;
use nexosim::simulation::{Mailbox, SimInit, Simulation, SimulationError};
use nexosim::time::{MonotonicTime, AutoSystemClock};
use nexosim::server;
#[derive(Default)]
pub(crate) struct MyModel {
pub(crate) output: Output<u16>
}
impl MyModel {
pub async fn my_input(&mut self, value: u16) {
self.output.send(value).await;
}
}
impl Model for MyModel {}
fn bench(_cfg: ()) -> Result<(Simulation, EndpointRegistry), SimulationError> {
let mut model = MyModel::default();
// Mailboxes.
let model_mbox = Mailbox::new();
let model_addr = model_mbox.address();
// Endpoints.
let mut registry = EndpointRegistry::new();
let output = EventBuffer::new();
model.output.connect_sink(&output);
registry.add_event_sink(output, "output").unwrap();
let mut input = EventSource::new();
input.connect(MyModel::my_input, &model_addr);
registry.add_event_source(input, "input").unwrap();
// Assembly and initialization.
let sim = SimInit::new()
.add_model(model, model_mbox, "model")
.set_clock(AutoSystemClock::new()) // Synchronize with the system clock.
.init(MonotonicTime::EPOCH)?
.0;
Ok((sim, registry))
}
fn main() {
server::run(bench, "0.0.0.0:41633".parse().unwrap()).unwrap();
}