Source code for sklearn_nominal.backend.pandas

from __future__ import annotations

import threading
from typing import Generator, Iterable

import numpy as np
import pandas as pd

from . import ColumnID
from .conditions import (
    AndCondition,
    Condition,
    NotCondition,
    RangeCondition,
    TrueCondition,
    ValueCondition,
)
from .core import ColumnType, Dataset

...


[docs] class PandasDataset(Dataset): def __init__(self, x: pd.DataFrame, y: np.ndarray, idx=None): super().__init__() self._x: pd.DataFrame = x self._y: np.ndarray = y self.idx = idx # already filtered if idx is None self._x_subset = idx is None self._y_subset = self._x_subset self._lock = threading.Lock() @property def x(self) -> pd.DataFrame: if not self._x_subset: with self._lock: if not self._x_subset: self._x = self._x.loc[self.idx] self._x_subset = True if self._y_subset: # if filtered both, free idx self.idx = None return self._x @property def y(self) -> pd.ndarray: if not self._y_subset: with self._lock: if not self._y_subset: self._y = self._y[self.idx] self._y_subset = True if self._x_subset: # if filtered both, free idx self.idx = None return self._y def split(self, conditions: list[Condition]): return [self.filter(c) for c in conditions] def values(self, column: ColumnID): result: pd.Series = self.x[column].dropna() return result def unique_values(self, column: ColumnID, sorted=False) -> np.ndarray: result = self.values(column).unique() if sorted: result.sort() return result def indices(self, condition: Condition): if isinstance(condition, RangeCondition): rc: RangeCondition = condition if rc.less: idx = self.x[rc.column] <= rc.value else: idx = self.x[rc.column] > rc.value idx.fillna(False, inplace=True) return idx elif isinstance(condition, ValueCondition): vc: ValueCondition = condition idx = self.x[vc.column] == vc.value idx.fillna(False, inplace=True) return idx elif isinstance(condition, TrueCondition): return None elif isinstance(condition, NotCondition): return ~self.indices(condition.condition) elif isinstance(condition, AndCondition): idx = None for c in condition.conditions: c_idx = self.indices(c) if idx is None: idx = c_idx else: idx = idx & c_idx return idx else: raise ValueError(f"Invalid condition: {condition}") def filter(self, condition: Condition): idx = self.indices(condition) return PandasDataset(self.x, self.y, idx=idx) @property def n(self): return self.y.shape[0] @property def types_dict(self) -> dict[ColumnID, ColumnType]: return dict(zip(self.columns, self.types)) @property def types(self) -> list[ColumnType]: numeric = self.x.select_dtypes(include="number").columns def to_type(column): if column in numeric: return ColumnType.Numeric else: return ColumnType.Nominal return list(map(to_type, self.columns)) @property def columns(self) -> list[ColumnID]: return self.x.columns def drop(self, columns: list[ColumnID]) -> PandasDataset: x = self.x.drop(columns=columns) return PandasDataset(x, self.y) def classes(self): values = np.unique(self.y) values.sort() return values def filter_by_class(self, c) -> Dataset: idx = self.y == c idx = np.nan_to_num(idx, nan=False) # idx.fillna(False, inplace=True) return PandasDataset(self.x, self.y, idx) def class_distribution(self, class_weight: np.ndarray) -> np.ndarray: classes = len(class_weight) p: np.ndarray = np.bincount(self.y, minlength=classes) result = p result = result * class_weight result /= result.sum() return result def mean_y( self, ) -> np.ndarray: return self.y.mean(axis=0) def std_y( self, ) -> float: if self.y.shape[0] == 0: return np.inf return np.sum(np.std(self.y, axis=0)) def mean_x(self, col: ColumnID) -> float: return self.x[col].mean() def std_x(self, col: ColumnID, ddof=1) -> float: return self.x[col].std(ddof=ddof) def count_class(self, klass: int) -> int: return np.sum(self.y == klass)