#!/usr/bin/python3
# -*- coding: utf-8 -*-
import os
import sys
import time
import types
import itertools
import numpy as np
import matplotlib as mp
import matplotlib.pyplot as plt
from PIL import Image
from pandas import DataFrame, read_table
from pandas import __version__ as pandas_version
from distutils.version import LooseVersion
from scipy.io import whosmat, loadmat
[docs]class ImageSet(object):
""" Set of images of equal size for masking and Feature creation.
Attributes:
info (DataFrame): table of image metadata (filenames, size, type...)
imageids (list): All unique image IDs in the set
label (str): optional label to distinguish between ImageSets
mat_var (str): name of MATLAB variable name if imported from .mat
mat_contents (list): list of all variable names in .mat if applicable
normalize (boolean): True if image data was normalized to 0..1 range
preload (boolean): True if images were preloaded into memory
size (tuple): image dimensions, specified as (width, height)
"""
def __init__(self, images, mat_var=None, size=None, imageids=None,
sep='\t', label=None, normalize=None, norm_range=None, preload=False):
""" Create a new ImageSet and add specified images
Args:
images: one of the following:
- path to a single image or .mat file
- path to a folder containing image or .mat files
- list of image or .mat file names
- simple text file, one filename per line
- text / CSV file containing columns 'filename' and 'imageid'
mat_var (str): variable to use if _images_ is a MATLAB file
size (tuple): image dimensions, specified as (width, height) in pixels
imageids (list): string ID labels, one for each image. If not specified,
file names without extension or a numerical label 0..n will be used
sep (str): if _images_ is a text file, use this separator
label (string): optional descriptive label
normalize (boolean): normalize image color / luminance values to 0...1
(defaults to True for images, False for MATLAB data)
norm_range (tuple): normalization range. Defaults to (0, 255) for image
files and per-image (min, max) for data read from MATLAB files
preload (boolean): if True, load all images at creation time
(faster, but uses a lot of memory)
"""
self.imageids = []
self._images = {}
# DF to hold image metadata
df_col = ['imageid', 'filename', 'width', 'height', 'channels', 'type', 'mat_var']
self.info = DataFrame(columns=df_col)
self.size = None
self.label = label
self.normalize = normalize
self.norm_range = norm_range
self.preload = preload
self._last_image = None
self._last_imageid = None
self.mat_var = None
self.mat_contents = None
if size is not None:
self.size = tuple(size)
self._add_images(images, mat_var, imageids, sep)
def __repr__(self):
""" Short string representation for printing """
# Image size
if self.size is None:
size = 'undefined'
else:
size = str(self.size)
# Number of images
s = ''
if len(self.imageids) != 1:
s = 's'
desc = ''
if self.label is not None:
desc = ' "{:s}"'.format(str(self.label))
mat = ''
if self.mat_var is not None:
mat = ', mat_var={:s}'.format(self.mat_var)
norm = ''
if self.normalize:
norm = ', normalized'
r = '<gridfix.ImageSet{:s}, {:d} image{:s}, size={:s}{:s}{:s}>'
return r.format(desc, len(self.imageids), s, size, mat, norm)
def __len__(self):
""" Overload len(ImageSet) to report number of images. """
return len(self.imageids)
def __iter__(self):
""" Overload iteration to step through the ndarray representations of images. """
return iter([np.array(i) for i in self.images.keys()])
def __getitem__(self, imageid):
""" Allow to retrieve image by bracket indexing """
return self.image(imageid)
def _add_images(self, images, mat_var=None, imageids=None, sep='\t'):
""" Add one or more image(s) to set.
Args:
images: one of the following:
- path to a single image file
- path to a folder containing image files
- list of image file names
- simple text file, one image filename per line
- text / CSV file containing columns 'filename' and 'imageid'
mat_var (str): variable to use if _images_ is a MATLAB file
imageids (list): string ID labels, one for each image. If not specified,
file names without extension or a numerical label 0..n will be used
sep (str): if _images_ is a text file, use this separator
"""
filelist = []
if imageids is None:
imageids = []
img_root = os.getcwd()
# Build file list
if type(images) == list:
filelist = images
elif type(images) == str:
if os.path.isdir(images):
# Directory
filelist = [os.path.join(images, b) for b in sorted(os.listdir(images))]
elif os.path.isfile(images):
# Single file - check if it is a text list of images!
(ifname, ifext) = os.path.splitext(images)
if ifext.lower() in ['.txt', '.tsv', '.csv', '.dat']:
# assume image list
try:
imgfiles = read_table(images, header=None, index_col=False, sep=sep)
if imgfiles.shape[1] == 1:
# Only one column: assume no headers and load as list of filenames
filelist = list(imgfiles.ix[:, 0])
elif imgfiles.shape[1] > 1:
# More than one column: look for 'imageid' and 'filename' columns
if 'imageid' in list(imgfiles.ix[0, :]) and 'filename' in list(imgfiles.ix[0, :]):
imgfiles.columns = imgfiles.ix[0, :]
imgfiles = imgfiles.ix[1:, :]
filelist = list(imgfiles['filename'])
imageids = list(imgfiles['imageid'])
lfolder, lname = os.path.split(images)
if len(lfolder) > 0:
img_root = os.path.abspath(lfolder)
except:
raise ValueError('could not read image list file, check format!')
else:
# assume single image file
filelist = [images]
else:
raise ValueError('first argument must be list or a string containing a file or directory!')
# Verify image files
filetable = []
for (idx, ifile) in enumerate(filelist):
try:
imeta = self._verify_image(ifile, mat_var, img_root)
# assign imageid
(ifdir, iffile) = os.path.split(ifile)
(ifbase, ifext) = os.path.splitext(iffile)
if imageids is not None and len(imageids) > 0:
imageid = imageids.pop(0)
else:
imageid = iffile
imeta['imageid'] = imageid
if imageid in self.imageids:
print('Warning: replacing existing image with ID <{:s}>!'.format(imageid))
else:
self.imageids.append(imageid)
filetable.append(imeta)
except ValueError as err:
print('Warning: file {:s} could not be added, error: {:s}!'.format(ifile, str(err)))
df_col = ['imageid', 'filename', 'width', 'height', 'channels', 'type', 'mat_var']
self.info = DataFrame(filetable, columns=df_col)
self.imageids = list(self.info.imageid)
# Preload files if requested
for imid in self.imageids:
if self.preload:
self._images[imid] = self._load_image(imid)
else:
self._images[imid] = None
[docs] def image(self, imageid):
""" Get image by imageid, loading it if not preloaded
Args:
imageid (str): valid imageid from set
Returns:
ndarray of raw image data
"""
if imageid not in self.imageids:
raise ValueError('Specified _imageid_ does not exist!')
if self._images[imageid] is not None:
return self._images[imageid]
else:
if self._last_imageid == imageid:
return self._last_image
else:
return self._load_image(imageid)
def _load_image(self, imageid):
""" Load and return image data for imageid.
Args:
imageid (str): valid imageid from set
Returns:
ndarray of raw image data
"""
if imageid not in self.imageids:
raise ValueError('Specified _imageid_ does not exist!')
if self._last_imageid is not None and self._last_imageid == imageid:
return self._last_image
imdata = None
imeta = self.info[self.info.imageid == imageid]
if len(imeta) > 0:
if imeta.type.iloc[0] == 'MAT':
mat = loadmat(imeta.filename.iloc[0], variable_names=imeta.mat_var.iloc[0])
imdata = np.asarray(mat[imeta.mat_var.iloc[0]], dtype=float)
else:
i = Image.open(imeta.filename.iloc[0])
if i.mode == 'RGBA':
i = i.convert('RGB') # Drop alpha channel
imdata = np.asarray(i, dtype=float)
if self.normalize is None and imeta.type.iloc[0] != 'MAT':
if self.norm_range is None:
imdata = imdata / 255.0
else:
imdata = (imdata - self.norm_range[0]) / (self.norm_range[1] - self.norm_range[0])
self.normalize = True
elif self.normalize:
if self.norm_range is None:
imdata = (imdata - imdata.min()) / (imdata.max() - imdata.min())
else:
imdata = (imdata - self.norm_range[0]) / (self.norm_range[1] - self.norm_range[0])
self._last_imageid = imageid
self._last_image = imdata
return imdata
def _verify_image(self, image_file, mat_var=None, img_root=None):
""" Verify type and size of image without actually loading data
Args:
image_file (str): path to image file to verify
mat_var (str): optional variable name for MATLAB files
img_root (str): folder containing image list in case file paths are relative
Returns:
dict of metadata, column names as in imageset.info DataFrame
"""
if not os.path.isfile(image_file):
if img_root is not None and os.path.isfile(os.path.join(img_root, image_file)):
image_file = os.path.join(os.path.join(img_root, image_file))
else:
raise ValueError('file not found')
(ifbase, ifext) = os.path.splitext(image_file)
imeta = {'imageid': None,
'filename': '',
'width': -1,
'height': -1,
'channels': -1,
'type': None,
'mat_var': None}
# Matlab files
if ifext.lower() == '.mat':
try:
# Load .mat header and identify variables
tmp = whosmat(image_file)
tmpvars = [m[0] for m in tmp]
if self.mat_contents is None:
self.mat_contents = tmpvars
if mat_var is None:
mat_var = tmpvars[0]
if mat_var in tmpvars:
if self.mat_var is None:
self.mat_var = mat_var
# check image size
imshape = [m[1] for m in tmp if m[0] == mat_var][0]
if self.size is None:
self.size = (imshape[1], imshape[0])
if (imshape[1], imshape[0]) == self.size:
imeta['filename'] = image_file
imeta['width'] = imshape[1]
imeta['height'] = imshape[0]
if len(imshape) > 2:
imeta['channels'] = imshape[2]
else:
imeta['channels'] = 1
imeta['type'] = 'MAT'
imeta['mat_var'] = mat_var
else:
w = 'Warning: skipping {:s} due to image size ({:d}x{:d} instead of {:d}x{:d}).'
print(w.format(iffile, imsize[0], imsize[1], self.size[0], self.size[1]))
else:
raise ValueError('specified MATLAB variable not in file')
except:
raise ValueError('error loading MATLAB data')
# Image files
else:
try:
i = Image.open(image_file)
imsize = i.size
if self.size is None:
self.size = imsize
# check image size
if imsize == self.size:
imeta['filename'] = image_file
imeta['width'] = imsize[0]
imeta['height'] = imsize[1]
if i.mode in ['RGB', 'RGBA']:
imeta['channels'] = 3
else:
imeta['channels'] = 1
imeta['type'] = i.format
imeta['mat_var'] = ''
else:
w = 'Warning: skipping {:s} due to image size ({:d}x{:d} instead of {:d}x{:d}).'
print(w.format(image_file, imsize[0], imsize[1], self.size[0], self.size[1]))
except OSError:
raise ValueError('not an image or file could not be opened.')
return imeta
[docs] def plot(self, imageid, cmap='gray', image_only=False, ax=None):
""" Display one of the contained images by imageid using matplotlib
Args:
imageid (str): valid imageid of image to show
cmap (str): name of a matplotlib colormap to use
image_only (boolean): if True, return only image content without labels
ax (Axes): axes object to draw on, to include result in other figure
Returns:
matplotlib figure object, or None if passed an axis to draw on
"""
try:
if ax is not None:
ax1 = ax
else:
fig = plt.figure()
ax1 = fig.add_subplot(1,1,1)
ax1.imshow(self.image(imageid), cmap=plt.get_cmap(cmap))
if image_only:
ax1.axis('off')
else:
ax1.set_title(imageid)
if ax is None and not plt.isinteractive():
# Only return figure object in non-interactive mode, otherwise
# IPython/Jupyter will display the figure twice (once while plotting
# and once as the cell result)!
return fig
except KeyError:
raise ValueError('No image with ID "{:s}" in set'.format(imageid))
[docs]class Fixations(object):
""" Dataset of fixation locations.
Fixation locations are assumed to be one-indexed in input, e.g. 1..800 if
the image is 800 pixels wide, and converted internally to Pythons zero-indexed
array convention.
Attributes:
data (DataFrame): DataFrame of raw fixation data
has_times (boolean): True if fixation times have been loaded
imageids (list): all unique image IDs represented in the dataset
imageset (ImageSet): if present, the associated ImageSet
input_file (str): file name of fixation data file
num_samples (int): number of fixation samples
num_vars (int): number of columns / variables in dataset
offset (tuple): optional offset from raw (x,y) positions
shape (tuple): dimensions of .data, i.e. same as Fixations.data.shape
variables (list): list of all variables loaded from input file
"""
def __init__(self, data, imageset=None, offset=(0, 0), imageid='imageid',
fixid='fixid', x='x', y='y', fixstart=None, fixend=None,
numericid=False):
""" Create new Fixations dataset and calculate defaults.
Args:
data: a predefined DataFrame or name of a file containing fixation report
imageset (ImageSet): if present, verify imageids against this ImageSet
offset (tuple): an (x, y) tuple of pixel values to offset fixations. E.g.,
if fixations have their origin at image center, use (-width/2, -height/2)
imageid (str): name of data file column containing imageids
fixid (str): name of data file column with unique fixation ID / index
x (str): name of data file column for horizontal fixation locations
y (str): name of data file column for vertical fixation locations
fixstart (str): name of data file column containing fixation start time
fixend (str): name of data file column containing fixation end time
numericid (boolean): if True, try to force parsing imageid as numeric
"""
self.data = DataFrame()
if isinstance(data, DataFrame):
# data is already a DataFrame
self.data = data
self.input_file = None
else:
try:
self.data = read_table(data, index_col=False)
self.input_file = data
except IOError:
raise IOError('Could not load fixation data, check file name and type!')
# Internal column names
self._imageid = imageid
self._fixid = fixid
self._x = x
self._y = y
self._xpx = self._x + '_PX'
self._ypx = self._y + '_PX'
self._fixstart = fixstart
self._fixend = fixend
self._fixdur = '__FIXDUR'
# Verify that all required columns are present
cols = [imageid, fixid, x, y]
miss_cols = []
for c in cols:
if c not in self.data.columns.values:
miss_cols.append(c)
if len(miss_cols) > 0:
raise ValueError('Missing columns ({:s}), please specify column names!'.format(str(miss_cols)))
# Image ID column should always be converted to strings
if numericid:
self.data[self._imageid] = self.data[self._imageid].astype(int).astype(str)
else:
self.data[self._imageid] = self.data[self._imageid].astype(str)
self.imageids = list(self.data[self._imageid].unique())
self.shape = self.data.shape
self.num_samples = self.shape[0]
self.num_vars = self.shape[1]
self.variables = list(self.data.columns.values)
# Fixation timing columns (optional, default: not specified)
self.has_times = False
if fixstart is not None and fixend is not None:
if fixstart not in self.data.columns.values:
raise ValueError('Unknown column specified for fixation start time: "{:s}"'.format(fixstart))
if fixend not in self.data.columns.values:
raise ValueError('Unknown column specified for fixation end time: "{:s}"'.format(fixend))
self.has_times = True
self.data[self._fixdur] = self.data[self._fixend] - self.data[self._fixstart]
else:
if (fixstart is None) - (fixend is None) != 0:
raise ValueError('Optional timing columns (fixstart, fixend) must be specified together!')
# If ImageSet provided, check if all images are present
self.imageset = None
if imageset is not None:
self.imageset = imageset
missing_imgs = []
for im in self.imageids:
if im not in imageset.imageids:
missing_imgs.append(im)
if len(missing_imgs) > 0:
print('Warning: the following images appear in the fixation data but not the speficied ImageSet: {:s}'.format(', '.join(missing_imgs)))
# Set offset and calculate pixel indices (_xpx, _ypx)
self.offset = (0, 0)
self.set_offset(offset)
def __repr__(self):
""" String representation """
r = '<gridfix.Fixations data set, {:d} samples, {:d} images>'.format(self.num_samples, len(self.imageids))
if self.imageset is not None:
r += '\nImages:\n\t{:s}'.format(str(self.imageset))
return r
def __len__(self):
""" Overload len() to report the number of samples """
return self.data.shape[0]
def __getitem__(self, imageid):
""" Bracket indexing returns all fixations for a specified image """
return self.select_fix(select={'imageid': imageid})
[docs] def set_offset(self, offset):
""" Set a constant offset for eye x/y coordinates.
If image coordinates are relative to image center, use (-width/2, -height/2)
(GridFix uses a coordinate origin at the top left).
Args:
offset (tuple): 2-tuple of (hor, ver) offset values in pixels
"""
# Reset previous offset
prevoffset = self.offset
if prevoffset[0] != 0.0 or prevoffset[1] != 0.0:
self.data[self._x] = self.data[self._x] - prevoffset[0]
self.data[self._y] = self.data[self._y] - prevoffset[1]
self.data[self._x] = self.data[self._x] + offset[0]
self.data[self._y] = self.data[self._y] + offset[1]
self.offset = (offset[0], offset[1])
# Round x/y fixation positions to integers (pixels) while keeping original data
# Note: we're going to use these as indices and Python is 0-indexed, so subtract 1!
self.data[self._xpx] = np.asarray(np.round(self.data[self._x]), dtype=int) - 1
self.data[self._ypx] = np.asarray(np.round(self.data[self._y]), dtype=int) - 1
[docs] def select_fix(self, select={}):
""" Return a subset of fixation data for specified imageid.
Args:.
select (dict): dict of filter variables, as {column: value}
Returns:
New Fixations object containing selected fixations only
"""
if self._imageid not in select.keys():
print('Warning: no image ID in filter variables, selection will yield fixations from multiple images! Proceed at own risk.')
if select[self._imageid] not in self.data[self._imageid].values:
print('Warning: zero fixations selected for specified imageid ({:s})'.format(select[self._imageid]))
selection = self.data
if len(select) > 0:
for col, target in select.items():
if col not in selection.columns.values:
print('Warning: filter variable {:s} not found in Fixations dataset!'.format(col))
else:
# Make sure dict value is list-like
if type(target) not in (tuple, list):
target = [target]
selection = selection[selection[col].isin(target)]
result = Fixations(selection.copy(), imageid=self._imageid, fixid=self._fixid,
x=self._x, y=self._y, imageset=self.imageset,
fixstart=self._fixstart, fixend=self._fixend)
return result
[docs] def plot(self, imageid=None, select={}, on_image=True, oob=False,
plotformat='wo', durations=False, image_only=False, ax=None):
""" Plot fixations for selected imageid, either alone or on image
Args:
imageid (str): optional image ID to plot fixations for
select (dict): dict of additional filter variables (see select_fix())
image (bool): if True, superimpose fixations onto image (if ImageSet present)
oob (bool): if True, include out-of-bounds fixations when plotting
plotformat (str): format string for plt.pyplot.plot(), default: white circles
durations (bool): if True, plot duration of each fixation next to marker
image_only (boolean): if True, return only image content without labels
ax (Axes): axes object to draw to, to include result in other figure
Returns:
matplotlib figure object, or None if passed an axis to draw on
"""
if imageid is not None:
if imageid not in select.keys():
select[self._imageid] = imageid
plotfix = self.select_fix(select)
else:
plotfix = self
if ax is not None:
ax1 = ax
else:
fig = plt.figure()
ax1 = fig.add_subplot(1,1,1)
try:
if on_image:
ax1.imshow(self.imageset.image(imageid), origin='upper')
except AttributeError:
print('Warning: cannot view fixations on image due to missing ImageSet!')
if oob:
ax1.plot(plotfix.data[self._xpx], plotfix.data[self._ypx], plotformat)
if durations:
for r in plotfix.data.iterrows():
x = r[1][self._xpx]
if r[1][self._ypx] > 15:
y = r[1][self._ypx] - 15
else:
y = r[1][self._ypx] + 5
d = r[1][self._fixdur]
ax1.text(x, y, str(d), horizontalalignment='center')
else:
try:
if self.imageset is not None:
size = self.imageset.size
else:
size = (max(plotfix.data[self._xpx]), max(plotfix.data[self._ypx]))
fix = plotfix.data[(plotfix.data[self._xpx] >= 0) &
(plotfix.data[self._xpx] < size[0]) &
(plotfix.data[self._ypx] >= 0) &
(plotfix.data[self._ypx] < size[1])]
ax1.plot(fix[self._xpx], fix[self._ypx], plotformat)
if durations:
for r in fix.iterrows():
x = r[1][self._xpx]
if r[1][self._ypx] > 15:
y = r[1][self._ypx] - 15
else:
y = r[1][self._ypx] + 5
d = r[1][self._fixdur]
ax1.text(x, y, str(d), horizontalalignment='center')
ax1.set_xlim((0, size[0]))
ax1.set_ylim((0,size[1]))
ax1.invert_yaxis()
if image_only:
ax1.axis('off')
else:
ax1.set_title(imageid)
except AttributeError:
print('Warning: cannot filter fixations for image boundaries due to missing ImageSet!')
ax1.plot(plotfix[self._xpx], plotfix[self._ypx], plotformat)
ax1.invert_yaxis()
if ax is None and not plt.isinteractive(): # see ImageSet.plot()
return fig
[docs] def location_map(self, imageid=None, size=None):
""" Binary ndarray of fixated and non-fixated pixels within image area
Args:
imageid (str): optional image ID to create map for one image only
size (tuple): image dimensions, specified as (width, height).
Returns:
2d boolean ndarray, True where fixated, otherwise False
"""
if imageid is not None:
mapfix = self.select_fix({self._imageid: imageid})
else:
mapfix = self
if size is None:
if self.imageset is not None:
size = self.imageset.size
else:
raise ValueError('Image size or attached ImageSet are necessary for location mapping!')
fixloc = np.zeros((size[1], size[0]), dtype=bool)
fix = mapfix.data[(self.data[self._xpx] >= 0) & (self.data[self._xpx] < size[0]) &
(self.data[self._ypx] >= 0) & (self.data[self._ypx] < size[1])]
fixloc[fix[self._ypx], fix[self._xpx]] = True
return fixloc
[docs] def count_map(self, imageid=None, size=None):
""" Map of fixation counts for each image pixel
Args:
imageid (str): optional image ID to create map for one image only
size (tuple): image dimensions, specified as (width, height).
Returns:
2d ndarray of pixel fixation counts
"""
if imageid is not None:
mapfix = self.select_fix({self._imageid: imageid})
else:
mapfix = self
if size is None:
if self.imageset is not None:
size = self.imageset.size
else:
raise ValueError('Image size or attached ImageSet are necessary for location mapping!')
fixcount = np.zeros((size[1], size[0]), dtype=int)
fix = mapfix.data[(self.data[self._xpx] >= 0) & (self.data[self._xpx] < size[0]) &
(self.data[self._ypx] >= 0) & (self.data[self._ypx] < size[1])]
fixloc = fix[[self._ypx, self._xpx]].as_matrix()
for pos in fixloc:
fixcount[pos[0], pos[1]] += 1
return fixcount
[docs] def dur_map(self, imageid=None, size=None):
""" Map of total fixation durations for each image pixel
Args:
imageid (str): optional image ID to create map for one image only
size (tuple): image dimensions, specified as (width, height).
Returns:
2d ndarray of fixation durations at each pixel
"""
if imageid is not None:
mapfix = self.select_fix({self._imageid: imageid})
else:
mapfix = self
if size is None:
if self.imageset is not None:
size = self.imageset.size
else:
raise ValueError('Image size or attached ImageSet are necessary for location mapping!')
durmap = np.zeros((size[1], size[0]), dtype=float)
fix = mapfix.data[(self.data[self._xpx] >= 0) & (self.data[self._xpx] < size[0]) &
(self.data[self._ypx] >= 0) & (self.data[self._ypx] < size[1])]
for r in fix.iterrows():
durmap[r[1][self._ypx], r[1][self._xpx]] += r[1][self._fixdur]
return durmap
[docs]class FixationModel(object):
""" Combines Features and Fixations to create predictors and R source for GLMM
Attributes:
chunks (list): list of data columns that define chunks (e.g., subjects or sessions)
comp_features (dict): dict of labelled feature comparisons. Predictors will be replicated
for each feature in a comparison so that features can serve as fixed or random factor
exclude_first_fix (boolean): if True, first fixation index was set NaN (e.g.,, fixation cross)
features (dict): dictionary of feature objects and feature groups
predictors (DataFrame): model predictors for GLMM
regionset (RegionSet): attached RegionSet
runtime (float): time in seconds for most recent update of predictor matrix
normalize_features (bool): if True, feature values are normalized to 0..1 range
"""
def __init__(self, fixations, regionset, dv_type='fixated', features=None, feature_labels=None,
chunks=[], progress=False, exclude_first_fix=False, normalize_features=False):
""" Create a new FixationModel.
Args:
fixations (Fixations): fixation data to use as model DV (column 'fixation')
regionset (RegionSet): a RegionSet object defining length of all features
dv_type (str): type of DV to generate, or list of multiple options:
'fixated': binary coding of fixated (1) and unfixated (0) regions
'count': absolute fixation count for each region
features (list): list of Feature objects to add (use add_comparison for feature groups)
feature_labels (list): string labels to apply to features defined using features= attribute
chunks (list): list of fixation data columns that define chunks (e.g., subjects or sessions)
progress (bool): print current image and group variables to indicate model build progress
exclude_first_fix (bool): if True, set first fixated region per image to NaN for GLMM
normalize_features (bool): if True, normalize feature values to 0..1 range
"""
self.regionset = regionset
self._fix = fixations
self._pred = DataFrame()
self._consistent = False # Flags whether we need to rebuild predictor matrix
self.features = {}
self.comp_features = {}
self.normalize_features = normalize_features
self.exclude_first_fix = exclude_first_fix
if type(dv_type) != list:
dv_type = [dv_type,]
for dvt in dv_type:
if dvt not in ['fixated', 'count']:
raise ValueError('Error: unknown DV type specified: "{:s}"!'.format(dvt))
self.dv_type = dv_type
# Make sure imageid is always a chunking variable and all chunk vars exist
if self._fix._imageid not in chunks:
chunks.append(self._fix._imageid)
for var in chunks:
if var not in self._fix.variables:
raise ValueError('Error: chunking variable "{:s}" does not exist in dataset!'.format(var))
self.chunks = chunks
self.progress = progress
# Add any specified features to model
if features:
if type(features) != list and type(features) != tuple:
features = [features,] # force list of features
for f in features:
if feature_labels is not None and len(feature_labels) > 0:
label = feature_labels.pop(0)
elif f.label is not None:
label = f.label
else:
label = self._feat_label(f)
self.add_feature(f, label=label)
self.update(progress=self.progress)
def _feat_label(self, feature):
""" Create label for unlabeled Feature object."""
cls = feature.__class__.__name__
label = 'f' + cls[0:5]
f_label = label
suffix = 1
while f_label in self.features.keys():
f_label = label + str(suffix)
suffix += 1
return f_label
@property
def predictors(self):
""" Model predictor matrix, updated if necessary """
if not self._consistent:
self.update()
return self._pred
def __repr__(self):
""" String representation for print summary. """
if not self._consistent:
self.update()
r = '<gridfix.FixationModel, {:d} samples, DV={:s}, chunked by: {:s}>\n'.format(self.predictors.shape[0], self.dv_type, str(self.chunks))
r += 'Fixations:\n\t{:s}\n'.format(str(self._fix))
r += 'Regions:\n\t{:s}\n'.format(str(self.regionset))
if len(self.features) > 0:
r += '\nFeatures:\n'
for l,f in self.features.items():
r += '\t{:s}\t{:s}\n'.format(l, f.__class__.__name__)
if len(self.comp_features) > 0:
r += '\nFeature Comparisons:\n'
for l,f in self.comp_features.items():
r += '{:s}:\n'.format(l)
for code, feat in f.items():
r += '\t{:s}\t{:s}\n'.format(str(code), feat.__class__.__name__)
return(r)
[docs] def r_source(self, datafile='gridfix.csv', comments=True, scale=True, center=True,
optimizer=None, fixed=None, random=None, random_slopes=False):
""" Generate R source code from current feature settings.
Args:
datafile (str): predictor matrix file name (for R import via read.table())
comments (boolean): if True, add explanatory comments and headers to source
scale (boolean): if True, add code to scale (normalize) feature predictors
center (boolean): if True, add code to center (demean) feature predictors
optimizer (str): optional optimizer to pass to R glmer()
fixed (list): list of column names (strings) to add as fixed factors
random (list): list of column names (strings) to add as random factors
random_slopes (boolean): also add random slopes to generated R code
Returns:
R source code as string
"""
r_libs = ['lme4']
src = ''
if comments:
d = time.strftime('%d.%m.%y, %H:%M:%S', time.localtime())
src = '# GridFix GLMM R source, generated on {:s}\n# \n'.format(d)
src += '# Predictor file:\t{:s}\n'.format(datafile)
src += '# Fixations file:\t{:s}\n'.format(str(self._fix.input_file))
src += '# RegionSet:\t\t{:s}\n'.format(str(self.regionset))
src += '# DV type(s):\t\t{:s}\n'.format(str(self.dv_type))
src += '\n'
# Libraries
for lib in r_libs:
src += 'library({:s})\n'.format(lib)
src += '\n'
# Predictor file
src += 'gridfixdata <- read.table("{:s}", header=T, sep="\\t", row.names=NULL)\n\n'.format(datafile)
# Factors
if comments:
src += '# Define R factors for all chunking variables and group dummy codes\n'
for chunk in self.chunks:
src += 'gridfixdata${:s} <- as.factor(gridfixdata${:s})\n'.format(chunk, chunk)
if len(self.comp_features) > 0:
for cf in self.comp_features.keys():
src += 'gridfixdata${:s} <- as.factor(gridfixdata${:s})\n'.format(cf, cf)
src += '\n'
# Center and scale
if scale or center:
if len(self.features) > 0:
r_cent = 'FALSE'
r_scal = 'FALSE'
if center:
r_cent = 'TRUE'
if scale:
r_scal = 'TRUE'
if comments:
src += '# Center and scale predictors\n'
for f in self.features.keys():
src += 'gridfixdata${:s}_C <- scale(gridfixdata${:s}, center={:s}, scale={:s})\n'.format(f, f, r_cent, r_scal)
src += '\n'
# GLMM model formula (DV is set later)
formula = '{:s} ~ 1'
if fixed is None:
# Best guess: all simple features should be fixed factors!
if len(self.features) > 0:
fixed = self.features.keys()
else:
fixed = []
fixed_vars = ''
for f in fixed:
try:
# likely a feature object
fl = f.label
except AttributeError:
# text label specified
fl = f
if scale or center:
fixed_vars += ' + {:s}_C '.format(fl)
else:
fixed_vars += ' + {:s} '.format(fl)
formula += fixed_vars
if random is None:
# imageid should be a random factor by default
random = [self._fix._imageid]
if len(fixed) > 0:
for r in random:
try:
# likely a feature object
rl = r.label
except AttributeError:
rl = r
if random_slopes:
formula += ' + (1{:s} | {:s})'.format(fixed_vars, rl)
else:
formula += ' + (1 | {:s})'.format(rl)
# Optimizer parameter
opt_call = ''
if optimizer is not None:
opt_call = ', control=glmerControl(optimizer="{:s}")'.format(optimizer)
# GLMM model call(s) - one per requested DV
if comments:
src += '# NOTE: this source code can only serve as a scaffolding for your own analysis!\n'
src += '# You MUST adapt the GLMM model formula below to your model, then uncomment the corresponding line!\n'
models = []
for current_dv in self.dv_type:
if current_dv == 'fixated':
model_fam = 'binomial'
model_dv = 'dvFix'
elif current_dv == 'count':
model_fam = 'poisson'
model_dv = 'dvCount'
models.append('model.{:s}'.format(current_dv))
if comments:
src += '# DV: {:s}\n#'.format(current_dv)
src += 'model.{:s} <- glmer({:s}, data=gridfixdata{:s}, family={:s})\n\n'.format(current_dv, formula.format(model_dv), opt_call, model_fam)
out_f, ext = os.path.splitext(datafile)
r_objlist = ','.join(['"{:s}"'.format(a) for a in models])
src += 'save(file="{}_GLMM.Rdata", list = c({:s}))\n\n'.format(out_f, r_objlist)
src += 'print(summary(model))\n'
return src
def _process_chunk(self, chunk_vals, data, pred_columns, group_levels):
""" Process a single data chunk. """
if data is not None:
sel_labels = dict(zip(self.chunks, chunk_vals))
imageid = str(sel_labels[self._fix._imageid])
tmpdf = DataFrame(columns=pred_columns, index=range(len(self.regionset[imageid])))
# groupby returns a single string if only one chunk columns is selected
if type(chunk_vals) != tuple:
chunk_vals = (chunk_vals,)
# Fixations
subset = self._fix.select_fix(sel_labels)
# Chunk and region values
for col in self.chunks:
tmpdf[col] = data[col].iloc[0]
# Region ID and numbering
if self.regionset.is_global:
tmpdf.regionid = np.array(self.regionset.info[self.regionset.info.imageid == '*'].regionid, dtype=str)
tmpdf.regionno = np.array(self.regionset.info[self.regionset.info.imageid == '*'].regionno, dtype=int)
else:
tmpdf.regionid = np.array(self.regionset.info[self.regionset.info.imageid == imageid].regionid, dtype=str)
tmpdf.regionno = np.array(self.regionset.info[self.regionset.info.imageid == imageid].regionno, dtype=int)
# Fixated and non-fixated regions
if 'fixated' in self.dv_type:
tmpdf['dvFix'] = self.regionset.fixated(subset, imageid=imageid, exclude_first=self.exclude_first_fix)
if 'count' in self.dv_type:
tmpdf['dvCount'] = self.regionset.fixated(subset, imageid=imageid, exclude_first=self.exclude_first_fix, count=True)
# Simple per-image features
for feat_col, feat in self.features.items():
tmpdf[feat_col] = feat.apply(imageid, normalize=self.normalize_features)
# Feature group comparisons
if len(self.comp_features) > 0:
for levels in group_levels:
for idx, gc in enumerate(self.comp_features.keys()):
tmpdf[gc] = levels[idx]
tmpdf['{:s}_val'.format(gc)] = self.comp_features[gc][levels[idx]].apply(imageid, normalize=self.normalize_features)
return tmpdf
[docs] def update(self, progress=False):
""" Update predictor matrix from features (this may take a while).
Args:
progress (boolean): if True, print model creation progress
"""
ts = time.time()
# Output DF columns
pred_columns = self.chunks + ['regionid', 'regionno', 'dvFix']
pred_columns += list(self.features.keys())
for cf in self.comp_features.keys():
pred_columns += [cf, '{:s}_val'.format(cf)]
pred_new = DataFrame(columns=pred_columns)
splitdata = self._fix.data.groupby(self.chunks)
group_levels = []
if len(self.comp_features) > 0:
groups = [list(f.keys()) for i,f in self.comp_features.items()]
group_levels = list(itertools.product(*groups))
# Process individual chunks
for chunk_vals, data in splitdata:
if progress:
print(chunk_vals)
results = self._process_chunk(chunk_vals, data, pred_columns, group_levels)
pred_new = pred_new.append(results)
self._pred = pred_new
self._consistent = True
self.runtime = time.time() - ts # rebuild duration
[docs] def add_feature(self, feature, label=None):
""" Add a feature to the model.
Args:
feature (Feature): Feature object to add
label (str): label and output column name for this feature
"""
# Generate unique feature label
if label is None:
label = self._feat_label(feature)
# Check feature length
if len(feature) != len(self.regionset):
w = 'Could not add feature "{:s}": invalid length ({:d} instead of {:d})!'
raise ValueError(label, len(feature), len(self.regionset))
self.features[label] = feature
self._consistent = False
[docs] def add_comparison(self, features, codes=None, label=None):
""" Add a feature comparison group to the model.
This generates a long-style predictor matrix for the specified features,
needed to compare e.g. different saliency maps in their relative effects.
Args:
features (list): list of Feature objects to combine into a group
codes (list): numeric codes to use in "dummy coding", e.g. [0, 1, 2]
label (str): label and output column name for this feature group
"""
# Generate unique group label
if label is None:
suffix = 1
g_label = 'fC' + str(suffix)
while g_label in self.comp_features.keys():
suffix += 1
g_label = 'fC' + str(suffix)
# Generate dummy codes if necessary
if codes is None:
codes = range(0, len(features))
else:
codes = [int(c) for c in codes]
comp = {codes[c]: f for c,f in enumerate(features)}
self.comp_features[g_label] = comp
self._consistent = False
[docs] def save(self, basename, sep='\t', pred=True, pred_pickle=False, src=True, src_comments=True, precision=10,
optimizer=None, fixed=None, random=None, random_slopes=False):
""" Saves the predictor matrix to a CSV text file.
Args:
basename (str): base filename to save to, without extension
sep (str): item separator, default TAB
pred (boolean): if True, output predictor matrix as CSV
pred_pickle (boolean): if True, also save predictors to Pickle object
src (boolean): if True, output r source code file for lme4
src_comments (boolean): if True, add comments to source code
precision (int): number of decimal places for CSV (default: 10)
optimizer (str): optional optimizer to pass to R glmer()
fixed (list): list of column names (strings) to add as fixed factors
random (list): list of column names (strings) to add as random factors
random_slopes (boolean): also add random slopes to generated R code
"""
if not self._consistent:
self.update()
if pred:
if LooseVersion(pandas_version) >= LooseVersion('0.17.1'):
# compression supported from 0.17.1
f_pred = '{:s}.csv.gz'.format(basename)
self.predictors.to_csv(f_pred, sep, index=False, float_format='%.{:d}f'.format(precision), compression='gzip')
else:
f_pred = '{:s}.csv'.format(basename)
self.predictors.to_csv(f_pred, sep, index=False, float_format='%.{:d}f'.format(precision))
if pred_pickle:
f_pred = '{:s}.pkl'.format(basename)
self.predictors.to_pickle(f_pred)
if src:
f_src = '{:s}.R'.format(basename)
src = self.r_source(comments=src_comments, datafile=f_pred, optimizer=optimizer,
fixed=fixed, random=random, random_slopes=random_slopes)
with open(f_src, 'w') as sf:
sf.write(src)
if __name__ == '__main__':
print('The gridfix modules cannot be called directly. Please use one of the included tools, e.g. gridmap.')