eegdash.api module#
High-level interface to the EEGDash metadata database.
This module provides the main EEGDash class which serves as the primary entry point for interacting with the EEGDash ecosystem. It offers methods to query, insert, and update metadata records stored in the EEGDash MongoDB database, and includes utilities to load EEG data from S3 for matched records.
- class eegdash.api.EEGDash(*, is_public: bool = True, is_staging: bool = False)[source]
Bases:
object
High-level interface to the EEGDash metadata database.
Provides methods to query, insert, and update metadata records stored in the EEGDash MongoDB database (public or private). Also includes utilities to load EEG data from S3 for matched records.
For working with collections of recordings as PyTorch datasets, prefer
EEGDashDataset
.Create a new EEGDash client.
- Parameters:
is_public (bool, default True) – Connect to the public MongoDB database. If
False
, connect to a private database instance using theDB_CONNECTION_STRING
environment variable (or value from a.env
file).is_staging (bool, default False) – If
True
, use the staging database (eegdashstaging
); otherwise use the production database (eegdash
).
Examples
>>> eegdash = EEGDash()
- add(record: dict) None [source]
Add a single record to the MongoDB collection.
- Parameters:
record (dict) – The record to add.
- add_bids_dataset(dataset: str, data_dir: str, overwrite: bool = True) None [source]
Scan a local BIDS dataset and upsert records into MongoDB.
- Parameters:
dataset (str) – Dataset identifier (e.g.,
"ds002718"
).data_dir (str) – Path to the local BIDS dataset directory.
overwrite (bool, default True) – If
True
, update existing records when encountered; otherwise, skip records that already exist.
- Raises:
ValueError – If called on a public client
(is_public=True)
.
- close() None [source]
Close the MongoDB connection.
Deprecated since version 0.1: Connections are now managed globally by
MongoConnectionManager
. This method is a no-op and will be removed in a future version. UseEEGDash.close_all_connections()
to close all clients.
- classmethod close_all_connections() None [source]
Close all MongoDB client connections managed by the singleton manager.
- property collection
The underlying PyMongo
Collection
object.- Returns:
The collection object used for database interactions.
- Return type:
pymongo.collection.Collection
- exist(query: dict[str, Any]) bool [source]
Return True if at least one record matches the query, else False.
This is a lightweight existence check that uses MongoDB’s
find_one
instead of fetching all matching documents (which would be wasteful in both time and memory for broad queries). Only a restricted set of fields is accepted to avoid accidental full scans caused by malformed or unsupported keys.- Parameters:
query (dict) – Mapping of allowed field(s) to value(s). Allowed keys:
data_name
anddataset
. The query must not be empty.- Returns:
True if at least one matching record exists; False otherwise.
- Return type:
bool
- Raises:
TypeError – If
query
is not a dict.ValueError – If
query
is empty or contains unsupported field names.
- exists(query: dict[str, Any]) bool [source]
Check if at least one record matches the query.
This is an alias for
exist()
.- Parameters:
query (dict) – MongoDB query to check for existence.
- Returns:
True if a matching record exists, False otherwise.
- Return type:
bool
- find(query: dict[str, Any] = None, /, **kwargs) list[Mapping[str, Any]] [source]
Find records in the MongoDB collection.
Examples
>>> eegdash.find({"dataset": "ds002718", "subject": {"$in": ["012", "013"]}}) # pre-built query >>> eegdash.find(dataset="ds002718", subject="012") # keyword filters >>> eegdash.find(dataset="ds002718", subject=["012", "013"]) # sequence -> $in >>> eegdash.find({}) # fetch all (use with care) >>> eegdash.find({"dataset": "ds002718"}, subject=["012", "013"]) # combine query + kwargs (AND)
- Parameters:
query (dict, optional) – Complete MongoDB query dictionary. This is a positional-only argument.
**kwargs – User-friendly field filters that are converted to a MongoDB query. Values can be scalars (e.g.,
"sub-01"
) or sequences (translated to$in
queries).
- Returns:
DB records that match the query.
- Return type:
list of dict
- remove_field(record: dict, field: str) None [source]
Remove a field from a specific record in the MongoDB collection.
- Parameters:
record (dict) – Record-identifying object with a
data_name
key.field (str) – The name of the field to remove.
- remove_field_from_db(field: str) None [source]
Remove a field from all records in the database.
Warning
This is a destructive operation and cannot be undone.
- Parameters:
field (str) – The name of the field to remove from all documents.
- update(record: dict) None [source]
Update a single record in the MongoDB collection.
- Parameters:
record (dict) – Record content to set at the matching
data_name
.
- class eegdash.api.EEGDashDataset(cache_dir: str | Path, query: dict[str, Any] = None, description_fields: list[str] = ['subject', 'session', 'run', 'task', 'age', 'gender', 'sex'], s3_bucket: str | None = None, records: list[dict] | None = None, download: bool = True, n_jobs: int = -1, eeg_dash_instance: EEGDash | None = None, **kwargs)[source]
Bases:
BaseConcatDataset
Create a new EEGDashDataset from a given query or local BIDS dataset directory and dataset name. An EEGDashDataset is pooled collection of EEGDashBaseDataset instances (individual recordings) and is a subclass of braindecode’s BaseConcatDataset.
- Parameters:
cache_dir (str | Path) – Directory where data are cached locally.
query (dict | None) – Raw MongoDB query to filter records. If provided, it is merged with keyword filtering arguments (see
**kwargs
) using logical AND. You must provide at least adataset
(either inquery
or as a keyword argument). Only fields inALLOWED_QUERY_FIELDS
are considered for filtering.description_fields (list[str]) – Fields to extract from each record and include in dataset descriptions (e.g., “subject”, “session”, “run”, “task”).
s3_bucket (str | None) – Optional S3 bucket URI (e.g., “s3://mybucket”) to use instead of the default OpenNeuro bucket when downloading data files.
records (list[dict] | None) – Pre-fetched metadata records. If provided, the dataset is constructed directly from these records and no MongoDB query is performed.
download (bool, default True) – If False, load from local BIDS files only. Local data are expected under
cache_dir / dataset
; no DB or S3 access is attempted.n_jobs (int) – Number of parallel jobs to use where applicable (-1 uses all cores).
eeg_dash_instance (EEGDash | None) – Optional existing EEGDash client to reuse for DB queries. If None, a new client is created on demand, not used in the case of no download.
**kwargs (dict) –
Additional keyword arguments serving two purposes:
Filtering: any keys present in
ALLOWED_QUERY_FIELDS
are treated as query filters (e.g.,dataset
,subject
,task
, …).Dataset options: remaining keys are forwarded to
EEGDashBaseDataset
.
Examples
Basic usage with dataset and subject filtering:
>>> from eegdash import EEGDashDataset >>> dataset = EEGDashDataset( ... cache_dir="./data", ... dataset="ds002718", ... subject="012" ... ) >>> print(f"Number of recordings: {len(dataset)}")
Filter by multiple subjects and specific task:
>>> subjects = ["012", "013", "014"] >>> dataset = EEGDashDataset( ... cache_dir="./data", ... dataset="ds002718", ... subject=subjects, ... task="RestingState" ... )
Load and inspect EEG data from recordings:
>>> if len(dataset) > 0: ... recording = dataset[0] ... raw = recording.load() ... print(f"Sampling rate: {raw.info['sfreq']} Hz") ... print(f"Number of channels: {len(raw.ch_names)}") ... print(f"Duration: {raw.times[-1]:.1f} seconds")
Advanced filtering with raw MongoDB queries:
>>> from eegdash import EEGDashDataset >>> query = { ... "dataset": "ds002718", ... "subject": {"$in": ["012", "013"]}, ... "task": "RestingState" ... } >>> dataset = EEGDashDataset(cache_dir="./data", query=query)
Working with dataset collections and braindecode integration:
>>> # EEGDashDataset is a braindecode BaseConcatDataset >>> for i, recording in enumerate(dataset): ... if i >= 2: # limit output ... break ... print(f"Recording {i}: {recording.description}") ... raw = recording.load() ... print(f" Channels: {len(raw.ch_names)}, Duration: {raw.times[-1]:.1f}s")
Initialize self. See help(type(self)) for accurate signature.
- Parameters:
cache_dir – The description is missing.
query – The description is missing.
description_fields – The description is missing.
s3_bucket – The description is missing.
records – The description is missing.
download – The description is missing.
n_jobs – The description is missing.
eeg_dash_instance – The description is missing.
**kwargs – The description is missing.