Source code for psychopy_eyetracker_pupil_labs.pupil_labs.pupil_core.eyetracker

# -*- coding: utf-8 -*-
# Part of the PsychoPy library
# Copyright (C) 2012-2020 iSolver Software Solutions (C) 2021 Open Science Tools Ltd.
# Distributed under the terms of the GNU General Public License (GPL).
import logging
from collections import defaultdict
from sys import flags
from typing import Optional, Dict, Tuple, Union

from psychopy.iohub.constants import EyeTrackerConstants
from psychopy.iohub.devices import Computer, Device
from psychopy.iohub.devices.eyetracker import EyeTrackerDevice
from . import (
    data_parse,
    pupil_remote,
)
from .bisector import (
    DatumNotFoundError,
    MutableBisector,
)
from psychopy.iohub.errors import print2err, printExceptionDetailsToStdErr

logger = logging.getLogger(__name__)


[docs]class EyeTracker(EyeTrackerDevice): """ Implementation of the :py:class:`Common Eye Tracker Interface <.EyeTrackerDevice>` for the Pupil Core headset. Uses ioHub's polling method to process data from `Pupil Capture's Network API <https://docs.pupil-labs.com/developer/core/network-api/>`_. To synchronize time between Pupil Capture and PsychoPy, the integration estimates the offset between their clocks and applies it to the incoming data. This step effectively transforms time between the two softwares while taking the transmission delay into account. For details, see this `real-time time-sync tutorial <https://github.com/pupil-labs/pupil-helpers/blob/master/python/simple_realtime_time_sync.py>`_. This class operates in two modes, depending on the ``pupillometry_only`` runtime setting: #. Pupillometry-only mode If the ``pupillometry_only`` setting is to ``True``, the integration will only receive eye-camera based metrics, e.g. pupil size, its location in eye camera coordinates, etc. The advatage of this mode is that it does not require calibrating the eye tracker or setting up AprilTag markers for the AoI tracking. To receive gaze data in PsychoPy screen coordinates, see the Pupillometry+Gaze mode below. Internally, this is implemented by subscribing to the ``pupil.`` data topic. #. Pupillometry+Gaze mode If the ``Pupillometry only`` setting is set to ``False``, the integration will receive positional data in addition to the pupillometry data mentioned above. For this to work, one has to setup Pupil Capture's built-in AoI tracking system and perform a calibration for each subject. The integration takes care of translating the spatial coordinates to PsychoPy display coordinates. Internally, this mode is implemented by subscribing to the ``gaze.3d.`` and the corresponding surface name data topics.only .. note:: Only **one** instance of EyeTracker can be created within an experiment. Attempting to create > 1 instance will raise an exception. """ # EyeTrackerDevice Interface # #: The multiplier needed to convert a device's native time base to sec.msec-usec times. # DEVICE_TIMEBASE_TO_SEC = 1.0 # Used by pyEyeTrackerDevice implementations to store relationships between an eye # trackers command names supported for EyeTrackerDevice sendCommand method and # a private python function to call for that command. This allows an implementation # of the interface to expose functions that are not in the core EyeTrackerDevice spec # without have to use the EXT extension class. _COMMAND_TO_FUNCTION = {} EVENT_CLASS_NAMES = [ "MonocularEyeSampleEvent", "BinocularEyeSampleEvent", ] def __init__(self, *args, **kwargs) -> None: EyeTrackerDevice.__init__(self, *args, **kwargs) self._latest_sample = None self._latest_gaze_position = None self._actively_recording = False self._surface_name = self._runtime_settings["surface_name"] self._confidence_threshold = self._runtime_settings["confidence_threshold"] pupil_remote_settings = self._runtime_settings["pupil_remote"] self._pupil_remote_ip_address = pupil_remote_settings["ip_address"] self._pupil_remote_port = pupil_remote_settings["port"] self._pupil_remote_timeout_ms = pupil_remote_settings["timeout_ms"] if self._runtime_settings["pupillometry_only"]: self._pupil_remote_subscriptions = ["pupil"] else: self._pupil_remote_subscriptions = ["gaze.3d.", self.surface_topic] # Calibration notifications are only being handled during runSetupProcedure() # and are otherwise ignored. self._pupil_remote_subscriptions.append("notify.calibration") capture_recording_settings = self._runtime_settings["pupil_capture_recording"] self._capture_recording_enabled = capture_recording_settings["enabled"] self._capture_recording_location = capture_recording_settings["location"] self._gaze_bisectors_by_topic = defaultdict(MutableBisector) self._pupil_remote = None self.setConnectionState(True) @property def surface_topic(self) -> str: """Read-ony Pupil Capture subscription topic to receive data from the configured surface""" return f"surfaces.{self._surface_name}"
[docs] def trackerTime(self) -> float: """Returns the current time reported by the eye tracker device. Implementation measures the current time in PsychoPy time and applies the estimated clock offset to transform the measurement into tracker time. :return: The eye tracker hardware's reported current time. """ return self._psychopyTimeInTrackerTime(Computer.getTime())
[docs] def trackerSec(self) -> float: """ Returns :py:func:`.EyeTracker.trackerTime` :return: The eye tracker hardware's reported current time in sec.msec-usec format. """ return self.trackerTime()
[docs] def setConnectionState(self, enable: bool) -> None: """setConnectionState either connects (``setConnectionState(True)``) or disables (``setConnectionState(False)``) active communication between the ioHub and Pupil Capture. .. note:: A connection to the Eye Tracker is automatically established when the ioHub Process is initialized (based on the device settings in the iohub_config.yaml), so there is no need to explicitly call this method in the experiment script. .. note:: Connecting an Eye Tracker to the ioHub does **not** necessarily collect and send eye sample data to the ioHub Process. To start actual data collection, use the Eye Tracker method ``setRecordingState(bool)`` or the ioHub Device method (device type independent) ``enableEventRecording(bool)``. Args: enable (bool): True = enable the connection, False = disable the connection. :return: bool: indicates the current connection state to the eye tracking hardware. """ if enable and self._pupil_remote is None: self._pupil_remote = pupil_remote.PupilRemote( subscription_topics=self._pupil_remote_subscriptions, ip_address=self._pupil_remote_ip_address, port=self._pupil_remote_port, timeout_ms=self._pupil_remote_timeout_ms, ) elif not enable and self._pupil_remote is not None: self._pupil_remote.cleanup() self._pupil_remote = None
[docs] def isConnected(self) -> bool: """isConnected returns whether the ioHub EyeTracker Device is connected to Pupil Capture or not. A Pupil Core headset must be connected and working properly for any of the Common Eye Tracker Interface functionality to work. Args: None :return: bool: True = the eye tracking hardware is connected. False otherwise. """ return self._pupil_remote is not None
[docs] def runSetupProcedure(self, calibration_args: Optional[Dict] =None) -> int: """ The runSetupProcedure method starts the Pupil Capture calibration choreography. .. note:: This is a blocking call for the PsychoPy Process and will not return to the experiment script until the calibration procedure was either successful, aborted, or failed. :param calibration_args: This argument will be ignored and has only been added for the purpose of compatibility with the Common Eye Tracker Interface :return: - :py:attr:`.EyeTrackerConstants.EYETRACKER_OK` if the calibration was succesful - :py:attr:`.EyeTrackerConstants.EYETRACKER_SETUP_ABORTED` if the choreography was aborted by the user - :py:attr:`.EyeTrackerConstants.EYETRACKER_CALIBRATION_ERROR` if the calibration failed, check logs for details - :py:attr:`.EyeTrackerConstants.EYETRACKER_ERROR` if any other error occured, check logs for details """ self._pupil_remote.start_calibration() logger.info("Waiting for calibration to complete") try: for topic, payload in self._pupil_remote.fetch(endless=True): if topic.startswith("notify.calibration"): if topic.endswith("successful"): return EyeTrackerConstants.EYETRACKER_OK elif topic.endswith("stopped"): return EyeTrackerConstants.EYETRACKER_SETUP_ABORTED elif topic.endswith("failed"): print2err(f"Calibration failed: {payload}") return EyeTrackerConstants.EYETRACKER_CALIBRATION_ERROR elif "setup" in topic or "should" in topic or "start" in topic: # ignore setup data notification (includes raw reference and # pupil data that can be used to reproduce the calibration), # and calibration should_start/stop + started notifications. pass else: print2err(f"Handling for {topic} not implemented ({payload})") except Exception: print2err("Error during runSetupProcedure") printExceptionDetailsToStdErr() return EyeTrackerConstants.EYETRACKER_ERROR
[docs] def setRecordingState(self, should_be_recording: bool) -> bool: """The setRecordingState method is used to start or stop the recording and transmission of eye data from the eye tracking device to the ioHub Process. If the ``pupil_capture_recording.enabled`` runtime setting is set to ``True``, a corresponding raw recording within Pupil Capture will be started or stopped. ``should_be_recording`` will also be passed to :py:func:`.EyeTrackerDevice.enableEventReporting`. Args: recording (bool): if True, the eye tracker will start recordng data.; false = stop recording data. :return: bool: the current recording state of the eye tracking device """ if not self.isConnected(): return False if self._capture_recording_enabled: if should_be_recording: self._pupil_remote.start_recording( rec_name=self._capture_recording_location ) else: self._pupil_remote.stop_recording() self._actively_recording = self._pupil_remote.is_recording else: self._actively_recording = should_be_recording is_recording_enabled = self.isRecordingEnabled() if not is_recording_enabled: self._latest_sample = None self._latest_gaze_position = None return EyeTrackerDevice.enableEventReporting(self, self._actively_recording)
[docs] def isRecordingEnabled(self) -> bool: """The isRecordingEnabled method indicates if the eye tracker device is currently recording data. :return: ``True`` == the device is recording data; ``False`` == Recording is not occurring """ if not self.isConnected(): return False return self._actively_recording
[docs] def getLastSample(self) -> Union[ None, "psychopy.iohub.devices.eyetracker.MonocularEyeSampleEvent", "psychopy.iohub.devices.eyetracker.BinocularEyeSampleEvent", ]: """The getLastSample method returns the most recent eye sample received from the Eye Tracker. The Eye Tracker must be in a recording state for a sample event to be returned, otherwise None is returned. :return: - MonocularEyeSampleEvent: Gaze mapping result from a single pupil detection. Only emitted if a second eye camera is not being operated or the confidence of the pupil detection was insufficient for a binocular pair. See also this high-level overview of the `Pupil Capture Data Matching algorithm <https://github.com/N-M-T/pupil-docs/commit/1dafe298565720a4bb7500a245abab7a6a2cd92f>`_ - BinocularEyeSample: Gaze mapping result from two combined pupil detections - None: If the eye tracker is not currently recording data. """ return self._latest_sample
[docs] def getLastGazePosition(self) -> Optional[Tuple[float,float]]: """The getLastGazePosition method returns the most recent eye gaze position received from the Eye Tracker. This is the position on the calibrated 2D surface that the eye tracker is reporting as the current eye position. The units are in the units in use by the ioHub Display device. If binocular recording is being performed, the average position of both eyes is returned. If no samples have been received from the eye tracker, or the eye tracker is not currently recording data, None is returned. :return: - None: If the eye tracker is not currently recording data or no eye samples have been received. - tuple: Latest (gaze_x,gaze_y) position of the eye(s) """ return self._latest_gaze_position
def _poll(self): if not self.isConnected(): return logged_time = Computer.getTime() for topic, payload in self._pupil_remote.fetch(): if topic.startswith("gaze.3d."): self._cache_gaze_datum(topic, payload) if topic == self.surface_topic: for gaze_on_surface in payload["gaze_on_surfaces"]: if gaze_on_surface["on_surf"] is not True: continue if gaze_on_surface["confidence"] < self._confidence_threshold: continue gaze_datum = self._lookup_corresponding_gaze_datum(gaze_on_surface) if gaze_datum is None: continue # Skip surface-mapped gaze if no gaze is avalable self._add_gaze_sample( gaze_on_surface_datum=gaze_on_surface, gaze_datum=gaze_datum, logged_time=logged_time, ) if ( topic.startswith("pupil") and payload["confidence"] < self._confidence_threshold ): self._add_pupil_sample(pupil_datum=payload, logged_time=logged_time) def _lookup_corresponding_gaze_datum(self, gaze_on_surface): gaze_topic, gaze_timestamp = gaze_on_surface["base_data"] gaze_bisector = self._gaze_bisectors_by_topic[gaze_topic] try: return gaze_bisector.find_datum_by_timestamp(timestamp=gaze_timestamp) except DatumNotFoundError: pass def _cache_gaze_datum(self, topic, payload): gaze_bisector = self._gaze_bisectors_by_topic[topic] gaze_bisector.insert(datum=payload, timestamp=payload["timestamp"]) # Remove gaze datums older than 3sec (200Hz) if len(gaze_bisector) >= 200 * 3: gaze_bisector.delete(index=0) def _add_gaze_sample(self, gaze_on_surface_datum, gaze_datum, logged_time): native_time = gaze_datum["timestamp"] iohub_time = self._trackerTimeInPsychopyTime(native_time) metadata = { "experiment_id": 0, # experiment_id, iohub fills in automatically "session_id": 0, # session_id, iohub fills in automatically "device_id": 0, # device_id, keep at 0 "event_id": Device._getNextEventID(), # iohub event unique ID "device_time": native_time, "logged_time": logged_time, "time": iohub_time, "confidence_interval": -1.0, "delay": (logged_time - iohub_time), "filter_id": False, } position = self._gaze_in_display_coords(gaze_on_surface_datum) sample = data_parse.eye_sample_from_gaze_3d(position, gaze_datum, metadata) self._addNativeEventToBuffer(sample) self._latest_sample = sample self._latest_gaze_position = position def _add_pupil_sample(self, pupil_datum, logged_time): native_time = pupil_datum["timestamp"] iohub_time = self._trackerTimeInPsychopyTime(native_time) metadata = { "experiment_id": 0, # experiment_id, iohub fills in automatically "session_id": 0, # session_id, iohub fills in automatically "device_id": 0, # device_id, keep at 0 "event_id": Device._getNextEventID(), # iohub event unique ID "device_time": native_time, "logged_time": logged_time, "time": iohub_time, "confidence_interval": -1.0, "delay": (logged_time - iohub_time), "filter_id": False, } sample = data_parse.eye_sample_from_pupil(pupil_datum, metadata) self._addNativeEventToBuffer(sample) self._latest_sample = sample def _gaze_in_display_coords(self, gaze_on_surface_datum): gaze_x, gaze_y = gaze_on_surface_datum["norm_pos"] width, height = self._display_device.getPixelResolution() # normalized to pixel coordinates: gaze_in_display_coords = int(gaze_x * width), int((1.0 - gaze_y) * height) return self._eyeTrackerToDisplayCoords(gaze_in_display_coords) def _psychopyTimeInTrackerTime(self, psychopy_time): return psychopy_time + self._pupil_remote.psychopy_pupil_clock_offset def _trackerTimeInPsychopyTime(self, tracker_time): return tracker_time - self._pupil_remote.psychopy_pupil_clock_offset def _close(self): """Do any final cleanup of the eye tracker before the object is destroyed.""" self.setRecordingState(False) self.setConnectionState(False) self.__class__._INSTANCE = None super()._close()

Back to top