fmeval.eval_algorithms.save_strategy
1import json 2import logging 3import os 4from abc import ABC, abstractmethod 5from typing import List, Dict, Optional 6 7import boto3 8from sagemaker.s3_utils import parse_s3_url 9 10from fmeval import util 11from fmeval.constants import PARTS, UPLOAD_ID, PART_NUMBER, E_TAG 12from fmeval.eval_algorithms.util import EvalOutputRecord 13 14logger = logging.getLogger(__name__) 15 16 17class SaveStrategy(ABC): 18 """Interface that defines how to save the eval outputs. 19 20 The save function of this interface may be called multiple times based on the size of the dataset. This is due to 21 the distributed nature of the computations. If the dataset is large, and all of the data is pulled to the head node, 22 it might lead to OOM errors. In order to avoid that, the data is pulled in batches, and `save` function is called on 23 each batch at a time. In order to allow this mechanism, while allowing more flexbility in the way outputs are saved, 24 this class works as a ContextManager. 25 """ 26 27 def __enter__(self) -> "SaveStrategy": 28 """Sets up the strategy to start saving the evaluation outputs.""" 29 self.start() 30 return self 31 32 @abstractmethod 33 def start(self): 34 """Sets up the strategy to write the evaluation output records.""" 35 36 @abstractmethod 37 def save(self, records: List[EvalOutputRecord]): 38 """Saves the given list of EvalOutputRecord based on the strategy. 39 40 Each invocation of this function would be for one block of the data. There could be multiple invocations of save 41 per invocation of the `evaluate` function (especially if the data is large). 42 43 :param records: list of EvalOutputRecords to be saved 44 """ 45 46 @abstractmethod 47 def clean_up(self): 48 """ 49 Clean up any leftover resources after saving the outputs. 50 """ 51 52 def __exit__(self, exc_type, exc_val, exc_tb): 53 self.clean_up() 54 return False 55 56 57class FileSaveStrategy(SaveStrategy): 58 """Strategy to write evaluation outputs to local file system. 59 60 The strategy appends multiple invocations of save to the same file. If the file already exists, it will be 61 overwritten. 62 """ 63 64 def __init__(self, file_path: str): 65 self._file_path = file_path 66 self._call_count = 0 67 if os.path.exists(self._file_path): 68 logger.warning(f"File {self._file_path} exists. Overwriting existing file") 69 70 def start(self): 71 """Sets up the strategy to write the evaluation output records""" 72 with open(self._file_path, mode="w") as file: 73 file.write("") 74 75 def save(self, records: List[EvalOutputRecord]): 76 """Append the list of evalution output records to the file at provided file path 77 78 :param records: list of EvalOutputRecords to be saved 79 """ 80 with open(self._file_path, mode="a") as file: 81 if self._call_count > 0: 82 file.write("\n") 83 file.writelines("\n".join([json.dumps(record.to_dict()) for record in records])) 84 self._call_count += 1 85 86 def clean_up(self): 87 self._call_count = 0 88 89 90class S3SaveStrategy(SaveStrategy): 91 """Strategy to write evaluation outputs to AWS S3. 92 93 The strategy appends multiple invocations of save to the same file. If the file already exists, it will be 94 overwritten. 95 """ 96 97 def __init__(self, s3_uri: str, s3_boto_client, Optional=None, kms_key_id: Optional[str] = None): 98 """Creates an instance of S3SaveStrategy 99 100 :param s3_uri: The S3 uri where the outputs should be written to 101 :param s3_boto_client: The boto3 client for S3. If not provided, the class will try to use the default S3 client 102 :param kms_key_id: If provided, this KMS Key will be used for server side encryption 103 """ 104 self._bucket, self._key_prefix = parse_s3_url(url=s3_uri) 105 self._s3_client = s3_boto_client if s3_boto_client else boto3.client("s3") 106 self._multi_part_upload = None 107 self._part_info: Optional[Dict] = None 108 self._kms_key_id = kms_key_id 109 110 def start(self): 111 """Sets up the strategy to write the evaluation output records to S3 using multi-part uploads.""" 112 self._multi_part_upload = self._s3_client.create_multipart_upload( 113 Bucket=self._bucket, Key=self._key_prefix, SSEKMSKeyId=self._kms_key_id 114 ) 115 self._part_info = {PARTS: []} 116 return self 117 118 def save(self, records: List[EvalOutputRecord]): 119 """Creates and uploads a part using the list of evaluation output records so that they can be completed during 120 the clean up stage. 121 122 :param records: list of EvalOutputRecords to be saved 123 """ 124 util.require( 125 self._part_info and self._multi_part_upload, "S3SaveStrategy is meant to be used as a context manager" 126 ) 127 assert self._part_info and self._multi_part_upload # to satisfy mypy 128 part_number = len(self._part_info[PARTS]) + 1 129 part = self._s3_client.upload_part( 130 Bucket=self._bucket, 131 Key=self._key_prefix, 132 PartNumber=part_number, 133 UploadId=self._multi_part_upload[UPLOAD_ID], 134 Body="\n".join([str(record) for record in records]), 135 ) 136 self._part_info[PARTS].append({PART_NUMBER: part_number, E_TAG: part[E_TAG]}) 137 138 def clean_up(self): 139 """Completes the multi-part upload to S3, which then collects and combines the parts together into one object""" 140 self._s3_client.complete_multipart_upload( 141 Bucket=self._bucket, 142 Key=self._key_prefix, 143 UploadId=self._multi_part_upload[UPLOAD_ID], 144 MultipartUpload=self._part_info, 145 ) 146 self._multi_part_upload = None 147 self._part_info = {PARTS: []}
18class SaveStrategy(ABC): 19 """Interface that defines how to save the eval outputs. 20 21 The save function of this interface may be called multiple times based on the size of the dataset. This is due to 22 the distributed nature of the computations. If the dataset is large, and all of the data is pulled to the head node, 23 it might lead to OOM errors. In order to avoid that, the data is pulled in batches, and `save` function is called on 24 each batch at a time. In order to allow this mechanism, while allowing more flexbility in the way outputs are saved, 25 this class works as a ContextManager. 26 """ 27 28 def __enter__(self) -> "SaveStrategy": 29 """Sets up the strategy to start saving the evaluation outputs.""" 30 self.start() 31 return self 32 33 @abstractmethod 34 def start(self): 35 """Sets up the strategy to write the evaluation output records.""" 36 37 @abstractmethod 38 def save(self, records: List[EvalOutputRecord]): 39 """Saves the given list of EvalOutputRecord based on the strategy. 40 41 Each invocation of this function would be for one block of the data. There could be multiple invocations of save 42 per invocation of the `evaluate` function (especially if the data is large). 43 44 :param records: list of EvalOutputRecords to be saved 45 """ 46 47 @abstractmethod 48 def clean_up(self): 49 """ 50 Clean up any leftover resources after saving the outputs. 51 """ 52 53 def __exit__(self, exc_type, exc_val, exc_tb): 54 self.clean_up() 55 return False
Interface that defines how to save the eval outputs.
The save function of this interface may be called multiple times based on the size of the dataset. This is due to
the distributed nature of the computations. If the dataset is large, and all of the data is pulled to the head node,
it might lead to OOM errors. In order to avoid that, the data is pulled in batches, and save
function is called on
each batch at a time. In order to allow this mechanism, while allowing more flexbility in the way outputs are saved,
this class works as a ContextManager.
33 @abstractmethod 34 def start(self): 35 """Sets up the strategy to write the evaluation output records."""
Sets up the strategy to write the evaluation output records.
37 @abstractmethod 38 def save(self, records: List[EvalOutputRecord]): 39 """Saves the given list of EvalOutputRecord based on the strategy. 40 41 Each invocation of this function would be for one block of the data. There could be multiple invocations of save 42 per invocation of the `evaluate` function (especially if the data is large). 43 44 :param records: list of EvalOutputRecords to be saved 45 """
Saves the given list of EvalOutputRecord based on the strategy.
Each invocation of this function would be for one block of the data. There could be multiple invocations of save
per invocation of the evaluate
function (especially if the data is large).
Parameters
- records: list of EvalOutputRecords to be saved
58class FileSaveStrategy(SaveStrategy): 59 """Strategy to write evaluation outputs to local file system. 60 61 The strategy appends multiple invocations of save to the same file. If the file already exists, it will be 62 overwritten. 63 """ 64 65 def __init__(self, file_path: str): 66 self._file_path = file_path 67 self._call_count = 0 68 if os.path.exists(self._file_path): 69 logger.warning(f"File {self._file_path} exists. Overwriting existing file") 70 71 def start(self): 72 """Sets up the strategy to write the evaluation output records""" 73 with open(self._file_path, mode="w") as file: 74 file.write("") 75 76 def save(self, records: List[EvalOutputRecord]): 77 """Append the list of evalution output records to the file at provided file path 78 79 :param records: list of EvalOutputRecords to be saved 80 """ 81 with open(self._file_path, mode="a") as file: 82 if self._call_count > 0: 83 file.write("\n") 84 file.writelines("\n".join([json.dumps(record.to_dict()) for record in records])) 85 self._call_count += 1 86 87 def clean_up(self): 88 self._call_count = 0
Strategy to write evaluation outputs to local file system.
The strategy appends multiple invocations of save to the same file. If the file already exists, it will be overwritten.
71 def start(self): 72 """Sets up the strategy to write the evaluation output records""" 73 with open(self._file_path, mode="w") as file: 74 file.write("")
Sets up the strategy to write the evaluation output records
76 def save(self, records: List[EvalOutputRecord]): 77 """Append the list of evalution output records to the file at provided file path 78 79 :param records: list of EvalOutputRecords to be saved 80 """ 81 with open(self._file_path, mode="a") as file: 82 if self._call_count > 0: 83 file.write("\n") 84 file.writelines("\n".join([json.dumps(record.to_dict()) for record in records])) 85 self._call_count += 1
Append the list of evalution output records to the file at provided file path
Parameters
- records: list of EvalOutputRecords to be saved
91class S3SaveStrategy(SaveStrategy): 92 """Strategy to write evaluation outputs to AWS S3. 93 94 The strategy appends multiple invocations of save to the same file. If the file already exists, it will be 95 overwritten. 96 """ 97 98 def __init__(self, s3_uri: str, s3_boto_client, Optional=None, kms_key_id: Optional[str] = None): 99 """Creates an instance of S3SaveStrategy 100 101 :param s3_uri: The S3 uri where the outputs should be written to 102 :param s3_boto_client: The boto3 client for S3. If not provided, the class will try to use the default S3 client 103 :param kms_key_id: If provided, this KMS Key will be used for server side encryption 104 """ 105 self._bucket, self._key_prefix = parse_s3_url(url=s3_uri) 106 self._s3_client = s3_boto_client if s3_boto_client else boto3.client("s3") 107 self._multi_part_upload = None 108 self._part_info: Optional[Dict] = None 109 self._kms_key_id = kms_key_id 110 111 def start(self): 112 """Sets up the strategy to write the evaluation output records to S3 using multi-part uploads.""" 113 self._multi_part_upload = self._s3_client.create_multipart_upload( 114 Bucket=self._bucket, Key=self._key_prefix, SSEKMSKeyId=self._kms_key_id 115 ) 116 self._part_info = {PARTS: []} 117 return self 118 119 def save(self, records: List[EvalOutputRecord]): 120 """Creates and uploads a part using the list of evaluation output records so that they can be completed during 121 the clean up stage. 122 123 :param records: list of EvalOutputRecords to be saved 124 """ 125 util.require( 126 self._part_info and self._multi_part_upload, "S3SaveStrategy is meant to be used as a context manager" 127 ) 128 assert self._part_info and self._multi_part_upload # to satisfy mypy 129 part_number = len(self._part_info[PARTS]) + 1 130 part = self._s3_client.upload_part( 131 Bucket=self._bucket, 132 Key=self._key_prefix, 133 PartNumber=part_number, 134 UploadId=self._multi_part_upload[UPLOAD_ID], 135 Body="\n".join([str(record) for record in records]), 136 ) 137 self._part_info[PARTS].append({PART_NUMBER: part_number, E_TAG: part[E_TAG]}) 138 139 def clean_up(self): 140 """Completes the multi-part upload to S3, which then collects and combines the parts together into one object""" 141 self._s3_client.complete_multipart_upload( 142 Bucket=self._bucket, 143 Key=self._key_prefix, 144 UploadId=self._multi_part_upload[UPLOAD_ID], 145 MultipartUpload=self._part_info, 146 ) 147 self._multi_part_upload = None 148 self._part_info = {PARTS: []}
Strategy to write evaluation outputs to AWS S3.
The strategy appends multiple invocations of save to the same file. If the file already exists, it will be overwritten.
98 def __init__(self, s3_uri: str, s3_boto_client, Optional=None, kms_key_id: Optional[str] = None): 99 """Creates an instance of S3SaveStrategy 100 101 :param s3_uri: The S3 uri where the outputs should be written to 102 :param s3_boto_client: The boto3 client for S3. If not provided, the class will try to use the default S3 client 103 :param kms_key_id: If provided, this KMS Key will be used for server side encryption 104 """ 105 self._bucket, self._key_prefix = parse_s3_url(url=s3_uri) 106 self._s3_client = s3_boto_client if s3_boto_client else boto3.client("s3") 107 self._multi_part_upload = None 108 self._part_info: Optional[Dict] = None 109 self._kms_key_id = kms_key_id
Creates an instance of S3SaveStrategy
Parameters
- s3_uri: The S3 uri where the outputs should be written to
- s3_boto_client: The boto3 client for S3. If not provided, the class will try to use the default S3 client
- kms_key_id: If provided, this KMS Key will be used for server side encryption
111 def start(self): 112 """Sets up the strategy to write the evaluation output records to S3 using multi-part uploads.""" 113 self._multi_part_upload = self._s3_client.create_multipart_upload( 114 Bucket=self._bucket, Key=self._key_prefix, SSEKMSKeyId=self._kms_key_id 115 ) 116 self._part_info = {PARTS: []} 117 return self
Sets up the strategy to write the evaluation output records to S3 using multi-part uploads.
119 def save(self, records: List[EvalOutputRecord]): 120 """Creates and uploads a part using the list of evaluation output records so that they can be completed during 121 the clean up stage. 122 123 :param records: list of EvalOutputRecords to be saved 124 """ 125 util.require( 126 self._part_info and self._multi_part_upload, "S3SaveStrategy is meant to be used as a context manager" 127 ) 128 assert self._part_info and self._multi_part_upload # to satisfy mypy 129 part_number = len(self._part_info[PARTS]) + 1 130 part = self._s3_client.upload_part( 131 Bucket=self._bucket, 132 Key=self._key_prefix, 133 PartNumber=part_number, 134 UploadId=self._multi_part_upload[UPLOAD_ID], 135 Body="\n".join([str(record) for record in records]), 136 ) 137 self._part_info[PARTS].append({PART_NUMBER: part_number, E_TAG: part[E_TAG]})
Creates and uploads a part using the list of evaluation output records so that they can be completed during the clean up stage.
Parameters
- records: list of EvalOutputRecords to be saved
139 def clean_up(self): 140 """Completes the multi-part upload to S3, which then collects and combines the parts together into one object""" 141 self._s3_client.complete_multipart_upload( 142 Bucket=self._bucket, 143 Key=self._key_prefix, 144 UploadId=self._multi_part_upload[UPLOAD_ID], 145 MultipartUpload=self._part_info, 146 ) 147 self._multi_part_upload = None 148 self._part_info = {PARTS: []}
Completes the multi-part upload to S3, which then collects and combines the parts together into one object