Skip to content

nexosim.aio

Asyncio version of the simulation API.

This module defines an asynchronous version of the Simulation class.

Example usage

import asyncio
from nexosim.aio import Simulation
from nexosim.time import MonotonicTime, Duration
from nexosim.exceptions import SimulationHaltedError

async def run():
    async with Simulation("0.0.0.0:41633") as sim:
        await sim.start()

        await sim.schedule_event(MonotonicTime(1), "input", 1)
        await sim.schedule_event(MonotonicTime(3), "input", 2)
        try:
            await sim.step_until(Duration(5))
        except SimulationHaltedError:
            time = await sim.time()
            print(f"Simulation halted at {time}")
            print(await sim.read_events("output"))

async def halt():
    async with Simulation("0.0.0.0:41633") as sim:
        await asyncio.sleep(2)
        await sim.halt()

async def main():
    await asyncio.gather(run(), halt())

asyncio.run(main())
use nexosim::model::Model;
use nexosim::ports::{EventSource, EventBuffer, Output};
use nexosim::registry::EndpointRegistry;
use nexosim::simulation::{Mailbox, SimInit, Simulation, SimulationError};
use nexosim::time::{MonotonicTime, AutoSystemClock};
use nexosim::server;

#[derive(Default)]
pub(crate) struct MyModel {
    pub(crate) output: Output<OutputEvent>
}

impl MyModel {
    pub async fn my_input(&mut self, value: u16) {
        self.output.send(value).await;
    }
}

impl Model for MyModel {}

fn bench(_cfg: ()) -> Result<(Simulation, EndpointRegistry), SimulationError> {
    let mut model = MyModel::default();

    // Mailboxes.
    let model_mbox = Mailbox::new();
    let model_addr = model_mbox.address();

    // Endpoints.
    let mut registry = EndpointRegistry::new();

    let output = EventBuffer::new();
    model.output.connect_sink(&output);
    registry.add_event_sink(output, "output").unwrap();

    let mut input = EventSource::new();
    input.connect(MyModel::my_input, &model_addr);
    registry.add_event_source(input, "input").unwrap();

    // Assembly and initialization.
    let sim = SimInit::new()
        .add_model(model, model_mbox, "model")
        .set_clock(AutoSystemClock::new())
        .init(MonotonicTime::EPOCH)?
        .0;

    Ok((sim, registry))
}


fn main() {
    server::run(bench, "0.0.0.0:41633".parse().unwrap()).unwrap();
}

Simulation

An asynchronous handle to the remote simulation bench.

Creates a handle to the remote simulation bench running at the specified address.

A gRPC NeXosim server must be running at the specified address.

For a regular remote gRPC connection via HTTP/2, the address should omit the URL scheme and the double-slash prefix (e.g. localhost:41633).

