-
Notifications
You must be signed in to change notification settings - Fork 17
/
writer.py
256 lines (221 loc) · 9.23 KB
/
writer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
"""
Class for writing DataContainer frames to disk.
:author: Jeremy Biggs ([email protected])
:author: Anastassia Loukina ([email protected])
:author: Nitin Madnani ([email protected])
:organization: ETS
"""
from os import makedirs
from os.path import join
from typing import Dict, List, Optional, Union
import pandas as pd
from wandb.sdk.lib import RunDisabled
from wandb.wandb_run import Run
from rsmtool.container import DataContainer
from .utils.wandb import log_dataframe_to_wandb
class DataWriter:
"""Class to write out DataContainer objects."""
def __init__(
self,
experiment_id: Optional[str] = None,
context: Optional[str] = None,
wandb_run: Union[Run, RunDisabled, None] = None,
):
"""
Initialize the DataWriter object.
Parameters
----------
experiment_id : Optional[str]
The experiment name to be used in the output file names.
Defaults to ``None``.
context : Optional[str]
The context in which this writer is used.
Defaults to ``None``.
wandb_run : Union[wandb.wandb_run.Run, wandb.sdk.lib.RunDisabled, None]
The wandb run object if wandb is enabled, None otherwise.
If enabled, all the output data frames will be logged to
this run as tables.
Defaults to ``None``.
"""
self._id = experiment_id
self.context = context
self.wandb_run = wandb_run
@staticmethod
def write_frame_to_file(
df: pd.DataFrame, name_prefix: str, file_format: str = "csv", index: bool = False, **kwargs
) -> None:
"""
Write given data frame to disk with given name and file format.
Parameters
----------
df : pandas.DataFrame
Data frame to write to disk
name_prefix : str
The complete prefix for the file to be written to disk.
This includes everything except the extension.
file_format : str
The file format (extension) for the file to be written to disk.
One of {``"csv"``, ``"xlsx"``, ``"tsv"``}.
Defaults to ``"csv"``.
index : bool
Whether to include the index in the output file.
Defaults to ``False``.
Raises
------
KeyError
If ``file_format`` is not valid.
"""
file_format = file_format.lower()
if file_format == "csv":
name_prefix += ".csv"
df.to_csv(name_prefix, index=index, **kwargs)
elif file_format == "tsv":
name_prefix += ".tsv"
df.to_csv(name_prefix, index=index, sep="\t", **kwargs)
# Added jsonlines for experimental purposes, but leaving
# this out of the documentation at this stage
elif file_format == "jsonlines":
name_prefix += ".jsonlines"
df.to_json(name_prefix, orient="records", lines=True, **kwargs)
elif file_format == "xlsx":
name_prefix += ".xlsx"
df.to_excel(name_prefix, index=index, **kwargs)
else:
raise KeyError(
"Please make sure that the `file_format` specified "
"is one of the following:\n{`csv`, `tsv`, `xlsx`}.\n"
f"You specified {file_format}."
)
def write_experiment_output(
self,
csvdir: str,
container_or_dict: Union[DataContainer, Dict[str, pd.DataFrame]],
dataframe_names: Optional[List[str]] = None,
new_names_dict: Optional[Dict[str, str]] = None,
include_experiment_id: bool = True,
reset_index: bool = False,
file_format: str = "csv",
index: bool = False,
**kwargs,
) -> None:
"""
Write out each of the named frames to disk.
This function writes out each of the given list of data frames as a
".csv", ".tsv", or ``.xlsx`` file in the given directory. Each data
frame was generated as part of running an RSMTool experiment. All files
are prefixed with the given experiment ID and suffixed with either the
name of the data frame in the DataContainer (or dict) object, or a new
name if ``new_names_dict`` is specified. Additionally, the indexes in
the data frames are reset if so specified.
Parameters
----------
csvdir : str
Path to the output experiment sub-directory that will
contain the CSV files corresponding to each of the data frames.
container_or_dict : Union[container.DataContainer, Dict[str, pd.DataFrame]]
A DataContainer object or dict, where keys are data frame
names and values are pandas.DataFrame objects.
dataframe_names : Optional[List[str]]
List of data frame names, one for each of the data frames.
Defaults to ``None``.
new_names_dict : Optional[Dict[str, str]]
New dictionary with new names for the data frames, if desired.
Defaults to ``None``.
include_experiment_id : bool
Whether to include the experiment ID in the file name.
Defaults to ``True``.
reset_index : bool
Whether to reset the index of each data frame
before writing to disk.
Defaults to ``False``.
file_format : str
The file format in which to output the data.
One of {``"csv"``, ``"xlsx"``, ``"tsv"``}.
Defaults to ``"csv"``.
index : bool
Whether to include the index in the output file.
Defaults to ``False``.
Raises
------
KeyError
If ``file_format`` is not valid, or a named data frame
is not present in ``container_or_dict``.
"""
container_or_dict = container_or_dict.copy()
# If no `dataframe_names` specified, use all names
if dataframe_names is None:
dataframe_names = list(container_or_dict.keys())
# Otherwise, check to make sure all specified names
# are actually in the DataContainer
else:
for name in dataframe_names:
if name not in container_or_dict:
raise KeyError(f"The name `{name}` is not in the container or dictionary.")
# Loop through DataFrames, and save
# output in specified format
for dataframe_name in dataframe_names:
df = container_or_dict[dataframe_name]
if df is None:
raise KeyError(f"The DataFrame `{dataframe_name}` does not exist.")
# If the DataFrame is empty, skip it
if df.empty:
continue
# If there is a desire to rename the DataFrame,
# get the new name
if new_names_dict is not None:
if dataframe_name in new_names_dict:
dataframe_name = new_names_dict[dataframe_name]
# Reset the index, if desired
if reset_index:
df.index.name = ""
df.reset_index(inplace=True)
# If include_experiment_id is True, and the experiment_id exists
# include it in the file name; otherwise, do not include it.
if include_experiment_id and self._id is not None:
outfile = join(csvdir, f"{self._id}_{dataframe_name}")
else:
outfile = join(csvdir, dataframe_name)
# write out the frame to disk in the given file
self.write_frame_to_file(df, outfile, file_format=file_format, index=index, **kwargs)
log_dataframe_to_wandb(self.wandb_run, df, dataframe_name, self.context)
def write_feature_csv(
self,
featuredir: str,
data_container: DataContainer,
selected_features: List[str],
include_experiment_id: bool = True,
file_format: str = "csv",
) -> None:
"""
Write out the selected features to disk.
Parameters
----------
featuredir : str
Path to the experiment output directory where the
feature JSON file will be saved.
data_container : DataContainer
A data container object.
selected_features : List[str]
List of features that were selected for model building.
include_experiment_id : bool
Whether to include the experiment ID in the file name.
Defaults to ``True``.
file_format : str
The file format in which to output the data. One of {``"csv"``, ``"tsv"``,
``"xlsx"``}.
Defaults to ``"csv"``.
"""
df_feature_specs = data_container["feature_specs"]
# Select specific features used in training
df_selected = df_feature_specs[df_feature_specs["feature"].isin(selected_features)]
# Replace existing `feature_specs` with selected features specs
data_container.add_dataset({"frame": df_selected, "name": "feature_specs"}, update=True)
makedirs(featuredir, exist_ok=True)
self.write_experiment_output(
featuredir,
data_container,
["feature_specs"],
{"feature_specs": "selected"},
include_experiment_id=include_experiment_id,
file_format=file_format,
)