"""Provide an interface to the CAM server."""
import os
import functools
import logging
import platform
import socket
from collections import OrderedDict
from time import sleep, time
import pydebug
_LOGGER = logging.getLogger(__name__)
[docs]def logger(function):
"""Decorate passed in function and log message to module logger."""
@functools.wraps(function)
def wrapper(*args, **kwargs):
"""Wrap function."""
sep = kwargs.get("sep", " ")
end = kwargs.get("end", "") # do not add newline by default
out = sep.join([repr(x) for x in args])
out = out + end
_LOGGER.debug(out)
return function(*args, **kwargs)
return wrapper
# debug with `DEBUG=leicacam python script.py`
if platform.system() == "Windows":
# monkeypatch
@logger
def debug(msg):
"""Debug on Windows."""
try:
dbg = os.environ["DEBUG"]
if dbg in ("leicacam", "*"):
print("leicacam " + str(msg))
except KeyError:
pass
else:
debug = logger(pydebug.debug("leicacam")) # pylint: disable=invalid-name
[docs]class BaseCAM:
"""Base driver for LASAF Computer Assisted Microscopy."""
# pylint: disable=too-many-instance-attributes, too-few-public-methods
def __init__(self, host="127.0.0.1", port=8895):
"""Set up instance."""
self.host = host
self.port = port
# prefix for all commands
self.prefix = [("cli", "python-leicacam"), ("app", "matrix")]
self.prefix_bytes = b"/cli:python-leicacam /app:matrix "
self.buffer_size = 1024
self.delay = 0.1 # poll every 100ms when waiting for incoming
def _prepare_send(self, commands):
"""Prepare message to be sent.
Parameters
----------
commands : list of tuples or bytes string
Commands as a list of tuples or a bytes string. cam.prefix is
allways prepended before sending.
Returns
-------
string
Message to be sent.
"""
if isinstance(commands, bytes):
msg = self.prefix_bytes + commands
else:
msg = tuples_as_bytes(self.prefix + commands)
debug(b"> " + msg)
return msg
def _parse_receive(incoming):
"""Parse received response.
Parameters
----------
incoming : bytes string
incoming bytes from socket server.
Returns
-------
list of OrderedDict
Received message as a list of OrderedDict.
"""
debug(b"< " + incoming)
# first split on terminating null byte
incoming = incoming.split(b"\x00")
msgs = []
for msg in incoming:
# then split on line ending
split_msg = msg.splitlines()
msgs.extend(split_msg)
# return as list of several messages received
return [bytes_as_dict(msg) for msg in msgs]
[docs]class CAM(BaseCAM):
"""Driver for LASAF Computer Assisted Microscopy."""
# pylint: disable=too-many-instance-attributes
def __init__(self, *args, **kwargs):
"""Set up instance."""
super().__init__(*args, **kwargs)
self.connect()
[docs] def connect(self):
"""Connect to LASAF through a CAM-socket."""
self.socket = socket.socket()
self.socket.connect((self.host, self.port))
self.socket.settimeout(False) # non-blocking
sleep(self.delay) # wait for response
self.welcome_msg = self.socket.recv(self.buffer_size) # receive welcome message
[docs] def flush(self):
"""Flush incoming socket messages."""
debug("flushing incoming socket messages")
try:
while True:
msg = self.socket.recv(self.buffer_size)
debug(b"< " + msg)
except socket.error:
pass
[docs] def send(self, commands):
"""Send commands to LASAF through CAM-socket.
Parameters
----------
commands : list of tuples or bytes string
Commands as a list of tuples or a bytes string. cam.prefix is
allways prepended before sending.
Returns
-------
int
Bytes sent.
Example
-------
::
>>> # send list of tuples
>>> cam.send([('cmd', 'enableall'), ('value', 'true')])
>>> # send bytes string
>>> cam.send(b'/cmd:enableall /value:true')
"""
self.flush() # discard any waiting messages
msg = self._prepare_send(commands)
return self.socket.send(msg)
[docs] def receive(self):
"""Receive message from socket interface as list of OrderedDict."""
try:
incoming = self.socket.recv(self.buffer_size)
except socket.error:
return []
return _parse_receive(incoming)
[docs] def wait_for(self, cmd, value=None, timeout=60):
"""Hang until command is received.
If value is supplied, it will hang until ``cmd:value`` is received.
Parameters
----------
cmd : string
Command to wait for in bytestring from microscope CAM interface. If
``value`` is falsy, value of received command does not matter.
value : string
Wait until ``cmd:value`` is received.
timeout : int
Minutes to wait for command. If timeout is reached, an empty
OrderedDict will be returned.
Returns
-------
collections.OrderedDict
Last received messsage or empty message if timeout is reached.
"""
wait = time() + timeout * 60
while True:
if time() > wait:
return OrderedDict()
msgs = self.receive()
msg = check_messages(msgs, cmd, value=value)
if msg:
return msg
sleep(self.delay)
[docs] def close(self):
"""Close the socket."""
self.socket.close()
# convenience methods for commands
[docs] def start_scan(self):
"""Start the matrix scan."""
cmd = [("cmd", "startscan")]
self.send(cmd)
return self.wait_for(*cmd[0])
[docs] def stop_scan(self):
"""Stop the matrix scan."""
cmd = [("cmd", "stopscan")]
self.send(cmd)
return self.wait_for(*cmd[0])
[docs] def autofocus_scan(self):
"""Start the autofocus job."""
cmd = [("cmd", "autofocusscan")]
self.send(cmd)
return self.wait_for(*cmd[0])
[docs] def pause_scan(self):
"""Pause the matrix scan."""
cmd = [("cmd", "pausescan")]
self.send(cmd)
return self.wait_for(*cmd[0])
[docs] def enable(self, slide=0, wellx=1, welly=1, fieldx=1, fieldy=1):
"""Enable a given scan field."""
# pylint: disable=too-many-arguments
cmd = [
("cmd", "enable"),
("slide", str(slide)),
("wellx", str(wellx)),
("welly", str(welly)),
("fieldx", str(fieldx)),
("fieldy", str(fieldy)),
("value", "true"),
]
self.send(cmd)
return self.wait_for(*cmd[0])
[docs] def disable(self, slide=0, wellx=1, welly=1, fieldx=1, fieldy=1):
"""Disable a given scan field."""
# pylint: disable=too-many-arguments
cmd = [
("cmd", "enable"),
("slide", str(slide)),
("wellx", str(wellx)),
("welly", str(welly)),
("fieldx", str(fieldx)),
("fieldy", str(fieldy)),
("value", "false"),
]
self.send(cmd)
return self.wait_for(*cmd[0])
[docs] def enable_all(self):
"""Enable all scan fields."""
cmd = [("cmd", "enableall"), ("value", "true")]
self.send(cmd)
return self.wait_for(*cmd[0])
[docs] def disable_all(self):
"""Disable all scan fields."""
cmd = [("cmd", "enableall"), ("value", "false")]
self.send(cmd)
return self.wait_for(*cmd[0])
[docs] def save_template(self, filename="{ScanningTemplate}leicacam.xml"):
"""Save scanning template to filename."""
cmd = [("sys", "0"), ("cmd", "save"), ("fil", str(filename))]
self.send(cmd)
return self.wait_for(*cmd[0])
[docs] def load_template(self, filename="{ScanningTemplate}leicacam.xml"):
"""Load scanning template from filename.
Template needs to exist in database, otherwise it will not load.
Parameters
----------
filename : str
Filename to template to load. Filename may contain path also, in
such case, the basename will be used. '.xml' will be stripped
from the filename if it exists because of a bug; LASAF implicit
add '.xml'. If '{ScanningTemplate}' is omitted, it will be added.
Returns
-------
collections.OrderedDict
Response from LASAF in an ordered dict.
Example
-------
::
>>> # load {ScanningTemplate}leicacam.xml
>>> cam.load_template('leicacam')
>>> # load {ScanningTemplate}leicacam.xml
>>> cam.load_template('{ScanningTemplate}leicacam')
>>> # load {ScanningTemplate}leicacam.xml
>>> cam.load_template('/path/to/{ScanningTemplate}leicacam.xml')
"""
basename = os.path.basename(filename)
if basename[-4:] == ".xml":
basename = basename[:-4]
if basename[:18] != "{ScanningTemplate}":
basename = "{ScanningTemplate}" + basename
cmd = [("sys", "0"), ("cmd", "load"), ("fil", str(basename))]
self.send(cmd)
return self.wait_for(*cmd[1])
##
# Helper methods
##
[docs]def tuples_as_bytes(cmds):
"""Format list of tuples to CAM message with format /key:val.
Parameters
----------
cmds : list of tuples
List of commands as tuples.
Returns
-------
bytes
Sequence of /key:val.
Example
-------
::
>>> tuples_as_bytes([('cmd', 'val'), ('cmd2', 'val2')])
b'/cmd:val /cmd2:val2'
"""
cmds = OrderedDict(cmds) # override equal keys
tmp = []
for key, val in cmds.items():
key = str(key)
val = str(val)
tmp.append("/" + key + ":" + val)
return " ".join(tmp).encode()
[docs]def tuples_as_dict(_list):
"""Translate a list of tuples to OrderedDict with key and val as strings.
Parameters
----------
_list : list of tuples
Returns
-------
collections.OrderedDict
Example
-------
::
>>> tuples_as_dict([('cmd', 'val'), ('cmd2', 'val2')])
OrderedDict([('cmd', 'val'), ('cmd2', 'val2')])
"""
_dict = OrderedDict()
for key, val in _list:
key = str(key)
val = str(val)
_dict[key] = val
return _dict
[docs]def bytes_as_dict(msg):
"""Parse CAM message to OrderedDict based on format /key:val.
Parameters
----------
msg : bytes
Sequence of /key:val.
Returns
-------
collections.OrderedDict
With /key:val => dict[key] = val.
"""
# decode bytes, assume '/' in start
cmd_strings = msg.decode()[1:].split(r" /")
cmds = OrderedDict()
for cmd in cmd_strings:
unpacked = cmd.split(":")
# handle string not well formated (ex filenames with c:\)
if len(unpacked) > 2:
key = unpacked[0]
val = ":".join(unpacked[1:])
elif len(unpacked) < 2:
continue
else:
key, val = unpacked
cmds[key] = val
return cmds
[docs]def check_messages(msgs, cmd, value=None):
"""Check if specific message is present.
Parameters
----------
cmd : string
Command to check for in bytestring from microscope CAM interface. If
``value`` is falsy, value of received command does not matter.
value : string
Check if ``cmd:value`` is received.
Returns
-------
collections.OrderedDict
Correct messsage or None if no correct message if found.
"""
for msg in msgs:
if value and msg.get(cmd) == value:
return msg
if not value and msg.get(cmd):
return msg
return None