iblutil.io.net.app

Examples

# Connect to remote server rig, send initialization message and wait for response >>> server = await EchoProtocol.server(‘udp://192.168.0.4’, name=’main’) >>> await server.init(‘2022-01-01_1_subject’) # Send init message and await confirmation of receipt >>> response = await server.on_event(‘INIT’) # Await response

# Send initialization message and wait max 10 seconds for response >>> try: … response = await asyncio.wait_for(server.on_event(‘INIT’), 10.) … except asyncio.TimeoutError: … server.close()

Functions

main

An example of an entry point for creating an individual communicator.

Classes

EchoProtocol

An echo server implementing TCP/IP and UDP.

Services

Handler for multiple remote rig services.

class EchoProtocol(server_uri, role, name=None, logger=None)[source]

Bases: Communicator

An echo server implementing TCP/IP and UDP.

This should be instantiated using either EchoProtocol.server or EchoProtocol.client. In the client role, the remote address is specified; in the server role, the local address is specified.

Server

A network server instance if using TCP/IP.

Type:

asyncio.Server

role

The communicator role. A server may communicate with multiple clients. The server URI specifies its local address. A client only communicates with a single host, specified by the server URI.

Type:

{‘client’, ‘server’}

default_echo_timeout

The default maximum time in seconds to await a message echo.

Type:

float

_last_sent

A map of addresses holding the last sent bytes str and the future being waited on. In client mode there should only be one entry - the server URI.

Type:

dict[(str, int), (bytes, asyncio.Future)]

Server = None
default_echo_timeout = 1.0
property role: str

The remote computer’s role

Type:

{‘client’, ‘server’}

property is_connected: bool

True if transport layer set and open.

Type:

bool

awaiting_response(addr=None) bool[source]

bool: True if awaiting confirmation of receipt from remote.

async cleanup(data=None, addr=None)[source]

Cleanup experiment.

Send a cleanup message to the remote host.

Parameters:
  • data (any) – Optional extra data to send to the remote host.

  • addr ((str, int)) – The remote host address and port. Only required in server role.

Raises:

TimeoutError – Remote host failed to echo the message within the timeout period.

async start(exp_ref, data=None, addr=None)[source]

Start an experiment.

Send a stop message to the remote host.

Parameters:
  • exp_ref (str) – A experiment reference string in the form yyyy-mm-dd_n_subject.

  • data (any) – Optional extra data to send to the remote host.

  • addr ((str, int)) – The remote host address and port. Only required in server role.

Raises:

TimeoutError – Remote host failed to echo the message within the timeout period.

async stop(data=None, immediately=False, addr=None)[source]

End an experiment.

Send a stop message to the remote host.

Parameters:
  • data (any) – Optional extra data to send to the remote host.

  • immediately (bool) – If True, an EXPINTERRUPT signal is used.

  • addr ((str, int)) – The remote host address and port. Only required in server role.

Raises:

TimeoutError – Remote host failed to echo the message within the timeout period.

async init(data=None, addr=None)[source]

Initialize an experiment.

Send an initialization message to the remote host.

Parameters:
  • data (any) – Optional extra data to send to the remote host.

  • addr ((str, int)) – The remote host address and port. Only required in server role.

Raises:

TimeoutError – Remote host failed to echo the message within the timeout period.

async alyx(alyx=None, addr=None)[source]

Send/request Alyx token to/from remote host.

Parameters:
  • alyx (one.webclient.AlyxClient) – An instance of Alyx to extract and send token from.

  • addr ((str, int))

  • role. (The remote host address and port. Only required in server)

Returns:

  • (str, dict) – (If alyx arg was None) the received Alyx token in the form (base_url, {user: token}).

  • (str, int) – The hostname and port of the remote host.

static encode(data)[source]

Serialize data for transmission

send(data, addr=None)[source]

Send data to clients.

Serialize data and pass to transport layer.

async confirmed_send(data, addr=None, timeout=None)[source]

Send a message to the client and await echo.

NB: Methods such as start, stop, init, cleanup and alyx should be used instead of calling this directly.

Parameters:
  • data (any) – The data to serialize and send to remote host.

  • addr ((str, int)) – The remote host address and port. Only required in server role.

  • timeout (float, optional) – The time in seconds to wait for an echo before raising an exception.

Raises:
  • TimeoutError – Remote host failed to echo the message within the timeout period.

  • RuntimeError – The response from the client did not match the original message.

  • ValueError – Timeout must be non-zero number. Unexpected remote address: in client mode the address must match server_uri.

  • TypeError – In server mode a remote address must be provided.

close()[source]

Close the connection, de-register callbacks and cancel outstanding futures.

The EchoProtocol.on_connection_lost future is resolved at this time, all others are cancelled. NB: Closing the socket should be handled by transport base class later on.

