Skip to content

Processing Module Reference

This section provides a detailed API reference for all modules related to processing datasets.

Binarize

Bases: Processor

A class for binarizing rating values in a dataset based on a given threshold.

This class processes a dataset wrapped in a DataRec object and modifies the rating column based on the specified threshold. If implicit is set to True, rows with ratings below the threshold are removed, and the rating column is dropped. Otherwise, ratings are binarized to either over_threshold or under_threshold values.

Source code in datarec/processing/binarizer.py
class Binarize(Processor):

    """
    A class for binarizing rating values in a dataset based on a given threshold. 

    This class processes a dataset wrapped in a DataRec object and modifies the rating column
    based on the specified threshold. If `implicit` is set to True, rows with ratings below
    the threshold are removed, and the rating column is dropped. Otherwise, ratings are binarized
    to either `over_threshold` or `under_threshold` values.
    """

    def __init__(self, threshold: float, implicit: bool = False,
                 over_threshold: float = 1, under_threshold: float = 0):
        """
        Initializes the Binarize object.

        Args:
            threshold (float): The threshold for binarization.
            implicit (bool): If True, removes rows below the threshold and drops the rating column.
            over_threshold (int, float): The value assigned to ratings equal to or above the threshold.
            under_threshold (int, float): The value assigned to ratings below the threshold.
        """
        self.params = {k: v for k, v in locals().items() if k != 'self'}

        self._threshold = threshold
        self._over_threshold = over_threshold
        self._under_threshold = under_threshold
        self._implicit = implicit

    def run(self, datarec: DataRec) -> DataRec:

        """
        Binarizes the rating values in the given dataset based on a threshold.

        If `implicit` is True, removes rows where the rating is below the threshold
        and drops the rating column. If `implicit` is False, replaces the rating
        values with binary values (over_threshold if >= threshold, under_threshold otherwise).

        Args:
            datarec (DataRec): The input dataset wrapped in a DataRec object.

        Returns:
            (DataRec): A new DataRec object with the processed dataset.
        """

        dataset = datarec.data.copy()
        column = datarec.rating_col

        positive = dataset[column] >= self._threshold

        if self._implicit:
            dataset = dataset[positive].copy()
            dataset.drop(columns=[column], inplace=True)
        else:
            dataset[column] = self._over_threshold
            dataset.loc[~positive, column] = self._under_threshold

        result = self.output(datarec, dataset,
                             step_info={'operation': self.__class__.__name__, 'params': self.params})

        return result

    @property
    def binary_threshold(self) -> float:
        """
        Returns the rating threshold used to distinguish positive interactions.
        """
        return self._threshold

    @property
    def over_threshold(self) -> float:
        """
        Returns the value assigned to ratings at or above the threshold.
        """
        return self._over_threshold

    @property
    def under_threshold(self) -> float:
        """
        Returns the value assigned to ratings below the threshold.
        """
        return self._under_threshold

binary_threshold property

Returns the rating threshold used to distinguish positive interactions.

over_threshold property

Returns the value assigned to ratings at or above the threshold.

under_threshold property

Returns the value assigned to ratings below the threshold.

__init__(threshold, implicit=False, over_threshold=1, under_threshold=0)

Initializes the Binarize object.

Parameters:

Name Type Description Default
threshold float

The threshold for binarization.

required
implicit bool

If True, removes rows below the threshold and drops the rating column.

False
over_threshold (int, float)

The value assigned to ratings equal to or above the threshold.

1
under_threshold (int, float)

The value assigned to ratings below the threshold.

0
Source code in datarec/processing/binarizer.py
def __init__(self, threshold: float, implicit: bool = False,
             over_threshold: float = 1, under_threshold: float = 0):
    """
    Initializes the Binarize object.

    Args:
        threshold (float): The threshold for binarization.
        implicit (bool): If True, removes rows below the threshold and drops the rating column.
        over_threshold (int, float): The value assigned to ratings equal to or above the threshold.
        under_threshold (int, float): The value assigned to ratings below the threshold.
    """
    self.params = {k: v for k, v in locals().items() if k != 'self'}

    self._threshold = threshold
    self._over_threshold = over_threshold
    self._under_threshold = under_threshold
    self._implicit = implicit

run(datarec)

Binarizes the rating values in the given dataset based on a threshold.

If implicit is True, removes rows where the rating is below the threshold and drops the rating column. If implicit is False, replaces the rating values with binary values (over_threshold if >= threshold, under_threshold otherwise).

Parameters:

Name Type Description Default
datarec DataRec

The input dataset wrapped in a DataRec object.

required

Returns:

Type Description
DataRec

A new DataRec object with the processed dataset.

Source code in datarec/processing/binarizer.py
def run(self, datarec: DataRec) -> DataRec:

    """
    Binarizes the rating values in the given dataset based on a threshold.

    If `implicit` is True, removes rows where the rating is below the threshold
    and drops the rating column. If `implicit` is False, replaces the rating
    values with binary values (over_threshold if >= threshold, under_threshold otherwise).

    Args:
        datarec (DataRec): The input dataset wrapped in a DataRec object.

    Returns:
        (DataRec): A new DataRec object with the processed dataset.
    """

    dataset = datarec.data.copy()
    column = datarec.rating_col

    positive = dataset[column] >= self._threshold

    if self._implicit:
        dataset = dataset[positive].copy()
        dataset.drop(columns=[column], inplace=True)
    else:
        dataset[column] = self._over_threshold
        dataset.loc[~positive, column] = self._under_threshold

    result = self.output(datarec, dataset,
                         step_info={'operation': self.__class__.__name__, 'params': self.params})

    return result

ColdFilter

Bases: Processor

A filtering class to retain only cold users or cold items, i.e., those with at most interactions interactions in the original DataRec dataset.

Source code in datarec/processing/cold.py
class ColdFilter(Processor):
    """
    A filtering class to retain only cold users or cold items, i.e., those with at most `interactions` interactions
    in the original DataRec dataset.
    """

    def __init__(self, interactions: int, mode: str = "user"):
        """
        Initializes the ColdFilter object.

        Args:
            interactions (int): The maximum number of interactions a user or item can have to be retained.
            mode (str): Filtering mode, either "user" for cold users or "item" for cold items.

        Raises:
            TypeError: If `interactions` is not an integer.
            ValueError: If `mode` is not "user" or "item".
        """
        if not isinstance(interactions, int):
            raise TypeError('Interactions must be an integer.')

        if mode not in {"user", "item"}:
            raise ValueError('Mode must be "user" or "item".')

        self.params = {k: v for k, v in locals().items() if k != 'self'}

        self.interactions = interactions
        self.mode = mode

    def run(self, datarec: DataRec) -> DataRec:
        """
        Filters the dataset to keep only cold users or cold items with at most `self.interactions` interactions.

        Args:
            datarec (DataRec): The input dataset wrapped in a DataRec object.

        Returns:
            (DataRec): A new DataRec object containing only the filtered users or items.
        """

        dataset = datarec.data.copy()
        group_col = datarec.user_col if self.mode == "user" else datarec.item_col
        groups = dataset.groupby(group_col)
        result = groups.filter(lambda x: len(x) <= self.interactions).reset_index(drop=True)

        return self.output(datarec, result, {'operation': self.__class__.__name__, 'params': self.params})