For a local Unix Domain Socket connection, the address is the socket path prefixed with the unix: scheme (e.g. unix:relative/path/to/socket, unix:/absolute/path/to/socket or alternatively unix:///absolute/path/to/socket).

Simulation is an asynchronous context manager. If not used in a async with statement, the aclose() method should be called after use.

Note

While most requests are processed concurrently, step* and process* requests are mutually blocking.

Parameters:

Name Type Description Default
address str

the address at which the NeXosim server is running.

required

aclose() async

Closes the gRPC channel.

start(cfg=None) async

Creates a simulation bench.

If a simulation bench is already running, it is replaced by the newly initialized bench. In such case, events that have not yet been retrieved from the sinks will be lost and the sinks are reset to their default open/close state.

Parameters:

Name Type Description Default
cfg Any

A bench configuration object which can be serialized/deserialized to the expected bench configuration type. The None default is appropriate if the bench initializer expects the Rust type () or accepts an Option::None.

None

Raises:

Type Description
SimulationError

One of the exceptions derived from SimulationError may be raised, such as:

terminate() async

Terminates a simulation.

halt() async

Requests the simulation to stop at the earliest opportunity.

Note that the request will only become effective on the next attempt by the simulator to advance the simulation time.

Raises:

Type Description
SimulationError

One of the exceptions derived from SimulationError may be raised, such as:

time() async

Returns the current simulation time.

Returns:

Type Description
MonotonicTime

The current simulation time.

exceptions.SimulationError: One of the exceptions derived from SimulationError may be raised, such as:

- [`SimulationNotStartedError`][nexosim.exceptions.SimulationNotStartedError]

step() async

Advances simulation time to that of the next scheduled event, processing that event as well as all other events scheduled for the same time and returns the final simulation time.

This method blocks other step* and process* requests until all newly processed events have completed and returns the final simulation time.

Returns:

Type Description
MonotonicTime

The final simulation time.

Raises:

Type Description
SimulationError

step_unbounded() async

Iteratively advances the simulation time until the simulation end, as if by calling step() repeatedly.

The request will complete when all scheduled events are processed or the simulation is halted.

This method blocks other step* and process* requests until completed. The simulation time upon completion is returned.

Returns:

Type Description
MonotonicTime

The final simulation time.

Raises:

Type Description
SimulationError

step_until(deadline) async

Iteratively advances the simulation time until the specified deadline, as if by calling Simulation.step repeatedly.

This method blocks other step* and process* requests until all events scheduled up to the specified target time have completed. The simulation time upon completion is returned and is always equal to the specified target time, whether or not an event was scheduled for that time.

Parameters:

Name Type Description Default
deadline MonotonicTime | Duration

The target time, specified either as an absolute time reference or as a positive duration relative to the current simulation time.

required

Returns:

Type Description
MonotonicTime

The final simulation time.

Raises:

Type Description
SimulationError

schedule_event(deadline, source_name, event=None, period=None, with_key=False) async

schedule_event(deadline: MonotonicTime | Duration, source_name: str, event: object = None, period: None | Duration = None, with_key: typing.Literal[False] = False) -> None
schedule_event(deadline: MonotonicTime | Duration, source_name: str, event: object, period: None | Duration, with_key: typing.Literal[True]) -> EventKey

Schedules an event at a future time.

Events scheduled for the same time and targeting the same model are guaranteed to be processed according to the scheduling order.

Parameters:

Name Type Description Default
deadline MonotonicTime | Duration

The target time, specified either as an absolute time set in the future of the current simulation time or as a strictly positive duration relative to the current simulation time.

required
source_name str

The name of the event source.

required
event object

an object that can be serialized/deserialized to the expected event type. The None default may be used if the Rust event type is () or Option.

None
period None | Duration

An optional, strictly positive duration expressing the repetition period of the event. If left to None, the event is scheduled only once. Otherwise, the first event is scheduled at the specified deadline and repeated periodically from then on until it is cancelled.

None
with_key bool

Optionally requests an event key to be returned, which may be used to cancel the event with Simulation.cancel_event.

False

Returns:

Type Description
EventKey | None

If with_key is set then a key for the scheduled event is returned.

Raises:

Type Description
SimulationError

cancel_event(key) async

Cancels a previously schedule event.

Parameters:

Name Type Description Default
key EventKey

The key for an event that is currently scheduled.

required

Raises:

Type Description
SimulationError

One of the exceptions derived from SimulationError may be raised, such as:

process_event(source_name, event=None) async

Broadcasts an event from an event source immediately, blocking other step* and process* requests until completion.

Simulation time remains unchanged.

Parameters:

Name Type Description Default
source_name str

The name of the event source.

required
event Any

an object that can be serialized/deserialized to the expected event type. The None default may be used if the Rust event type is () or Option.

None

Raises:

Type Description
SimulationError

process_query(source_name, request=None, reply_type=object) async

Broadcasts a query from a query source immediately, , blocking other step* and process* requests until completion.

Simulation time remains unchanged.

Parameters:

Name Type Description Default
source_name str

The name of the query source.

required
request Any

An object that can be serialized/deserialized to the expected request type. The None default may be used if the Rust request type is () or Option.

None
reply_type Type[T]

The Python type to which replies to the query should be mapped. If left unspecified, replies are mapped to their canonical representation in terms of built-in Python types such as bool, int, float, str, bytes, dict and list.

object

Returns:

Type Description
list[T]

An ordered list of replies to the query.

Raises:

Type Description
SimulationError

read_events(sink_name, event_type=object) async

Reads all events from an event sink.

Parameters:

Name Type Description Default
sink_name str

The name of the event sink.

required
event_type Type[T]

The Python type to which events should be mapped. If left unspecified, events are mapped to their canonical representation in terms of built-in Python types such as bool, int, float, str, bytes, dict and list.

object

Returns:

Type Description
list[T]

An ordered list of events.

Raises:

Type Description
SimulationError

One of the exceptions derived from SimulationError may be raised, such as:

await_event(sink_name, timeout, event_type=object) async

Waits for the next event from an event sink.

The call is blocking.

Parameters:

Name Type Description Default
sink_name str

The name of the event sink.

required
event_type Type[T]

The Python type to which events should be mapped. If left unspecified, events are mapped to their canonical representation in terms of built-in Python types such as bool, int, float, str, bytes, dict and list.

object

Returns:

Type Description
T

An event.

Raises:

Type Description
SimulationError

One of the exceptions derived from SimulationError may be raised, such as:

open_sink(sink_name) async

Enables the reception of new events by the specified sink.

Note that the initial state of a sink may be either open or closed depending on the bench initializer.

Parameters:

Name Type Description Default
sink_name str

The name of the event sink.

required

Raises:

Type Description
SimulationError

One of the exceptions derived from SimulationError may be raised, such as:

close_sink(sink_name) async

Disables the reception of new events by the specified sink.

Note that the initial state of a sink may be either open or closed depending on the bench initializer.

Parameters:

Name Type Description Default
sink_name str

The name of the event sink.

required

Raises:

Type Description
SimulationError

One of the exceptions derived from SimulationError may be raised, such as: