"""Miscellaneous pipeline utility functions."""
import ctypes
import hashlib
import json
import os
import re
import shutil
import subprocess
import sys
import time
import logging
import warnings
from functools import wraps
from pathlib import Path
from typing import Union, List, Callable, Any
from inspect import signature
import uuid
import socket
import traceback
import spikeglx
from iblutil.io import hashfile, params
from iblutil.util import range_str
from one.alf.files import get_session_path
from one.alf.spec import is_uuid_string, is_session_path, describe
from one.api import ONE
import ibllib.io.flags as flags
import ibllib.io.raw_data_loaders as raw
from ibllib.io.misc import delete_empty_folders
import ibllib.io.session_params as sess_params
log = logging.getLogger(__name__)
DEVICE_FLAG_MAP = {'neuropixel': 'ephys',
'cameras': 'video',
'widefield': 'widefield',
'sync': 'sync'}
[docs]
def subjects_data_folder(folder: Path, rglob: bool = False) -> Path:
"""Given a root_data_folder will try to find a 'Subjects' data folder.
If Subjects folder is passed will return it directly."""
if not isinstance(folder, Path):
folder = Path(folder)
if rglob:
func = folder.rglob
else:
func = folder.glob
# Try to find Subjects folder one level
if folder.name.lower() != 'subjects':
# Try to find Subjects folder if folder.glob
spath = [x for x in func('*') if x.name.lower() == 'subjects']
if not spath:
raise ValueError('No "Subjects" folder in children folders')
elif len(spath) > 1:
raise ValueError(f'Multiple "Subjects" folder in children folders: {spath}')
else:
folder = folder / spath[0]
return folder
[docs]
def cli_ask_default(prompt: str, default: str):
"""
Prompt the user for input, display the default option and return user input or default
:param prompt: String to display to user
:param default: The default value to return if user doesn't enter anything
:return: User input or default
"""
return input(f'{prompt} [default: {default}]: ') or default
[docs]
def cli_ask_options(prompt: str, options: list, default_idx: int = 0) -> str:
parsed_options = [str(x) for x in options]
if default_idx is not None:
parsed_options[default_idx] = f"[{parsed_options[default_idx]}]"
options_str = " (" + " | ".join(parsed_options) + ")> "
ans = input(prompt + options_str) or str(options[default_idx])
if ans not in [str(x) for x in options]:
return cli_ask_options(prompt, options, default_idx=default_idx)
return ans
[docs]
def behavior_exists(session_path: str, include_devices=False) -> bool:
"""
Returns True if the session has a task behaviour folder
:param session_path:
:return:
"""
session_path = Path(session_path)
if include_devices and session_path.joinpath("_devices").exists():
return True
if session_path.joinpath("raw_behavior_data").exists():
return True
return any(session_path.glob('raw_task_data_*'))
[docs]
def check_transfer(src_session_path, dst_session_path):
"""
Check all the files in the source directory match those in the destination directory. Function
will throw assertion errors/exceptions if number of files do not match, file names do not
match, or if file sizes do not match.
:param src_session_path: The source directory that was copied
:param dst_session_path: The copy target directory
"""
src_files = sorted([x for x in Path(src_session_path).rglob('*') if x.is_file()])
dst_files = sorted([x for x in Path(dst_session_path).rglob('*') if x.is_file()])
assert len(src_files) == len(dst_files), 'Not all files transferred'
for s, d in zip(src_files, dst_files):
assert s.name == d.name, 'file name mismatch'
assert s.stat().st_size == d.stat().st_size, 'file size mismatch'
[docs]
def rename_session(session_path: str, new_subject=None, new_date=None, new_number=None,
ask: bool = False) -> Path:
"""Rename a session. Prompts the user for the new subject name, data and number and then moves
session path to new session path.
:param session_path: A session path to rename
:type session_path: str
:param new_subject: A new subject name, if none provided, the user is prompted for one
:param new_date: A new session date, if none provided, the user is prompted for one
:param new_number: A new session number, if none provided, the user is prompted for one
:param ask: used to ensure prompt input from user, defaults to False
:type ask: bool
:return: The renamed session path
:rtype: Path
"""
session_path = get_session_path(session_path)
if session_path is None:
raise ValueError('Session path not valid ALF session folder')
mouse = session_path.parts[-3]
date = session_path.parts[-2]
sess = session_path.parts[-1]
new_mouse = new_subject or mouse
new_date = new_date or date
new_sess = new_number or sess
if ask:
new_mouse = input(f"Please insert subject NAME [current value: {mouse}]> ")
new_date = input(f"Please insert new session DATE [current value: {date}]> ")
new_sess = input(f"Please insert new session NUMBER [current value: {sess}]> ")
new_session_path = Path(*session_path.parts[:-3]).joinpath(new_mouse, new_date,
new_sess.zfill(3))
assert is_session_path(new_session_path), 'invalid subject, date or number'
if new_session_path.exists():
ans = input(f'Warning: session path {new_session_path} already exists.\nWould you like to '
f'move {new_session_path} to a backup directory? [y/N] ')
if (ans or 'n').lower() in ['n', 'no']:
print(f'Manual intervention required, data exists in the following directory: '
f'{session_path}')
return
if backup_session(new_session_path):
print(f'Backup was successful, removing directory {new_session_path}...')
shutil.rmtree(str(new_session_path), ignore_errors=True)
shutil.move(str(session_path), str(new_session_path))
print(session_path, "--> renamed to:")
print(new_session_path)
return new_session_path
[docs]
def backup_session(session_path):
"""Used to move the contents of a session to a backup folder, likely before the folder is
removed.
:param session_path: A session path to be backed up
:return: True if directory was backed up or exits if something went wrong
:rtype: Bool
"""
bk_session_path = Path()
if Path(session_path).exists():
try:
bk_session_path = Path(*session_path.parts[:-4]).joinpath(
"Subjects_backup_renamed_sessions", Path(*session_path.parts[-3:]))
Path(bk_session_path.parent).mkdir(parents=True)
print(f"Created path: {bk_session_path.parent}")
# shutil.copytree(session_path, bk_session_path, dirs_exist_ok=True)
shutil.copytree(session_path, bk_session_path) # python 3.7 compatibility
print(f"Copied contents from {session_path} to {bk_session_path}")
return True
except FileExistsError:
log.error(f"A backup session for the given path already exists: {bk_session_path}, "
f"manual intervention is necessary.")
raise
except shutil.Error:
log.error(f'Some kind of copy error occurred when moving files from {session_path} to '
f'{bk_session_path}')
log.error(shutil.Error)
else:
log.error(f"The given session path does not exist: {session_path}")
return False
[docs]
def copy_with_check(src, dst, **kwargs):
dst = Path(dst)
if dst.exists() and Path(src).stat().st_size == dst.stat().st_size:
return dst
elif dst.exists():
dst.unlink()
return shutil.copy2(src, dst, **kwargs)
[docs]
def transfer_session_folders(local_sessions: list, remote_subject_folder, subfolder_to_transfer):
"""
Used to determine which local session folders should be transferred to which remote session folders, will prompt the user
when necessary.
Parameters
----------
local_sessions : list
Required list of local session folder paths to sync to local server.
remote_subject_folder : str, pathlib.Path
The remote location of the subject folder (typically pulled from the params).
subfolder_to_transfer : str
Which subfolder to sync
Returns
-------
list of tuples
For each session, a tuple of (source, destination) of attempted file transfers.
list of bool
A boolean True/False for success/failure of the transfer.
"""
transfer_list = [] # list of sessions to transfer
skip_list = "" # "list" of sessions to skip and the reason for the skip
# Iterate through all local sessions in the given list
for local_session in local_sessions:
# Set expected remote_session location and perform simple error state checks
remote_session = remote_subject_folder.joinpath(*local_session.parts[-3:])
# Skip session if ...
if subfolder_to_transfer:
if not local_session.joinpath(subfolder_to_transfer).exists():
msg = f"{local_session} - skipping session, no '{subfolder_to_transfer}' folder found locally"
log.warning(msg)
skip_list += msg + "\n"
continue
if not remote_session.parent.exists():
msg = f"{local_session} - no matching remote session date folder found for the given local session"
log.info(msg)
skip_list += msg + "\n"
continue
if not behavior_exists(remote_session):
msg = f"{local_session} - skipping session, no behavior data found in remote folder {remote_session}"
log.warning(msg)
skip_list += msg + "\n"
continue
# Determine if there are multiple session numbers from the date path
local_sessions_for_date = get_session_numbers_from_date_path(local_session.parent)
remote_sessions_for_date = get_session_numbers_from_date_path(remote_session.parent)
remote_session_pick = None
if len(local_sessions_for_date) > 1 or len(remote_sessions_for_date) > 1:
# Format folder size output for end user to review
local_session_numbers_with_size = remote_session_numbers_with_size = ""
for lsfd in local_sessions_for_date:
size_in_gb = round(get_directory_size(local_session.parent / lsfd, in_gb=True), 2)
local_session_numbers_with_size += lsfd + " (" + str(size_in_gb) + " GB)\n"
for rsfd in remote_sessions_for_date:
size_in_gb = round(get_directory_size(remote_session.parent / rsfd, in_gb=True), 2)
remote_session_numbers_with_size += rsfd + " (" + str(size_in_gb) + " GB)\n"
log.info(f"\n\nThe following local session folder(s) were found on this acquisition PC:\n\n"
f"{''.join(local_session_numbers_with_size)}\nThe following remote session folder(s) were found on the "
f"server:\n\n{''.join(remote_session_numbers_with_size)}\n")
def _remote_session_picker(sessions_for_date):
resp = "s"
resp_invalid = True
while resp_invalid: # loop until valid user input
resp = input(f"\n\n--- USER INPUT NEEDED ---\nWhich REMOTE session number would you like to transfer your "
f"local session to? Options {range_str(map(int, sessions_for_date))} or "
f"[s]kip/[h]elp/[e]xit> ").strip().lower()
if resp == "h":
print("An example session filepath:\n")
describe("number") # Explain what a session number is
input("Press enter to continue")
elif resp == "s" or resp == "e": # exit loop
resp_invalid = False
elif len(resp) <= 3:
resp_invalid = False if [i for i in sessions_for_date if int(resp) == int(i)] else None
else:
print("Invalid response. Please try again.")
return resp
log.info(f"Evaluation for local session "
f"{local_session.parts[-3]}/{local_session.parts[-2]}/{local_session.parts[-1]}...")
user_response = _remote_session_picker(remote_sessions_for_date)
if user_response == "s":
msg = f"{local_session} - Local session skipped due to user input"
log.info(msg)
skip_list += msg + "\n"
continue
elif user_response == "e":
log.info("Exiting, no files transferred.")
return
else:
remote_session_pick = remote_session.parent / user_response.zfill(3)
# Append to the transfer_list
transfer_tuple = (local_session, remote_session_pick) if remote_session_pick else (local_session, remote_session)
transfer_list.append(transfer_tuple)
log.info(f"{transfer_tuple[0]}, {transfer_tuple[1]} - Added to the transfer list")
# Verify that the number of local transfer_list entries match the number of remote transfer_list entries
if len(transfer_list) != len(set(dst for _, dst in transfer_list)):
raise RuntimeError(
"An invalid combination of sessions were picked; the most likely cause of this error is multiple local "
"sessions being selected for a single remote session. Please rerun the script."
)
# Call rsync/rdiff function for every entry in the transfer list
success = []
for src, dst in transfer_list:
if subfolder_to_transfer:
success.append(rsync_paths(src / subfolder_to_transfer, dst / subfolder_to_transfer))
else:
success.append(rsync_paths(src, dst))
if not success[-1]:
log.error("File transfer failed, check log for reason.")
# Notification to user for any transfers were skipped
log.warning(f"Video transfers that were not completed:\n\n{skip_list}") if skip_list else log.info("No transfers skipped.")
return transfer_list, success
[docs]
def transfer_folder(src: Path, dst: Path, force: bool = False) -> None:
"""functionality has been replaced by transfer_session_folders function"""
print(f"Attempting to copy:\n{src}\n--> {dst}")
if force:
print(f"Removing {dst}")
shutil.rmtree(dst, ignore_errors=True)
else:
try:
check_transfer(src, dst)
print("All files already copied, use force=True to re-copy")
return
except AssertionError:
pass
print(f"Copying all files:\n{src}\n--> {dst}")
# rsync_folder(src, dst, '**transfer_me.flag')
if sys.version_info.minor < 8:
# dirs_exist_ok kwarg not supported in < 3.8
shutil.rmtree(dst, ignore_errors=True)
shutil.copytree(src, dst, copy_function=copy_with_check)
else:
shutil.copytree(src, dst, dirs_exist_ok=True, copy_function=copy_with_check)
# If folder was created delete the src_flag_file
if check_transfer(src, dst) is None:
print("All files copied")
# rdiff-backup --compare /tmp/tmpw9o1zgn0 /tmp/tmp82gg36rm
# No changes found. Directory matches archive data.
[docs]
def load_params_dict(params_fname: str) -> dict:
params_fpath = Path(params.getfile(params_fname))
if not params_fpath.exists():
return None
with open(params_fpath, "r") as f:
out = json.load(f)
return out
[docs]
def load_videopc_params():
"""(DEPRECATED) This will be removed in favour of iblrigv8 functions."""
warnings.warn('load_videopc_params will be removed in favour of iblrigv8', FutureWarning)
if not load_params_dict("videopc_params"):
create_videopc_params()
return load_params_dict("videopc_params")
[docs]
def load_ephyspc_params():
if not load_params_dict("ephyspc_params"):
create_ephyspc_params()
return load_params_dict("ephyspc_params")
[docs]
def create_basic_transfer_params(param_str='transfer_params', local_data_path=None,
remote_data_path=None, clobber=False, **kwargs):
"""Create some basic parameters common to all acquisition rigs.
Namely prompt user for the local root data path and the remote (lab server) data path.
NB: All params stored in uppercase by convention.
Parameters
----------
param_str : str
The name of the parameters to load/save.
local_data_path : str, pathlib.Path
The local root data path, stored with the DATA_FOLDER_PATH key. If None, user is prompted.
remote_data_path : str, pathlib.Path, bool
The local root data path, stored with the REMOTE_DATA_FOLDER_PATH key. If None, user is prompted.
If False, the REMOTE_DATA_PATH key is not updated or is set to False if clobber = True.
clobber : bool
If True, any parameters in existing parameter file not found as keyword args will be removed,
otherwise the user is prompted for these also.
**kwargs
Extra parameters to set. If value is None, the user is prompted.
Returns
-------
dict
The parameters written to disc.
Examples
--------
Set up basic transfer parameters for modality acquisition PC
>>> par = create_basic_transfer_params()
Set up basic transfer paramers without prompting the user
>>> par = create_basic_transfer_params(
... local_data_path='/iblrig_data/Subjects',
... remote_data_path='/mnt/iblserver.champalimaud.pt/ibldata/Subjects')
Prompt user for extra parameter using custom prompt (will call function with current default)
>>> from functools import partial
>>> par = create_basic_transfer_params(
... custom_arg=partial(cli_ask_default, 'Please enter custom arg value'))
Set up with no remote path (NB: if not the first time, use clobber=True to save param key)
>>> par = create_basic_transfer_params(remote_data_path=False)
"""
parameters = params.as_dict(params.read(param_str, {})) or {}
if local_data_path is None:
local_data_path = parameters.get('DATA_FOLDER_PATH')
if not local_data_path or clobber:
local_data_path = cli_ask_default("Where's your LOCAL 'Subjects' data folder?", local_data_path)
parameters['DATA_FOLDER_PATH'] = local_data_path
if remote_data_path is None:
remote_data_path = parameters.get('REMOTE_DATA_FOLDER_PATH')
if remote_data_path in (None, '') or clobber:
remote_data_path = cli_ask_default("Where's your REMOTE 'Subjects' data folder?", remote_data_path)
if remote_data_path is not False:
parameters['REMOTE_DATA_FOLDER_PATH'] = remote_data_path
elif 'REMOTE_DATA_FOLDER_PATH' not in parameters or clobber:
parameters['REMOTE_DATA_FOLDER_PATH'] = False # Always assume no remote path
# Deal with extraneous parameters
for k, v in kwargs.items():
if callable(v): # expect function handle with default value as input
n_pars = len(signature(v).parameters)
parameters[k.upper()] = v(parameters.get(k.upper())) if n_pars > 0 else v()
elif v is None: # generic prompt for key
parameters[k.upper()] = cli_ask_default(
f'Enter a value for parameter {k.upper()}', parameters.get(k.upper())
)
else: # assign value to parameter
parameters[k.upper()] = str(v)
defined = list(map(str.upper, ('DATA_FOLDER_PATH', 'REMOTE_DATA_FOLDER_PATH', 'TRANSFER_LABEL', *kwargs.keys())))
if clobber:
# Delete any parameters in parameter dict that were not passed as keyword args into function
parameters = {k: v for k, v in parameters.items() if k in defined}
else:
# Prompt for any other parameters that weren't passed into function
for k in filter(lambda x: x not in defined, map(str.upper, parameters.keys())):
parameters[k] = cli_ask_default(f'Enter a value for parameter {k}', parameters.get(k))
if 'TRANSFER_LABEL' not in parameters:
parameters['TRANSFER_LABEL'] = f'{socket.gethostname()}_{uuid.getnode()}'
# Write parameters
params.write(param_str, parameters)
return parameters
[docs]
def create_videopc_params(force=False, silent=False):
"""(DEPRECATED) This will be removed in favour of iblrigv8 functions."""
url = 'https://github.com/int-brain-lab/iblrig/blob/videopc/docs/source/video.rst'
warnings.warn(f'create_videopc_params is deprecated, see {url}', DeprecationWarning)
if Path(params.getfile("videopc_params")).exists() and not force:
print(f"{params.getfile('videopc_params')} exists already, exiting...")
print(Path(params.getfile("videopc_params")).exists())
return
if silent:
data_folder_path = r"D:\iblrig_data\Subjects"
remote_data_folder_path = r"\\iblserver.champalimaud.pt\ibldata\Subjects"
body_cam_idx = 0
left_cam_idx = 1
right_cam_idx = 2
else:
data_folder_path = cli_ask_default(
r"Where's your LOCAL 'Subjects' data folder?", r"D:\iblrig_data\Subjects"
)
remote_data_folder_path = cli_ask_default(
r"Where's your REMOTE 'Subjects' data folder?",
r"\\iblserver.champalimaud.pt\ibldata\Subjects",
)
body_cam_idx = cli_ask_default("Please select the index of the BODY camera", "0")
left_cam_idx = cli_ask_default("Please select the index of the LEFT camera", "1")
right_cam_idx = cli_ask_default("Please select the index of the RIGHT camera", "2")
param_dict = {
"DATA_FOLDER_PATH": data_folder_path,
"REMOTE_DATA_FOLDER_PATH": remote_data_folder_path,
"BODY_CAM_IDX": body_cam_idx,
"LEFT_CAM_IDX": left_cam_idx,
"RIGHT_CAM_IDX": right_cam_idx,
}
params.write("videopc_params", param_dict)
print(f"Created {params.getfile('videopc_params')}")
print(param_dict)
return param_dict
[docs]
def create_ephyspc_params(force=False, silent=False):
if Path(params.getfile("ephyspc_params")).exists() and not force:
print(f"{params.getfile('ephyspc_params')} exists already, exiting...")
print(Path(params.getfile("ephyspc_params")).exists())
return
if silent:
data_folder_path = r"D:\iblrig_data\Subjects"
remote_data_folder_path = r"\\iblserver.champalimaud.pt\ibldata\Subjects"
probe_types = {"PROBE_TYPE_00": "3A", "PROBE_TYPE_01": "3B"}
else:
data_folder_path = cli_ask_default(
r"Where's your LOCAL 'Subjects' data folder?", r"D:\iblrig_data\Subjects"
)
remote_data_folder_path = cli_ask_default(
r"Where's your REMOTE 'Subjects' data folder?",
r"\\iblserver.champalimaud.pt\ibldata\Subjects",
)
n_probes = int(cli_ask_default("How many probes are you using?", '2'))
assert 100 > n_probes > 0, 'Please enter number between 1, 99 inclusive'
probe_types = {}
for i in range(n_probes):
probe_types[f'PROBE_TYPE_{i:02}'] = cli_ask_options(
f"What's the type of PROBE {i:02}?", ["3A", "3B"])
param_dict = {
"DATA_FOLDER_PATH": data_folder_path,
"REMOTE_DATA_FOLDER_PATH": remote_data_folder_path,
**probe_types
}
params.write("ephyspc_params", param_dict)
print(f"Created {params.getfile('ephyspc_params')}")
print(param_dict)
return param_dict
[docs]
def rdiff_install() -> bool:
"""
For windows:
* if the rdiff-backup executable does not already exist on the system
* downloads rdiff-backup zip file
* copies the executable to the C:\tools folder
For linux/mac:
* runs a pip install rdiff-backup
Returns:
True when install is successful, False when an error is encountered
"""
if os.name == "nt":
# ensure tools folder exists
tools_folder = "C:\\tools\\"
os.mkdir(tools_folder) if not Path(tools_folder).exists() else None
rdiff_cmd_loc = tools_folder + "rdiff-backup.exe"
if not Path(rdiff_cmd_loc).exists():
import requests
import zipfile
from io import BytesIO
url = "https://github.com/rdiff-backup/rdiff-backup/releases/download/v2.0.5/rdiff-backup-2.0.5.win32exe.zip"
log.info("Downloading zip file for rdiff-backup.")
# Download the file by sending the request to the URL, ensure success by status code
if requests.get(url).status_code == 200:
log.info("Download complete for rdiff-backup zip file.")
# extracting the zip file contents
zipfile = zipfile.ZipFile(BytesIO(requests.get(url).content))
zipfile.extractall("C:\\Temp")
rdiff_folder_name = zipfile.namelist()[0] # attempting a bit of future-proofing
# move the executable to the C:\tools folder
shutil.copy("C:\\Temp\\" + rdiff_folder_name + "rdiff-backup.exe", rdiff_cmd_loc)
shutil.rmtree("C:\\Temp\\" + rdiff_folder_name) # cleanup temp folder
try: # attempt to call the rdiff command
subprocess.run([rdiff_cmd_loc, "--version"], check=True)
except (FileNotFoundError, subprocess.CalledProcessError) as e:
log.error("rdiff-backup installation did not complete.\n", e)
return False
return True
else:
log.error("Download request status code not 200, something did not go as expected.")
return False
else: # anything not Windows
try: # package should not be installed via the requirements.txt to accommodate windows
subprocess.run(["pip", "install", "rdiff-backup"], check=True)
except subprocess.CalledProcessError as e:
log.error("rdiff-backup pip install did not complete.\n", e)
return False
return True
[docs]
def get_directory_size(dir_path: Path, in_gb=False) -> float:
"""
Used to determine total size of all files in a given session_path, including all child directories
Args:
dir_path (Path): path we want to get the total size of
in_gb (bool): set to True for returned value to be in gigabytes
Returns:
float: sum of all files in the given directory path (in bytes by default, in GB if specified)
"""
total = 0
with iter(os.scandir(dir_path)) as it:
for entry in it:
if entry.is_file():
total += entry.stat().st_size
elif entry.is_dir():
total += get_directory_size(entry.path)
if in_gb:
return total / 1024 / 1024 / 1024 # in GB
return total # in bytes
[docs]
def get_session_numbers_from_date_path(date_path: Path) -> list:
"""
Retrieves session numbers when given a date path
Args:
date_path (Path): path to date, i.e. \\\\server\\some_lab\\Subjects\\Date"
Returns:
(list): Found sessions as a sorted list
"""
contents = Path(date_path).glob('*')
folders = filter(lambda x: x.is_dir() and re.match(r'^\d{3}$', x.name), contents)
sessions_as_set = set(map(lambda x: x.name, folders))
sessions_as_sorted_list = sorted(sessions_as_set)
return sessions_as_sorted_list
[docs]
def rsync_paths(src: Path, dst: Path) -> bool:
"""
Used to run the rsync algorithm via a rdiff-backup command on the paths contained on the provided source and destination.
This function relies on the rdiff-backup package and is run from the command line, i.e. subprocess.run(). Full documentation
can be found here - https://rdiff-backup.net/docs/rdiff-backup.1.html
Parameters
----------
src : Path
source path that contains data to be transferred
dst : Path
destination path that will receive the transferred data
Returns
-------
bool
True for success, False for failure
Raises
------
FileNotFoundError, subprocess.CalledProcessError
"""
# Set rdiff_cmd_loc based on OS type (assuming C:\tools is not in Windows PATH environ)
rdiff_cmd_loc = "C:\\tools\\rdiff-backup.exe" if os.name == "nt" else "rdiff-backup"
try: # Check if rdiff-backup command is available
subprocess.run([rdiff_cmd_loc, "--version"], check=True)
except (FileNotFoundError, subprocess.CalledProcessError) as e:
if not rdiff_install(): # Attempt to install rdiff
log.error("rdiff-backup command is unavailable, transfers can not continue.\n", e)
raise
log.info("Attempting to transfer data: " + str(src) + " -> " + str(dst))
WindowsInhibitor().inhibit() if os.name == "nt" else None # prevent Windows from going to sleep
try:
rsync_command = [rdiff_cmd_loc, "--verbosity", str(0),
"--create-full-path", "--backup-mode", "--no-acls", "--no-eas",
"--no-file-statistics", "--exclude", "**transfer_me.flag",
str(src), str(dst)]
subprocess.run(rsync_command, check=True)
time.sleep(1) # give rdiff-backup a second to complete all logging operations
except (FileNotFoundError, subprocess.CalledProcessError) as e:
log.error("Transfer failed with code %i.\n", e.returncode)
if e.stderr:
log.error(e.stderr)
return False
log.info("Validating transfer completed...")
try: # Validate the transfers succeeded
rsync_validate = [rdiff_cmd_loc, "--verify", str(dst)]
subprocess.run(rsync_validate, check=True)
except (FileNotFoundError, subprocess.CalledProcessError) as e:
log.error(f"Validation for destination {dst} failed.\n", e)
return False
log.info("Cleaning up rdiff files...")
shutil.rmtree(dst / "rdiff-backup-data")
WindowsInhibitor().uninhibit() if os.name == 'nt' else None # allow Windows to go to sleep
return True
[docs]
def confirm_ephys_remote_folder(local_folder=False, remote_folder=False, force=False, iblscripts_folder=False,
session_path=None):
"""
:param local_folder: The full path to the local Subjects folder
:param remote_folder: the full path to the remote Subjects folder
:param force:
:param iblscripts_folder:
:return:
"""
# FIXME: session_path can be relative
pars = load_ephyspc_params()
if not iblscripts_folder:
import deploy
iblscripts_folder = Path(deploy.__file__).parent.parent
if not local_folder:
local_folder = pars["DATA_FOLDER_PATH"]
if not remote_folder:
remote_folder = pars["REMOTE_DATA_FOLDER_PATH"]
local_folder = Path(local_folder)
remote_folder = Path(remote_folder)
# Check for Subjects folder
local_folder = subjects_data_folder(local_folder, rglob=True)
remote_folder = subjects_data_folder(remote_folder, rglob=True)
log.info(f"local folder: {local_folder}")
log.info(f"remote folder: {remote_folder}")
if session_path is None:
src_session_paths = [x.parent for x in local_folder.rglob("transfer_me.flag")]
else:
src_session_paths = session_path if isinstance(session_path, list) else [session_path]
if not src_session_paths:
log.info("Nothing to transfer, exiting...")
return
for session_path in src_session_paths:
log.info(f"Found : {session_path}")
log.info(f"Found: {len(src_session_paths)} sessions to transfer, starting transferring now")
for session_path in src_session_paths:
log.info(f"Transferring session: {session_path}")
# Rename ephys files
# FIXME: if transfer has failed and wiring file is there renaming will fail!
rename_ephys_files(str(session_path))
# Move ephys files
move_ephys_files(str(session_path))
# Copy wiring files
copy_wiring_files(str(session_path), iblscripts_folder)
try:
create_alyx_probe_insertions(str(session_path))
except BaseException:
log.error(traceback.print_exc())
log.info("Probe creation failed, please create the probe insertions manually. Continuing transfer...")
msg = f"Transfer {session_path }to {remote_folder} with the same name?"
resp = input(msg + "\n[y]es/[r]ename/[s]kip/[e]xit\n ^\n> ") or "y"
resp = resp.lower()
log.info(resp)
if resp not in ["y", "r", "s", "e", "yes", "rename", "skip", "exit"]:
return confirm_ephys_remote_folder(
local_folder=local_folder,
remote_folder=remote_folder,
force=force,
iblscripts_folder=iblscripts_folder,
)
elif resp == "y" or resp == "yes":
pass
elif resp == "r" or resp == "rename":
session_path = rename_session(session_path)
if not session_path:
continue
elif resp == "s" or resp == "skip":
continue
elif resp == "e" or resp == "exit":
return
remote_session_path = remote_folder / Path(*session_path.parts[-3:])
if not behavior_exists(remote_session_path, include_devices=True):
log.error(f"No behavior folder found in {remote_session_path}: skipping session...")
return
# TODO: Check flagfiles on src.and dst + alf dir in session folder then remove
# Try catch? wher catch condition is force transfer maybe
transfer_folder(session_path / "raw_ephys_data", remote_session_path / "raw_ephys_data", force=force)
# if behavior extract_me.flag exists remove it, because of ephys flag
flag_file = session_path / "transfer_me.flag"
if flag_file.exists(): # this file only exists for the iblrig v7 and lower
flag_file.unlink()
if (remote_session_path / "extract_me.flag").exists():
(remote_session_path / "extract_me.flag").unlink()
# Create remote flags
create_ephys_transfer_done_flag(remote_session_path)
check_create_raw_session_flag(remote_session_path)
[docs]
def probe_labels_from_session_path(session_path: Union[str, Path]) -> List[str]:
"""
Finds ephys probes according to the metadata spikeglx files. Only returns first subfolder
name under raw_ephys_data folder, ie. raw_ephys_data/probe00/copy_of_probe00 won't be returned
If there is a NP2.4 probe with several shanks, create several probes
:param session_path:
:return: list of strings
"""
plabels = []
raw_ephys_folder = Path(session_path).joinpath('raw_ephys_data')
for meta_file in raw_ephys_folder.rglob('*.ap.meta'):
if meta_file.parents[1] != raw_ephys_folder:
continue
meta = spikeglx.read_meta_data(meta_file)
nshanks = spikeglx._get_nshanks_from_meta(meta)
if nshanks > 1:
for i in range(nshanks):
plabels.append(meta_file.parts[-2] + 'abcdefghij'[i])
else:
plabels.append(meta_file.parts[-2])
plabels.sort()
return plabels
[docs]
def create_alyx_probe_insertions(
session_path: str,
force: bool = False,
one: object = None,
model: str = None,
labels: list = None,
):
if one is None:
one = ONE(cache_rest=None, mode='local')
eid = session_path if is_uuid_string(session_path) else one.path2eid(session_path)
if eid is None:
log.warning("Session not found on Alyx: please create session before creating insertions")
if model is None:
probe_model = spikeglx.get_neuropixel_version_from_folder(session_path)
pmodel = "3B2" if probe_model == "3B" else probe_model
else:
pmodel = model
labels = labels or probe_labels_from_session_path(session_path)
# create the qc fields in the json field
qc_dict = {}
qc_dict.update({"qc": "NOT_SET"})
qc_dict.update({"extended_qc": {}})
# create the dictionary
insertions = []
for plabel in labels:
insdict = {"session": eid, "name": plabel, "model": pmodel, "json": qc_dict}
# search for the corresponding insertion in Alyx
alyx_insertion = one.alyx.get(f'/insertions?&session={eid}&name={plabel}', clobber=True)
# if it doesn't exist, create it
if len(alyx_insertion) == 0:
alyx_insertion = one.alyx.rest("insertions", "create", data=insdict)
else:
iid = alyx_insertion[0]["id"]
if force:
alyx_insertion = one.alyx.rest("insertions", "update", id=iid, data=insdict)
else:
alyx_insertion = alyx_insertion[0]
insertions.append(alyx_insertion)
return insertions
[docs]
def create_ephys_flags(session_folder: str):
"""
Create flags for processing an ephys session. Should be called after move_ephys_files
:param session_folder: A path to an ephys session
:return:
"""
session_path = Path(session_folder)
flags.write_flag_file(session_path.joinpath("extract_ephys.flag"))
flags.write_flag_file(session_path.joinpath("raw_ephys_qc.flag"))
for probe_path in session_path.joinpath('raw_ephys_data').glob('probe*'):
flags.write_flag_file(probe_path.joinpath("spike_sorting.flag"))
[docs]
def create_ephys_transfer_done_flag(session_folder: str) -> None:
session_path = Path(session_folder)
flags.write_flag_file(session_path.joinpath("ephys_data_transferred.flag"))
[docs]
def create_video_transfer_done_flag(session_folder: str) -> None:
session_path = Path(session_folder)
flags.write_flag_file(session_path.joinpath("video_data_transferred.flag"))
[docs]
def create_transfer_done_flag(session_folder: str, flag_name: str) -> None:
session_path = Path(session_folder)
flags.write_flag_file(session_path.joinpath(f"{flag_name}_data_transferred.flag"))
[docs]
def check_create_raw_session_flag(session_folder: str) -> None:
session_path = Path(session_folder)
# if we have an experiment description file read in whether we expect video, ephys widefield etc, don't do it just based
# on the task protocol
experiment_description = sess_params.read_params(session_path)
def check_status(expected, flag):
if expected is not False and flag.exists():
return True
if expected is False and not flag.exists():
return True
else:
return False
if experiment_description is not None:
if any(session_path.joinpath('_devices').glob('*')):
return
# Find the devices in the experiment description file
devices = list()
for key in DEVICE_FLAG_MAP.keys():
if experiment_description.get('devices', {}).get(key, None) is not None:
devices.append(key)
# In case of widefield the sync also needs to be in it's own folder
if 'widefield' in devices:
devices.append('sync')
expected_flags = [session_path.joinpath(f'{DEVICE_FLAG_MAP[dev]}_data_transferred.flag') for dev in devices]
expected = []
flag_files = []
for dev, fl in zip(devices, expected_flags):
status = check_status(dev, fl)
if status:
flag_files.append(fl)
expected.append(status)
# In this case all the copying has completed
if all(expected):
# make raw session flag
flags.write_flag_file(session_path.joinpath("raw_session.flag"))
# and unlink individual copy flags
for fl in flag_files:
fl.unlink()
return
ephys = session_path.joinpath("ephys_data_transferred.flag")
video = session_path.joinpath("video_data_transferred.flag")
sett = raw.load_settings(session_path)
if sett is None:
log.error(f"No flag created for {session_path}")
return
is_biased = True if "biased" in sett["PYBPOD_PROTOCOL"] else False
is_training = True if "training" in sett["PYBPOD_PROTOCOL"] else False
is_habituation = True if "habituation" in sett["PYBPOD_PROTOCOL"] else False
if video.exists() and (is_biased or is_training or is_habituation):
flags.write_flag_file(session_path.joinpath("raw_session.flag"))
video.unlink()
if video.exists() and ephys.exists():
flags.write_flag_file(session_path.joinpath("raw_session.flag"))
ephys.unlink()
video.unlink()
[docs]
def rename_ephys_files(session_folder: str) -> None:
"""rename_ephys_files is system agnostic (3A, 3B1, 3B2).
Renames all ephys files to Alyx compatible filenames. Uses get_new_filename.
:param session_folder: Session folder path
:type session_folder: str
:return: None - Changes names of files on filesystem
:rtype: None
"""
session_path = Path(session_folder)
ap_files = session_path.rglob("*.ap.*")
lf_files = session_path.rglob("*.lf.*")
nidq_files = session_path.rglob("*.nidq.*")
for apf in ap_files:
new_filename = get_new_filename(apf.name)
shutil.move(str(apf), str(apf.parent / new_filename))
for lff in lf_files:
new_filename = get_new_filename(lff.name)
shutil.move(str(lff), str(lff.parent / new_filename))
for nidqf in nidq_files:
# Ignore wiring files: these are usually created after the file renaming however this
# function may be called a second time upon failed transfer.
if 'wiring' in nidqf.name:
continue
new_filename = get_new_filename(nidqf.name)
shutil.move(str(nidqf), str(nidqf.parent / new_filename))
[docs]
def get_new_filename(filename: str) -> str:
"""get_new_filename is system agnostic (3A, 3B1, 3B2).
Gets an alyx compatible filename from any spikeglx ephys file.
:param filename: Name of an ephys file
:return: New name for ephys file
"""
root = "_spikeglx_ephysData"
parts = filename.split('.')
if len(parts) < 3:
raise ValueError(fr'unrecognized filename "{filename}"')
pattern = r'.*(?P<gt>_g\d+_t\d+)'
if not (match := re.match(pattern, parts[0])):
raise ValueError(fr'unrecognized filename "{filename}"')
return '.'.join([root + match.group(1), *parts[1:]])
[docs]
def move_ephys_files(session_folder: str) -> None:
"""move_ephys_files is system agnostic (3A, 3B1, 3B2).
Moves all properly named ephys files to appropriate locations for transfer.
Use rename_ephys_files function before this one.
:param session_folder: Session folder path
:type session_folder: str
:return: None - Moves files on filesystem
:rtype: None
"""
session_path = Path(session_folder)
raw_ephys_data_path = session_path / "raw_ephys_data"
imec_files = session_path.rglob("*.imec*")
for imf in imec_files:
# For 3B system probe0x == imecx
probe_number = re.match(r'_spikeglx_ephysData_g\d_t\d.imec(\d+).*', imf.name)
if not probe_number:
# For 3A system imec files must be in a 'probexx' folder
probe_label = re.search(r'probe\d+', str(imf))
assert probe_label, f'Cannot assign probe number to file {imf}'
probe_label = probe_label.group()
else:
probe_number, = probe_number.groups()
probe_label = f'probe{probe_number.zfill(2)}'
raw_ephys_data_path.joinpath(probe_label).mkdir(exist_ok=True)
shutil.move(imf, raw_ephys_data_path.joinpath(probe_label, imf.name))
# NIDAq files (3B system only)
nidq_files = session_path.rglob("*.nidq.*")
for nidqf in nidq_files:
shutil.move(str(nidqf), str(raw_ephys_data_path / nidqf.name))
# Delete all empty folders recursively
delete_empty_folders(raw_ephys_data_path, dry=False, recursive=True)
[docs]
def create_custom_ephys_wirings(iblscripts_folder: str):
iblscripts_path = Path(iblscripts_folder)
PARAMS = load_ephyspc_params()
probe_set = set(v for k, v in PARAMS.items() if k.startswith('PROBE_TYPE'))
params_path = iblscripts_path.parent / "iblscripts_params"
params_path.mkdir(parents=True, exist_ok=True)
wirings_path = iblscripts_path / "deploy" / "ephyspc" / "wirings"
for k, v in PARAMS.items():
if not k.startswith('PROBE_TYPE_'):
continue
probe_label = f'probe{k[-2:]}'
if v not in ('3A', '3B'):
raise ValueError(f'Unsupported probe type "{v}"')
shutil.copy(
wirings_path / f"{v}.wiring.json", params_path / f"{v}_{probe_label}.wiring.json"
)
print(f"Created {v}.wiring.json in {params_path} for {probe_label}")
if "3B" in probe_set:
shutil.copy(wirings_path / "nidq.wiring.json", params_path / "nidq.wiring.json")
print(f"Created nidq.wiring.json in {params_path}")
print(f"\nYou can now modify your wiring files from folder {params_path}")
[docs]
def get_iblscripts_folder():
return str(Path().cwd().parent.parent)
[docs]
def copy_wiring_files(session_folder, iblscripts_folder):
"""Run after moving files to probe folders"""
PARAMS = load_ephyspc_params()
if PARAMS["PROBE_TYPE_00"] != PARAMS["PROBE_TYPE_01"]:
print("Having different probe types is not supported")
raise NotImplementedError()
session_path = Path(session_folder)
iblscripts_path = Path(iblscripts_folder)
iblscripts_params_path = iblscripts_path.parent / "iblscripts_params"
wirings_path = iblscripts_path / "deploy" / "ephyspc" / "wirings"
termination = '.wiring.json'
# Determine system
ephys_system = PARAMS["PROBE_TYPE_00"]
# Define where to get the files from (determine if custom wiring applies)
src_wiring_path = iblscripts_params_path if iblscripts_params_path.exists() else wirings_path
probe_wiring_file_path = src_wiring_path / f"{ephys_system}{termination}"
if ephys_system == "3B":
# Copy nidq file
nidq_files = session_path.rglob("*.nidq.bin")
for nidqf in nidq_files:
nidq_wiring_name = ".".join(str(nidqf.name).split(".")[:-1]) + termination
shutil.copy(
str(src_wiring_path / f"nidq{termination}"),
str(session_path / "raw_ephys_data" / nidq_wiring_name),
)
# If system is either (3A OR 3B) copy a wiring file for each ap.bin file
for binf in session_path.rglob("*.ap.bin"):
probe_label = re.search(r'probe\d+', str(binf))
if probe_label:
wiring_name = ".".join(str(binf.name).split(".")[:-2]) + termination
dst_path = session_path / "raw_ephys_data" / probe_label.group() / wiring_name
shutil.copy(probe_wiring_file_path, dst_path)
[docs]
def multi_parts_flags_creation(root_paths: Union[list, str, Path]) -> List[Path]:
"""
Creates the sequence files to run spike sorting in batches
A sequence file is a json file with the following fields:
sha1: a unique hash of the metafiles involved
probe: a string with the probe name
index: the index within the sequence
nrecs: the length of the sequence
files: a list of files
:param root_paths:
:return:
"""
from one.alf import io as alfio
# "001/raw_ephys_data/probe00/_spikeglx_ephysData_g0_t0.imec0.ap.meta",
if isinstance(root_paths, str) or isinstance(root_paths, Path):
root_paths = [root_paths]
recordings = {}
for root_path in root_paths:
for meta_file in root_path.rglob("*.ap.meta"):
# we want to make sure that the file is just under session_path/raw_ephys_data/{probe_label}
session_path = alfio.files.get_session_path(meta_file)
raw_ephys_path = session_path.joinpath('raw_ephys_data')
if meta_file.parents[1] != raw_ephys_path:
log.warning(f"{meta_file} is not in a probe directory and will be skipped")
continue
# stack the meta-file in the probe label key of the recordings dictionary
plabel = meta_file.parts[-2]
recordings[plabel] = recordings.get(plabel, []) + [meta_file]
# once we have all of the files
for k in recordings:
nrecs = len(recordings[k])
recordings[k].sort()
# the identifier of the overarching recording sequence is the hash of hashes of the files
m = hashlib.sha1()
for i, meta_file in enumerate(recordings[k]):
hash = hashfile.sha1(meta_file)
m.update(hash.encode())
# writes the sequence files
for i, meta_file in enumerate(recordings[k]):
sequence_file = meta_file.parent.joinpath(meta_file.name.replace('ap.meta', 'sequence.json'))
with open(sequence_file, 'w+') as fid:
json.dump(dict(sha1=m.hexdigest(), probe=k, index=i, nrecs=len(recordings[k]),
files=list(map(str, recordings[k]))), fid)
log.info(f"{k}: {i}/{nrecs} written sequence file {recordings}")
return recordings
[docs]
class WindowsInhibitor:
"""Prevent OS sleep/hibernate in windows; code from:
https://github.com/h3llrais3r/Deluge-PreventSuspendPlus/blob/master/preventsuspendplus/core.py
API documentation:
https://msdn.microsoft.com/en-us/library/windows/desktop/aa373208(v=vs.85).aspx"""
ES_CONTINUOUS = 0x80000000
ES_SYSTEM_REQUIRED = 0x00000001
@staticmethod
def _set_thread_execution_state(state: int) -> None:
result = ctypes.windll.kernel32.SetThreadExecutionState(state)
if result == 0:
log.error("Failed to set thread execution state.")
[docs]
@staticmethod
def inhibit(quiet: bool = False):
if quiet:
log.debug("Preventing Windows from going to sleep")
else:
print("Preventing Windows from going to sleep")
WindowsInhibitor._set_thread_execution_state(WindowsInhibitor.ES_CONTINUOUS | WindowsInhibitor.ES_SYSTEM_REQUIRED)
[docs]
@staticmethod
def uninhibit(quiet: bool = False):
if quiet:
log.debug("Allowing Windows to go to sleep")
else:
print("Allowing Windows to go to sleep")
WindowsInhibitor._set_thread_execution_state(WindowsInhibitor.ES_CONTINUOUS)
[docs]
def sleepless(func: Callable[..., Any]) -> Callable[..., Any]:
"""
Decorator to ensure that the system doesn't enter sleep or idle mode during a long-running task.
This decorator wraps a function and sets the thread execution state to prevent
the system from entering sleep or idle mode while the decorated function is
running.
Parameters
----------
func : callable
The function to decorate.
Returns
-------
callable
The decorated function.
"""
@wraps(func)
def inner(*args, **kwargs) -> Any:
if os.name == 'nt':
WindowsInhibitor().inhibit(quiet=True)
result = func(*args, **kwargs)
if os.name == 'nt':
WindowsInhibitor().uninhibit(quiet=True)
return result
return inner