Source code for ml_logger.parser.metric

"""Implementation of Parser to parse metrics from logs."""

from typing import Callable, Dict, List, Optional

import pandas as pd

from ml_logger.parser import log as log_parser
from ml_logger.types import LogType, MetricType, ParseLineFunctionType


[docs]def parse_json_and_match_value(line: str) -> Optional[LogType]: """Parse a line as JSON log and check if it a valid metric log.""" return log_parser.parse_json_and_match_value(line=line, value="metric")
[docs]def group_metrics(metrics: List[MetricType]) -> Dict[str, List[MetricType]]: """Group a list of metrics. Group a list of metrics into a dictionary of (key, list of grouped metrics) Args: metrics (List[MetricType]): List of metrics to group Returns: Dict[str, List[MetricType]]: Dictionary of (key, list of grouped metrics) """ return {"all": metrics}
[docs]def aggregate_metrics(metrics: List[MetricType]) -> List[MetricType]: """Aggregate a list of metrics. Args: metrics (List[MetricType]): List of metrics to aggregate Returns: List[MetricType]: List of aggregated metrics """ return metrics
[docs]class Parser(log_parser.Parser): """Class to parse the metrics from the logs.""" def __init__(self, parse_line: ParseLineFunctionType = parse_json_and_match_value): """Class to parse the metrics from the logs. Args: parse_line (ParseLineFunctionType): Function to parse a line in the log file. The function should return None if the line is not a valid log statement (eg error messages). Defaults to parse_json_and_match_value. """ super().__init__(parse_line) self.log_type = "metric"
[docs] def parse_as_df( self, filepath_pattern: str, group_metrics: Callable[ [List[LogType]], Dict[str, List[LogType]] ] = group_metrics, aggregate_metrics: Callable[[List[LogType]], List[LogType]] = aggregate_metrics, ) -> Dict[str, pd.DataFrame]: """Create a dict of (metric_name, dataframe). Method that: (i) reads metrics from the filesystem (ii) groups metrics (iii) aggregates all the metrics within a group, (iv) converts the aggregate metrics into dataframes and returns a \ dictionary of dataframes Args: filepath_pattern (str): filepath pattern to glob group_metrics (Callable[[List[LogType]], Dict[str, List[LogType]]], optional): Function to group a list of metrics into a dictionary of (key, list of grouped metrics). Defaults to group_metrics. aggregate_metrics (Callable[[List[LogType]], List[LogType]], optional): Function to aggregate a list of metrics. Defaults to aggregate_metrics. """ metric_logs = list(self.parse(filepath_pattern)) return metrics_to_df( metric_logs=metric_logs, group_metrics=group_metrics, aggregate_metrics=aggregate_metrics, )
[docs]def metrics_to_df( metric_logs: List[LogType], group_metrics: Callable[[List[LogType]], Dict[str, List[LogType]]] = group_metrics, aggregate_metrics: Callable[[List[LogType]], List[LogType]] = aggregate_metrics, ) -> Dict[str, pd.DataFrame]: """Create a dict of (metric_name, dataframe). Method that: (i) groups metrics (ii) aggregates all the metrics within a group, (iii) converts the aggregate metrics into dataframes and returns a \ dictionary of dataframes Args: metric_logs (List[LogType]): List of metrics group_metrics (Callable[[List[LogType]], Dict[str, List[LogType]]], optional): Function to group a list of metrics into a dictionary of (key, list of grouped metrics). Defaults to group_metrics. aggregate_metrics (Callable[[List[LogType]], List[LogType]], optional): Function to aggregate a list of metrics. Defaults to aggregate_metrics. Returns: Dict[str, pd.DataFrame]: [description] """ grouped_metrics: Dict[str, List[LogType]] = group_metrics(metric_logs) aggregated_metrics = { key: aggregate_metrics(metrics) for key, metrics in grouped_metrics.items() } metric_dfs = { key: pd.json_normalize(data=metrics) for key, metrics in aggregated_metrics.items() } return metric_dfs