__init__(interactions, mode='user')

Initializes the ColdFilter object.

Parameters:

Name Type Description Default
interactions int

The maximum number of interactions a user or item can have to be retained.

required
mode str

Filtering mode, either "user" for cold users or "item" for cold items.

'user'

Raises:

Type Description
TypeError

If interactions is not an integer.

ValueError

If mode is not "user" or "item".

Source code in datarec/processing/cold.py
def __init__(self, interactions: int, mode: str = "user"):
    """
    Initializes the ColdFilter object.

    Args:
        interactions (int): The maximum number of interactions a user or item can have to be retained.
        mode (str): Filtering mode, either "user" for cold users or "item" for cold items.

    Raises:
        TypeError: If `interactions` is not an integer.
        ValueError: If `mode` is not "user" or "item".
    """
    if not isinstance(interactions, int):
        raise TypeError('Interactions must be an integer.')

    if mode not in {"user", "item"}:
        raise ValueError('Mode must be "user" or "item".')

    self.params = {k: v for k, v in locals().items() if k != 'self'}

    self.interactions = interactions
    self.mode = mode

run(datarec)

Filters the dataset to keep only cold users or cold items with at most self.interactions interactions.

Parameters:

Name Type Description Default
datarec DataRec

The input dataset wrapped in a DataRec object.

required

Returns:

Type Description
DataRec

A new DataRec object containing only the filtered users or items.

Source code in datarec/processing/cold.py
def run(self, datarec: DataRec) -> DataRec:
    """
    Filters the dataset to keep only cold users or cold items with at most `self.interactions` interactions.

    Args:
        datarec (DataRec): The input dataset wrapped in a DataRec object.

    Returns:
        (DataRec): A new DataRec object containing only the filtered users or items.
    """

    dataset = datarec.data.copy()
    group_col = datarec.user_col if self.mode == "user" else datarec.item_col
    groups = dataset.groupby(group_col)
    result = groups.filter(lambda x: len(x) <= self.interactions).reset_index(drop=True)

    return self.output(datarec, result, {'operation': self.__class__.__name__, 'params': self.params})

KCore

This class filters a dataset based on a minimum number of records (core) for each group defined by a specific column.

Source code in datarec/processing/kcore.py
class KCore:

    """
    This class filters a dataset based on a minimum number of records (core) for each group
    defined by a specific column.
    """

    def __init__(self, column: str, core: int):
        """
        Initializes the KCore object.

        Args:
            column (str): The column name used to group the data (e.g., user or item).
            core (int): The minimum number of records required for each group to be kept.

        Raises:
            TypeError: If 'core' is not an integer.
        """

        if not isinstance(core, int):
            raise TypeError('Core must be an integer.')

        self._column = column
        self._core = core

    def run(self, dataset: pd.DataFrame) -> pd.DataFrame:
        """
        Filters the dataset by keeping only groups with at least the specified number of records.

        Args:
            dataset (pd.DataFrame): The dataset to be filtered.

        Returns:
            (pd.DataFrame): A new dataframe with groups filtered by the core condition.

        Raises: 
            ValueError: If 'self._column' is not in the dataset.

        """

        if self._column not in dataset.columns:
            raise ValueError(f'Column "{self._column}" not in the dataset.')

        dataset = dataset.copy()
        groups = dataset.groupby([self._column])
        dataset = groups.filter(lambda x: len(x) >= self._core)
        return dataset

__init__(column, core)

Initializes the KCore object.

Parameters:

Name Type Description Default
column str

The column name used to group the data (e.g., user or item).

required
core int

The minimum number of records required for each group to be kept.

required

Raises:

Type Description
TypeError

If 'core' is not an integer.

Source code in datarec/processing/kcore.py
def __init__(self, column: str, core: int):
    """
    Initializes the KCore object.

    Args:
        column (str): The column name used to group the data (e.g., user or item).
        core (int): The minimum number of records required for each group to be kept.

    Raises:
        TypeError: If 'core' is not an integer.
    """

    if not isinstance(core, int):
        raise TypeError('Core must be an integer.')

    self._column = column
    self._core = core

run(dataset)

Filters the dataset by keeping only groups with at least the specified number of records.

Parameters:

Name Type Description Default
dataset DataFrame

The dataset to be filtered.

required

Returns:

Type Description
DataFrame

A new dataframe with groups filtered by the core condition.

Raises:

Type Description
ValueError

If 'self._column' is not in the dataset.

Source code in datarec/processing/kcore.py
def run(self, dataset: pd.DataFrame) -> pd.DataFrame:
    """
    Filters the dataset by keeping only groups with at least the specified number of records.

    Args:
        dataset (pd.DataFrame): The dataset to be filtered.

    Returns:
        (pd.DataFrame): A new dataframe with groups filtered by the core condition.

    Raises: 
        ValueError: If 'self._column' is not in the dataset.

    """

    if self._column not in dataset.columns:
        raise ValueError(f'Column "{self._column}" not in the dataset.')

    dataset = dataset.copy()
    groups = dataset.groupby([self._column])
    dataset = groups.filter(lambda x: len(x) >= self._core)
    return dataset

UserKCore

Bases: Processor

Filters a dataset based on a minimum number of records (core) for each user.

This class applies a KCore filter on the user column of the dataset.

Source code in datarec/processing/kcore.py
class UserKCore(Processor):
    """
    Filters a dataset based on a minimum number of records (core) for each user.

    This class applies a KCore filter on the user column of the dataset.
    """
    def __init__(self, core: int):
        """
        Initializes the UserKCore object.

        Args:
            core (int): The minimum number of records required for each user to be kept.

        Raises: 
            TypeErrore: If 'core' is not an integer.
        """
        if not isinstance(core, int):
            raise TypeError('Core must be an integer.')

        self.params = {k: v for k, v in locals().items() if k != 'self'}

        self.core = core

    def run(self, datarec: DataRec) -> DataRec:
        """
        Filters the dataset by user, applying the KCore filter, and returns a new DataRec object
        containing the filtered data.

        Args:
            datarec (DataRec): The DataRec object containing the dataset to be filtered.

        Returns:
            (DataRec): A new DataRec object with the filtered data.

        """

        core_obj = KCore(column=datarec.user_col, core=self.core)
        result = core_obj.run(datarec.data)

        return self.output(datarec, result, {'operation': self.__class__.__name__, 'params': self.params})

__init__(core)

Initializes the UserKCore object.

Parameters:

Name Type Description Default
core int

The minimum number of records required for each user to be kept.

required

Raises:

Type Description
TypeErrore

If 'core' is not an integer.

Source code in datarec/processing/kcore.py
def __init__(self, core: int):
    """
    Initializes the UserKCore object.

    Args:
        core (int): The minimum number of records required for each user to be kept.

    Raises: 
        TypeErrore: If 'core' is not an integer.
    """
    if not isinstance(core, int):
        raise TypeError('Core must be an integer.')

    self.params = {k: v for k, v in locals().items() if k != 'self'}

    self.core = core

run(datarec)

Filters the dataset by user, applying the KCore filter, and returns a new DataRec object containing the filtered data.

Parameters:

Name Type Description Default
datarec DataRec

The DataRec object containing the dataset to be filtered.

required

Returns:

Type Description
DataRec

A new DataRec object with the filtered data.

Source code in datarec/processing/kcore.py
def run(self, datarec: DataRec) -> DataRec:
    """
    Filters the dataset by user, applying the KCore filter, and returns a new DataRec object
    containing the filtered data.

    Args:
        datarec (DataRec): The DataRec object containing the dataset to be filtered.

    Returns:
        (DataRec): A new DataRec object with the filtered data.

    """

    core_obj = KCore(column=datarec.user_col, core=self.core)
    result = core_obj.run(datarec.data)

    return self.output(datarec, result, {'operation': self.__class__.__name__, 'params': self.params})

ItemKCore

Bases: Processor

Filters a dataset based on a minimum number of records (core) for each item.

This class applies a KCore filter on the item column of the dataset.

Source code in datarec/processing/kcore.py
class ItemKCore(Processor):
    """
    Filters a dataset based on a minimum number of records (core) for each item.

    This class applies a KCore filter on the item column of the dataset.
    """
    def __init__(self, core: int):
        """
        Initializes the ItemKCore object.

        Args:
            core (int): The minimum number of records required for each item to be kept.

        Raises:
            TypeError: If "core" is not an integer.
        """

        if not isinstance(core, int):
            raise TypeError('Core must be an integer.')

        self.params = {k: v for k, v in locals().items() if k != 'self'}

        self.core = core

    def run(self, datarec: DataRec) -> DataRec:
        """
        Filters the dataset by item, applying the KCore filter, and returns a new DataRec object
        containing the filtered data.

        Args:
            datarec (DataRec): The DataRec object containing the dataset to be filtered.

        Returns:
            (DataRec): A new DataRec object with the filtered data.
        """

        core_obj = KCore(column=datarec.item_col, core=self.core)
        result = core_obj.run(datarec.data)

        return self.output(datarec, result, {'operation': self.__class__.__name__, 'params': self.params})

__init__(core)

Initializes the ItemKCore object.

Parameters:

Name Type Description Default
core int

The minimum number of records required for each item to be kept.

required

Raises:

Type Description
TypeError

If "core" is not an integer.

Source code in datarec/processing/kcore.py
def __init__(self, core: int):
    """
    Initializes the ItemKCore object.

    Args:
        core (int): The minimum number of records required for each item to be kept.

    Raises:
        TypeError: If "core" is not an integer.
    """

    if not isinstance(core, int):
        raise TypeError('Core must be an integer.')

    self.params = {k: v for k, v in locals().items() if k != 'self'}

    self.core = core

run(datarec)

Filters the dataset by item, applying the KCore filter, and returns a new DataRec object containing the filtered data.

Parameters:

Name Type Description Default
datarec DataRec

The DataRec object containing the dataset to be filtered.

required

Returns:

Type Description
DataRec

A new DataRec object with the filtered data.

Source code in datarec/processing/kcore.py
def run(self, datarec: DataRec) -> DataRec:
    """
    Filters the dataset by item, applying the KCore filter, and returns a new DataRec object
    containing the filtered data.

    Args:
        datarec (DataRec): The DataRec object containing the dataset to be filtered.

    Returns:
        (DataRec): A new DataRec object with the filtered data.
    """

    core_obj = KCore(column=datarec.item_col, core=self.core)
    result = core_obj.run(datarec.data)

    return self.output(datarec, result, {'operation': self.__class__.__name__, 'params': self.params})

IterativeKCore

Iteratively filters a dataset based on a set of columns and minimum core values.

This class applies KCore filters to multiple columns and iteratively removes groups that do not meet the core requirement until no further changes occur.

Source code in datarec/processing/kcore.py
class IterativeKCore:
    """        
    Iteratively filters a dataset based on a set of columns and minimum core values.

    This class applies KCore filters to multiple columns and iteratively removes groups
    that do not meet the core requirement until no further changes occur.
    """
    def __init__(self, columns: list, cores: Union[int, list]):
        """
        Initializes the IterativeKCore object.

        Args:
            columns (list): A list of column names to apply the KCore filter on.
            cores (list of int or int): The minimum number of records required for each column to be kept.

        Raises:
            TypeError: If 'cores' in not a list or an integer.
        """

        self._columns = columns

        if isinstance(cores, list):
            self._cores = list(zip(columns, cores))
        elif isinstance(cores, int):
            self._cores = [(c, cores) for c in columns]
        else:
            raise TypeError('Cores must be a list or an integer.')

    def run(self, dataset: pd.DataFrame) -> pd.DataFrame:
        """
        Iteratively applies the KCore filters on the dataset until no changes occur, then returns the filtered dataset.

        Args:
            dataset (pd.DataFrame): The dataset to be iteratively filtered.

        Returns:
            (pd.DataFrame): The filtered dataset after all iterations.
        """

        data = dataset.copy()

        filters = {c: KCore(column=c, core=k) for c, k in self._cores}
        checks = [False for _ in self._columns]
        prev_len = len(data)

        while not all(checks):
            checks = []
            for c, f in filters.items():
                data = f.run(data)
                checks.append((prev_len - len(data)) == 0)
                prev_len = len(data)

        return data

