import logging
import requests
import traceback
import json
import abc
import numpy as np
from one.api import ONE
from ibllib.pipes import tasks
from one.alf.exceptions import ALFObjectNotFound
from neuropixel import trace_header, TIP_SIZE_UM
from ibllib import __version__ as ibllib_version
from ibllib.pipes.ephys_alignment import EphysAlignment
from ibllib.pipes.histology import interpolate_along_track
from iblatlas.atlas import AllenAtlas
_logger = logging.getLogger(__name__)
[docs]
class ReportSnapshot(tasks.Task):
def __init__(self, session_path, object_id, content_type='session', **kwargs):
self.object_id = object_id
self.content_type = content_type
self.images = []
super(ReportSnapshot, self).__init__(session_path, **kwargs)
def _run(self, overwrite=False):
# Can be used to generate the image if desired
pass
[docs]
def register_images(self, widths=None, function=None, extra_dict=None):
report_tag = '## report ##'
snapshot = Snapshot(one=self.one, object_id=self.object_id, content_type=self.content_type)
jsons = []
texts = []
for f in self.outputs:
json_dict = dict(tag=report_tag, version=ibllib_version,
function=(function or str(self.__class__).split("'")[1]), name=f.stem)
if extra_dict is not None:
assert isinstance(extra_dict, dict)
json_dict.update(extra_dict)
jsons.append(json_dict)
texts.append(f"{f.stem}")
return snapshot.register_images(self.outputs, jsons=jsons, texts=texts, widths=widths)
[docs]
class ReportSnapshotProbe(ReportSnapshot):
signature = {
'input_files': [], # see setUp method for declaration of inputs
'output_files': [] # see setUp method for declaration of inputs
}
def __init__(self, pid, session_path=None, one=None, brain_regions=None, brain_atlas=None, **kwargs):
"""
:param pid: probe insertion UUID from Alyx
:param one: one instance
:param brain_regions: (optional) iblatlas.regions.BrainRegion object
:param brain_atlas: (optional) iblatlas.atlas.AllenAtlas object
:param kwargs:
"""
assert one
self.one = one
self.brain_atlas = brain_atlas
self.brain_regions = brain_regions
if self.brain_atlas and not self.brain_regions:
self.brain_regions = self.brain_atlas.regions
self.content_type = 'probeinsertion'
self.pid = pid
self.eid, self.pname = self.one.pid2eid(self.pid)
self.session_path = session_path or self.one.eid2path(self.eid)
self.output_directory = self.session_path.joinpath('snapshot', self.pname)
self.output_directory.mkdir(exist_ok=True, parents=True)
self.histology_status = None
self.get_probe_signature()
super(ReportSnapshotProbe, self).__init__(self.session_path, object_id=pid, content_type=self.content_type, one=self.one,
**kwargs)
@property
def pid_label(self):
"""returns a probe insertion stub to label titles, for example: 'SWC_054_2020-10-05_001_probe01'"""
return '_'.join(list(self.session_path.parts[-3:]) + [self.pname])
[docs]
@abc.abstractmethod
def get_probe_signature(self):
# method that gets input and output signatures from the probe name. The format is a dictionary as follows:
# return {'input_files': input_signature, 'output_files': output_signature}
pass
[docs]
def get_histology_status(self):
"""
Finds at which point in histology pipeline the probe insertion is
:return:
"""
self.hist_lookup = {'Resolved': 3,
'Aligned': 2,
'Traced': 1,
None: 0} # is this bad practice?
self.ins = self.one.alyx.rest('insertions', 'list', id=self.pid)[0]
traced = self.ins.get('json', {}).get('extended_qc', {}).get('tracing_exists', False)
aligned = self.ins.get('json', {}).get('extended_qc', {}).get('alignment_count', 0)
resolved = self.ins.get('json', {}).get('extended_qc', {}).get('alignment_resolved', False)
if resolved:
return 'Resolved'
elif aligned > 0:
return 'Aligned'
elif traced:
return 'Traced'
else:
return None
[docs]
def get_channels(self, alf_object, collection):
electrodes = {}
try:
electrodes = self.one.load_object(self.eid, alf_object, collection=collection)
electrodes['axial_um'] = electrodes['localCoordinates'][:, 1]
except ALFObjectNotFound:
_logger.warning(f'{alf_object} does not yet exist')
if self.hist_lookup[self.histology_status] == 3:
try:
electrodes['atlas_id'] = electrodes['brainLocationIds_ccf_2017']
electrodes['mlapdv'] = electrodes['mlapdv'] / 1e6
except KeyError:
_logger.warning('Insertion resolved but brainLocationIds_ccf_2017 attribute do not exist')
if self.hist_lookup[self.histology_status] > 0 and 'atlas_id' not in electrodes.keys():
if not self.brain_atlas:
self.brain_atlas = AllenAtlas()
self.brain_regions = self.brain_regions or self.brain_atlas.regions
if 'localCoordinates' not in electrodes.keys():
geometry = trace_header(version=1)
electrodes['localCoordinates'] = np.c_[geometry['x'], geometry['y']]
electrodes['axial_um'] = electrodes['localCoordinates'][:, 1]
depths = electrodes['localCoordinates'][:, 1]
xyz = np.array(self.ins['json']['xyz_picks']) / 1e6
if self.hist_lookup[self.histology_status] >= 2:
traj = self.one.alyx.rest('trajectories', 'list', provenance='Ephys aligned histology track',
probe_insertion=self.pid)[0]
align_key = self.ins['json']['extended_qc']['alignment_stored']
feature = traj['json'][align_key][0]
track = traj['json'][align_key][1]
ephysalign = EphysAlignment(xyz, depths, track_prev=track,
feature_prev=feature,
brain_atlas=self.brain_atlas, speedy=True)
electrodes['mlapdv'] = ephysalign.get_channel_locations(feature, track)
electrodes['atlas_id'] = self.brain_atlas.regions.get(self.brain_atlas.get_labels(electrodes['mlapdv']))['id']
if self.hist_lookup[self.histology_status] == 1:
xyz = xyz[np.argsort(xyz[:, 2]), :]
electrodes['mlapdv'] = interpolate_along_track(xyz, (depths + TIP_SIZE_UM) / 1e6)
electrodes['atlas_id'] = self.brain_atlas.regions.get(self.brain_atlas.get_labels(electrodes['mlapdv']))['id']
return electrodes
[docs]
def register_images(self, widths=None, function=None):
super(ReportSnapshotProbe, self).register_images(widths=widths, function=function,
extra_dict={'channels': self.histology_status})
[docs]
class Snapshot:
"""
A class to register images in form of Notes, linked to an object on Alyx.
:param object_id: The id of the object the image should be linked to
:param content_type: Which type of object to link to, e.g. 'session', 'probeinsertion', 'subject',
default is 'session'
:param one: An ONE instance, if None is given it will be instantiated.
"""
def __init__(self, object_id, content_type='session', one=None):
self.one = one or ONE()
self.object_id = object_id
self.content_type = content_type
self.images = []
[docs]
def plot(self):
"""
Placeholder method to be overriden by child object
:return:
"""
pass
[docs]
def generate_image(self, plt_func, plt_kwargs):
"""
Takes a plotting function and adds the output to the Snapshot.images list for registration
:param plt_func: A plotting function that returns the path to an image.
:param plt_kwargs: Dictionary with keyword arguments for the plotting function
"""
img_path = plt_func(**plt_kwargs)
if isinstance(img_path, list):
self.images.extend(img_path)
else:
self.images.append(img_path)
return img_path
[docs]
def register_image(self, image_file, text='', json_field=None, width=None):
"""
Registers an image as a Note, attached to the object specified by Snapshot.object_id
:param image_file: Path to the image to to registered
:param text: str, text to describe the image, defaults ot empty string
:param json_field: dict, to be added to the json field of the Note
:param width: width to scale the image to, defaults to None (scale to UPLOADED_IMAGE_WIDTH in alyx.settings.py),
other options are 'orig' (don't change size) or any integer (scale to width=int, aspect ratios won't be changed)
:returns: dict, note as registered in database
"""
# the protocol is not compatible with byte streaming and json, so serialize the json object here
# Make sure that user is logged in, if not, try to log in
assert self.one.alyx.is_logged_in, 'No Alyx user is logged in, try running one.alyx.authenticate() first'
note = {
'user': self.one.alyx.user, 'content_type': self.content_type, 'object_id': self.object_id,
'text': text, 'width': width, 'json': json.dumps(json_field)}
_logger.info(f'Registering image to {self.content_type} with id {self.object_id}')
# to make sure an eventual note gets deleted with the image call the delete REST endpoint first
current_note = self.one.alyx.rest('notes', 'list',
django=f"object_id,{self.object_id},text,{text},json__name,{text}",
no_cache=True)
if len(current_note) == 1:
self.one.alyx.rest('notes', 'delete', id=current_note[0]['id'])
# Open image for upload
fig_open = open(image_file, 'rb')
# Catch error that results from object_id - content_type mismatch
try:
note_db = self.one.alyx.rest('notes', 'create', data=note, files={'image': fig_open})
return note_db
except requests.HTTPError as e:
if 'matching query does not exist' in str(e):
_logger.error(f'The object_id {self.object_id} does not match an object of type {self.content_type}')
_logger.debug(traceback.format_exc())
else:
raise e
finally:
fig_open.close()
[docs]
def register_images(self, image_list=None, texts=None, widths=None, jsons=None):
"""
Registers a list of images as Notes, attached to the object specified by Snapshot.object_id.
The images can be passed as image_list. If None are passed, will try to register the images in Snapshot.images.
:param image_list: List of paths to the images to to registered. If None, will try to register any images in
Snapshot.images
:param texts: List of text to describe the images. If len(texts)==1, the same text will be used for all images
:param widths: List of width to scale the figure to (see Snapshot.register_image). If len(widths)==1,
the same width will be used for all images
:param jsons: List of dictionaries to populate the json field of the note in Alyx. If len(jsons)==1,
the same dict will be used for all images
:returns: list of dicts, notes as registered in database
"""
if not image_list or len(image_list) == 0:
if len(self.images) == 0:
_logger.warning(
"No figures were passed to register_figures, and self.figures is empty. No figures to register")
return
else:
image_list = self.images
widths = widths or [None]
texts = texts or ['']
jsons = jsons or [None]
if len(texts) == 1:
texts = len(image_list) * texts
if len(widths) == 1:
widths = len(image_list) * widths
if len(jsons) == 1:
jsons = len(image_list) * jsons
note_dbs = []
for figure, text, width, json_field in zip(image_list, texts, widths, jsons):
note_dbs.append(self.register_image(figure, text=text, width=width, json_field=json_field))
return note_dbs