import logging
import numpy as np
from pathlib import Path
from one.alf.spec import QC
from datetime import date
from neuropixel import trace_header
import spikeglx
from iblatlas.atlas import AllenAtlas
from ibllib.pipes import histology
from ibllib.pipes.ephys_alignment import EphysAlignment
from ibllib.qc import base
from ibllib.oneibl.patcher import FTPPatcher
_log = logging.getLogger(__name__)
CRITERIA = {"PASS": 0.8}
[docs]
class AlignmentQC(base.QC):
"""
Class that is used to update the extended_qc of the probe insertion fields with the results
from the ephys alignment procedure
"""
def __init__(self, probe_id, one=None, brain_atlas=None, channels=True, collection=None):
super().__init__(probe_id, one=one, log=_log, endpoint='insertions')
# Data
self.alignments = None
self.xyz_picks = None
self.depths = None
self.cluster_chns = None
self.chn_coords = None
self.align_keys_sorted = None
# Metrics and passed trials
self.sim_matrix = None
self.criteria = CRITERIA
# Get the brain atlas
self.brain_atlas = brain_atlas or AllenAtlas(25)
# Flag for uploading channels to alyx. For testing purposes
self.channels_flag = channels
self.insertion = self.one.alyx.get(f'/insertions/{self.eid}', clobber=True)
self.resolved = (self.insertion.get('json', {'temp': 0}).get('extended_qc').
get('alignment_resolved', False))
self.probe_collection = collection
[docs]
def load_data(self, prev_alignments=None, xyz_picks=None, depths=None, cluster_chns=None,
chn_coords=None):
""""
Load data required to assess alignment qc and compute similarity matrix. If no arguments
are given load_data will fetch all the relevant data required
"""
if not np.any(prev_alignments):
aligned_traj = self.one.alyx.get(f'/trajectories?&probe_insertion={self.eid}'
'&provenance=Ephys aligned histology track',
clobber=True)
if len(aligned_traj) > 0:
self.alignments = aligned_traj[0].get('json', {})
else:
self.alignments = {}
return
else:
self.alignments = prev_alignments
align_keys = [*self.alignments.keys()]
self.align_keys_sorted = sorted(align_keys, reverse=True)
if not np.any(xyz_picks):
self.xyz_picks = np.array(self.insertion['json']['xyz_picks']) / 1e6
else:
self.xyz_picks = xyz_picks
if len(self.alignments) < 2:
return
if not self.probe_collection:
all_collections = self.one.list_collections(self.insertion['session'])
if f'alf/{self.insertion["name"]}/pykilosort' in all_collections:
self.probe_collection = f'alf/{self.insertion["name"]}/pykilosort'
else:
self.probe_collection = f'alf/{self.insertion["name"]}'
if not np.any(chn_coords):
self.chn_coords = self.one.load_dataset(self.insertion['session'],
'channels.localCoordinates.npy',
collection=self.probe_collection)
else:
self.chn_coords = chn_coords
if not np.any(depths):
self.depths = self.chn_coords[:, 1]
else:
self.depths = depths
if not np.any(cluster_chns):
self.cluster_chns = self.one.load_dataset(self.insertion['session'],
'clusters.channels.npy',
collection=self.probe_collection)
else:
self.cluster_chns = cluster_chns
[docs]
def compute(self):
"""
Computes the similarity matrix if > 2 alignments. If no data loaded, wraps around load_data
to get all relevant data needed
"""
if self.alignments is None:
self.load_data()
if len(self.alignments) < 2:
self.log.info(f"Insertion {self.eid}: One or less alignment found...")
self.sim_matrix = np.array([len(self.alignments)])
else:
self.log.info(f"Insertion {self.eid}: Running QC on alignment data...")
self.sim_matrix = self.compute_similarity_matrix()
return self.sim_matrix
[docs]
def run(self, update=True, upload_alyx=True, upload_flatiron=True):
"""
Compute alignment_qc for a specified probe insertion and updates extended qc field in alyx.
If alignment is resolved and upload flags set to True channels from resolved
alignment will be updated to alyx and datasets sent to ibl-ftp-patcher to be uploaded to
flatiron
"""
if self.sim_matrix is None:
self.compute()
# Case where the alignment has already been resolved
if self.resolved:
self.log.info(f"Alignment for insertion {self.eid} already resolved, channels won't be"
f" updated. To force update of channels use "
f"resolve_manual method with force=True")
results = {'alignment_count': len(self.alignments)}
if update:
self.update_extended_qc(results)
results.update({'alignment_resolved': True})
# Case where no alignments have been made
elif np.all(self.sim_matrix == 0) and self.sim_matrix.shape[0] == 1:
# We don't update database
results = {'alignment_resolved': False}
# Case where only one alignment
elif np.all(self.sim_matrix == 1) and self.sim_matrix.shape[0] == 1:
results = {'alignment_count': len(self.alignments),
'alignment_stored': self.align_keys_sorted[0],
'alignment_resolved': False}
if update:
self.update_extended_qc(results)
# Case where 2 or more alignments and alignments haven't been resolved
else:
results = self.compute_alignment_status()
if update:
self.update_extended_qc(results)
if results['alignment_resolved'] and np.bitwise_or(upload_alyx, upload_flatiron):
self.upload_channels(results['alignment_stored'], upload_alyx, upload_flatiron)
return results
[docs]
def resolve_manual(self, align_key, update=True, upload_alyx=True, upload_flatiron=False,
force=False):
"""
Method to manually resolve the alignment of a probe insertion with a given alignment
regardless of the number of alignments or the alignment qc value. Channels from specified
alignment will be uploaded to alyx and datasets sent to ibl-ftp-patcher to be uploaded to
flatiron. If alignment already resolved will only upload if force flag set to True
"""
if self.sim_matrix is None:
self.compute()
assert align_key in self.align_keys_sorted, 'align key not recognised'
if self.resolved == 1 and not force:
self.log.info(f"Alignment for insertion {self.eid} already resolved, channels won't be"
f"updated. To overwrite stored channels with alignment {align_key} "
f"set 'force=True'")
file_paths = []
else:
if self.sim_matrix.shape[0] > 1:
results = self.compute_alignment_status()
else:
results = {'alignment_count': self.sim_matrix.shape[0]}
results['alignment_resolved'] = True
results['alignment_stored'] = align_key
results['alignment_resolved_by'] = 'experimenter'
results['alignment_resolved_date'] = date.today().isoformat()
if update:
self.update_extended_qc(results)
file_paths = []
if upload_alyx or upload_flatiron:
file_paths = self.upload_channels(align_key, upload_alyx, upload_flatiron)
return file_paths
[docs]
def compute_similarity_matrix(self):
"""
Computes the similarity matrix between each alignment stored in the ephys aligned
trajectory. Similarity matrix based on number of clusters that share brain region and
parent brain region
"""
r = self.brain_atlas.regions
clusters = dict()
for iK, key in enumerate(self.align_keys_sorted):
# Location of reference lines used for alignment
feature = np.array(self.alignments[key][0])
track = np.array(self.alignments[key][1])
# Instantiate EphysAlignment object
ephysalign = EphysAlignment(self.xyz_picks, self.depths, track_prev=track,
feature_prev=feature,
brain_atlas=self.brain_atlas)
# Find xyz location of all channels
xyz_channels = ephysalign.get_channel_locations(feature, track)
brain_regions = ephysalign.get_brain_locations(xyz_channels)
# Find the location of clusters along the alignment
cluster_info = dict()
cluster_info['brain_id'] = brain_regions['id'][self.cluster_chns]
cluster_info['parent_id'] = r.get(ids=cluster_info['brain_id']).parent.astype(int)
clusters.update({key: cluster_info})
sim_matrix = np.zeros((len(self.align_keys_sorted), len(self.align_keys_sorted)))
for ik, key in enumerate(self.align_keys_sorted):
for ikk, key2 in enumerate(self.align_keys_sorted):
same_id = np.where(clusters[key]['brain_id'] == clusters[key2]['brain_id'])[0]
not_same_id = \
np.where(clusters[key]['brain_id'] != clusters[key2]['brain_id'])[0]
same_parent = np.where(clusters[key]['parent_id'][not_same_id] ==
clusters[key2]['parent_id'][not_same_id])[0]
sim_matrix[ik, ikk] = len(same_id) + (len(same_parent) * 0.5)
# Normalise
sim_matrix_norm = sim_matrix / np.max(sim_matrix)
return sim_matrix_norm
[docs]
def compute_alignment_status(self):
"""
Determine whether alignments agree based on value in similarity matrix. If any alignments
have similarity of 0.8 set the alignment to be resolved
"""
# Set diagonals to zero so we don't use those to find max
np.fill_diagonal(self.sim_matrix, 0)
max_sim = np.max(self.sim_matrix)
results = {'alignment_qc': max_sim,
'alignment_count': self.sim_matrix.shape[0]}
if max_sim > CRITERIA['PASS']:
location = np.where(self.sim_matrix == max_sim)
results.update({'alignment_stored': self.align_keys_sorted[np.min(location)]})
results.update({'alignment_resolved': True})
results.update({'alignment_resolved_by': 'qc'})
else:
results.update({'alignment_stored': self.align_keys_sorted[0]})
results.update({'alignment_resolved': False})
return results
[docs]
def upload_channels(self, alignment_key, upload_alyx, upload_flatiron):
"""
Upload channels to alyx and flatiron based on the alignment specified by the alignment key
"""
feature = np.array(self.alignments[alignment_key][0])
track = np.array(self.alignments[alignment_key][1])
try:
meta_dset = self.one.list_datasets(self.insertion['session'], '*ap.meta',
collection=f'raw_ephys_data/{self.insertion["name"]}')
meta_file = self.one.load_dataset(self.insertion['session'], meta_dset[0].split('/')[-1],
collection=f'raw_ephys_data/{self.insertion["name"]}',
download_only=True)
geometry = spikeglx.read_geometry(meta_file)
chns = np.c_[geometry['x'], geometry['y']]
except Exception as err:
self.log.warning(f"Could not compute channel locations from meta file, errored with message: {err}. "
f"Will use default Neuropixel 1 channels")
geometry = trace_header(version=1)
chns = np.c_[geometry['x'], geometry['y']]
ephysalign = EphysAlignment(self.xyz_picks, chns[:, 1],
track_prev=track,
feature_prev=feature,
brain_atlas=self.brain_atlas)
channels_mlapdv = np.int32(ephysalign.get_channel_locations(feature, track) * 1e6)
channels_atlas_id = ephysalign.get_brain_locations(channels_mlapdv / 1e6)['id']
# Need to change channels stored on alyx as well as the stored key is not the same as the latest key
if upload_alyx:
if alignment_key != self.align_keys_sorted[0]:
histology.register_aligned_track(self.eid, channels_mlapdv / 1e6,
chn_coords=chns, one=self.one,
overwrite=True, channels=self.channels_flag,
brain_atlas=self.brain_atlas)
ephys_traj = self.one.alyx.get(f'/trajectories?&probe_insertion={self.eid}'
'&provenance=Ephys aligned histology track',
clobber=True)
patch_dict = {'probe_insertion': self.eid, 'json': self.alignments}
self.one.alyx.rest('trajectories', 'partial_update', id=ephys_traj[0]['id'],
data=patch_dict)
files_to_register = []
if upload_flatiron:
ftp_patcher = FTPPatcher(one=self.one)
alf_path = self.one.eid2path(self.insertion['session']).joinpath('alf', self.insertion["name"])
alf_path.mkdir(exist_ok=True, parents=True)
f_name = alf_path.joinpath('electrodeSites.mlapdv.npy')
np.save(f_name, channels_mlapdv)
files_to_register.append(f_name)
f_name = alf_path.joinpath('electrodeSites.brainLocationIds_ccf_2017.npy')
np.save(f_name, channels_atlas_id)
files_to_register.append(f_name)
f_name = alf_path.joinpath('electrodeSites.localCoordinates.npy')
np.save(f_name, chns)
files_to_register.append(f_name)
probe_collections = self.one.list_collections(self.insertion['session'], filename='channels*',
collection=f'alf/{self.insertion["name"]}*')
for collection in probe_collections:
chns = self.one.load_dataset(self.insertion['session'], 'channels.localCoordinates', collection=collection)
ephysalign = EphysAlignment(self.xyz_picks, chns[:, 1],
track_prev=track,
feature_prev=feature,
brain_atlas=self.brain_atlas)
channels_mlapdv = np.int32(ephysalign.get_channel_locations(feature, track) * 1e6)
channels_atlas_id = ephysalign.get_brain_locations(channels_mlapdv / 1e6)['id']
alf_path = self.one.eid2path(self.insertion['session']).joinpath(collection)
alf_path.mkdir(exist_ok=True, parents=True)
f_name = alf_path.joinpath('channels.mlapdv.npy')
np.save(f_name, channels_mlapdv)
files_to_register.append(f_name)
f_name = alf_path.joinpath('channels.brainLocationIds_ccf_2017.npy')
np.save(f_name, channels_atlas_id)
files_to_register.append(f_name)
self.log.info("Writing datasets to FlatIron")
ftp_patcher.create_dataset(path=files_to_register,
created_by=self.one.alyx.user)
return files_to_register
[docs]
def update_experimenter_evaluation(self, prev_alignments=None, override=False):
if not np.any(prev_alignments) and not np.any(self.alignments):
aligned_traj = self.one.alyx.get(f'/trajectories?&probe_insertion={self.eid}'
'&provenance=Ephys aligned histology track',
clobber=True)
if len(aligned_traj) > 0:
self.alignments = aligned_traj[0].get('json', {})
else:
self.alignments = {}
return
else:
self.alignments = prev_alignments
outcomes = [align[2].split(':')[0] for key, align in self.alignments.items()
if len(align) == 3]
if len(outcomes) > 0:
vals = list(map(QC.validate, outcomes))
max_qc = np.argmax(vals)
outcome = outcomes[max_qc]
self.update(outcome, namespace='experimenter', override=override)
else:
self.log.warning(f'No experimenter qc found, qc field of probe insertion {self.eid} '
f'will not be updated')
[docs]
def get_aligned_channels(ins, chn_coords, one, ba=None, save_dir=None):
ba = ba or AllenAtlas(25)
depths = chn_coords[:, 1]
xyz = np.array(ins['json']['xyz_picks']) / 1e6
traj = one.alyx.rest('trajectories', 'list', probe_insertion=ins['id'],
provenance='Ephys aligned histology track')[0]
align_key = ins['json']['extended_qc']['alignment_stored']
feature = traj['json'][align_key][0]
track = traj['json'][align_key][1]
ephysalign = EphysAlignment(xyz, depths, track_prev=track,
feature_prev=feature,
brain_atlas=ba, speedy=True)
channels_mlapdv = np.int32(ephysalign.get_channel_locations(feature, track) * 1e6)
channels_atlas_ids = ephysalign.get_brain_locations(channels_mlapdv / 1e6)['id']
out_files = []
if save_dir is not None:
f_name = Path(save_dir).joinpath('channels.mlapdv.npy')
np.save(f_name, channels_mlapdv)
out_files.append(f_name)
f_name = Path(save_dir).joinpath('channels.brainLocationIds_ccf_2017.npy')
np.save(f_name, channels_atlas_ids)
out_files.append(f_name)
return out_files