__init__(columns, cores)

Initializes the IterativeKCore object.

Parameters:

Name Type Description Default
columns list

A list of column names to apply the KCore filter on.

required
cores list of int or int

The minimum number of records required for each column to be kept.

required

Raises:

Type Description
TypeError

If 'cores' in not a list or an integer.

Source code in datarec/processing/kcore.py
def __init__(self, columns: list, cores: Union[int, list]):
    """
    Initializes the IterativeKCore object.

    Args:
        columns (list): A list of column names to apply the KCore filter on.
        cores (list of int or int): The minimum number of records required for each column to be kept.

    Raises:
        TypeError: If 'cores' in not a list or an integer.
    """

    self._columns = columns

    if isinstance(cores, list):
        self._cores = list(zip(columns, cores))
    elif isinstance(cores, int):
        self._cores = [(c, cores) for c in columns]
    else:
        raise TypeError('Cores must be a list or an integer.')

run(dataset)

Iteratively applies the KCore filters on the dataset until no changes occur, then returns the filtered dataset.

Parameters:

Name Type Description Default
dataset DataFrame

The dataset to be iteratively filtered.

required

Returns:

Type Description
DataFrame

The filtered dataset after all iterations.

Source code in datarec/processing/kcore.py
def run(self, dataset: pd.DataFrame) -> pd.DataFrame:
    """
    Iteratively applies the KCore filters on the dataset until no changes occur, then returns the filtered dataset.

    Args:
        dataset (pd.DataFrame): The dataset to be iteratively filtered.

    Returns:
        (pd.DataFrame): The filtered dataset after all iterations.
    """

    data = dataset.copy()

    filters = {c: KCore(column=c, core=k) for c, k in self._cores}
    checks = [False for _ in self._columns]
    prev_len = len(data)

    while not all(checks):
        checks = []
        for c, f in filters.items():
            data = f.run(data)
            checks.append((prev_len - len(data)) == 0)
            prev_len = len(data)

    return data

UserItemIterativeKCore

Bases: Processor

Iteratively filters a dataset based on both user and item columns with specified core values.

This class applies the IterativeKCore filter to both the user and item columns of the dataset.

Source code in datarec/processing/kcore.py
class UserItemIterativeKCore(Processor):

    """
    Iteratively filters a dataset based on both user and item columns with specified core values.

    This class applies the IterativeKCore filter to both the user and item columns of the dataset.
    """
    def __init__(self, cores: Union[int, list]):
        """
        Initializes the UserItemIterativeKCore object.

        Args:
            cores (list or int): A list of core values for the user and item columns.

        Raises:
            TypeError: If "cores" is not a list or an integer.
        """

        if not isinstance(cores, (list, int)):
            raise TypeError('Cores must be a list or an integer.')

        self.params = {k: v for k, v in locals().items() if k != 'self'}

        self._cores = cores

    def run(self, datarec: DataRec) -> DataRec:
        """
        Applies the iterative KCore filter to both user and item columns, and returns a new DataRec object
        containing the filtered data.

        Args:
            datarec (DataRec): The DataRec object containing the dataset to be filtered.

        Returns:
            (DataRec): A new DataRec object with the filtered data.
        """

        core_obj = IterativeKCore(columns=[datarec.user_col, datarec.item_col],
                                  cores=self._cores)
        result = core_obj.run(datarec.data)

        return self.output(datarec, result, {'operation': self.__class__.__name__, 'params': self.params})

__init__(cores)

Initializes the UserItemIterativeKCore object.

Parameters:

Name Type Description Default
cores list or int

A list of core values for the user and item columns.

required

Raises:

Type Description
TypeError

If "cores" is not a list or an integer.

Source code in datarec/processing/kcore.py
def __init__(self, cores: Union[int, list]):
    """
    Initializes the UserItemIterativeKCore object.

    Args:
        cores (list or int): A list of core values for the user and item columns.

    Raises:
        TypeError: If "cores" is not a list or an integer.
    """

    if not isinstance(cores, (list, int)):
        raise TypeError('Cores must be a list or an integer.')

    self.params = {k: v for k, v in locals().items() if k != 'self'}

    self._cores = cores

run(datarec)

Applies the iterative KCore filter to both user and item columns, and returns a new DataRec object containing the filtered data.

Parameters:

Name Type Description Default
datarec DataRec

The DataRec object containing the dataset to be filtered.

required

Returns:

Type Description
DataRec

A new DataRec object with the filtered data.

Source code in datarec/processing/kcore.py
def run(self, datarec: DataRec) -> DataRec:
    """
    Applies the iterative KCore filter to both user and item columns, and returns a new DataRec object
    containing the filtered data.

    Args:
        datarec (DataRec): The DataRec object containing the dataset to be filtered.

    Returns:
        (DataRec): A new DataRec object with the filtered data.
    """

    core_obj = IterativeKCore(columns=[datarec.user_col, datarec.item_col],
                              cores=self._cores)
    result = core_obj.run(datarec.data)

    return self.output(datarec, result, {'operation': self.__class__.__name__, 'params': self.params})

NRoundsKCore

Filters a dataset based on a minimum number of records (core) for each column over multiple rounds.

This class applies KCore filters iteratively over a specified number of rounds.

Source code in datarec/processing/kcore.py
class NRoundsKCore:
    """
    Filters a dataset based on a minimum number of records (core) for each column over multiple rounds.

    This class applies KCore filters iteratively over a specified number of rounds.
    """

    def __init__(self, columns: list, cores: Union[int, list], rounds: int):
        """
        Initializes the NRoundsKCore object.

        Args:
            columns (list): A list of column names to apply the KCore filter on.
            cores (list of int or int): The minimum number of records required for each column to be kept.
            rounds (int): The number of rounds to apply the filtering process.

        Raises:
            TypeError: If 'cores' is not a list or an integer.
            TypeError: If 'rounds' is not an integer.
        """

        self._columns = columns

        if isinstance(cores, list):
            self._cores = list(zip(columns, cores))
        elif isinstance(cores, int):
            self._cores = [(c, cores) for c in columns]
        else:
            raise TypeError('Cores must be a list or an integer.')

        if not isinstance(rounds, int):
            raise TypeError('Rounds must be an integer.')

        self._rounds = rounds

    def run(self, dataset: pd.DataFrame) -> pd.DataFrame:
        """
        Applies the KCore filters over the specified number of rounds and returns the filtered dataset.

        Args:
            dataset (pd.DataFrame): The dataset to be filtered.

        Returns:
            (pd.DataFrame): The dataset after filtering over the specified number of rounds.
        """

        data = dataset.copy()

        filters = {c: KCore(column=c, core=k) for c, k in self._cores}
        checks = [False for _ in self._columns]
        prev_len = len(data)

        for _ in range(self._rounds) or all(checks):
            checks = []
            for c, f in filters.items():
                data = f.run(data)
                checks.append((prev_len - len(data)) == 0)
                prev_len = len(data)
        return data