connection_made(transport)[source]

Called by event loop

datagram_received(data, addr)[source]

Called by UDP transport layer

data_received(data)[source]

Called by TCP/IP transport layer

error_received(exc)[source]
eof_received()[source]
connection_lost(exc)[source]
async static server(server_uri, name=None, log=None, **kwargs) EchoProtocol[source]

Create a remote server instance.

Parameters:
  • server_uri (str, ipaddress.IPv4Address, ipaddress.IPv6Address) – The address of the remote computer, may be an IP or hostname with or without a port. To use TCP/IP instead of the default UDP, add a ‘ws://’ scheme to the URI.

  • name (str) – An optional, arbitrary label.

  • **kwargs – Optional parameters to pass to create_datagram_endpoint for UDP or create_server for TCP/IP.

Returns:

A Communicator instance.

Return type:

EchoProtocol

async static client(server_uri, name=None, log=None, **kwargs) EchoProtocol[source]

Create a remote client instance.

Parameters:
  • server_uri (str) – The address of the remote computer, may be an IP or hostname with or without a port. To use TCP/IP instead of the default UDP, add a ‘ws://’ scheme to the URI.

  • name (str) – An optional, arbitrary label.

  • **kwargs – Optional parameters to pass to create_datagram_endpoint for UDP or create_server for TCP/IP.

Returns:

A Communicator instance.

Return type:

EchoProtocol

server_uri
logger
class Services(remote_rigs, alyx=None, timeout=10.0)[source]

Bases: Service, UserDict

Handler for multiple remote rig services.

timeout
assign_callback(event, callback, return_service=False)[source]

Assign a callback to all services for a given event.

Parameters:
  • event (str, int, iblutil.io.net.base.ExpMessage) – An event to listen for.

  • callback (function, async.io.Future) – A callable or future to notify when the event occurs.

  • return_service (bool) – When True an instance of the Communicator is additionally passed to the callback.

clear_callbacks(event, callback=None)[source]

Clear all callbacks for a given event.

Parameters:
  • event (str, int, iblutil.io.net.base.ExpMessage) – The event to clear listeners from.

  • callback (function, asyncio.Future) – A specific callback or future to remove.

async await_all(event)[source]

Wait for all services to report a given event.

Parameters:

event (str, int, iblutil.io.net.base.ExpMessage) – The event to wait on.

Returns:

A map of rig name and the data that was received.

Return type:

dict

close()[source]

Close all communication.

async init(data=None, concurrent=True)[source]

Initialize an experiment.

Send an initialization signal to the remote services and await the responses.

Parameters:
  • data (any) – Optional extra data to send to the remote host.

  • concurrent (bool) – If false, wait for response from each service before communicating with the next.

Returns:

A dictionary of service names and the response data received.

Return type:

dict of str

Raises:

TimeoutError – Remote host failed to echo the message within the timeout period. Remote host failed to respond within response period.

async cleanup(data=None, concurrent=True)[source]

Cleanup an experiment.

Send an cleanup signal to the remote services and await responses.

Parameters:
  • data (any) – Optional extra data to send to the remote host.

  • concurrent (bool) – If false, wait for response from each service before communicating with the next.

Returns:

A dictionary of service names and the response data received.

Return type:

dict of str

Raises:

TimeoutError – Remote host failed to echo the message within the timeout period. Remote host failed to respond within response period.

async start(exp_ref, data=None, concurrent=True)[source]

Start an experiment.

Send a start signal to the remote services and await responses.

Parameters:
  • exp_ref (str) – An experiment reference string in the form yyyy-mm-dd_n_subject.

  • data (any) – Optional extra data to send to the remote host.

  • concurrent (bool) – If false, wait for response from each service before communicating with the next.

Returns:

A dictionary of service names and the response data received.

Return type:

dict of str

Raises:

TimeoutError – Remote host failed to echo the message within the timeout period. Remote host failed to respond within response period.

async stop(data=None, immediately=False, **kwargs)[source]

End an experiment.

Send a stop signal to the remote services and await responses.

Parameters:
  • data (any) – Optional extra data to send to the remote host.

  • immediately (bool) – If true, send an EXPINTERRUPT signal.

  • concurrent (bool) – If false, wait for response from each service before communicating with the next.

Returns:

A dictionary of service names and the response data received.

Return type:

dict of str

Raises:

TimeoutError – Remote host failed to echo the message within the timeout period. Remote host failed to respond within response period.

async alyx(alyx)[source]

Send Alyx token to remote services.

Parameters:

alyx (one.webclient.AlyxClient) – An instance of Alyx to extract and send token from.

server
async main(role, server_uri, name=None, **kwargs)[source]

An example of an entry point for creating an individual communicator.