fmeval.eval_algorithms.save_strategy

  1import json
  2import logging
  3import os
  4from abc import ABC, abstractmethod
  5from typing import List, Dict, Optional
  6
  7import boto3
  8from sagemaker.s3_utils import parse_s3_url
  9
 10from fmeval import util
 11from fmeval.constants import PARTS, UPLOAD_ID, PART_NUMBER, E_TAG
 12from fmeval.eval_algorithms.util import EvalOutputRecord
 13
 14logger = logging.getLogger(__name__)
 15
 16
 17class SaveStrategy(ABC):
 18    """Interface that defines how to save the eval outputs.
 19
 20    The save function of this interface may be called multiple times based on the size of the dataset. This is due to
 21    the distributed nature of the computations. If the dataset is large, and all of the data is pulled to the head node,
 22    it might lead to OOM errors. In order to avoid that, the data is pulled in batches, and `save` function is called on
 23    each batch at a time. In order to allow this mechanism, while allowing more flexbility in the way outputs are saved,
 24    this class works as a ContextManager.
 25    """
 26
 27    def __enter__(self) -> "SaveStrategy":
 28        """Sets up the strategy to start saving the evaluation outputs."""
 29        self.start()
 30        return self
 31
 32    @abstractmethod
 33    def start(self):
 34        """Sets up the strategy to write the evaluation output records."""
 35
 36    @abstractmethod
 37    def save(self, records: List[EvalOutputRecord]):
 38        """Saves the given list of EvalOutputRecord based on the strategy.
 39
 40        Each invocation of this function would be for one block of the data. There could be multiple invocations of save
 41        per invocation of the `evaluate` function (especially if the data is large).
 42
 43        :param records: list of EvalOutputRecords to be saved
 44        """
 45
 46    @abstractmethod
 47    def clean_up(self):
 48        """
 49        Clean up any leftover resources after saving the outputs.
 50        """
 51
 52    def __exit__(self, exc_type, exc_val, exc_tb):
 53        self.clean_up()
 54        return False
 55
 56
 57class FileSaveStrategy(SaveStrategy):
 58    """Strategy to write evaluation outputs to local file system.
 59
 60    The strategy appends multiple invocations of save to the same file. If the file already exists, it will be
 61    overwritten.
 62    """
 63
 64    def __init__(self, file_path: str):
 65        self._file_path = file_path
 66        self._call_count = 0
 67        if os.path.exists(self._file_path):
 68            logger.warning(f"File {self._file_path} exists. Overwriting existing file")
 69
 70    def start(self):
 71        """Sets up the strategy to write the evaluation output records"""
 72        with open(self._file_path, mode="w") as file:
 73            file.write("")
 74
 75    def save(self, records: List[EvalOutputRecord]):
 76        """Append the list of evalution output records to the file at provided file path
 77
 78        :param records: list of EvalOutputRecords to be saved
 79        """
 80        with open(self._file_path, mode="a") as file:
 81            if self._call_count > 0:
 82                file.write("\n")
 83            file.writelines("\n".join([json.dumps(record.to_dict()) for record in records]))
 84            self._call_count += 1
 85
 86    def clean_up(self):
 87        self._call_count = 0
 88
 89
 90class S3SaveStrategy(SaveStrategy):
 91    """Strategy to write evaluation outputs to AWS S3.
 92
 93    The strategy appends multiple invocations of save to the same file. If the file already exists, it will be
 94    overwritten.
 95    """
 96
 97    def __init__(self, s3_uri: str, s3_boto_client, Optional=None, kms_key_id: Optional[str] = None):
 98        """Creates an instance of S3SaveStrategy
 99
100        :param s3_uri: The S3 uri where the outputs should be written to
101        :param s3_boto_client: The boto3 client for S3. If not provided, the class will try to use the default S3 client
102        :param kms_key_id: If provided, this KMS Key will be used for server side encryption
103        """
104        self._bucket, self._key_prefix = parse_s3_url(url=s3_uri)
105        self._s3_client = s3_boto_client if s3_boto_client else boto3.client("s3")
106        self._multi_part_upload = None
107        self._part_info: Optional[Dict] = None
108        self._kms_key_id = kms_key_id
109
110    def start(self):
111        """Sets up the strategy to write the evaluation output records to S3 using multi-part uploads."""
112        self._multi_part_upload = self._s3_client.create_multipart_upload(
113            Bucket=self._bucket, Key=self._key_prefix, SSEKMSKeyId=self._kms_key_id
114        )
115        self._part_info = {PARTS: []}
116        return self
117
118    def save(self, records: List[EvalOutputRecord]):
119        """Creates and uploads a part using the list of evaluation output records so that they can be completed during
120        the clean up stage.
121
122        :param records: list of EvalOutputRecords to be saved
123        """
124        util.require(
125            self._part_info and self._multi_part_upload, "S3SaveStrategy is meant to be used as a context manager"
126        )
127        assert self._part_info and self._multi_part_upload  # to satisfy mypy
128        part_number = len(self._part_info[PARTS]) + 1
129        part = self._s3_client.upload_part(
130            Bucket=self._bucket,
131            Key=self._key_prefix,
132            PartNumber=part_number,
133            UploadId=self._multi_part_upload[UPLOAD_ID],
134            Body="\n".join([str(record) for record in records]),
135        )
136        self._part_info[PARTS].append({PART_NUMBER: part_number, E_TAG: part[E_TAG]})
137
138    def clean_up(self):
139        """Completes the multi-part upload to S3, which then collects and combines the parts together into one object"""
140        self._s3_client.complete_multipart_upload(
141            Bucket=self._bucket,
142            Key=self._key_prefix,
143            UploadId=self._multi_part_upload[UPLOAD_ID],
144            MultipartUpload=self._part_info,
145        )
146        self._multi_part_upload = None
147        self._part_info = {PARTS: []}
logger = <Logger fmeval.eval_algorithms.save_strategy (WARNING)>
class SaveStrategy(abc.ABC):
18class SaveStrategy(ABC):
19    """Interface that defines how to save the eval outputs.
20
21    The save function of this interface may be called multiple times based on the size of the dataset. This is due to
22    the distributed nature of the computations. If the dataset is large, and all of the data is pulled to the head node,
23    it might lead to OOM errors. In order to avoid that, the data is pulled in batches, and `save` function is called on
24    each batch at a time. In order to allow this mechanism, while allowing more flexbility in the way outputs are saved,
25    this class works as a ContextManager.
26    """
27
28    def __enter__(self) -> "SaveStrategy":
29        """Sets up the strategy to start saving the evaluation outputs."""
30        self.start()
31        return self
32
33    @abstractmethod
34    def start(self):
35        """Sets up the strategy to write the evaluation output records."""
36
37    @abstractmethod
38    def save(self, records: List[EvalOutputRecord]):
39        """Saves the given list of EvalOutputRecord based on the strategy.
40
41        Each invocation of this function would be for one block of the data. There could be multiple invocations of save
42        per invocation of the `evaluate` function (especially if the data is large).
43
44        :param records: list of EvalOutputRecords to be saved
45        """
46
47    @abstractmethod
48    def clean_up(self):
49        """
50        Clean up any leftover resources after saving the outputs.
51        """
52
53    def __exit__(self, exc_type, exc_val, exc_tb):
54        self.clean_up()
55        return False