__init__(columns, cores, rounds)

Initializes the NRoundsKCore object.

Parameters:

Name Type Description Default
columns list

A list of column names to apply the KCore filter on.

required
cores list of int or int

The minimum number of records required for each column to be kept.

required
rounds int

The number of rounds to apply the filtering process.

required

Raises:

Type Description
TypeError

If 'cores' is not a list or an integer.

TypeError

If 'rounds' is not an integer.

Source code in datarec/processing/kcore.py
def __init__(self, columns: list, cores: Union[int, list], rounds: int):
    """
    Initializes the NRoundsKCore object.

    Args:
        columns (list): A list of column names to apply the KCore filter on.
        cores (list of int or int): The minimum number of records required for each column to be kept.
        rounds (int): The number of rounds to apply the filtering process.

    Raises:
        TypeError: If 'cores' is not a list or an integer.
        TypeError: If 'rounds' is not an integer.
    """

    self._columns = columns

    if isinstance(cores, list):
        self._cores = list(zip(columns, cores))
    elif isinstance(cores, int):
        self._cores = [(c, cores) for c in columns]
    else:
        raise TypeError('Cores must be a list or an integer.')

    if not isinstance(rounds, int):
        raise TypeError('Rounds must be an integer.')

    self._rounds = rounds

run(dataset)

Applies the KCore filters over the specified number of rounds and returns the filtered dataset.

Parameters:

Name Type Description Default
dataset DataFrame

The dataset to be filtered.

required

Returns:

Type Description
DataFrame

The dataset after filtering over the specified number of rounds.

Source code in datarec/processing/kcore.py
def run(self, dataset: pd.DataFrame) -> pd.DataFrame:
    """
    Applies the KCore filters over the specified number of rounds and returns the filtered dataset.

    Args:
        dataset (pd.DataFrame): The dataset to be filtered.

    Returns:
        (pd.DataFrame): The dataset after filtering over the specified number of rounds.
    """

    data = dataset.copy()

    filters = {c: KCore(column=c, core=k) for c, k in self._cores}
    checks = [False for _ in self._columns]
    prev_len = len(data)

    for _ in range(self._rounds) or all(checks):
        checks = []
        for c, f in filters.items():
            data = f.run(data)
            checks.append((prev_len - len(data)) == 0)
            prev_len = len(data)
    return data

UserItemNRoundsKCore

Bases: Processor

Filters a dataset based on both user and item columns with specified core values over multiple rounds.

This class applies the NRoundsKCore filter to both the user and item columns of the dataset.

Source code in datarec/processing/kcore.py
class UserItemNRoundsKCore(Processor):

    """
    Filters a dataset based on both user and item columns with specified core values over multiple rounds.

    This class applies the NRoundsKCore filter to both the user and item columns of the dataset.
    """

    def __init__(self, cores: Union[int, list], rounds: int):
        """
        Initializes the UserItemNRoundsKCore object.

        Args:
            cores (int, list): A list of core values for the user and item columns.
            rounds (int): The number of rounds to apply the filtering process.

        Raises:
            TypeError: If 'cores' is not a list or an integer.
            TypeError: If 'rounds' is not an integer.
        """

        if not isinstance(cores, (list, int)):
            raise TypeError('Cores must be a list or an integer.')

        if not isinstance(rounds, int):
            raise TypeError('Rounds must be an integer.')

        self.params = {k: v for k, v in locals().items() if k != 'self'}

        self._cores = cores
        self._rounds = rounds

    def run(self, datarec: DataRec) -> DataRec:
        """
        Applies the NRoundsKCore filter to both user and item columns over multiple rounds, and returns a new DataRec object
        containing the filtered data.

        Args:
            datarec (DataRec): The DataRec object containing the dataset to be filtered.

        Returns:
            (DataRec): A new DataRec object with the filtered data.
        """

        core_obj = NRoundsKCore(columns=[datarec.user_col, datarec.item_col],
                                cores=self._cores, rounds=self._rounds)
        result = core_obj.run(datarec.data)

        return self.output(datarec, result, {'operation': self.__class__.__name__, 'params': self.params})

__init__(cores, rounds)

Initializes the UserItemNRoundsKCore object.

Parameters:

Name Type Description Default
cores (int, list)

A list of core values for the user and item columns.

required
rounds int

The number of rounds to apply the filtering process.

required

Raises:

Type Description
TypeError

If 'cores' is not a list or an integer.

TypeError

If 'rounds' is not an integer.

Source code in datarec/processing/kcore.py
def __init__(self, cores: Union[int, list], rounds: int):
    """
    Initializes the UserItemNRoundsKCore object.

    Args:
        cores (int, list): A list of core values for the user and item columns.
        rounds (int): The number of rounds to apply the filtering process.

    Raises:
        TypeError: If 'cores' is not a list or an integer.
        TypeError: If 'rounds' is not an integer.
    """

    if not isinstance(cores, (list, int)):
        raise TypeError('Cores must be a list or an integer.')

    if not isinstance(rounds, int):
        raise TypeError('Rounds must be an integer.')

    self.params = {k: v for k, v in locals().items() if k != 'self'}

    self._cores = cores
    self._rounds = rounds

run(datarec)

Applies the NRoundsKCore filter to both user and item columns over multiple rounds, and returns a new DataRec object containing the filtered data.

Parameters:

Name Type Description Default
datarec DataRec

The DataRec object containing the dataset to be filtered.

required

Returns:

Type Description
DataRec

A new DataRec object with the filtered data.

Source code in datarec/processing/kcore.py
def run(self, datarec: DataRec) -> DataRec:
    """
    Applies the NRoundsKCore filter to both user and item columns over multiple rounds, and returns a new DataRec object
    containing the filtered data.

    Args:
        datarec (DataRec): The DataRec object containing the dataset to be filtered.

    Returns:
        (DataRec): A new DataRec object with the filtered data.
    """

    core_obj = NRoundsKCore(columns=[datarec.user_col, datarec.item_col],
                            cores=self._cores, rounds=self._rounds)
    result = core_obj.run(datarec.data)

    return self.output(datarec, result, {'operation': self.__class__.__name__, 'params': self.params})

