"""Provide an interface using asyncio to the CAM server."""
import asyncio
from collections import OrderedDict
from async_timeout import timeout as async_timeout
from leicacam.cam import BaseCAM, _parse_receive, check_messages
[docs]class AsyncCAM(BaseCAM):
"""Driver for LASAF Computer Assisted Microscopy using asyncio."""
def __init__(self, *args, loop=None, **kwargs):
"""Set up instance."""
super().__init__(*args, **kwargs)
self.loop = loop or asyncio.get_event_loop()
self.reader = None
self.writer = None
self.welcome_msg = None
[docs] async def connect(self):
"""Connect to LASAF through a CAM-socket."""
self.reader, self.writer = await asyncio.open_connection(
self.host, self.port, loop=self.loop
)
self.welcome_msg = await self.reader.read(self.buffer_size)
[docs] async def send(self, commands):
"""Send commands to LASAF through CAM-socket.
Parameters
----------
commands : list of tuples or bytes string
Commands as a list of tuples or a bytes string. cam.prefix is
allways prepended before sending.
Returns
-------
int
Bytes sent.
Example
-------
::
>>> # send list of tuples
>>> await cam.send([('cmd', 'enableall'), ('value', 'true')])
>>> # send bytes string
>>> await cam.send(b'/cmd:enableall /value:true')
"""
msg = self._prepare_send(commands)
self.writer.write(msg)
await self.writer.drain()
[docs] async def receive(self):
"""Receive message from socket interface as list of OrderedDict."""
try:
incoming = await self.reader.read(self.buffer_size)
except OSError:
return []
return _parse_receive(incoming)
[docs] async def wait_for(self, cmd, value=None, timeout=60):
"""Hang until command is received.
If value is supplied, it will hang until ``cmd:value`` is received.
Parameters
----------
cmd : string
Command to wait for in bytestring from microscope CAM interface. If
``value`` is falsy, value of received command does not matter.
value : string
Wait until ``cmd:value`` is received.
timeout : int
Minutes to wait for command. If timeout is reached, an empty
OrderedDict will be returned.
Returns
-------
collections.OrderedDict
Last received messsage or empty message if timeout is reached.
"""
try:
async with async_timeout(timeout * 60):
while True:
msgs = await self.receive()
msg = check_messages(msgs, cmd, value=value)
if msg:
return msg
except asyncio.TimeoutError:
return OrderedDict()
[docs] def close(self):
"""Close stream."""
if self.writer.can_write_eof():
self.writer.write_eof()
self.writer.close()