Source code for psychopy.hardware.emulator

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Part of the PsychoPy library
# Copyright (C) 2002-2018 Jonathan Peirce (C) 2019-2021 Open Science Tools Ltd.
# Distributed under the terms of the GNU General Public License (GPL).

"""Idea: Run or debug an experiment script using exactly the same code,
i.e., for both testing and online data acquisition. To debug timing,
you can emulate sync pulses and user responses.
Limitations: pyglet only; keyboard events only.

from __future__ import absolute_import, print_function

from builtins import range
import threading

from psychopy import visual, event, core, logging
from psychopy.sound import Sound  # for SyncGenerator tone

__author__ = 'Jeremy Gray'

[docs]class ResponseEmulator(threading.Thread): def __init__(self, simResponses=None): """Class to allow simulation of a user's keyboard responses during a scan. Given a list of response tuples (time, key), the thread will simulate a user pressing a key at a specific time (relative to the start of the run). Author: Jeremy Gray; Idea: Mike MacAskill """ if not simResponses: self.responses = [] else: self.responses = sorted(simResponses) # sort by onset times self.clock = core.Clock() self.stopflag = False threading.Thread.__init__(self, None, 'ResponseEmulator', None) self.running = False
[docs] def run(self): self.running = True self.clock.reset() last_onset = 0.000 # wait until next event requested, and simulate a key press for onset, key in self.responses: core.wait(float(onset) - last_onset) if type(key) == int: # avoid cryptic error if int key = "{}".format(key)[0] if type(key) == type(""): event._onPygletKey(symbol=key, modifiers=0, emulated=True) else: logging.error('ResponseEmulator: only keyboard events ' 'are supported') last_onset = onset if self.stopflag: break self.running = False return self
def stop(self): self.stopflag = True
[docs]class SyncGenerator(threading.Thread): def __init__(self, TR=1.0, TA=1.0, volumes=10, sync='5', skip=0, sound=False, **kwargs): """Class for a character-emitting metronome thread (emulate MR sync pulse). Aim: Allow testing of temporal robustness of fMRI scripts by emulating a hardware sync pulse. Adds an arbitrary 'sync' character to the key buffer, with sub-millisecond precision (less precise if CPU is maxed). Recommend: TR=1.000 or higher and less than 100% CPU. Shorter TR --> higher CPU load. Parameters: TR: seconds between volume acquisitions TA: seconds to acquire one volume volumes: number of 3D volumes to obtain in a given scanning run sync: character used as flag for sync timing, default='5' skip: how many frames to silently omit initially during T1 stabilization, no sync pulse. Not needed to test script timing, but will give more accurate feel to start of run. aka "discdacqs". sound: simulate scanner noise """ if TR < 0.1: msg = 'SyncGenerator: whole-brain TR < 0.1 not supported' raise ValueError(msg) self.TR = TR self.TA = TA self.hogCPU = 0.035 self.timesleep = self.TR self.volumes = int(volumes) self.sync = sync self.skip = skip self.playSound = sound if self.playSound: # pragma: no cover self.sound1 = Sound(800, secs=self.TA, volume=0.15, autoLog=False) self.sound2 = Sound(813, secs=self.TA, volume=0.15, autoLog=False) self.clock = core.Clock() self.stopflag = False threading.Thread.__init__(self, None, 'SyncGenerator', None) self.running = False
[docs] def run(self): self.running = True if self.skip: for i in range(int(self.skip)): if self.playSound: # pragma: no cover # emulate T1 stabilization without data collection core.wait(self.TR, hogCPUperiod=0) self.clock.reset() for vol in range(1, self.volumes + 1): if self.playSound: # pragma: no cover if self.stopflag: break # "emit" a sync pulse by placing a key in the buffer: event._onPygletKey(symbol=self.sync, modifiers=0, emulated=True) # wait for start of next volume, doing our own hogCPU for # tighter sync: core.wait(self.timesleep - self.hogCPU, hogCPUperiod=0) while self.clock.getTime() < vol * self.TR: pass # hogs the CPU for tighter sync self.running = False return self
def stop(self): self.stopflag = True
[docs]def launchScan(win, settings, globalClock=None, simResponses=None, mode=None, esc_key='escape', instr='select Scan or Test, press enter', wait_msg="waiting for scanner...", wait_timeout=300, log=True): """Accepts up to four fMRI scan parameters (TR, volumes, sync-key, skip), and launches an experiment in one of two modes: Scan, or Test. :Usage: See Coder Demo -> experiment control -> In brief: 1) from psychopy.hardware.emulator import launchScan; 2) Define your args; and 3) add 'vol = launchScan(args)' at the top of your experiment script. launchScan() waits for the first sync pulse and then returns, allowing your experiment script to proceed. The key feature is that, in test mode, it first starts an autonomous thread that emulates sync pulses (i.e., emulated by your CPU rather than generated by an MRI machine). The thread places a character in the key buffer, exactly like a keyboard event does. launchScan will wait for the first such sync pulse (i.e., character in the key buffer). launchScan returns the number of sync pulses detected so far (i.e., 1), so that a script can account for them explicitly. If a globalClock is given (highly recommended), it is reset to 0.0 when the first sync pulse is detected. If a mode was not specified when calling launchScan, the operator is prompted to select Scan or Test. If **scan mode** is selected, the script will wait until the first scan pulse is detected. Typically this would be coming from the scanner, but note that it could also be a person manually pressing that key. If **test mode** is selected, launchScan() starts a separate thread to emit sync pulses / key presses. Note that this thread is effectively nothing more than a key-pressing metronome, emitting a key at the start of every TR, doing so with high temporal precision. If your MR hardware interface does not deliver a key character as a sync flag, you can still use launchScan() to test script timing. You have to code your experiment to trigger on either a sync character (to test timing) or your usual sync flag (for actual scanning). :Parameters: win: a :class:`~psychopy.visual.Window` object (required) settings : a dict containing up to 5 parameters (2 required: TR, volumes) TR : seconds per whole-brain volume (minimum value = 0.1s) volumes : number of whole-brain (3D) volumes to obtain in a given scanning run. sync : (optional) key for sync timing, default = '5'. skip : (optional) how many volumes to silently omit initially (during T1 stabilization, no sync pulse). default = 0. sound : (optional) whether to play a sound when simulating scanner sync pulses globalClock : optional but highly recommended :class:`~psychopy.core.Clock` to be used during the scan; if one is given, it is reset to 0.000 when the first sync pulse is received. simResponses : optional list of tuples [(time, key), (time, key), ...]. time values are seconds after the first scan pulse is received. esc_key : key to be used for user-interrupt during launch. default = 'escape' mode : if mode is 'Test' or 'Scan', launchScan() will start in that mode. instr : instructions to be displayed to the scan operator during mode selection. wait_msg : message to be displayed to the subject while waiting for the scan to start (i.e., after operator indicates start but before the first scan pulse is received). wait_timeout : time in seconds that launchScan will wait before assuming something went wrong and exiting. Defaults to 300sec (5 min). Raises a RuntimeError if no sync pulse is received in the allowable time. """ if not 'sync' in settings: settings.update({'sync': '5'}) if not 'skip' in settings: settings.update({'skip': 0}) try: wait_timeout = max(0.01, float(wait_timeout)) except ValueError: msg = "wait_timeout must be number-like, but instead it was {}." raise ValueError(msg.format(wait_timeout)) settings['sync'] = "{}".format(settings['sync']) # convert to str/unicode settings['TR'] = float(settings['TR']) settings['volumes'] = int(settings['volumes']) settings['skip'] = int(settings['skip']) msg = "vol: %(volumes)d TR: %(TR).3fs skip: %(skip)d sync: '%(sync)s'" runInfo = msg % settings if log: # pragma: no cover logging.exp('launchScan: ' + runInfo) instructions = visual.TextStim( win, text=instr, height=.05, pos=(0, 0), color=.4, autoLog=False) parameters = visual.TextStim( win, text=runInfo, height=.05, pos=(0, -0.5), color=.4, autoLog=False) # if a valid mode was specified, use it; otherwise query via RatingScale: mode = "{}".format(mode).capitalize() if mode not in ['Scan', 'Test']: run_type = visual.RatingScale(win, choices=['Scan', 'Test'], marker='circle', markerColor='DarkBlue', size=.8, stretch=.3, pos=(0.8, -0.9), markerStart='Test', lineColor='DarkGray', autoLog=False) while run_type.noResponse: instructions.draw() parameters.draw() run_type.draw() win.flip() if event.getKeys([esc_key]): break mode = run_type.getRating() doSimulation = bool(mode == 'Test') win.mouseVisible = False if doSimulation: wait_msg += ' (simulation)' msg = visual.TextStim(win, color='DarkGray', text=wait_msg, autoLog=False) msg.draw() win.flip() event.clearEvents() # do before starting the threads if doSimulation: syncPulse = SyncGenerator(**settings) syncPulse.start() # start emitting sync pulses core.runningThreads.append(syncPulse) if simResponses: roboResponses = ResponseEmulator(simResponses) # start emitting simulated user responses roboResponses.start() core.runningThreads.append(roboResponses) # wait for first sync pulse: timeoutClock = core.Clock() # zeroed now allKeys = [] while not settings['sync'] in allKeys: allKeys = event.getKeys() if esc_key and esc_key in allKeys: # pragma: no cover core.quit() if timeoutClock.getTime() > wait_timeout: msg = 'Waiting for scanner has timed out in %.3f seconds.' raise RuntimeError(msg % wait_timeout) if globalClock: globalClock.reset() if log: # pragma: no cover logging.exp('launchScan: start of scan') # blank the screen on first sync pulse received: win.flip() # one sync pulse has been caught so far: elapsed = 1 return elapsed

Back to top