Interface that defines how to save the eval outputs.

The save function of this interface may be called multiple times based on the size of the dataset. This is due to the distributed nature of the computations. If the dataset is large, and all of the data is pulled to the head node, it might lead to OOM errors. In order to avoid that, the data is pulled in batches, and save function is called on each batch at a time. In order to allow this mechanism, while allowing more flexbility in the way outputs are saved, this class works as a ContextManager.

@abstractmethod
def start(self):
33    @abstractmethod
34    def start(self):
35        """Sets up the strategy to write the evaluation output records."""

Sets up the strategy to write the evaluation output records.

@abstractmethod
def save(self, records: List[fmeval.eval_algorithms.util.EvalOutputRecord]):
37    @abstractmethod
38    def save(self, records: List[EvalOutputRecord]):
39        """Saves the given list of EvalOutputRecord based on the strategy.
40
41        Each invocation of this function would be for one block of the data. There could be multiple invocations of save
42        per invocation of the `evaluate` function (especially if the data is large).
43
44        :param records: list of EvalOutputRecords to be saved
45        """

Saves the given list of EvalOutputRecord based on the strategy.

Each invocation of this function would be for one block of the data. There could be multiple invocations of save per invocation of the evaluate function (especially if the data is large).

Parameters
  • records: list of EvalOutputRecords to be saved