Processor

Utility class for handling the output of preprocessing steps on DataRec objects.

This class provides functionality to build a new DataRec from transformation results while updating the processing pipeline accordingly.

Source code in datarec/processing/processor.py
class Processor:
    """
    Utility class for handling the output of preprocessing steps on `DataRec` 
    objects.

    This class provides functionality to build a new `DataRec` from 
    transformation results while updating the processing pipeline accordingly.
    """

    @staticmethod
    def output(datarec: DataRec, result: pd.DataFrame, step_info: dict) -> DataRec:
        """
        Create a new `DataRec` object from a transformation result and update 
        the processing pipeline with a new step.

        Args:
            datarec (DataRec): The original `DataRec` object from which the 
                transformation is derived.
            result (pd.DataFrame): The result of the transformation.
            step_info (dict): Metadata of the transformation.

        Returns:
            (DataRec): A new `DataRec` object wrapping the transformation result
                with an updated pipeline.
        """
        pipeline = datarec.pipeline.copy()
        pipeline.add_step(name='process', operation=step_info['operation'], params=step_info['params'])

        new_datarec = DataRec(
            RawData(result,
                    user=datarec.user_col,
                    item=datarec.item_col,
                    rating=datarec.rating_col if datarec.rating_col in result.columns else None,
                    timestamp=datarec.timestamp_col),
            derives_from=datarec,
            dataset_name=datarec.dataset_name,
            pipeline=pipeline
        )

        return new_datarec

output(datarec, result, step_info) staticmethod

Create a new DataRec object from a transformation result and update the processing pipeline with a new step.

Parameters:

Name Type Description Default
datarec DataRec

The original DataRec object from which the transformation is derived.

required
result DataFrame

The result of the transformation.

required
step_info dict

Metadata of the transformation.

required

Returns:

Type Description
DataRec

A new DataRec object wrapping the transformation result with an updated pipeline.

Source code in datarec/processing/processor.py
@staticmethod
def output(datarec: DataRec, result: pd.DataFrame, step_info: dict) -> DataRec:
    """
    Create a new `DataRec` object from a transformation result and update 
    the processing pipeline with a new step.

    Args:
        datarec (DataRec): The original `DataRec` object from which the 
            transformation is derived.
        result (pd.DataFrame): The result of the transformation.
        step_info (dict): Metadata of the transformation.

    Returns:
        (DataRec): A new `DataRec` object wrapping the transformation result
            with an updated pipeline.
    """
    pipeline = datarec.pipeline.copy()
    pipeline.add_step(name='process', operation=step_info['operation'], params=step_info['params'])

    new_datarec = DataRec(
        RawData(result,
                user=datarec.user_col,
                item=datarec.item_col,
                rating=datarec.rating_col if datarec.rating_col in result.columns else None,
                timestamp=datarec.timestamp_col),
        derives_from=datarec,
        dataset_name=datarec.dataset_name,
        pipeline=pipeline
    )

    return new_datarec

FilterByRatingThreshold

Bases: Processor

Filters the dataset by removing interactions with a rating below a given threshold.

Source code in datarec/processing/rating.py
class FilterByRatingThreshold(Processor):
    """
    Filters the dataset by removing interactions with a rating below a given threshold.
    """

    def __init__(self, rating_threshold: float):
        """
        Initializes the FilterByRatingThreshold object.

        Args:
            rating_threshold (float): The minimum rating required for an interaction to be kept.

        Raises:
            ValueError: If `rating_threshold` is a negative number.
        """
        if not isinstance(rating_threshold, (int, float)):
            raise ValueError("rating_threshold must be a number.")
        if rating_threshold < 0:
            raise ValueError("rating_threshold must be non-negative.")

        self.params = {k: v for k, v in locals().items() if k != 'self'}

        self.rating_threshold = rating_threshold

    def run(self, datarec: DataRec) -> DataRec:
        """
        Filters interactions with a rating below the threshold.

        Args:
            datarec (DataRec): The input dataset wrapped in a DataRec object.

        Returns:
            (DataRec): A new DataRec object with the processed dataset.
        """

        dataset = datarec.data
        filtered_data = dataset[dataset[datarec.rating_col] >= self.rating_threshold]

        return self.output(datarec, filtered_data, {'operation': self.__class__.__name__, 'params': self.params})

__init__(rating_threshold)

Initializes the FilterByRatingThreshold object.

Parameters:

Name Type Description Default
rating_threshold float

The minimum rating required for an interaction to be kept.

required

Raises:

Type Description
ValueError

If rating_threshold is a negative number.

Source code in datarec/processing/rating.py
def __init__(self, rating_threshold: float):
    """
    Initializes the FilterByRatingThreshold object.

    Args:
        rating_threshold (float): The minimum rating required for an interaction to be kept.

    Raises:
        ValueError: If `rating_threshold` is a negative number.
    """
    if not isinstance(rating_threshold, (int, float)):
        raise ValueError("rating_threshold must be a number.")
    if rating_threshold < 0:
        raise ValueError("rating_threshold must be non-negative.")

    self.params = {k: v for k, v in locals().items() if k != 'self'}

    self.rating_threshold = rating_threshold

run(datarec)

Filters interactions with a rating below the threshold.

Parameters:

Name Type Description Default
datarec DataRec

The input dataset wrapped in a DataRec object.

required

Returns:

Type Description
DataRec

A new DataRec object with the processed dataset.

Source code in datarec/processing/rating.py
def run(self, datarec: DataRec) -> DataRec:
    """
    Filters interactions with a rating below the threshold.

    Args:
        datarec (DataRec): The input dataset wrapped in a DataRec object.

    Returns:
        (DataRec): A new DataRec object with the processed dataset.
    """

    dataset = datarec.data
    filtered_data = dataset[dataset[datarec.rating_col] >= self.rating_threshold]

    return self.output(datarec, filtered_data, {'operation': self.__class__.__name__, 'params': self.params})

FilterByUserMeanRating

Bases: Processor

Filters the dataset by removing interactions with a rating below the user's average rating.

This filter calculates the average rating given by each user and removes interactions where the rating is below that average.

