from typing import Iterable
from pandas import read_csv, read_parquet, to_numeric
from RIAssigner.utils import define_separator, get_first_common_element
from RIAssigner.utils import clean_column_names
from .Data import Data
[docs]
class PandasData(Data):
"""Class to handle data from filetypes which can be imported into a pandas dataframe."""
[docs]
_carbon_number_column_names = set(["carbon_number"])
def __init__(self, filename: str, filetype: str, rt_unit: str):
super().__init__(filename, filetype, rt_unit)
self._read()
[docs]
def _read(self):
"""Load content from file into PandasData object."""
self._read_into_dataframe()
self._init_carbon_number_index()
self._init_rt_column_info()
self._init_ri_column_info()
self._init_ri_indices()
self._sort_by_rt()
self._replace_nans_with_0s()
[docs]
def _read_into_dataframe(self) -> None:
"""Read the data from file into dataframe."""
if self._filetype in ["csv", "tsv", "tabular"]:
self._data = read_csv(self._filename, sep=None, engine="python")
elif self._filetype == "parquet":
self._data = read_parquet(self._filename)
else:
raise NotImplementedError("File formats different from ['csv', 'tsv', 'tabular', 'parquet'] are not implemented yet.")
self._data.columns = clean_column_names(self._data.columns)
[docs]
def write(self, filename: str) -> None:
"""Write data on disk. Supports 'csv', 'tsv', 'tabular' and 'parquet' formats."""
if filename.endswith(".parquet"):
self._data.to_parquet(filename, index=False)
elif filename.endswith((".csv", ".tsv", ".tabular")):
separator = define_separator(filename)
self._data.to_csv(filename, index=False, sep=separator)
else:
raise ValueError("File extension must be 'csv', 'tsv', 'tabular', or 'parquet'.")
[docs]
def _init_carbon_number_index(self) -> None:
"""Find key of carbon number column and store it."""
self._carbon_number_index = get_first_common_element(self._data.columns, self._carbon_number_column_names)
[docs]
def _init_rt_column_info(self) -> None:
"""Find key of retention time column and store it."""
self._rt_index = get_first_common_element(self._data.columns, Data.get_possible_rt_keys())
if self._rt_index is not None:
self._rt_position = self._data.columns.tolist().index(self._rt_index)
else:
self._rt_position = None
[docs]
def _init_ri_column_info(self) -> None:
"""Initialize retention index column name and set its position next to the retention time column."""
self._ri_index = get_first_common_element(self._data.columns, Data.get_possible_ri_keys())
if self._ri_index in self._data.columns:
self._ri_position = self._data.columns.get_loc(self._ri_index)
else:
self._ri_index = "retention_index"
self._ri_position = None
[docs]
def _init_ri_indices(self) -> None:
"""Initialize retention indices to a factor of 100 of carbon numbers or None if carbon numbers are not present."""
if self._carbon_number_index is not None:
self._data[self._ri_index] = self._data[self._carbon_number_index] * 100
elif self._ri_position is None:
self._ri_position = self._rt_position + 1
self._data.insert(loc=self._ri_position, column=self._ri_index, value=None)
[docs]
def _sort_by_rt(self) -> None:
"""Sort peaks by their retention times."""
if self._rt_index is not None:
self._data.sort_values(by=self._rt_index, axis=0, inplace=True)
[docs]
def _replace_nans_with_0s(self) -> None:
"""Replace NaN values (including blank strings and invalid values) with 0s."""
if self._rt_index is not None:
self._data[self._rt_index] = to_numeric(self._data[self._rt_index], errors="coerce").fillna(0)
if self._ri_index is not None:
self._data[self._ri_index] = to_numeric(self._data[self._ri_index], errors="coerce").fillna(0)
[docs]
def __eq__(self, o: object) -> bool:
"""Comparison operator `==`.
Args:
o (object): Object to compare with.
Returns:
bool: State of equality.
"""
if not isinstance(o, PandasData):
return False
other: PandasData = o
are_equal = (self.retention_times == other.retention_times).all()
try:
are_equal &= (self.retention_indices == other.retention_indices).all()
except KeyError:
pass
are_equal &= self._data.equals(other._data)
return are_equal
@property
[docs]
def retention_times(self) -> Iterable[Data.RetentionTimeType]:
"""Get retention times in seconds."""
values = self._data[self._rt_index].to_numpy()
return (values * self._unit).to("seconds")
@property
[docs]
def retention_indices(self) -> Iterable[Data.RetentionIndexType]:
"""Get retention indices from data or computed from carbon numbers."""
if self._carbon_number_index is not None:
return self._ri_from_carbon_numbers()
return self._data[self._ri_index]
[docs]
def _ri_from_carbon_numbers(self) -> Iterable[int]:
"""Returns the RI of compound based on carbon number."""
return self._data[self._carbon_number_index] * 100
@retention_indices.setter
def retention_indices(self, values: Iterable[int]):
"""Setter for `retention_indices` property.
Args:
values (Iterable[int]): Values to assign.
"""
self._data[self._ri_index] = values
@property