@abstractmethod
def clean_up(self):
47    @abstractmethod
48    def clean_up(self):
49        """
50        Clean up any leftover resources after saving the outputs.
51        """

Clean up any leftover resources after saving the outputs.

class FileSaveStrategy(SaveStrategy):
58class FileSaveStrategy(SaveStrategy):
59    """Strategy to write evaluation outputs to local file system.
60
61    The strategy appends multiple invocations of save to the same file. If the file already exists, it will be
62    overwritten.
63    """
64
65    def __init__(self, file_path: str):
66        self._file_path = file_path
67        self._call_count = 0
68        if os.path.exists(self._file_path):
69            logger.warning(f"File {self._file_path} exists. Overwriting existing file")
70
71    def start(self):
72        """Sets up the strategy to write the evaluation output records"""
73        with open(self._file_path, mode="w") as file:
74            file.write("")
75
76    def save(self, records: List[EvalOutputRecord]):
77        """Append the list of evalution output records to the file at provided file path
78
79        :param records: list of EvalOutputRecords to be saved
80        """
81        with open(self._file_path, mode="a") as file:
82            if self._call_count > 0:
83                file.write("\n")
84            file.writelines("\n".join([json.dumps(record.to_dict()) for record in records]))
85            self._call_count += 1
86
87    def clean_up(self):
88        self._call_count = 0

Strategy to write evaluation outputs to local file system.

The strategy appends multiple invocations of save to the same file. If the file already exists, it will be overwritten.

FileSaveStrategy(file_path: str)
65    def __init__(self, file_path: str):
66        self._file_path = file_path
67        self._call_count = 0
68        if os.path.exists(self._file_path):
69            logger.warning(f"File {self._file_path} exists. Overwriting existing file")
def start(self):
71    def start(self):
72        """Sets up the strategy to write the evaluation output records"""
73        with open(self._file_path, mode="w") as file:
74            file.write("")

Sets up the strategy to write the evaluation output records

def save(self, records: List[fmeval.eval_algorithms.util.EvalOutputRecord]):
76    def save(self, records: List[EvalOutputRecord]):
77        """Append the list of evalution output records to the file at provided file path
78
79        :param records: list of EvalOutputRecords to be saved
80        """
81        with open(self._file_path, mode="a") as file:
82            if self._call_count > 0:
83                file.write("\n")
84            file.writelines("\n".join([json.dumps(record.to_dict()) for record in records]))
85            self._call_count += 1

Append the list of evalution output records to the file at provided file path

Parameters
  • records: list of EvalOutputRecords to be saved
def clean_up(self):
87    def clean_up(self):
88        self._call_count = 0

Clean up any leftover resources after saving the outputs.

class S3SaveStrategy(SaveStrategy):
 91class S3SaveStrategy(SaveStrategy):
 92    """Strategy to write evaluation outputs to AWS S3.
 93
 94    The strategy appends multiple invocations of save to the same file. If the file already exists, it will be
 95    overwritten.
 96    """
 97
 98    def __init__(self, s3_uri: str, s3_boto_client, Optional=None, kms_key_id: Optional[str] = None):
 99        """Creates an instance of S3SaveStrategy
100
101        :param s3_uri: The S3 uri where the outputs should be written to
102        :param s3_boto_client: The boto3 client for S3. If not provided, the class will try to use the default S3 client
103        :param kms_key_id: If provided, this KMS Key will be used for server side encryption
104        """
105        self._bucket, self._key_prefix = parse_s3_url(url=s3_uri)
106        self._s3_client = s3_boto_client if s3_boto_client else boto3.client("s3")
107        self._multi_part_upload = None
108        self._part_info: Optional[Dict] = None
109        self._kms_key_id = kms_key_id
110
111    def start(self):
112        """Sets up the strategy to write the evaluation output records to S3 using multi-part uploads."""
113        self._multi_part_upload = self._s3_client.create_multipart_upload(
114            Bucket=self._bucket, Key=self._key_prefix, SSEKMSKeyId=self._kms_key_id
115        )
116        self._part_info = {PARTS: []}
117        return self
118
119    def save(self, records: List[EvalOutputRecord]):
120        """Creates and uploads a part using the list of evaluation output records so that they can be completed during
121        the clean up stage.
122
123        :param records: list of EvalOutputRecords to be saved
124        """
125        util.require(
126            self._part_info and self._multi_part_upload, "S3SaveStrategy is meant to be used as a context manager"
127        )
128        assert self._part_info and self._multi_part_upload  # to satisfy mypy
129        part_number = len(self._part_info[PARTS]) + 1
130        part = self._s3_client.upload_part(
131            Bucket=self._bucket,
132            Key=self._key_prefix,
133            PartNumber=part_number,
134            UploadId=self._multi_part_upload[UPLOAD_ID],
135            Body="\n".join([str(record) for record in records]),
136        )
137        self._part_info[PARTS].append({PART_NUMBER: part_number, E_TAG: part[E_TAG]})
138
139    def clean_up(self):
140        """Completes the multi-part upload to S3, which then collects and combines the parts together into one object"""
141        self._s3_client.complete_multipart_upload(
142            Bucket=self._bucket,
143            Key=self._key_prefix,
144            UploadId=self._multi_part_upload[UPLOAD_ID],
145            MultipartUpload=self._part_info,
146        )
147        self._multi_part_upload = None
148        self._part_info = {PARTS: []}

