importasyncioimportreimportjsonimportsocketimportwarningsimportloggingfromasyncioimportisfuturefromabcimportABC,abstractmethodfromurllib.parseimporturlparseimporturllib.requestimportipaddressfromoperatorimportor_fromfunctoolsimportreducefromenumimportIntFlag,IntEnum,auto# py3.11 STRICTLISTEN_PORT=11001# listen for commands on this portPROTOCOL_VERSION='1.0.0'# Versioning for ExpMessage, ExpStatus enumerations, and Service base class
[docs]defis_success(future:asyncio.Future)->bool:"""Check if future successfully resolved."""returnfuture.done()andnotfuture.cancelled()andfuture.exception()isNone
[docs]defexternal_ip():""" Fetch WAN IP address. NB: Requires internet. Returns ------- ipaddress.IPv4Address, ipaddress.IPv6Address The computer's default WAN IP address. """returnipaddress.ip_address(urllib.request.urlopen('https://ident.me').read().decode('utf8'))
[docs]defis_valid_ip(ip_address)->bool:""" Test whether IP address is valid. Parameters ---------- ip_address : str An IP address to validate. Returns ------- bool True is IP address is valid. """try:ipaddress.ip_address(ip_address)returnTrueexceptValueError:returnFalse
[docs]defhostname2ip(hostname=None):""" Resolve hostname to IP address. Parameters ---------- hostname : str, optional The hostname to resolve. If None, resolved this computer's hostname. Returns ------- ipaddress.IPv4Address, ipaddress.IPv6Address The resolved IP address. Raises ------ ValueError Failed to resolve IP for hostname. """hostname=hostnameorsocket.gethostname()try:ip_address=socket.gethostbyname(hostname)returnipaddress.ip_address(ip_address)except(socket.error,socket.gaierror):raiseValueError(f'Failed to resolve IP for hostname "{hostname}"')
[docs]defvalidate_uri(uri,resolve_host=True,default_port=LISTEN_PORT,default_proc='udp'):""" Ensure URI is complete and correct. Parameters ---------- uri : str, ipaddress.IPv4Address, ipaddress.IPv6Address A full URI, hostname or hostname and port. resolve_host : bool If the URI is not an IP address, attempt to resolve hostname to IP. default_port : int, str If the port is absent from the URI, append this one. default_proc : str If the URI scheme is missing, prepend this one. Returns ------- str The complete URI. Raises ------ TypeError URI type not supported. ValueError Failed to resolve host name to IP address. URI host contains invalid characters (expects only alphanumeric + hyphen). Port number not within range (must be > 1, <= 65535). """# Validate URI schemeifnotisinstance(uri,(str,ipaddress.IPv4Address,ipaddress.IPv6Address)):raiseTypeError(f'Unsupported URI "{uri}" of type {type(uri)}')ifisinstance(uri,str)and(proc:=re.match(r'(?P<proc>^[a-zA-Z]+(?=://))',uri)):proc=proc.group()uri=uri[len(proc)+3:]else:proc=default_proc# Validate hostnameifisinstance(uri,(ipaddress.IPv4Address,ipaddress.IPv6Address)):host=str(uri)port=default_portelif':'inuri:host,port=uri.split(':',1)else:host=uriport=Noneifisinstance(uri,str)andnotis_valid_ip(host):ifresolve_host:host=hostname2ip(host)elifnotre.match(r'^[a-z0-9-]+$',host):raiseValueError(f'Invalid hostname "{host}"')# Validate porttry:port=int(portordefault_port)assert1<=port<=65535except(AssertionError,ValueError):raiseValueError(f'Invalid port number: {portordefault_port}')returnf'{procordefault_proc}://{host}:{port}'
# class ExpMessage(IntFlag, boundary=STRICT): # py3.11
[docs]classExpMessage(IntFlag):"""A set of standard experiment messages for communicating between rigs.""""""Experiment is initializing."""EXPINIT=auto()"""Experiment has begun."""EXPSTART=auto()"""Experiment has stopped."""EXPEND=auto()"""Experiment cleanup begun."""EXPCLEANUP=auto()"""Experiment interrupted."""EXPINTERRUPT=auto()"""Experiment status."""EXPSTATUS=auto()"""Experiment info, including task protocol start and end."""EXPINFO=auto()"""Alyx token."""ALYX=auto()__version__=PROTOCOL_VERSION
[docs]@classmethoddefany(cls)->'ExpMessage':"""Return enumeration comprising all possible messages. NB: This doesn't include the null ExpMessage (0), used to indicate API errors. """returnreduce(or_,cls)
[docs]@staticmethoddefvalidate(event,allow_bitwise=True):""" Validate an event message, returning a corresponding enumeration if valid and raising an exception if not. Parameters ---------- event : str, int, ExpMessage An event message to validate. allow_bitwise : bool If false, raise if event is the result of a bitwise operation. Returns ------- ExpMessage: The corresponding event enumeration. Raises ------ TypeError event is neither a string, integer nor enumeration. ValueError event does not correspond to any ExpMessage enumeration, neither in its integer form nor its string name, or `allow_bitwise` is false and value is combination of events. Examples -------- >>> ExpMessage.validate('expstart') ExpMessage.EXPSTART >>> ExpMessage.validate(10) ExpMessage.EXPINIT >>> ExpMessage.validate(ExpMessage.EXPEND) ExpMessage.EXPEND """ifnotisinstance(event,ExpMessage):try:ifisinstance(event,str):event=ExpMessage[event.strip().upper()]elifisinstance(event,int):event=ExpMessage(event)else:raiseTypeError(f'Unknown event type {type(event)}')exceptKeyError:raiseValueError(f'Unrecognized event "{event}". 'f'Choices: {tuple(ExpMessage.__members__.keys())}')ifnotallow_bitwiseandeventnotinlist(ExpMessage):raiseValueError('Compound (bitwise) events not permitted. 'f'Choices: {tuple(ExpMessage)}')returnevent
def__iter__(self):# py3.11 remove this method"""Iterate over the individual bits in the enumeration. NB: This method is copied from Python 3.11 which supports iteration of Enum objects. """num=self.valuewhilenum:b=num&(~num+1)yieldbnum^=b
[docs]classExpStatus(IntEnum):"""A set of standard statuses for communicating between rigs.""""""Service is connected."""CONNECTED=0"""Service is initialized."""INITIALIZED=10"""Service is running."""RUNNING=20"""Experiment has stopped."""STOPPED=30__version__=PROTOCOL_VERSION
[docs]classService(ABC):"""An abstract base class for auxiliary experiment services."""__version__=PROTOCOL_VERSION__slots__='name'
[docs]@abstractmethoddefinit(self,data=None):""" Initialize an experiment. This is intended to specify the expected message signature. The subclassed method should serialize the returned values and pass them to the transport layer. Parameters ---------- data : any Optional extra data to send to the remote server. Returns ------- ExpMessage.EXPINIT The EXPINIT event. any, None Optional extra data. """returnExpMessage.EXPINIT,data
[docs]@abstractmethoddefstart(self,exp_ref,data=None):""" Start an experiment. This is intended to specify the expected message signature. The subclassed method should serialize the returned values and pass them to the transport layer. Parameters ---------- exp_ref : str An experiment reference string in the form yyyy-mm-dd_n_subject. data : any Optional extra data to send to the remote server. Returns ------- ExpMessage.EXPSTART The EXPSTART event. str The experiment reference string. any, None Optional extra data. """exp_ref=exp_reforNoneifisinstance(exp_ref,dict):exp_ref='_'.join(map(str,(exp_ref['date'],int(exp_ref['sequence']),exp_ref['subject'])))returnExpMessage.EXPSTART,exp_ref,data
[docs]@abstractmethoddefstop(self,data=None,immediately=False):""" Stop an experiment. This is intended to specify the expected message signature. The subclassed method should serialize the returned values and pass them to the transport layer. Parameters ---------- data : any Optional extra data to send to the remote server. immediately : bool If True, an EXPINTERRUPT message is returned. Returns ------- ExpMessage.EXPINTERRUPT, ExpMessage.EXPEND The EXPEND event, or EXPINTERRUPT if immediately is True. any, None Optional extra data. """returnExpMessage.EXPINTERRUPTifimmediatelyelseExpMessage.EXPEND,data
[docs]@abstractmethoddefstatus(self,status):""" Report experiment status. NB: This is intended to be lightweight. For more detail and custom data use the info method. This is intended to specify the expected message signature. The subclassed method should serialize the returned values and pass them to the transport layer. Parameters ---------- status : ExpStatus The experiment status enumeration. Returns ------- ExpMessage.EXPSTATUS The EXPSTATUS event. ExpStatus The validated experiment status. """ifnotisinstance(status,ExpStatus):status=ExpStatus(status)ifisinstance(status,int)elseExpStatus[status]returnExpMessage.EXPSTATUS,status
[docs]@abstractmethoddefinfo(self,status,data=None):""" Report experiment information. This is intended to specify the expected message signature. The subclassed method should serialize the returned values and pass them to the transport layer. Parameters ---------- status : ExpStatus The experiment status enumeration. data : any Optional extra data to send to the remote server. Returns ------- ExpMessage.EXPINFO The EXPINFO event. ExpStatus The validated experiment status. any, None Optional extra data. """returnExpMessage.EXPINFO,Service.status(self,status)[1],data
[docs]@abstractmethoddefcleanup(self,data=None):""" Clean up an experiment. This is intended to specify the expected message signature. The subclassed method should serialize the returned values and pass them to the transport layer. Parameters ---------- data : any Optional extra data to send to the remote server. Returns ------- ExpMessage.EXPCLEANUP The EXPCLEANUP event. any, None Optional extra data. """returnExpMessage.EXPCLEANUP,data
[docs]@abstractmethoddefalyx(self,alyx):""" Request/send Alyx token. This is intended to specify the expected message signature. The subclassed method should serialize the returned values and pass them to the transport layer. Parameters ---------- alyx : one.webclient.AlyxClient Optional instance of Alyx to send. Returns ------- ExpMessage.ALYX The ALYX event. str The Alyx database URL. dict The Alyx token in the form {user: token}. """base_url=alyx.base_urlifalyxelseNonetoken={alyx.user:alyx._token}ifalyxandalyx.is_logged_inelse{}returnExpMessage.ALYX,base_url,token
[docs]classCommunicator(Service):"""A base class for communicating between experimental rigs. Attributes ---------- name : str An arbitrary label for the remote host server_uri : str The full URI of the remote device, e.g. udp://192.168.0.1:1001 """__slots__=('server_uri','_callbacks','logger','name')def__init__(self,server_uri,name=None,logger=None):self.server_uri=validate_uri(server_uri)self.name=nameorserver_uriself.logger=loggerorlogging.getLogger(self.name)# Init callbacks map of ExpMessage -> list, including null ExpMessage for processing callback errorsself._callbacks=dict(map(lambdaitem:(item,[]),(ExpMessage(0),*ExpMessage.__members__.values())))
[docs]defassign_callback(self,event,callback):""" Assign a callback to be called when an event occurs. NB: Unlike with futures, an assigned callback may be triggered multiple times, whereas coroutines may only be set once after which they are cleared. Parameters ---------- event : str, int, iblutil.io.net.base.ExpMessage The event for which the callback is registered. callback : function, asyncio.Future A function or Future to trigger when an event occurs. See Also -------- EchoProtocol.receive The method that processes the callbacks upon receiving a message. """event=ExpMessage.validate(event)ifnot(callable(callback)orisfuture(callback)):raiseTypeError('Callback must be callable or a Future')ifeventisExpMessage(0):self._callbacks.setdefault(event,[]).append((callback,False))else:return_event=eventnotinExpMessageforeinevent:# iterate over enum as bitwise ops permittedself._callbacks.setdefault(e,[]).append((callback,return_event))
[docs]defclear_callbacks(self,event,callback=None,cancel_futures=True):""" For a given event, remove the provided callback, or all callbacks if none were provided. Parameters ---------- event : str, int, iblutil.io.net.base.ExpMessage The event for which the callback was registered. callback : function, asyncio.Future The callback or future to remove. cancel_futures : bool If True and callback is a Future, cancel before removing. Returns ------- int The number of callbacks removed. """i=0event=ExpMessage.validate(event)ifcallback:# clear specific callback# Wrapped callables have an id attribute containing the hash of the inner functionforevtinevent:# iterate as bitwise enums permitted, e.g. ~ExpMessage.ALYXids=[getattr(cb,'id',None)orhash(cb)forcb,_inself._callbacks[evt]]cb_id=getattr(callback,'id',None)orhash(callback)whileTrue:try:idx=ids.index(cb_id)ifcancel_futuresandisfuture(cb:=self._callbacks[evt][idx][0]):cb.cancel()delself._callbacks[evt][idx]delids[idx]i+=1except(IndexError,ValueError):breakelse:# clear all callbacks for eventforevtinevent:ifcancel_futures:forcbinfilter(isfuture,map(lambdax:x[0],self._callbacks[evt])):cb.cancel()i+=len(self._callbacks[evt])delself._callbacks[evt][:]self.logger.debug('[%s] %i callbacks cleared',self.name,i)returni
[docs]asyncdefon_event(self,event):""" Await an event from the remote host. Parameters ---------- event : str, int, iblutil.io.net.base.ExpMessage The event to wait on. Returns ------- any The response data returned by the remote host. Examples -------- >>> data, addr = await com.on_event('EXPSTART') >>> event = await asyncio.create_task(com.on_event('EXPSTART')) >>> ... >>> data = await event Await more than one event >>> data, addr, event = await com.on_event(ExpMessage.EXPEND | ExpMessage.EXPINTERRUPT) """loop=asyncio.get_running_loop()fut=loop.create_future()self.assign_callback(event,fut)returnawaitfut
@propertydefport(self)->int:"""int: the remote port"""returnint(urlparse(self.server_uri).port)@propertydefhostname(self)->str:"""str: the remote hostname or IP address"""returnurlparse(self.server_uri).hostname@propertydefprotocol(self)->str:"""str: the protocol scheme, e.g. udp, ws"""returnurlparse(self.server_uri).scheme@property@abstractmethoddefis_connected(self)->bool:"""bool: True if the remote device is connected"""pass
[docs]@abstractmethoddefsend(self,data,addr=None):"""Serialize and pass data to the transport layer"""pass
def_receive(self,data,addr):""" Process data received from remote host and notify event listeners. This is called by the transport layer when a message is received and should not be called by the user. Parameters ---------- data : bytes The serialized data received by the transport layer. addr : (str, int) The source address as (hostname, port) Warnings -------- Warnings Expects the deserialized data to be a tuple where the first element is an ExpMessage. A warning is thrown if the data is not a tuple, or has fewer than two elements. TODO Perhaps for every event only the first future should be set. """data=self.decode(data)ifisinstance(data,(list,tuple))andlen(data)>1:event,*data=dataevent=ExpMessage.validate(event,allow_bitwise=False)ifeventelseExpMessage(0)ifeventisExpMessage(0):# An error in the remote callback function occurrederr,evt=dataself.logger.error('Callback for %s on %s://%s:%i failed with %s',ExpMessage(evt).name,self.protocol,*addr,err)forf,return_eventinself._callbacks[event].copy():ifisfuture(f):iff.done():self.logger.warning('Future %s already resolved',f)elifnotf.cancelled():f.set_result((data,addr,event)ifreturn_eventelse(data,addr))self.clear_callbacks(event,f)# Remove future from listelse:try:f(data,addr,event)ifreturn_eventelsef(data,addr)exceptExceptionasex:self.logger.error('Callback "%s" failed with error "%s"',f,ex)message=self.encode([0,f'{type(ex).__name__}: {ex}',event])self.send(message,addr)breakelse:warnings.warn(f'Expected list, got {data}',RuntimeWarning)
[docs]@staticmethoddefencode(data)->bytes:""" Serialize data for transmission. None-string or -bytes objects are encoded as JSON before converting to bytes. Parameters ---------- data : any The data to serialize. Returns ------- bytes The encoded data. """ifisinstance(data,bytes):returndataifnotisinstance(data,str):data=json.dumps(data)returndata.encode()
[docs]@staticmethoddefdecode(data:bytes):""" De-serialize and parse bytes data. This function attempts to decode the data as a JSON object. On failing that, returns a string. Parameters ---------- data : bytes The data to de-serialize. Returns ------- any Deserialized data. """try:data=json.loads(data)exceptjson.JSONDecodeError:warnings.warn('Failed to decode as JSON')data=data.decode()returndata
[docs]defclose(self):"""De-register all callbacks and cancel futures"""foreventinself._callbacks:forfutinfilter(isfuture,map(lambdax:x[0],self._callbacks[event])):fut.cancel('Close called on communicator')delself._callbacks[event][:]