Writing your custom file wrapper¶
Since 0.5, SpyKING CIRCUS can natively read/write several file formats, in order to ease your sorting workflow. By default, some generic file formats are already implemented (see the documentation on the file formats), but you can also write your own wrapper in order to read/write your own custom datafile.
- Note that we did not used neo, and we recommend not to do so, because your wrapper should have some functionalities not allowed yet by neo:
- it should allow memory mapping, i.e. to read only chunks of your data at a time, slicing either by time or by channels.
- it should read data in their native format, as they will internally be turned into
float32
- it could allow streaming, if data are internally stored in several chunks
To do so, you simply need to create an object that will inherit from the DataFile
object described in circus/files/datafile.py
. The easy thing
to understand the structure is to have a look to circus/files/raw_binary.py
as an example of such a datafile object. If you have questions while writing your
wrapper, do not hesitate to be in touch with us.
The speed of the algorithm may slow down a little, depending on your wrapper. For example, currently, we provide an example of a wrapper based on neuroshare (mcd files). This wrapper is working, but slow and inefficient, because the neuroshare API is slow on its own.
Mandatory attributes¶
Here are the class attributes that you must define:
description = "mydatafile" # Description of the file format
extension = [".myextension"] # extensions allowed
parallel_write = False # can be written in parallel (using the comm object)
is_writable = False # can be written
is_streamable = ['multi-files'] # If the file format can support streams of data ['multi-files' is a default, but can be something else]
_shape = None # The total shape of the data (nb time steps, nb channels) across streams if any
_t_start = None # The global t_start of the data (0 by default)
_t_stop = None # The final t_stop of the data, across all streams if any
_params = {} # The dictionary where all attributes will be saved
Note that the datafile objects has an internal dictionary _params
that contains all the values provided by the Configuration Parser, i.e. read from the parameter file
in the data section. For a given file format, you can specify:
# This is a dictionary of values that need to be provided to the constructor, with the corresponding type
_required_fields = {}
This is the list of mandatory parameters, along with the type, that have to be specify in the parameter file, because they can not be inferred from the header of your data file. For example:
_required_files = {'sampling_rate' : float, 'nb_channels' : int}
Then you can also specify some additional parameters, that may have a default value. If they are not provided in the parameter file, this default value is used. For example:
# This is a dictionary of values that may have a default value, if not provided to the constructor
_default_values = {'gain' : 1.}
At the end, there are 5 mandatory attributes that the code will require for any given file format. Those should be stored in the _params
dictionary:
nb_channels
sampling_rate
data_dtype
dtype_offset
gain
Custom methods¶
Here is the list of the function that you should implement in order to have a valid wrapper
Basics IO¶
You must provide function to open/close the datafile:
def _open(self, mode=''):
'''
This function should open the file
- mode can be to read only 'r', or to write 'w'
'''
raise NotImplementedError('The open method needs to be implemented for file format %s' %self.description)
def _close(self):
'''
This function closes the file
'''
raise NotImplementedError('The close method needs to be implemented for file format %s' %self.description)
Reading values from the header¶
You need to provide a function that will read data from the header of your datafile:
def _read_from_header(self):
'''
This function is called only if the file is not empty, and should fill the values in the constructor
such as _shape. It returns a dictionary, that will be added to self._params based on the constrains given by
required_fields and default_values
'''
raise NotImplementedError('The _read_from_header method needs to be implemented for file format %s' %self.description)
- Such a function must:
- set _shape to (duration, nb_channels)
- set _t_start if not 0
- return a dictionary of parameters that will be used, given the constrains obtained from values in _required_fields and _default_values, to create the DataFile
Reading chunks of data¶
Then you need to provide a function to load a block of data, with a given size:
def read_chunk(self, idx, chunk_size, padding=(0, 0), nodes=None):
'''
Assuming the analyze function has been called before, this is the main function
used by the code, in all steps, to get data chunks. More precisely, assuming your
dataset can be divided in nb_chunks (see analyze) of temporal size (chunk_size),
- idx is the index of the chunk you want to load
- chunk_size is the time of those chunks, in time steps
- if the data loaded are data[idx:idx+1], padding should add some offsets,
in time steps, such that we can load data[idx+padding[0]:idx+padding[1]]
- nodes is a list of nodes, between 0 and nb_channels
'''
raise NotImplementedError('The get_data method needs to be implemented for file format %s' %self.description)
Note that for convenience, in such a function, you can obtained local t_start, t_stop by using the method t_start, t_stop = _get_t_start_t_stop(idx, chunk_size, padding)
(see circus/files/raw_binary.py
for an example). This may be easier to slice your datafile. At the end, data must be returned as float32
, and to do so, you can also use the internal method _scale_data_to_float32(local_chunk)
Writing chunks of data¶
This method is required only if your file format is allowing write access:
def write_chunk(self, time, data):
'''
This function writes data at a given time.
- time is expressed in time step
- data must be a 2D matrix of size time_length x nb_channels
'''
raise NotImplementedError('The set_data method needs to be implemented for file format %s' %self.description)
Streams¶
Depending on the complexity of your file format, you can allow several ways of streaming into your data. The way to define streams is rather simple, and by default, all files format can be streamed
with a mode called multi-files
. This is the former multi-files
mode that we used to have in 0.4 versions (see multi files):
def set_streams(self, stream_mode):
'''
This function is only used for file format supporting streams, and need to return a list of datafiles, with
appropriate t_start for each of them. Note that the results will be using the times defined by the streams.
You can do anything regarding the keyword used for the stream mode, but multi-files is implemented by default
This will allow every file format to be streamed from multiple sources, and processed as a single file.
'''
if stream_mode == 'multi-files':
dirname = os.path.abspath(os.path.dirname(self.file_name))
all_files = os.listdir(dirname)
fname = os.path.basename(self.file_name)
fn, ext = os.path.splitext(fname)
head, sep, tail = fn.rpartition('_')
mindigits = len(tail)
basefn, fnum = head, int(tail)
fmtstring = '_%%0%dd%%s' % mindigits
sources = []
to_write = []
global_time = 0
params = self.get_description()
while fname in all_files:
new_data = type(self)(os.path.join(os.path.abspath(dirname), fname), params)
new_data._t_start = global_time
global_time += new_data.duration
sources += [new_data]
fnum += 1
fmtstring = '_%%0%dd%%s' % mindigits
fname = basefn + fmtstring % (fnum, ext)
to_write += ['We found the datafile %s with t_start %s and duration %s' %(new_data.file_name, new_data.t_start, new_data.duration)]
print_and_log(to_write, 'debug', logger)
return sources
Note
When working with streams, you must always defined attributes (such as t_start
, duration
, …) that are local, and defined only for each streams.
As you can see, set_streams is a function that given a stream_mode
, will read the parameters and return a list of DataFiles, created by slightly changing those parameters. In the case of multi-files
, this is just a change in the file names, but for some file formats, streams are embedded within the same data structure, and not spread over several files. For example, if you have a look to the file circus/files/kwd.py
you can see that there is also a mode for streams call single-file
. If this mode is enabled, the code will process all chunks of data in the HDF5 file, sorted by their keys, as a single giant data file. This is a common situation in experiment. Chunks of data are recorded at several times, but in the same data file. Because they are originating from the same experiment, they better be processed as a whole.
Once those functions are implemented, you simply need to add your wrapper in the list defined in circus/files/__init__.py
. Or be in touch with us to make it available in the default trunk.
Parallelism¶
In all your wrappers, if you want to deal with parallelism and do read/write access that will depend on MPI, you have access to an object comm
which is the MPI communicator. Simply add at the top of your python wrapper:
from circus.shared.mpi import comm
And then have a look for example circus/files/hdf5.py
to understand how this is used
Logs¶
In all your wrappers, if you want to log some informations to the log files (in addition to those logged by default in the DataFile class), you can use the print_and_log
function. Simply add at the top of your wrapper:
from circus.shared.messages import print_and_log
import logging
logger = logging.getLogger(__name__)
Then, if you want to log something, the syntax of such a function is:
>> print_and_log(list_of_lines, 'debug', logger)