
  1import json
  2import logging
  3import os
  5import string
  7import fmeval.util as util
  9from import Dataset
 10from collections import OrderedDict
 11from dataclasses import dataclass
 12from typing import Any, Dict, List, Optional, Tuple, Union
 13from fmeval.constants import (
 14    DatasetColumns,
 16    MEAN,
 20from fmeval.data_loaders.data_config import DataConfig
 21from fmeval.eval_algorithms import (
 22    EvalScore,
 23    CategoryScore,
 26    EvalOutput,
 27    get_default_prompt_template,
 29from fmeval.exceptions import EvalAlgorithmInternalError
 30from fmeval.model_runners.composers.composers import PromptComposer
 31from fmeval.model_runners.model_runner import ModelRunner
 32from fmeval.perf_util import timed_block
 33from fmeval.transforms.common import GeneratePrompt, GetModelOutputs
 34from fmeval.transforms.transform_pipeline import TransformPipeline
 35from fmeval.util import get_num_actors
 37# punctuation and articles for the normalize function
 38ENGLISH_ARTICLES = ["a", "an", "the"]
 39ENGLISH_PUNCTUATIONS = string.punctuation
 41logger = logging.getLogger(__name__)
 44def get_dataset_configs(data_config: Optional[Union[DataConfig, List[DataConfig]]], eval_name: str) -> List[DataConfig]:
 45    if not data_config:
 46        return [DATASET_CONFIGS[dataset_name] for dataset_name in EVAL_DATASETS[eval_name]]
 47    elif isinstance(data_config, list):
 48        return data_config
 49    elif isinstance(data_config, tuple):
 50        return [cfg for cfg in data_config]
 51    else:
 52        return [data_config]
 55def generate_model_predict_response_for_dataset(
 56    model: ModelRunner,
 57    data: Dataset,
 58    model_input_column_name: str,
 59    model_output_column_name: Optional[str] = None,
 60    model_log_probability_column_name: Optional[str] = None,
 61) -> Dataset:
 62    """
 63    Runs the model on the given data. Output will be written to the
 64    `model_output_column_name` column, and log_probability will be
 65    written to the `model_log_probability_column_name` column.
 67    :param model: ModelRunner to get predictions from.
 68    :param data: The dataset containing model inputs to feed to `model`.
 69    :param model_input_column_name: The name of the column containing the model input.
 70    :param model_output_column_name: The name of the column to write the model output to.
 71    :param model_log_probability_column_name: The name of the column to write the model log probability to.
 72    :return: The dataset with a model output column and model log probability column added.
 73        Note that both columns are optional, i.e. it is possible that a model output
 74        column is added, but a log probability column is not added (and vice versa).
 75    """
 76    with timed_block(f"Performing inference on dataset on {model}", logger):
 78        class ModelRunnerWrapper:  # pragma: no cover
 79            """
 80            This class represents the Ray Actor that gets model predictions
 81            by feeding model inputs from the dataset to the model runner.
 83            We use Ray Actors instead of Tasks because the Actor approach minimizes
 84            the number of times that the ModelRunner `model` gets deserialized.
 85            With Tasks, Ray will serialize and deserialize `model` for every single
 86            prediction. With Actors, `model` gets deserialized once per Actor when
 87            the Actor gets initialized.
 88            """
 90            def __init__(self):
 91                self.model_runner = model
 92                logger.setLevel(logging.DEBUG)
 94            def __call__(self, row: Dict[str, Any]) -> Dict[str, Any]:
 95                predict_output = self.model_runner.predict(row[model_input_column_name])
 96                if model_output_column_name:
 97                    row[model_output_column_name] = predict_output[0]
 98                if model_log_probability_column_name:
 99                    row[model_log_probability_column_name] = predict_output[1]
100                return row
102        data =
103            ModelRunnerWrapper,  # type: ignore[arg-type]
104        ).materialize()
105    return data
108def generate_prompt_column_for_dataset(
109    prompt_template: str, data: Dataset, model_input_column_name: str, prompt_column_name: str
110) -> Dataset:
111    """
112    Generates prompts column for a given input dataset and prompt_template
113    :param prompt_template: Prompt template
114    :param data: the dataset where each instance is a row in the dataset.
115    :param model_input_column_name: the name of the column containing the model input.
116    :param prompt_column_name: Output column name to which composed prompts are added
117    :return: the dataset with the composed prompts added.
118    """
119    with timed_block(f"Generating prompt column", logger):
120        prompt_composer = PromptComposer(prompt_template)
122        def _generate_prompt_column(row: Dict[str, Any]) -> Dict[str, Any]:  # pragma: no cover
123            """
124            Map function for generating the prompt column value given a dataset row.
125            """
126            row[prompt_column_name] = prompt_composer.compose(row[model_input_column_name])
127            return row
129        data =
130    return data
133def validate_dataset(dataset: Dataset, column_names: List[str]):
134    """
135    Util function to validate that dataset contains the required column names.
137    :param dataset: Input ray dataset
138    :param column_names: names of the columns that must be present in the dataset
139    :raises: EvalAlgorithmClientError for an invalid dataset
140    """
141    for column_name in column_names:
142        util.require(
143            column_name in dataset.columns(),
144            f"Missing required column: {column_name}, for evaluate() method",
145        )
148def validate_prompt_template(prompt_template: str, placeholders: List[str]):
149    """
150    Util function to validate that prompt_template contains the keywords.
152    :param prompt_template: A template used to compose prompts. Ex: '{"Question":$question, "Answer": $answer}'
153    :param placeholders: Placeholder keyword list. This keyword appears
154            in `prompt_template` with a $ sign prepended. In the above example,
155            the placeholders are ["question", "answer"].
156    :raises: EvalAlgorithmClientError for an invalid prompt_template
157    """
158    for placeholder in placeholders:
159        util.require(
160            f"${placeholder}" in prompt_template,
161            f"Unable to find placeholder ${placeholder} in prompt_template.",
162        )
165def aggregate_evaluation_scores(
166    dataset: Dataset, score_column_names: List[str], agg_method: str
167) -> Tuple[List[EvalScore], Optional[List[CategoryScore]]]:
168    """
169    The method aggregates scores at the dataset level and optionally at the category level if
170     categories are available in the dataset.
172    :param dataset: ray dataset with eval scores
173    :param score_column_names: a list of column names which contain the scores to aggregate
174    :param agg_method: the name of the aggregation to perform
175    :return: a tuple containing 1) dataset-level scores and
176                                2) a list of category-level scores if categories are available, `None` otherwise
177    """
178    dataset_scores = [
179        EvalScore(name=score_column_name, value=dataset_aggregation(dataset, score_column_name, agg_method))
180        for score_column_name in score_column_names
181    ]
182    category_scores: Optional[Dict[str, CategoryScore]] = None
183    if in dataset.columns():
184        category_scores = {
185            name: CategoryScore(name=name, scores=[]) for name in dataset.unique(
186        }
187        for score_column_name in score_column_names:
188            category_aggregate: Dataset = category_wise_aggregation(dataset, score_column_name, agg_method)
189            for row in category_aggregate.iter_rows():
190                category_scores[row[]].scores.append(
191                    EvalScore(name=score_column_name, value=row[f"mean({score_column_name})"])
192                )
194    return dataset_scores, list(category_scores.values()) if category_scores else None
197def dataset_aggregation(dataset: Dataset, score_column_name: str, agg_method: str) -> float:
198    if agg_method == MEAN:
199        aggregate = dataset.mean(on=score_column_name, ignore_nulls=True)
200        assert isinstance(aggregate, float)
201        return aggregate
202    else:
203        raise EvalAlgorithmInternalError(f"Aggregation method {agg_method} is not supported")
206def category_wise_aggregation(dataset: Dataset, score_column_name: str, agg_method: str) -> Dataset:
207    category_aggregate: Dataset = dataset.groupby(  # type: ignore
208    if agg_method == MEAN:
209        category_aggregate = category_aggregate.mean(on=score_column_name, ignore_nulls=True)
210    else:
211        raise EvalAlgorithmInternalError(f"Aggregation method {agg_method} is not supported")
212    return category_aggregate
215# Moved function to because it's being used by both factual knowledge and qa accuracy
216def normalize_text_quac_protocol(text: str) -> str:
217    """
218    Inspired by HELM:
219    Given a text, normalize it using the SQUAD / QUAC protocol. That is remove punctuations, excess spaces and articles, and return the lowercased tokens.
220    SQUAD ( and
221    QuAC benchmarks ( use this protocol to normalize text before evaluating it.
222    HELM (
223    and HuggingFace evaluate (
224    also use this to normalization procedure.
226    :param text: The text that needs to be normalized.
227    :returns: The normalized text.
228    """
230    text = text.lower()
231    text = "".join(character for character in text if character not in ENGLISH_PUNCTUATIONS)
232    return " ".join([word for word in text.split(" ") if (word != "" and word not in ENGLISH_ARTICLES)])
236class EvalOutputRecord:
237    """
238    This class represents a single record that gets written by the `save_dataset` method.
239    In other words, it represents a single row from the Ray Dataset that is being saved.
241    :param scores: A list of EvalScores, where each EvalScore corresponds
242        to one of the score columns in the Ray Dataset being saved.
243    :param dataset_columns: Maps a column name to its contents in the current row
244        (recall that an EvalOutputRecord corresponds to a single Ray Dataset row).
246        Note: the keys in `dataset_columns` must belong to constants.COLUMN_NAMES,
247        because constants.COLUMN_NAMES defines which (non-score) columns are allowed
248        to appear in the saved output, i.e. it defines the schema for an output record.
249    """
251    scores: List[EvalScore]
252    dataset_columns: Dict[str, Union[str, float, int]]
254    def __post_init__(self):
255        for col in self.dataset_columns:
256            util.assert_condition(
257                col in DATASET_COLUMNS,
258                f"Attempting to initialize an EvalOutputRecord with invalid non-score column {col}.",
259            )
261    def __str__(self):
262        return json.dumps(self.to_dict())
264    def to_dict(self) -> OrderedDict[str, Union[str, float, int, List]]:
265        """
266        Returns a dictionary representation of this instance,
267        to be used when writing this object to JSON Lines.
269        Note that we use an OrderedDict to maintain consistency
270        in the ordering of columns. The score columns always come
271        at the end, and the non-score columns are ordered according
272        to constants.COLUMN_NAMES.
273        """
274        json_obj = OrderedDict(
275            (col_name, self.dataset_columns[col_name])
276            for col_name in DATASET_COLUMNS
277            if col_name in self.dataset_columns
278        )
279        json_obj["scores"] = [
280            # filter out None "value" and None "error"
281            {k: v for k, v in eval_score.__dict__.items() if v is not None}
282            for eval_score in self.scores
283        ]
284        return json_obj
286    @staticmethod
287    def from_row(row: Dict[str, Union[str, float, int]], score_names: List[str]) -> "EvalOutputRecord":
288        """
289        Returns an instance of EvalOutputRecord, created from a Ray Dataset row (represented as a dict).
291        Example input:
292            row = {
293                "model_input": "input",
294                "model_output": "output",
295                "column_that_wont_be_included": "hello",
296                "rouge": 0.42,
297                "bert": 0.162
298            }
300        Corresponding output:
301            EvalOutputRecord(
302                scores=[
303                    EvalScore(name="rouge", value=0.42),
304                    EvalScore(name="bert", value=0.162)
305                ],
306                dataset_columns={
307                    "model_input": "input",
308                    "model_output": "output"
309                }
310            )
312        Note how "column_that_wont_be_included" is not included in the produced EvalOutputRecord.
313        This is because only columns in constants.COLUMN_NAMES are considered to be valid columns
314        in the saved output file generated by `save_dataset`. The reason why it's even possible
315        for a column name that doesn't belong to constants.COLUMN_NAMES to appear in `row` is that
316        the Ray Dataset that `row` belongs to can contain columns used to store intermediate computations.
317        For example, ClassificationAccuracy generates a column named CLASSIFIED_MODEL_OUTPUT_COLUMN_NAME
318        that is used to compute CLASSIFICATION_ACCURACY_SCORE, which is one of the score columns.
320        :param row: a Ray Dataset row represented as a dict
321        :param score_names: column names included in the Ray Dataset that `row`
322            is a sample of that correspond to evaluation algorithm scores
323        :returns: an instance of EvalOutputRecord corresponding to `row`
324        """
325        dataset_columns = {}
326        scores = []
327        for column_name, value in row.items():
328            if column_name not in score_names:  # pragma: no branch
329                if column_name in DATASET_COLUMNS:  # pragma: no branch
330                    dataset_columns[column_name] = value
331            else:
332                assert isinstance(value, float) or isinstance(value, int) or value is None  # to satisfy Mypy
333                if value is None:
334                    assert row.get(, None)
335                    scores.append(EvalScore(name=column_name, error=row.get(
336                else:
337                    scores.append(EvalScore(name=column_name, value=value))
339        return EvalOutputRecord(
340            scores=scores,
341            dataset_columns=dataset_columns,
342        )
345def generate_output_dataset_path(path_to_parent_dir: str, eval_name: str, dataset_name) -> str:
346    """
347    Returns the path to be used by an EvalAlgorithm when calling `save_dataset`.
349    :param path_to_parent_dir: The path to the parent directory of the file to be saved.
350    :param eval_name: The evaluation name provided by the EvalAlgorithm.
351    :param dataset_name: The name of the dataset.
352    :returns: A path that is unique to an evaluation/dataset pair for a given job.
353    """
354    return os.path.join(path_to_parent_dir, f"{eval_name}_{dataset_name}.jsonl")
357def generate_mean_delta_score(original_score: EvalScore, perturbed_input_scores: List[EvalScore]) -> float:
358    """
359    Util method to generate mean of difference between original and perturbed input scores
360    :param original_score: Original score
361    :param perturbed_input_scores: List of scores for model inference outputs on perturbed inputs
362    :returns: mean of delta between the scores
363    """
364    return sum([abs(original_score.value - reference_score.value) for reference_score in perturbed_input_scores]) / len(
365        perturbed_input_scores
366    )
369def verify_model_determinism(
370    model: ModelRunner,
371    dataset: Dataset,
372    prompt_template: str,
373    model_input_column_name: str =,
374) -> bool:
375    """Heuristic for whether model is deterministic.
377    This function invokes the provided model twice on each of the first
378    NUM_ROWS_DETERMINISTIC rows in the dataset. If the two model outputs
379    for each input are the same for all rows, the model is considered deterministic.
381    :param model: A ModelRunner instance representing the model under investigation.
382    :param dataset: A Ray Dataset that includes a model input column.
383    :param prompt_template: The template used to compose the prompt from the model input.
384    :param model_input_column_name: Model input column name.
385    :returns: Whether the model is deterministic.
386    """
387    prompt_composer = PromptComposer(prompt_template)
388    for row in dataset.limit(NUM_ROWS_DETERMINISTIC).iter_rows():
389        prompt = prompt_composer.compose(row[model_input_column_name])
390        model_output = model.predict(prompt)[0]
391        if model.predict(prompt)[0] != model_output:
392            return False
393    return True
396def create_model_invocation_pipeline(model: ModelRunner, prompt_template: str) -> TransformPipeline:
397    """Create a transform pipeline for performing the standard action of invoking a model on a prompt.
399    :param model: The model to be invoked.
400    :param prompt_template: The template used for constructing prompts (out of raw inputs)
401        that will be fed to the model.
402    :returns: A TransformPipeline instance containing a GeneratePrompt transform that uses `prompt_template`
403        and a GetModelOutputs transform for invoking the model on the generated prompts.
404    """
405    gen_prompt = GeneratePrompt(
406        input_keys=[],
407        output_keys=[],
408        prompt_template=prompt_template,
409    )
410    get_model_outputs = GetModelOutputs(
411        input_to_output_keys={ []},
412        model_runner=model,
413    )
414    return TransformPipeline([gen_prompt, get_model_outputs])