Source code in datarec/processing/rating.py
class FilterByUserMeanRating(Processor):
    """
    Filters the dataset by removing interactions with a rating below the user's average rating.

    This filter calculates the average rating given by each user and removes
    interactions where the rating is below that average.
    """

    def run(self, datarec: DataRec) -> DataRec:
        """
        Filters interactions with a rating below the user's mean rating.

        Args:
            datarec (DataRec): The input dataset wrapped in a DataRec object.

        Returns:
            (DataRec): A new DataRec object with the processed dataset.
        """

        dataset = datarec.data
        user_means = dataset.groupby(datarec.user_col)[datarec.rating_col].mean()

        filtered_data = dataset[
            dataset.apply(lambda row: row[datarec.rating_col] >= user_means[row[datarec.user_col]], axis=1)
        ]

        return self.output(datarec, filtered_data, {'operation': self.__class__.__name__, 'params': ''})

run(datarec)

Filters interactions with a rating below the user's mean rating.

Parameters:

Name Type Description Default
datarec DataRec

The input dataset wrapped in a DataRec object.

required

Returns:

Type Description
DataRec

A new DataRec object with the processed dataset.

Source code in datarec/processing/rating.py
def run(self, datarec: DataRec) -> DataRec:
    """
    Filters interactions with a rating below the user's mean rating.

    Args:
        datarec (DataRec): The input dataset wrapped in a DataRec object.

    Returns:
        (DataRec): A new DataRec object with the processed dataset.
    """

    dataset = datarec.data
    user_means = dataset.groupby(datarec.user_col)[datarec.rating_col].mean()

    filtered_data = dataset[
        dataset.apply(lambda row: row[datarec.rating_col] >= user_means[row[datarec.user_col]], axis=1)
    ]

    return self.output(datarec, filtered_data, {'operation': self.__class__.__name__, 'params': ''})

FilterOutDuplicatedInteractions

Bases: Processor

Filters a dataset by removing duplicated (user, item) interactions based on a specified strategy.

Source code in datarec/processing/rating.py
class FilterOutDuplicatedInteractions(Processor):
    """
    Filters a dataset by removing duplicated (user, item) interactions based on a specified strategy.
    """

    STRATEGIES = ['first', 'last', 'earliest', 'latest', 'random']

    def __init__(self, keep='first', random_seed=42):
        """
        Initializes the FilterOutDuplicatedInteractions object.

        Args:
            keep (str): Strategy to determine which interaction to keep when duplicates are found.
                Must be one of ['first', 'last', 'earliest', 'latest', 'random'].
            random_seed (int): Random seed used for reproducibility when using the 'random' strategy.

        Raises:
            ValueError: If the provided strategy (`keep`) is not among the supported options.
        """

        if keep not in self.STRATEGIES:
            raise ValueError(f"Invalid strategy '{keep}'. Choose from {self.STRATEGIES}.")

        self.params = {k: v for k, v in locals().items() if k != 'self'}

        self.keep = keep
        self.random_seed = random_seed

    def run(self, datarec: DataRec, verbose=True) -> DataRec:
        """
        Filter out duplicated (user, item) interactions in the dataset using the specified strategy.

        Args:
            datarec (DataRec): An object containing the dataset and metadata (user, item, timestamp columns, etc.)
            verbose (bool): Whether to print logging information during execution.

        Returns:
            (DataRec): A new DataRec object with duplicated (user, item) interactions removed according to the selected strategy.

        Raises:
            ValueError: If Date colum is not provided for 'earliest' and 'latest' strategies.
            ValueError: If the provided strategy (`keep`) is not among the supported options.
        """

        if verbose:
            print(f'Running filter-out duplicated interactions with strategy {self.keep}')
            print(f'Filtering DataRec: {datarec.dataset_name}')

        dataset = datarec.data
        subset = [datarec.user_col, datarec.item_col]

        # Random strategy
        if self.keep == 'random':
            dataset = dataset.sample(frac=1, random_state=self.random_seed).drop_duplicates(subset=subset, keep='first')

        # Ordering-based strategies
        elif self.keep in ['first', 'last']:
            dataset = dataset.drop_duplicates(subset=subset, keep=self.keep)

        # Temporal strategies
        elif self.keep in ['earliest', 'latest']:
            if datarec.timestamp_col is None:
                raise ValueError(f"Date column is required for '{self.keep}' strategy.")
            dataset = dataset.sort_values(by=datarec.timestamp_col, ascending=True)
            if self.keep == 'earliest':
                dataset = dataset.drop_duplicates(subset=subset, keep='first')
            else:
                dataset = dataset.drop_duplicates(subset=subset, keep='last')
        else:
            raise ValueError(f"Invalid strategy '{self.keep}'. Choose from {self.STRATEGIES}.")

        dataset = dataset.sort_values(by=[datarec.user_col, datarec.item_col], ascending=True)

        return self.output(datarec, dataset, {'operation': self.__class__.__name__, 'params': self.params})

__init__(keep='first', random_seed=42)

Initializes the FilterOutDuplicatedInteractions object.

Parameters:

Name Type Description Default
keep str

Strategy to determine which interaction to keep when duplicates are found. Must be one of ['first', 'last', 'earliest', 'latest', 'random'].

'first'
random_seed int

Random seed used for reproducibility when using the 'random' strategy.

42

Raises:

Type Description
ValueError

If the provided strategy (keep) is not among the supported options.

Source code in datarec/processing/rating.py
def __init__(self, keep='first', random_seed=42):
    """
    Initializes the FilterOutDuplicatedInteractions object.

    Args:
        keep (str): Strategy to determine which interaction to keep when duplicates are found.
            Must be one of ['first', 'last', 'earliest', 'latest', 'random'].
        random_seed (int): Random seed used for reproducibility when using the 'random' strategy.

    Raises:
        ValueError: If the provided strategy (`keep`) is not among the supported options.
    """

    if keep not in self.STRATEGIES:
        raise ValueError(f"Invalid strategy '{keep}'. Choose from {self.STRATEGIES}.")

    self.params = {k: v for k, v in locals().items() if k != 'self'}

    self.keep = keep
    self.random_seed = random_seed

run(datarec, verbose=True)

Filter out duplicated (user, item) interactions in the dataset using the specified strategy.

Parameters:

Name Type Description Default
datarec DataRec

An object containing the dataset and metadata (user, item, timestamp columns, etc.)

required
verbose bool

Whether to print logging information during execution.

True

Returns:

Type Description
DataRec

A new DataRec object with duplicated (user, item) interactions removed according to the selected strategy.

Raises:

Type Description
ValueError

If Date colum is not provided for 'earliest' and 'latest' strategies.

ValueError

If the provided strategy (keep) is not among the supported options.