Strategy to write evaluation outputs to AWS S3.

The strategy appends multiple invocations of save to the same file. If the file already exists, it will be overwritten.

S3SaveStrategy( s3_uri: str, s3_boto_client, Optional=None, kms_key_id: Optional[str] = None)
 98    def __init__(self, s3_uri: str, s3_boto_client, Optional=None, kms_key_id: Optional[str] = None):
 99        """Creates an instance of S3SaveStrategy
100
101        :param s3_uri: The S3 uri where the outputs should be written to
102        :param s3_boto_client: The boto3 client for S3. If not provided, the class will try to use the default S3 client
103        :param kms_key_id: If provided, this KMS Key will be used for server side encryption
104        """
105        self._bucket, self._key_prefix = parse_s3_url(url=s3_uri)
106        self._s3_client = s3_boto_client if s3_boto_client else boto3.client("s3")
107        self._multi_part_upload = None
108        self._part_info: Optional[Dict] = None
109        self._kms_key_id = kms_key_id

Creates an instance of S3SaveStrategy

Parameters
  • s3_uri: The S3 uri where the outputs should be written to
  • s3_boto_client: The boto3 client for S3. If not provided, the class will try to use the default S3 client
  • kms_key_id: If provided, this KMS Key will be used for server side encryption
def start(self):
111    def start(self):
112        """Sets up the strategy to write the evaluation output records to S3 using multi-part uploads."""
113        self._multi_part_upload = self._s3_client.create_multipart_upload(
114            Bucket=self._bucket, Key=self._key_prefix, SSEKMSKeyId=self._kms_key_id
115        )
116        self._part_info = {PARTS: []}
117        return self

Sets up the strategy to write the evaluation output records to S3 using multi-part uploads.

def save(self, records: List[fmeval.eval_algorithms.util.EvalOutputRecord]):
119    def save(self, records: List[EvalOutputRecord]):
120        """Creates and uploads a part using the list of evaluation output records so that they can be completed during
121        the clean up stage.
122
123        :param records: list of EvalOutputRecords to be saved
124        """
125        util.require(
126            self._part_info and self._multi_part_upload, "S3SaveStrategy is meant to be used as a context manager"
127        )
128        assert self._part_info and self._multi_part_upload  # to satisfy mypy
129        part_number = len(self._part_info[PARTS]) + 1
130        part = self._s3_client.upload_part(
131            Bucket=self._bucket,
132            Key=self._key_prefix,
133            PartNumber=part_number,
134            UploadId=self._multi_part_upload[UPLOAD_ID],
135            Body="\n".join([str(record) for record in records]),
136        )
137        self._part_info[PARTS].append({PART_NUMBER: part_number, E_TAG: part[E_TAG]})

Creates and uploads a part using the list of evaluation output records so that they can be completed during the clean up stage.

Parameters
  • records: list of EvalOutputRecords to be saved
def clean_up(self):
139    def clean_up(self):
140        """Completes the multi-part upload to S3, which then collects and combines the parts together into one object"""
141        self._s3_client.complete_multipart_upload(
142            Bucket=self._bucket,
143            Key=self._key_prefix,
144            UploadId=self._multi_part_upload[UPLOAD_ID],
145            MultipartUpload=self._part_info,
146        )
147        self._multi_part_upload = None
148        self._part_info = {PARTS: []}

Completes the multi-part upload to S3, which then collects and combines the parts together into one object