Source code in datarec/processing/rating.py
def run(self, datarec: DataRec, verbose=True) -> DataRec:
    """
    Filter out duplicated (user, item) interactions in the dataset using the specified strategy.

    Args:
        datarec (DataRec): An object containing the dataset and metadata (user, item, timestamp columns, etc.)
        verbose (bool): Whether to print logging information during execution.

    Returns:
        (DataRec): A new DataRec object with duplicated (user, item) interactions removed according to the selected strategy.

    Raises:
        ValueError: If Date colum is not provided for 'earliest' and 'latest' strategies.
        ValueError: If the provided strategy (`keep`) is not among the supported options.
    """

    if verbose:
        print(f'Running filter-out duplicated interactions with strategy {self.keep}')
        print(f'Filtering DataRec: {datarec.dataset_name}')

    dataset = datarec.data
    subset = [datarec.user_col, datarec.item_col]

    # Random strategy
    if self.keep == 'random':
        dataset = dataset.sample(frac=1, random_state=self.random_seed).drop_duplicates(subset=subset, keep='first')

    # Ordering-based strategies
    elif self.keep in ['first', 'last']:
        dataset = dataset.drop_duplicates(subset=subset, keep=self.keep)

    # Temporal strategies
    elif self.keep in ['earliest', 'latest']:
        if datarec.timestamp_col is None:
            raise ValueError(f"Date column is required for '{self.keep}' strategy.")
        dataset = dataset.sort_values(by=datarec.timestamp_col, ascending=True)
        if self.keep == 'earliest':
            dataset = dataset.drop_duplicates(subset=subset, keep='first')
        else:
            dataset = dataset.drop_duplicates(subset=subset, keep='last')
    else:
        raise ValueError(f"Invalid strategy '{self.keep}'. Choose from {self.STRATEGIES}.")

    dataset = dataset.sort_values(by=[datarec.user_col, datarec.item_col], ascending=True)

    return self.output(datarec, dataset, {'operation': self.__class__.__name__, 'params': self.params})

FilterByTime

Bases: Processor

Filters the dataset based on a time threshold and specified drop condition.

This class allows filtering a dataset by a time threshold, either dropping records before or after the specified time.

Source code in datarec/processing/temporal.py
class FilterByTime(Processor):
    """
    Filters the dataset based on a time threshold and specified drop condition.

    This class allows filtering a dataset by a time threshold, either dropping
    records before or after the specified time.
    """

    def __init__(self, time_threshold: float = 0, drop: str = 'after'):
        """  
        Initializes the FilterByTime object.

        Args:
            time_threshold (float): The time threshold used for filtering. The dataset
                                    will be filtered based on this value.
            drop (str, optional): Specifies whether to drop records 'before' or 'after' the time threshold.

        Raises:
            ValueError: If `time_threshold` is negative or not a float, or if drop is
                        neither 'after' nor 'before'.
        """
        if not isinstance(time_threshold, (int, float)):
            raise ValueError('time_threshold must be positive number.')
        if isinstance(time_threshold, float) and time_threshold < 0:
            raise ValueError('time_threshold must be positive number.')

        if drop not in ['after', 'before']:
            raise ValueError(f'Drop must be "after" or "before".')

        self.params = {k: v for k, v in locals().items() if k != 'self'}
        self.time_threshold = time_threshold
        self.drop = drop

    def run(self, datarec: DataRec) -> DataRec:
        """
        Filters the dataset of the given DataRec based on the specified time threshold
        and drop condition, returning a new DataRec object with the filtered data.

        Args:
            datarec (DataRec): The input dataset wrapped in a DataRec object.

        Returns:
            (DataRec): A new DataRec object with the processed dataset.

        Raises:
            TypeError: If the DataRec does not contain temporal information.
        """

        if datarec.timestamp_col is None:
            raise TypeError('This DataRec does not contain temporal information')

        dataset = datarec.data

        if self.drop == 'before':
            data = dataset[dataset[datarec.timestamp_col] < self.time_threshold]
        else:
            data = dataset[dataset[datarec.timestamp_col] >= self.time_threshold]

        return self.output(datarec, data, {'operation': self.__class__.__name__, 'params': self.params})

__init__(time_threshold=0, drop='after')

Initializes the FilterByTime object.

Parameters:

Name Type Description Default
time_threshold float

The time threshold used for filtering. The dataset will be filtered based on this value.

0
drop str

Specifies whether to drop records 'before' or 'after' the time threshold.

'after'

Raises:

Type Description
ValueError

If time_threshold is negative or not a float, or if drop is neither 'after' nor 'before'.

Source code in datarec/processing/temporal.py
def __init__(self, time_threshold: float = 0, drop: str = 'after'):
    """  
    Initializes the FilterByTime object.

    Args:
        time_threshold (float): The time threshold used for filtering. The dataset
                                will be filtered based on this value.
        drop (str, optional): Specifies whether to drop records 'before' or 'after' the time threshold.

    Raises:
        ValueError: If `time_threshold` is negative or not a float, or if drop is
                    neither 'after' nor 'before'.
    """
    if not isinstance(time_threshold, (int, float)):
        raise ValueError('time_threshold must be positive number.')
    if isinstance(time_threshold, float) and time_threshold < 0:
        raise ValueError('time_threshold must be positive number.')

    if drop not in ['after', 'before']:
        raise ValueError(f'Drop must be "after" or "before".')

    self.params = {k: v for k, v in locals().items() if k != 'self'}
    self.time_threshold = time_threshold
    self.drop = drop

run(datarec)

Filters the dataset of the given DataRec based on the specified time threshold and drop condition, returning a new DataRec object with the filtered data.

Parameters:

Name Type Description Default
datarec DataRec

The input dataset wrapped in a DataRec object.

required

Returns:

Type Description
DataRec

A new DataRec object with the processed dataset.

Raises:

Type Description
TypeError

If the DataRec does not contain temporal information.

Source code in datarec/processing/temporal.py
def run(self, datarec: DataRec) -> DataRec:
    """
    Filters the dataset of the given DataRec based on the specified time threshold
    and drop condition, returning a new DataRec object with the filtered data.

    Args:
        datarec (DataRec): The input dataset wrapped in a DataRec object.

    Returns:
        (DataRec): A new DataRec object with the processed dataset.

    Raises:
        TypeError: If the DataRec does not contain temporal information.
    """

    if datarec.timestamp_col is None:
        raise TypeError('This DataRec does not contain temporal information')

    dataset = datarec.data

    if self.drop == 'before':
        data = dataset[dataset[datarec.timestamp_col] < self.time_threshold]
    else:
        data = dataset[dataset[datarec.timestamp_col] >= self.time_threshold]

    return self.output(datarec, data, {'operation': self.__class__.__name__, 'params': self.params})