1# Copyright (C) 2020 Gradient Boosted Investments, Inc. - All Rights Reserved
2
3import base64
4import csv
5import datetime
6import io
7import itertools
8import json
9import logging
10import math
11import mimetypes
12import os
13import sys
14import tempfile
15import time
16from datetime import timedelta
17from typing import Any, Dict, List, Literal, Optional, Tuple, Union
18from urllib import parse
19
20import boosted.api.graphql_queries as graphql_queries
21import numpy as np
22import pandas as pd
23import requests
24from boosted.api.api_type import (
25 BoostedDate,
26 ChunkStatus,
27 ColumnSubRole,
28 DataAddType,
29 DataSetConfig,
30 DataSetType,
31 DateIdentCountryCurrency,
32 GbiIdSecurity,
33 GbiIdTickerISIN,
34 HedgeExperiment,
35 HedgeExperimentDetails,
36 HedgeExperimentScenario,
37 PortfolioSettings,
38 Status,
39 hedge_experiment_type,
40)
41from boosted.api.api_util import (
42 infer_dataset_schema,
43 protoCubeJsonDataToDataFrame,
44 validate_start_and_end_dates,
45)
46from dateutil import parser
47
48logger = logging.getLogger("boosted.api.client")
49logging.basicConfig()
50
51g_boosted_api_url = "https://insights.boosted.ai"
52g_boosted_api_url_dev = "https://insights-dev.boosted.ai"
53WATCHLIST_ROUTE_PREFIX = "/api/dal/watchlist"
54ROUTE_PREFIX = WATCHLIST_ROUTE_PREFIX
55DAL_WATCHLIST_ROUTE = "/api/v0/watchlist"
56DAL_SECURITIES_ROUTE = "/api/v0/securities"
57DAL_PA_ROUTE = "/api/v0/portfolio-analysis"
58PORTFOLIO_GROUP_ROUTE = "/api/v0/portfolio-group"
59
60RISK_FACTOR = "risk-factor"
61RISK_FACTOR_V2 = "risk-factor-v2"
62RISK_FACTOR_COLUMNS = [
63 "depth",
64 "identifier",
65 "stock_count",
66 "volatility",
67 "exposure",
68 "rating",
69 "rating_delta",
70]
71
72
73class BoostedAPIException(Exception):
74 def __init__(self, value, data=None):
75 self.value = value
76 self.data = data
77
78 def __str__(self):
79 return repr(self.value)
80
81
82def convert_date(date: BoostedDate) -> datetime.date:
83 if isinstance(date, str):
84 try:
85 return parser.parse(date).date()
86 except Exception as e:
87 raise BoostedAPIException(f"Unable to parse date: {str(e)}")
88 return date
89
90
91class BoostedClient:
92 def __init__(
93 self, api_key, override_uri=None, debug=False, proxy=None, disable_verify_ssl=False
94 ):
95 """
96 Parameters
97 ----------
98 api_key: str
99 Your API key provided by the Boosted application. See your profile
100 to generate a new key.
101 proxy: str
102 Your organization may require the use of a proxy for access.
103 The address of a HTTPS proxy in the format of <address>:<port>.
104 Examples are "123.456.789:123" or "my.proxy.com:123".
105 Do not prepend with "https://".
106 disable_verify_ssl: bool
107 Your networking setup may be behind a firewall which performs SSL
108 inspection. Either set the REQUESTS_CA_BUNDLE environment variable
109 to point to the location of a custom certificate bundle, or set this
110 parameter to True to disable SSL verification as a workaround.
111 """
112 if override_uri is None:
113 self.base_uri = g_boosted_api_url
114 else:
115 self.base_uri = override_uri
116 self.api_key = api_key
117 self.debug = debug
118 self._request_params: Dict = {}
119 if debug:
120 logger.setLevel(logging.DEBUG)
121 else:
122 logger.setLevel(logging.INFO)
123 if proxy is not None:
124 self._request_params["proxies"] = {"https": proxy}
125 if disable_verify_ssl:
126 self._request_params["verify"] = False
127
128 def __print_json_info(self, json_data, isInference=False):
129 if "warnings" in json_data.keys():
130 for warning in json_data["warnings"]:
131 logger.warning(" {0}".format(warning))
132 if "errors" in json_data.keys():
133 for error in json_data["errors"]:
134 logger.error(" {0}".format(error))
135 return Status.FAIL
136
137 if "result" in json_data.keys():
138 results_data = json_data["result"]
139 if isInference:
140 if "inferenceResultsUrl" in results_data.keys():
141 res_url = parse.urlparse(results_data["inferenceResultsUrl"])
142 logger.debug(res_url)
143 logger.info("Inference started.")
144 if "updateCount" in results_data.keys():
145 logger.info("Updated {0} rows.".format(results_data["updateCount"]))
146 if "createCount" in results_data.keys():
147 logger.info("Created {0} rows.".format(results_data["createCount"]))
148 return Status.SUCCESS
149
150 def __to_date_obj(self, dt):
151 if isinstance(dt, datetime.datetime):
152 dt = dt.date()
153 elif isinstance(dt, datetime.date):
154 return dt
155 elif isinstance(dt, str):
156 try:
157 dt = parser.parse(dt).date()
158 except ValueError:
159 raise ValueError('dt: "' + dt + '" is not a valid date.')
160 return dt
161
162 def __iso_format(self, dt):
163 date = self.__to_date_obj(dt)
164 if date is not None:
165 date = date.isoformat()
166 return date
167
168 def _check_status_code(self, response, isInference=False):
169 has_json = False
170 try:
171 logger.debug(response.headers)
172 if "Content-Type" in response.headers:
173 if response.headers["Content-Type"].startswith("application/json"):
174 json_data = response.json()
175 has_json = True
176 else:
177 has_json = False
178 except json.JSONDecodeError:
179 logger.error("ERROR: response has no JSON payload.")
180 if response.status_code == 200 or response.status_code == 202:
181 if has_json:
182 self.__print_json_info(json_data, isInference)
183 else:
184 pass
185 return Status.SUCCESS
186 if response.status_code == 404:
187 if has_json:
188 self.__print_json_info(json_data, isInference)
189 raise BoostedAPIException(
190 'Server "{0}" not reachable. Code {1}.'.format(
191 self.base_uri, response.status_code
192 ),
193 data=response,
194 )
195 if response.status_code == 400:
196 if has_json:
197 self.__print_json_info(json_data, isInference)
198 if isInference:
199 return Status.FAIL
200 else:
201 raise BoostedAPIException("Error, bad request. Check the dataset ID.", response)
202 if response.status_code == 401:
203 if has_json:
204 self.__print_json_info(json_data, isInference)
205 raise BoostedAPIException("Authorization error.", response)
206 else:
207 if has_json:
208 self.__print_json_info(json_data, isInference)
209 raise BoostedAPIException(
210 "Error in API response. Status code={0} {1}\n{2}".format(
211 response.status_code, response.reason, response.headers
212 ),
213 response,
214 )
215
216 def _try_extract_error_code(self, result):
217 logger.info(result.headers)
218 if "Content-Type" in result.headers:
219 if result.headers["Content-Type"].startswith("application/json"):
220 if "errors" in result.json():
221 return result.json()["errors"]
222 if result.headers["Content-Type"].startswith("text/plain"):
223 return result.text
224 return str(result.reason)
225
226 def _check_ok_or_err_with_msg(self, res, potential_error_msg: str):
227 if not res.ok:
228 error = self._try_extract_error_code(res)
229 logger.error(error)
230 raise BoostedAPIException(f"{potential_error_msg}: {error}")
231
232 def _get_portfolio_rebalance_from_periods(
233 self, portfolio_id: str, rel_periods: List[str]
234 ) -> List[datetime.date]:
235 """
236 Returns a list of rebalance dates for a portfolio given a list of
237 relative periods of format '1D', '1W', '3M', etc.
238 """
239 resp = self._get_graphql(
240 query=graphql_queries.GET_PORTFOLIO_RELATIVE_DATES_QUERY,
241 variables={"portfolioId": portfolio_id, "relativePeriods": rel_periods},
242 )
243 dates = resp["data"]["portfolio"]["relativeDates"]
244 return [datetime.datetime.strptime(d["date"], "%Y-%m-%d").date() for d in dates]
245
246 def query_dataset(self, dataset_id):
247 url = self.base_uri + "/api/datasets/{0}".format(dataset_id)
248 headers = {"Authorization": "ApiKey " + self.api_key}
249 res = requests.get(url, headers=headers, **self._request_params)
250 if res.ok:
251 return res.json()
252 else:
253 error_msg = self._try_extract_error_code(res)
254 logger.error(error_msg)
255 raise BoostedAPIException("Failed to query dataset: {0}.".format(error_msg))
256
257 def export_global_data(
258 self,
259 dataset_id,
260 start=(datetime.date.today() - timedelta(days=365 * 25)),
261 end=datetime.date.today(),
262 timeout=600,
263 ):
264 query_info = self.query_dataset(dataset_id)
265 if DataSetType[query_info["type"]] != DataSetType.GLOBAL:
266 raise BoostedAPIException(
267 f"Incorrect dataset type: {query_info['type']}" f" - Expected {DataSetType.GLOBAL}"
268 )
269 return self.export_data(dataset_id, start, end, timeout)
270
271 def export_independent_data(
272 self,
273 dataset_id,
274 start=(datetime.date.today() - timedelta(days=365 * 25)),
275 end=datetime.date.today(),
276 timeout=600,
277 ):
278 query_info = self.query_dataset(dataset_id)
279 if DataSetType[query_info["type"]] != DataSetType.STRATEGY:
280 raise BoostedAPIException(
281 f"Incorrect dataset type: {query_info['type']}"
282 f" - Expected {DataSetType.STRATEGY}"
283 )
284 return self.export_data(dataset_id, start, end, timeout)
285
286 def export_dependent_data(
287 self,
288 dataset_id,
289 start=(datetime.date.today() - timedelta(days=365 * 25)),
290 end=datetime.date.today(),
291 timeout=600,
292 ):
293 query_info = self.query_dataset(dataset_id)
294 if DataSetType[query_info["type"]] != DataSetType.STOCK:
295 raise BoostedAPIException(
296 f"Incorrect dataset type: {query_info['type']}" f" - Expected {DataSetType.STOCK}"
297 )
298 return self.export_data(dataset_id, start, end, timeout)
299
300 def export_data(
301 self,
302 dataset_id,
303 start=(datetime.date.today() - timedelta(days=365 * 25)),
304 end=datetime.date.today(),
305 timeout=600,
306 ):
307 logger.info("Requesting start={0} end={1}.".format(start, end))
308 request_url = "/api/datasets/" + dataset_id + "/export-data"
309 headers = {"Authorization": "ApiKey " + self.api_key}
310 start = self.__iso_format(start)
311 end = self.__iso_format(end)
312 params = {"start": start, "end": end}
313 logger.debug("URL={0}, headers={1}, params={2}".format(request_url, headers, params))
314 res = requests.get(
315 self.base_uri + request_url,
316 headers=headers,
317 params=params,
318 timeout=timeout,
319 **self._request_params,
320 )
321 if res.ok or self._check_status_code(res):
322 buf = io.StringIO(res.text)
323 df = pd.read_csv(buf, index_col=0, parse_dates=True)
324 if "price" in df.columns:
325 df = df.drop("price", axis=1)
326 return df
327 else:
328 error_msg = self._try_extract_error_code(res)
329 logger.error(error_msg)
330 raise BoostedAPIException("Failed to query dataset: {0}.".format(error_msg))
331
332 def _get_inference(self, model_id, inference_date=datetime.date.today()):
333 request_url = "/api/models/" + model_id + "/inference-results"
334 headers = {"Authorization": "ApiKey " + self.api_key}
335 params = {}
336 params["date"] = self.__iso_format(inference_date)
337 logger.debug(request_url + ", " + str(headers) + ", " + str(params))
338 res = requests.get(
339 self.base_uri + request_url, headers=headers, params=params, **self._request_params
340 )
341 status = self._check_status_code(res, isInference=True)
342 if status == Status.SUCCESS:
343 return res, status
344 else:
345 return None, status
346
347 def get_inference(
348 self, model_id, inference_date=datetime.date.today(), block=False, timeout_minutes=30
349 ):
350 start_time = datetime.datetime.now()
351 while True:
352 for numRetries in range(3):
353 res, status = self._get_inference(model_id, inference_date)
354 if res is not None:
355 continue
356 else:
357 if status == Status.FAIL:
358 return Status.FAIL
359 logger.info("Retrying...")
360 if res is None:
361 logger.error("Max retries reached. Request failed.")
362 return None
363
364 json_data = res.json()
365 if "result" in json_data.keys():
366 if json_data["result"]["status"] == "RUNNING":
367 still_running = True
368 if not block:
369 logger.warn("Inference job is still running.")
370 return None
371 else:
372 logger.info(
373 "Inference job is still running. Time elapsed={0}.".format(
374 datetime.datetime.now() - start_time
375 )
376 )
377 time.sleep(10)
378 else:
379 still_running = False
380
381 if not still_running and json_data["result"]["status"] == "COMPLETE":
382 csv = json_data["result"]["signals"]
383 logger.info(json_data["result"])
384 if self._check_status_code(res, isInference=True):
385 logger.info(
386 "Total run time = {0}.".format(datetime.datetime.now() - start_time)
387 )
388 return csv
389 else:
390 if "errors" in json_data.keys():
391 logger.error(json_data["errors"])
392 else:
393 logger.error("Error getting inference for date {0}.".format(inference_date))
394 return None
395 if (datetime.datetime.now() - start_time).total_seconds() / 60.0 > timeout_minutes:
396 logger.error("Timeout waiting for job completion.")
397 return None
398
399 def createDataset(self, schema):
400 request_url = "/api/datasets"
401 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
402 s = json.dumps(schema)
403 logger.info("Creating dataset with schema " + s)
404 res = requests.post(
405 self.base_uri + request_url, data=s, headers=headers, **self._request_params
406 )
407 if res.ok:
408 return res.json()["result"]
409 else:
410 raise BoostedAPIException("Dataset creation failed.")
411
412 def getUniverse(self, modelId, date=None):
413 if date is not None:
414 url = "/api/models/{0}/universe/{1}".format(modelId, self.__iso_format(date))
415 logger.info("Getting universe for date: {0}.".format(date))
416 else:
417 url = "/api/models/{0}/universe/".format(modelId)
418 headers = {"Authorization": "ApiKey " + self.api_key}
419 res = requests.get(self.base_uri + url, headers=headers, **self._request_params)
420 if res.ok:
421 buf = io.StringIO(res.text)
422 df = pd.read_csv(buf, index_col=0, parse_dates=True)
423 return df
424 else:
425 error = self._try_extract_error_code(res)
426 logger.error(
427 "There was a problem getting this universe or model ID: {0}.".format(error)
428 )
429 raise BoostedAPIException("Failed to get universe: {0}".format(error))
430
431 def updateUniverse(self, modelId, universe_df, date=datetime.date.today() + timedelta(1)):
432 date = self.__iso_format(date)
433 url = self.base_uri + "/api/models/{0}/universe/{1}".format(modelId, date)
434 headers = {"Authorization": "ApiKey " + self.api_key}
435 logger.info("Updating universe for date {0}.".format(date))
436 if isinstance(universe_df, pd.core.frame.DataFrame):
437 buf = io.StringIO()
438 universe_df.to_csv(buf)
439 target = ("uploaded_universe.csv", buf.getvalue(), "text/csv")
440 files_req = {}
441 files_req["universe"] = target
442 res = requests.post(url, files=files_req, headers=headers, **self._request_params)
443 elif isinstance(universe_df, str):
444 target = ("uploaded_universe.csv", universe_df, "text/csv")
445 files_req = {}
446 files_req["universe"] = target
447 res = requests.post(url, files=files_req, headers=headers, **self._request_params)
448 else:
449 raise BoostedAPIException("Expected CSV as str or Pandas DataFrame.")
450 if res.ok:
451 logger.info("Universe update successful.")
452 if "warnings" in res.json():
453 logger.info("Warnings: {0}.".format(res.json()["warnings"]))
454 return res.json()["warnings"]
455 else:
456 return "No warnings."
457 else:
458 error_msg = self._try_extract_error_code(res)
459 raise BoostedAPIException("Failed to get universe: {0}.".format(error_msg))
460
461 def create_universe(
462 self, universe: Union[pd.DataFrame, str], name: str, description: str
463 ) -> List[str]:
464 PRESENT = "PRESENT"
465 ANY = "ANY"
466 EARLIST_DATE = "1900-01-01"
467 LATEST_DATE = "4000-01-01"
468
469 if isinstance(universe, (str, bytes, os.PathLike)):
470 universe = pd.read_csv(universe)
471
472 universe.columns = universe.columns.str.lower()
473
474 # Clients are free to leave out data. Fill in some defaults here.
475 if "from" not in universe.columns:
476 universe["from"] = EARLIST_DATE
477 if "to" not in universe.columns:
478 universe["to"] = LATEST_DATE
479 if "currency" not in universe.columns:
480 universe["currency"] = ANY
481 if "country" not in universe.columns:
482 universe["country"] = ANY
483 if "isin" not in universe.columns:
484 universe["isin"] = None
485 if "symbol" not in universe.columns:
486 universe["symbol"] = None
487
488 # to prevent conflicts with python keywords
489 universe.rename(columns={"from": "from_date", "to": "to_date"}, inplace=True)
490
491 universe = universe.replace({np.nan: None})
492 security_country_currency_date_list = []
493 for i, r in enumerate(universe.itertuples()):
494 id_type = ColumnSubRole.ISIN
495 identifier = r.isin
496
497 if identifier is None:
498 id_type = ColumnSubRole.SYMBOL
499 identifier = str(r.symbol)
500
501 # if identifier is still None, it means that there is no ISIN or
502 # SYMBOL for this row, in which case we throw an error
503 if identifier is None:
504 raise BoostedAPIException(
505 (
506 f"Missing identifier column in universe row {i + 1}"
507 " should contain ISIN or Symbol"
508 )
509 )
510
511 security_country_currency_date_list.append(
512 DateIdentCountryCurrency(
513 date=r.from_date or EARLIST_DATE,
514 identifier=identifier,
515 country=r.country or ANY,
516 currency=r.currency or ANY,
517 id_type=id_type,
518 )
519 )
520
521 gbi_id_objs = self.getGbiIdFromIdentCountryCurrencyDate(security_country_currency_date_list)
522
523 security_list = []
524 for i, r in enumerate(universe.itertuples()):
525 # if we have a None here, we failed to map to a gbi id
526 if gbi_id_objs[i] is None:
527 raise BoostedAPIException(f"Unable to map row: {tuple(r)}")
528
529 security_list.append(
530 {
531 "stockId": gbi_id_objs[i].gbi_id,
532 "fromZ": r.from_date or EARLIST_DATE,
533 "toZ": LATEST_DATE if r.to_date in (PRESENT, None) else r.to_date,
534 "removal": False,
535 "source": "UPLOAD",
536 }
537 )
538
539 url = self.base_uri + "/api/template-universe/save"
540 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
541 req = {"name": name, "description": description, "modificationDaos": security_list}
542
543 res = requests.post(url, json=req, headers=headers, **self._request_params)
544 self._check_ok_or_err_with_msg(res, "Failed to create universe")
545
546 if "warnings" in res.json():
547 logger.info("Warnings: {0}.".format(res.json()["warnings"]))
548 return res.json()["warnings"].splitlines()
549 else:
550 return []
551
552 def validate_dataframe(self, df):
553 if not isinstance(df, pd.core.frame.DataFrame):
554 logger.error("Dataset must be of type Dataframe.")
555 return False
556 if type(df.index) != pd.core.indexes.datetimes.DatetimeIndex:
557 logger.error("Index must be DatetimeIndex.")
558 return False
559 if len(df.columns) == 0:
560 logger.error("No feature columns exist.")
561 return False
562 if len(df) == 0:
563 logger.error("No rows exist.")
564 return True
565
566 def get_dataset_schema(self, dataset_id):
567 url = self.base_uri + "/api/datasets/{0}/schema".format(dataset_id)
568 headers = {"Authorization": "ApiKey " + self.api_key}
569 res = requests.get(url, headers=headers, **self._request_params)
570 if res.ok:
571 json_schema = res.json()
572 else:
573 error_msg = self._try_extract_error_code(res)
574 logger.error(error_msg)
575 raise BoostedAPIException("Failed to query dataset: {0}.".format(error_msg))
576 return DataSetConfig.fromDict(json_schema["result"])
577
578 def add_dependent_dataset(
579 self, dataset, datasetName="DependentDataset", schema=None, timeout=600, block=True
580 ):
581 result = self.add_dependent_dataset_with_warnings(
582 dataset, datasetName, schema, timeout, block
583 )
584 return result["dataset_id"]
585
586 def add_dependent_dataset_with_warnings(
587 self,
588 dataset,
589 datasetName="DependentDataset",
590 schema=None,
591 timeout=600,
592 block=True,
593 no_exception_on_chunk_error=False,
594 ):
595 if not self.validate_dataframe(dataset):
596 logger.error("dataset failed validation.")
597 return None
598 if schema is None:
599 schema = infer_dataset_schema(datasetName, dataset, DataSetType.STOCK)
600 dsid = self.createDataset(schema.toDict())
601 logger.info("Creating dataset with ID = {0}.".format(dsid))
602 result = self.add_dependent_data(
603 dsid,
604 dataset,
605 timeout,
606 block,
607 data_type=DataAddType.CREATION,
608 no_exception_on_chunk_error=no_exception_on_chunk_error,
609 )
610 return {"dataset_id": dsid, "warnings": result["warnings"], "errors": result["errors"]}
611
612 def add_independent_dataset(
613 self, dataset, datasetName="IndependentDataset", schema=None, timeout=600, block=True
614 ):
615 result = self.add_independent_dataset_with_warnings(
616 dataset, datasetName, schema, timeout, block
617 )
618 return result["dataset_id"]
619
620 def add_independent_dataset_with_warnings(
621 self,
622 dataset,
623 datasetName="IndependentDataset",
624 schema=None,
625 timeout=600,
626 block=True,
627 no_exception_on_chunk_error=False,
628 ):
629 if not self.validate_dataframe(dataset):
630 logger.error("dataset failed validation.")
631 return None
632 if schema is None:
633 schema = infer_dataset_schema(datasetName, dataset, DataSetType.STRATEGY)
634 schemaDict = schema.toDict()
635 if "configurationDataJson" not in schemaDict:
636 schemaDict["configurationDataJson"] = "{}"
637 dsid = self.createDataset(schemaDict)
638 logger.info("Creating dataset with ID = {0}.".format(dsid))
639 result = self.add_independent_data(
640 dsid,
641 dataset,
642 timeout,
643 block,
644 data_type=DataAddType.CREATION,
645 no_exception_on_chunk_error=no_exception_on_chunk_error,
646 )
647 return {"dataset_id": dsid, "warnings": result["warnings"], "errors": result["errors"]}
648
649 def add_global_dataset(
650 self, dataset, datasetName="GlobalDataset", schema=None, timeout=600, block=True
651 ):
652 result = self.add_global_dataset_with_warnings(dataset, datasetName, schema, timeout, block)
653 return result["dataset_id"]
654
655 def add_global_dataset_with_warnings(
656 self,
657 dataset,
658 datasetName="GlobalDataset",
659 schema=None,
660 timeout=600,
661 block=True,
662 no_exception_on_chunk_error=False,
663 ):
664 if not self.validate_dataframe(dataset):
665 logger.error("dataset failed validation.")
666 return None
667 if schema is None:
668 schema = infer_dataset_schema(datasetName, dataset, DataSetType.GLOBAL)
669 dsid = self.createDataset(schema.toDict())
670 logger.info("Creating dataset with ID = {0}.".format(dsid))
671 result = self.add_global_data(
672 dsid,
673 dataset,
674 timeout,
675 block,
676 data_type=DataAddType.CREATION,
677 no_exception_on_chunk_error=no_exception_on_chunk_error,
678 )
679 return {"dataset_id": dsid, "warnings": result["warnings"], "errors": result["errors"]}
680
681 def add_independent_data(
682 self,
683 dataset_id,
684 csv_data,
685 timeout=600,
686 block=True,
687 data_type=DataAddType.HISTORICAL,
688 no_exception_on_chunk_error=False,
689 ):
690 query_info = self.query_dataset(dataset_id)
691 if DataSetType[query_info["type"]] != DataSetType.STRATEGY:
692 raise BoostedAPIException(
693 f"Incorrect dataset type: {query_info['type']}"
694 f" - Expected {DataSetType.STRATEGY}"
695 )
696 warnings, errors = self.setup_chunk_and_upload_data(
697 dataset_id, csv_data, data_type, timeout, block, no_exception_on_chunk_error
698 )
699 if len(warnings) > 0:
700 logger.warning(
701 "Encountered {0} total warnings while uploading dataset.".format(len(warnings))
702 )
703 if len(errors) > 0:
704 raise BoostedAPIException(
705 "Encountered {0} total ERRORS while uploading dataset".format(len(errors))
706 + "\n".join(errors)
707 )
708 return {"warnings": warnings, "errors": errors}
709
710 def add_dependent_data(
711 self,
712 dataset_id,
713 csv_data,
714 timeout=600,
715 block=True,
716 data_type=DataAddType.HISTORICAL,
717 no_exception_on_chunk_error=False,
718 ):
719 warnings = []
720 query_info = self.query_dataset(dataset_id)
721 if DataSetType[query_info["type"]] != DataSetType.STOCK:
722 raise BoostedAPIException(
723 f"Incorrect dataset type: {query_info['type']}" f" - Expected {DataSetType.STOCK}"
724 )
725 warnings, errors = self.setup_chunk_and_upload_data(
726 dataset_id, csv_data, data_type, timeout, block, no_exception_on_chunk_error
727 )
728 if len(warnings) > 0:
729 logger.warning(
730 "Encountered {0} total warnings while uploading dataset.".format(len(warnings))
731 )
732 if len(errors) > 0:
733 raise BoostedAPIException(
734 "Encountered {0} total ERRORS while uploading dataset".format(len(errors))
735 + "\n".join(errors)
736 )
737 return {"warnings": warnings, "errors": errors}
738
739 def add_global_data(
740 self,
741 dataset_id,
742 csv_data,
743 timeout=600,
744 block=True,
745 data_type=DataAddType.HISTORICAL,
746 no_exception_on_chunk_error=False,
747 ):
748 query_info = self.query_dataset(dataset_id)
749 if DataSetType[query_info["type"]] != DataSetType.GLOBAL:
750 raise BoostedAPIException(
751 f"Incorrect dataset type: {query_info['type']}" f" - Expected {DataSetType.GLOBAL}"
752 )
753 warnings, errors = self.setup_chunk_and_upload_data(
754 dataset_id, csv_data, data_type, timeout, block, no_exception_on_chunk_error
755 )
756 if len(warnings) > 0:
757 logger.warning(
758 "Encountered {0} total warnings while uploading dataset.".format(len(warnings))
759 )
760 if len(errors) > 0:
761 raise BoostedAPIException(
762 "Encountered {0} total ERRORS while uploading dataset".format(len(errors))
763 + "\n".join(errors)
764 )
765 return {"warnings": warnings, "errors": errors}
766
767 def get_csv_buffer(self):
768 return io.StringIO()
769
770 def start_chunked_upload(self, dataset_id):
771 url = self.base_uri + "/api/datasets/{0}/start-chunked-upload".format(dataset_id)
772 headers = {"Authorization": "ApiKey " + self.api_key}
773 res = requests.post(url, headers=headers, **self._request_params)
774 if res.ok:
775 return res.json()["result"]
776 else:
777 error_msg = self._try_extract_error_code(res)
778 logger.error(error_msg)
779 raise BoostedAPIException(
780 "Failed to obtain dataset lock for upload: {0}.".format(error_msg)
781 )
782
783 def abort_chunked_upload(self, dataset_id, chunk_id):
784 url = self.base_uri + "/api/datasets/{0}/abort-chunked-upload".format(dataset_id)
785 headers = {"Authorization": "ApiKey " + self.api_key}
786 params = {"uploadGroupId": chunk_id}
787 res = requests.post(url, headers=headers, **self._request_params, params=params)
788 if not res.ok:
789 error_msg = self._try_extract_error_code(res)
790 logger.error(error_msg)
791 raise BoostedAPIException(
792 "Failed to abort dataset lock during error: {0}.".format(error_msg)
793 )
794
795 def check_dataset_ingestion_completion(self, dataset_id, chunk_id, start_time):
796 url = self.base_uri + "/api/datasets/{0}/upload-chunk-status".format(dataset_id)
797 headers = {"Authorization": "ApiKey " + self.api_key}
798 params = {"uploadGroupId": chunk_id}
799 res = requests.get(url, headers=headers, **self._request_params, params=params)
800 res = res.json()
801
802 finished = False
803 warnings = []
804 errors = []
805
806 if type(res) == dict:
807 dataset_status = res["datasetStatus"]
808 chunk_status = res["chunkStatus"]
809 if chunk_status != ChunkStatus.PROCESSING.value:
810 finished = True
811 errors = res["errors"]
812 warnings = res["warnings"]
813 successful_rows = res["successfulRows"]
814 total_rows = res["totalRows"]
815 logger.info(
816 f"Successfully ingested {successful_rows} out of {total_rows} uploaded rows."
817 )
818 if chunk_status in [
819 ChunkStatus.SUCCESS.value,
820 ChunkStatus.WARNING.value,
821 ChunkStatus.ERROR.value,
822 ]:
823 if dataset_status != "AVAILABLE":
824 raise BoostedAPIException(
825 "Dataset was unexpectedly unavailable after chunk upload finished."
826 )
827 else:
828 logger.info("Ingestion complete. Uploaded data is ready for use.")
829 elif chunk_status == ChunkStatus.ABORTED.value:
830 errors.append(
831 "Dataset chunk upload was aborted by server! Upload did not succeed."
832 )
833 else:
834 errors.append("Unexpected data ingestion status: {0}.".format(chunk_status))
835 logger.info(
836 "Data ingestion still running. Time elapsed={0}.".format(
837 datetime.datetime.now() - start_time
838 )
839 )
840 else:
841 raise BoostedAPIException("Unable to get status of dataset ingestion.")
842 return {"finished": finished, "warnings": warnings, "errors": errors}
843
844 def _commit_chunked_upload(self, dataset_id, chunk_id, data_type, block=True, timeout=600):
845 url = self.base_uri + "/api/datasets/{0}/commit-chunked-upload".format(dataset_id)
846 headers = {"Authorization": "ApiKey " + self.api_key}
847 params = {
848 "uploadGroupId": chunk_id,
849 "dataAddType": data_type,
850 "sendCompletionEmail": not block,
851 }
852 res = requests.post(url, headers=headers, **self._request_params, params=params)
853 if not res.ok:
854 error_msg = self._try_extract_error_code(res)
855 logger.error(error_msg)
856 raise BoostedAPIException("Failed to commit dataset files: {0}.".format(error_msg))
857
858 if block:
859 start_time = datetime.datetime.now()
860 # Keep waiting until upload is no longer in UPDATING state...
861 while True:
862 result = self.check_dataset_ingestion_completion(dataset_id, chunk_id, start_time)
863 if result["finished"]:
864 break
865
866 if (datetime.datetime.now() - start_time).total_seconds() > timeout:
867 err_str = (
868 f"Timeout waiting for commit of dataset: {dataset_id} | chunk: {chunk_id}"
869 )
870 logger.error(err_str)
871 return [], [err_str]
872
873 time.sleep(10)
874 return result["warnings"], result["errors"]
875 else:
876 return [], []
877
878 def setup_chunk_and_upload_data(
879 self,
880 dataset_id,
881 csv_data,
882 data_type,
883 timeout=600,
884 block=True,
885 no_exception_on_chunk_error=False,
886 ):
887 chunk_id = self.start_chunked_upload(dataset_id)
888 logger.info("Obtained lock on dataset for upload: " + chunk_id)
889 try:
890 warnings, errors = self.chunk_and_upload_data(
891 dataset_id, chunk_id, csv_data, timeout, no_exception_on_chunk_error
892 )
893 commit_warnings, commit_errors = self._commit_chunked_upload(
894 dataset_id, chunk_id, data_type, block, timeout
895 )
896 return warnings + commit_warnings, errors + commit_errors
897 except Exception:
898 self.abort_chunked_upload(dataset_id, chunk_id)
899 raise
900
901 def chunk_and_upload_data(
902 self, dataset_id, chunk_id, csv_data, timeout=600, no_exception_on_chunk_error=False
903 ):
904 if isinstance(csv_data, pd.core.frame.DataFrame):
905 if not isinstance(csv_data.index, pd.core.indexes.datetimes.DatetimeIndex):
906 raise BoostedAPIException("DataFrame must have DatetimeIndex as index type.")
907
908 warnings = []
909 errors = []
910 logger.info("Uploading yearly.")
911 for t in csv_data.index.to_period("Y").unique():
912 if t is pd.NaT:
913 continue
914
915 # serialize bit to string
916 buf = self.get_csv_buffer()
917 yearly_csv = csv_data.loc[str(t)]
918 yearly_csv.to_csv(buf, header=True)
919 raw_csv = buf.getvalue()
920
921 # we are already chunking yearly... but if the csv still exceeds a healthy
922 # limit of 50mb the final line of defence is to ignore date boundaries and
923 # just chunk the rows. This is mostly for the cloudflare upload limit.
924 size_lim = 50 * 1000 * 1000
925 est_csv_size = sys.getsizeof(raw_csv)
926 if est_csv_size > size_lim:
927 del raw_csv, buf
928 logger.info("Yearly data too large for single upload, chunking further...")
929 chunks = []
930 nchunks = math.ceil(est_csv_size / size_lim)
931 rows_per_chunk = math.ceil(len(yearly_csv) / nchunks)
932 for i in range(0, len(yearly_csv), rows_per_chunk):
933 buf = self.get_csv_buffer()
934 split_csv = yearly_csv.iloc[i : i + rows_per_chunk]
935 split_csv.to_csv(buf, header=True)
936 split_csv = buf.getvalue()
937 chunks.append(
938 (
939 "{0}-{1}".format(i + 1, min(len(yearly_csv), i + rows_per_chunk)),
940 split_csv,
941 )
942 )
943 else:
944 chunks = [("all", raw_csv)]
945
946 for i, (rows_descriptor, chunk_csv) in enumerate(chunks):
947 chunk_descriptor = "{0} in yearly chunk {1}".format(rows_descriptor, t)
948 logger.info(
949 "Uploading rows:"
950 + chunk_descriptor
951 + " (chunk {0} of {1}):".format(i + 1, len(chunks))
952 )
953 _, new_warnings, new_errors = self.upload_dataset_chunk(
954 chunk_descriptor,
955 dataset_id,
956 chunk_id,
957 chunk_csv,
958 timeout,
959 no_exception_on_chunk_error,
960 )
961 warnings.extend(new_warnings)
962 errors.extend(new_errors)
963 return warnings, errors
964
965 elif isinstance(csv_data, str):
966 _, warnings, errors = self.upload_dataset_chunk(
967 "all data", dataset_id, chunk_id, csv_data, timeout, no_exception_on_chunk_error
968 )
969 return warnings, errors
970 else:
971 raise BoostedAPIException("Expected CSV as str or Pandas DataFrame.")
972
973 def upload_dataset_chunk(
974 self,
975 chunk_descriptor,
976 dataset_id,
977 chunk_id,
978 csv_data,
979 timeout=600,
980 no_exception_on_chunk_error=False,
981 ):
982 logger.info("Starting upload: " + chunk_descriptor)
983 url = self.base_uri + "/api/datasets/{0}/upload-dataset-chunk".format(dataset_id)
984 headers = {"Authorization": "ApiKey " + self.api_key}
985 files_req = {}
986 warnings = []
987 errors = []
988
989 # make the network request
990 target = ("uploaded_data.csv", csv_data, "text/csv")
991 files_req["dataFile"] = target
992 params = {"uploadGroupId": chunk_id}
993 res = requests.post(
994 url,
995 params=params,
996 files=files_req,
997 headers=headers,
998 timeout=timeout,
999 **self._request_params,
1000 )
1001
1002 if res.ok:
1003 logger.info(
1004 (
1005 "Chunk upload completed. "
1006 "Ingestion started. "
1007 "Please wait until the data is in AVAILABLE state."
1008 )
1009 )
1010 if "warnings" in res.json():
1011 warnings = res.json()["warnings"]
1012 if len(warnings) > 0:
1013 logger.warning("Uploaded chunk encountered data warnings: ")
1014 for w in warnings:
1015 logger.warning(w)
1016 else:
1017 reason = "Upload failed: {0}, {1}".format(res.text, res.reason)
1018 logger.error(reason)
1019 if no_exception_on_chunk_error:
1020 errors.append(
1021 "Chunk {0} failed: {1}. ".format(chunk_descriptor, reason)
1022 + "Your data was only PARTIALLY uploaded. "
1023 + "Please reattempt the upload of this chunk."
1024 )
1025 else:
1026 raise BoostedAPIException(reason)
1027
1028 return res, warnings, errors
1029
1030 def getAllocationsForDate(self, portfolio_id, date, rollback_to_last_available_date):
1031 date = self.__iso_format(date)
1032 endpoint = "latest-allocations" if rollback_to_last_available_date else "allocations"
1033 url = self.base_uri + "/api/portfolios/{0}/{1}".format(portfolio_id, endpoint)
1034 headers = {"Authorization": "ApiKey " + self.api_key}
1035 params = {"date": date}
1036 logger.info("Retrieving allocations information for date {0}.".format(date))
1037 res = requests.get(url, params=params, headers=headers, **self._request_params)
1038 if res.ok:
1039 logger.info("Allocations retrieval successful.")
1040 return res.json()
1041 else:
1042 error_msg = self._try_extract_error_code(res)
1043 raise BoostedAPIException("Failed to retrieve allocations: {0}.".format(error_msg))
1044
1045 # New API method for fetching data from portfolio_holdings.pb2 file.
1046 def getAllocationsForDateV2(self, portfolio_id, date, rollback_to_last_available_date):
1047 date = self.__iso_format(date)
1048 endpoint = "latest-allocations-v2" if rollback_to_last_available_date else "allocations-v2"
1049 url = self.base_uri + "/api/portfolios/{0}/{1}".format(portfolio_id, endpoint)
1050 headers = {"Authorization": "ApiKey " + self.api_key}
1051 params = {"date": date}
1052 logger.info("Retrieving allocations information for date {0}.".format(date))
1053 res = requests.get(url, params=params, headers=headers, **self._request_params)
1054 if res.ok:
1055 logger.info("Allocations retrieval successful.")
1056 return res.json()
1057 else:
1058 error_msg = self._try_extract_error_code(res)
1059 raise BoostedAPIException("Failed to retrieve allocations: {0}.".format(error_msg))
1060
1061 def getAllocationsByDates(self, portfolio_id, dates=None):
1062 url = self.base_uri + "/api/portfolios/{0}/allocationsByDate".format(portfolio_id)
1063 headers = {"Authorization": "ApiKey " + self.api_key}
1064 if dates is not None:
1065 fmt_dates = []
1066 for d in dates:
1067 fmt_dates.append(self.__iso_format(d))
1068 fmt_dates_str = ",".join(fmt_dates)
1069 params: Dict = {"dates": fmt_dates_str}
1070 logger.info("Retrieving allocations information for dates {0}.".format(fmt_dates))
1071 else:
1072 params = {"dates": None}
1073 logger.info("Retrieving allocations information for all dates")
1074 res = requests.get(url, params=params, headers=headers, **self._request_params)
1075 if res.ok:
1076 logger.info("Allocations retrieval successful.")
1077 return res.json()
1078 else:
1079 error_msg = self._try_extract_error_code(res)
1080 raise BoostedAPIException("Failed to retrieve allocations: {0}.".format(error_msg))
1081
1082 def getSignalsForDate(self, portfolio_id, date, rollback_to_last_available_date):
1083 date = self.__iso_format(date)
1084 endpoint = "latest-signals" if rollback_to_last_available_date else "signals"
1085 url = self.base_uri + "/api/portfolios/{0}/{1}".format(portfolio_id, endpoint)
1086 headers = {"Authorization": "ApiKey " + self.api_key}
1087 params = {"date": date}
1088 logger.info("Retrieving signals information for date {0}.".format(date))
1089 res = requests.get(url, params=params, headers=headers, **self._request_params)
1090 if res.ok:
1091 logger.info("Signals retrieval successful.")
1092 return res.json()
1093 else:
1094 error_msg = self._try_extract_error_code(res)
1095 raise BoostedAPIException("Failed to retrieve signals: {0}.".format(error_msg))
1096
1097 def getSignalsForAllDates(self, portfolio_id, dates=None):
1098 url = self.base_uri + "/api/portfolios/{0}/signalsByDate".format(portfolio_id)
1099 headers = {"Authorization": "ApiKey " + self.api_key}
1100 params = {}
1101 if dates is not None:
1102 fmt_dates = []
1103 for d in dates:
1104 fmt_dates.append(self.__iso_format(d))
1105 fmt_dates_str = ",".join(fmt_dates)
1106 params = {"dates": fmt_dates_str}
1107 logger.info("Retrieving signals information for dates {0}.".format(fmt_dates))
1108 else:
1109 params = {"dates": None}
1110 logger.info("Retrieving signals information for all dates")
1111 res = requests.get(url, params=params, headers=headers, **self._request_params)
1112 if res.ok:
1113 logger.info("Signals retrieval successful.")
1114 return res.json()
1115 else:
1116 error_msg = self._try_extract_error_code(res)
1117 raise BoostedAPIException("Failed to retrieve signals: {0}.".format(error_msg))
1118
1119 def getEquityAccuracy(
1120 self,
1121 model_id: str,
1122 portfolio_id: str,
1123 tickers: List[str],
1124 start_date: Optional[BoostedDate] = None,
1125 end_date: Optional[BoostedDate] = None,
1126 ) -> Dict[str, Dict[str, Any]]:
1127 data: Dict[str, Any] = {}
1128 if start_date is not None:
1129 start_date = convert_date(start_date)
1130 data["startDate"] = start_date.isoformat()
1131 if end_date is not None:
1132 end_date = convert_date(end_date)
1133 data["endDate"] = end_date.isoformat()
1134
1135 if start_date and end_date:
1136 validate_start_and_end_dates(start_date, end_date)
1137
1138 tickers_stream = ",".join(tickers)
1139 data["tickers"] = tickers_stream
1140 data["timestamp"] = time.strftime("%H:%M:%S")
1141 data["shouldRecalc"] = True
1142 url = self.base_uri + f"/api/analysis/equity-accuracy/{model_id}/{portfolio_id}"
1143 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
1144
1145 logger.info(
1146 f"Retrieving equity accuracy data for date range {start_date} to {end_date} "
1147 f"for tickers: {tickers}."
1148 )
1149
1150 # Now create dataframes from the JSON output.
1151 metrics = [
1152 "hit_rate_mean",
1153 "hit_rate_median",
1154 "excess_return_mean",
1155 "excess_return_median",
1156 "return",
1157 "excess_return",
1158 ]
1159
1160 # send the request, retry if failed
1161 MAX_RETRIES = 10 # max of number of retries until timeout
1162 SLEEP_TIME = 3 # waiting time between requests
1163
1164 num_retries = 0
1165 success = False
1166 while not success and num_retries < MAX_RETRIES:
1167 res = requests.post(url, data=json.dumps(data), headers=headers, **self._request_params)
1168 if res.ok:
1169 logger.info("Equity Accuracy Data retrieval successful.")
1170 info = res.json()
1171 success = True
1172 else:
1173 data["shouldRecalc"] = False
1174 num_retries += 1
1175 time.sleep(SLEEP_TIME)
1176
1177 if not success:
1178 raise BoostedAPIException("Failed to retrieve equity accuracy: Request timeout.")
1179
1180 for ticker, accuracy_data in info.items():
1181 for metric in metrics:
1182 metric_matrix = accuracy_data[metric]
1183 if not isinstance(metric_matrix, str):
1184 # Set the index to the quintile label, and remove it from the data
1185 index = []
1186 for row in metric_matrix[1:]:
1187 index.append(row.pop(0))
1188
1189 # columns are "1D", "5D", etc.
1190 df = pd.DataFrame(metric_matrix[1:], columns=metric_matrix[0][1:], index=index)
1191 accuracy_data[metric] = df
1192 return info
1193
1194 def getHistoricalTradeDates(self, portfolio_id, start_date=None, end_date=None):
1195 end_date = self.__to_date_obj(end_date or datetime.date.today())
1196 start_date = self.__iso_format(start_date or (end_date - timedelta(days=365)))
1197 end_date = self.__iso_format(end_date)
1198
1199 url = self.base_uri + "/api/portfolios/{0}/tradingDates".format(portfolio_id)
1200 headers = {"Authorization": "ApiKey " + self.api_key}
1201 params = {"startDate": start_date, "endDate": end_date}
1202
1203 logger.info(
1204 "Retrieving historical trade dates data for date range {0} to {1}.".format(
1205 start_date, end_date
1206 )
1207 )
1208 res = requests.get(url, params=params, headers=headers, **self._request_params)
1209 if res.ok:
1210 logger.info("Trading dates retrieval successful.")
1211 return res.json()["dates"]
1212 else:
1213 error_msg = self._try_extract_error_code(res)
1214 raise BoostedAPIException("Failed to retrieve trading dates: {0}.".format(error_msg))
1215
1216 def getRankingsForAllDates(self, portfolio_id, dates=None):
1217 url = self.base_uri + "/api/portfolios/{0}/rankingsByDate".format(portfolio_id)
1218 headers = {"Authorization": "ApiKey " + self.api_key}
1219 params = {}
1220 if dates is not None:
1221 fmt_dates = []
1222 for d in dates:
1223 fmt_dates.append(self.__iso_format(d))
1224 fmt_dates_str = ",".join(fmt_dates)
1225 params = {"dates": fmt_dates_str}
1226 logger.info("Retrieving rankings information for date {0}.".format(fmt_dates_str))
1227 else:
1228 params = {"dates": None}
1229 logger.info("Retrieving rankings information for all dates")
1230 res = requests.get(url, params=params, headers=headers, **self._request_params)
1231 if res.ok:
1232 logger.info("Rankings retrieval successful.")
1233 return res.json()
1234 else:
1235 error_msg = self._try_extract_error_code(res)
1236 raise BoostedAPIException("Failed to retrieve rankings: {0}.".format(error_msg))
1237
1238 def getRankingsForDate(self, portfolio_id, date, rollback_to_last_available_date):
1239 date = self.__iso_format(date)
1240 endpoint = "latest-rankings" if rollback_to_last_available_date else "rankings"
1241 url = self.base_uri + "/api/{0}/{1}/{2}".format(endpoint, portfolio_id, date)
1242 headers = {"Authorization": "ApiKey " + self.api_key}
1243 logger.info("Retrieving rankings information for date {0}.".format(date))
1244 res = requests.get(url, headers=headers, **self._request_params)
1245 if res.ok:
1246 logger.info("Rankings retrieval successful.")
1247 return res.json()
1248 else:
1249 error_msg = self._try_extract_error_code(res)
1250 raise BoostedAPIException("Failed to retrieve rankings: {0}.".format(error_msg))
1251
1252 def sendModelRecalc(self, model_id):
1253 url = self.base_uri + "/api/models/{0}/recalc".format(model_id)
1254 logger.info("Sending model recalc request for model {0}".format(model_id))
1255 headers = {"Authorization": "ApiKey " + self.api_key}
1256 res = requests.put(url, headers=headers, **self._request_params)
1257 if not res.ok:
1258 error_msg = self._try_extract_error_code(res)
1259 logger.error(error_msg)
1260 raise BoostedAPIException(
1261 "Failed to send model recalc request - "
1262 + "the model in UI may be out of date: {0}.".format(error_msg)
1263 )
1264
1265 def sendRecalcAllModelPortfolios(self, model_id: str):
1266 """Recalculates all portfolios under a given model ID.
1267
1268 Args:
1269 model_id: the model ID
1270 Raises:
1271 BoostedAPIException: if the Boosted API request fails
1272 """
1273 url = self.base_uri + f"/api/models/{model_id}/recalc-all-portfolios"
1274 logger.info(f"Sending portfolio recalc requests for all portfolios under {model_id=}.")
1275 headers = {"Authorization": "ApiKey " + self.api_key}
1276 res = requests.put(url, headers=headers, **self._request_params)
1277 if not res.ok:
1278 error_msg = self._try_extract_error_code(res)
1279 logger.error(error_msg)
1280 raise BoostedAPIException(
1281 f"Failed to send recalc request for all portfolios under {model_id=} - {error_msg}."
1282 )
1283
1284 def sendPortfolioRecalc(self, portfolio_id: str):
1285 """Recalculates a single portfolio by its portfolio ID.
1286
1287 Args:
1288 portfolio_id: the portfolio ID to recalculate
1289 Raises:
1290 BoostedAPIException: if the Boosted API request fails
1291 """
1292 url = self.base_uri + "/api/graphql"
1293 logger.info(f"Sending portfolio recalc request for {portfolio_id=}.")
1294 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
1295 qry = """
1296 mutation recalcPortfolio($input: RecalculatePortfolioInput!) {
1297 recalculatePortfolio(input: $input) {
1298 success
1299 errors
1300 }
1301 }
1302 """
1303 req_json = {
1304 "query": qry,
1305 "variables": {"input": {"portfolioId": f"{portfolio_id}", "allowForceRecalc": "true"}},
1306 }
1307 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
1308 if not res.ok or res.json().get("errors"):
1309 error_msg = self._try_extract_error_code(res)
1310 logger.error(error_msg)
1311 raise BoostedAPIException(
1312 f"Failed to send portfolio recalc request for {portfolio_id=} - {error_msg}."
1313 )
1314
1315 def add_uploaded_model_data(self, url, csv_data, request_data, timeout=600):
1316 logger.info("Starting upload.")
1317 headers = {"Authorization": "ApiKey " + self.api_key}
1318 files_req: Dict = {}
1319 target: Tuple[str, Any, str] = ("data.csv", None, "text/csv")
1320 warnings = []
1321 if isinstance(csv_data, pd.core.frame.DataFrame):
1322 buf = io.StringIO()
1323 csv_data.to_csv(buf, header=False)
1324 if not isinstance(csv_data.index, pd.core.indexes.datetimes.DatetimeIndex):
1325 raise BoostedAPIException("DataFrame must have DatetimeIndex as index type.")
1326 target = ("uploaded_data.csv", buf.getvalue(), "text/csv")
1327 files_req["dataFile"] = target
1328 res = requests.post(
1329 url,
1330 files=files_req,
1331 data=request_data,
1332 headers=headers,
1333 timeout=timeout,
1334 **self._request_params,
1335 )
1336 elif isinstance(csv_data, str):
1337 target = ("uploaded_data.csv", csv_data, "text/csv")
1338 files_req["dataFile"] = target
1339 res = requests.post(
1340 url,
1341 files=files_req,
1342 data=request_data,
1343 headers=headers,
1344 timeout=timeout,
1345 **self._request_params,
1346 )
1347 else:
1348 raise BoostedAPIException("Expected CSV as str or Pandas DataFrame.")
1349 if res.ok:
1350 logger.info("Signals upload completed.")
1351 result = res.json()["result"]
1352 if "warningMessages" in result:
1353 warnings = result["warningMessages"]
1354 else:
1355 error_str = "Signals upload failed: {0}, {1}".format(res.text, res.reason)
1356 logger.error(error_str)
1357 raise BoostedAPIException(error_str)
1358
1359 return res, warnings
1360
1361 def createSignalsModel(self, csv_data, model_name, timeout=600):
1362 warnings = []
1363 url = self.base_uri + "/api/models/upload/signals/create"
1364 request_data = {"modelName": model_name, "uploadName": model_name}
1365 res, warnings = self.add_uploaded_model_data(url, csv_data, request_data, timeout)
1366 result = res.json()["result"]
1367 model_id = result["modelId"]
1368 self.sendModelRecalc(model_id)
1369 return model_id, warnings
1370
1371 def addToUploadedModel(self, model_id, csv_data, timeout=600, recalc_model=True):
1372 warnings = []
1373 url = self.base_uri + "/api/models/{0}/upload/add-data".format(model_id)
1374 request_data: Dict = {}
1375 _, warnings = self.add_uploaded_model_data(url, csv_data, request_data, timeout)
1376 if recalc_model:
1377 self.sendModelRecalc(model_id)
1378 return warnings
1379
1380 def addSignalsToUploadedModel(
1381 self,
1382 model_id: str,
1383 csv_data: Union[pd.core.frame.DataFrame, str],
1384 timeout: int = 600,
1385 recalc_all: bool = False,
1386 recalc_portfolio_ids: Optional[List[str]] = None,
1387 ) -> List[str]:
1388 """
1389 Add signals to an uploaded model and then recalculate a random portfolio under that model.
1390
1391 Args:
1392 model_id: model ID
1393 csv_data: pandas DataFrame, or a string with signals to upload.
1394 timeout (optional): Timeout for initial upload request in seconds.
1395 recalc_all (optional): if True, recalculates all portfolios in the model.
1396 recalc_portfolio_ids (optional): List of portfolio IDs under the model to re-calculate.
1397 """
1398 warnings = self.addToUploadedModel(model_id, csv_data, timeout, recalc_model=False)
1399
1400 if recalc_all:
1401 self.sendRecalcAllModelPortfolios(model_id)
1402 elif recalc_portfolio_ids:
1403 for portfolio_id in recalc_portfolio_ids:
1404 self.sendPortfolioRecalc(portfolio_id)
1405 else:
1406 self.sendModelRecalc(model_id)
1407 return warnings
1408
1409 def getSignalsFromUploadedModel(self, model_id, date=None):
1410 date = self.__iso_format(date)
1411 url = self.base_uri + "/api/models/{0}/upload/signals".format(model_id)
1412 headers = {"Authorization": "ApiKey " + self.api_key}
1413 params = {"date": date}
1414 logger.info("Retrieving uploaded signals information")
1415 res = requests.get(url, params=params, headers=headers, **self._request_params)
1416 if res.ok:
1417 result = pd.DataFrame.from_dict(res.json()["result"])
1418 # ensure column order
1419 result = result[["date", "isin", "country", "currency", "weight"]]
1420 result["date"] = pd.to_datetime(result["date"], format="%Y-%m-%d")
1421 result = result.set_index("date")
1422 logger.info("Signals retrieval successful.")
1423 return result
1424 else:
1425 error_msg = self._try_extract_error_code(res)
1426 raise BoostedAPIException("Failed to retrieve signals: {0}.".format(error_msg))
1427
1428 def getPortfolioSettings(self, portfolio_id, timeout=600):
1429 url = self.base_uri + "/api/portfolio-settings/{0}".format(portfolio_id)
1430 headers = {"Authorization": "ApiKey " + self.api_key}
1431 res = requests.get(url, headers=headers, **self._request_params)
1432 if res.ok:
1433 return PortfolioSettings(res.json())
1434 else:
1435 error_msg = self._try_extract_error_code(res)
1436 logger.error(error_msg)
1437 raise BoostedAPIException(
1438 "Failed to retrieve portfolio settings: {0}.".format(error_msg)
1439 )
1440
1441 def createPortfolioWithPortfolioSettings(
1442 self, model_id, portfolio_name, portfolio_description, portfolio_settings, timeout=600
1443 ):
1444 url = self.base_uri + "/api/models/{0}/constraints/add".format(model_id)
1445 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
1446 setting_string = json.dumps(portfolio_settings.settings)
1447 logger.info("Creating new portfolio with specified setting: {}".format(setting_string))
1448 params = {
1449 "name": portfolio_name,
1450 "description": portfolio_description,
1451 "constraints": setting_string,
1452 "validate": "true",
1453 }
1454 res = requests.put(url, json=params, headers=headers, **self._request_params)
1455 response = res.json()
1456 if res.ok:
1457 return response
1458 else:
1459 error_msg = self._try_extract_error_code(res)
1460 logger.error(error_msg)
1461 raise BoostedAPIException(
1462 "Failed to create portfolio with the specified settings: {0}.".format(error_msg)
1463 )
1464
1465 def getGbiIdFromIdentCountryCurrencyDate(
1466 self, ident_country_currency_dates: List[DateIdentCountryCurrency], timeout: int = 600
1467 ) -> List[Optional[GbiIdSecurity]]:
1468 url = self.base_uri + "/api/custom-stock-data/map-identifiers-simple"
1469 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
1470 identifiers = [
1471 {
1472 "row": idx,
1473 "date": identifier.date,
1474 "isin": identifier.identifier if identifier.id_type == ColumnSubRole.ISIN else None,
1475 "symbol": identifier.identifier
1476 if identifier.id_type == ColumnSubRole.SYMBOL
1477 else None,
1478 "countryPreference": identifier.country,
1479 "currencyPreference": identifier.currency,
1480 }
1481 for idx, identifier in enumerate(ident_country_currency_dates)
1482 ]
1483 params = json.dumps({"identifiers": identifiers})
1484 logger.info(
1485 "Retrieving GBI-ID mapping for {} identifier tuples...".format(
1486 len(ident_country_currency_dates)
1487 )
1488 )
1489 res = requests.post(url, data=params, headers=headers, **self._request_params)
1490
1491 if res.ok:
1492 result = res.json()
1493 warnings = result["warnings"]
1494 if warnings:
1495 for warning in warnings:
1496 logger.warn(f"Mapping warning: {warning}")
1497 gbiSecurities = []
1498 for idx, ident in enumerate(result["mappedIdentifiers"]):
1499 if ident is None:
1500 security = None
1501 else:
1502 security = GbiIdSecurity(
1503 ident["gbiId"],
1504 ident_country_currency_dates[idx],
1505 ident["symbol"],
1506 ident["companyName"],
1507 )
1508 gbiSecurities.append(security)
1509
1510 return gbiSecurities
1511 else:
1512 error_msg = self._try_extract_error_code(res)
1513 raise BoostedAPIException(
1514 "Failed to retrieve identifier mappings: {0}.".format(error_msg)
1515 )
1516
1517 # exists for backwards compatibility purposes.
1518 def getGbiIdFromIsinCountryCurrencyDate(self, isin_country_currency_dates, timeout=600):
1519 return self.getGbiIdFromIdentCountryCurrencyDate(
1520 ident_country_currency_dates=isin_country_currency_dates, timeout=timeout
1521 )
1522
1523 # model_id: str
1524 # returns: Dict[str, str] representing the translation from the rankings ID (feature refs)
1525 # to human readable names
1526 def __get_rankings_ref_translation(self, model_id: str) -> Dict[str, str]:
1527 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
1528 feature_name_url = f"/api/models/{model_id}/advanced-explain/translate-feature-ref/"
1529 feature_name_res = requests.post(
1530 self.base_uri + feature_name_url,
1531 data=json.dumps({}),
1532 headers=headers,
1533 **self._request_params,
1534 )
1535
1536 if feature_name_res.ok:
1537 feature_name_dict = feature_name_res.json()
1538 return {
1539 id: "-".join(
1540 [names["variable_name"], names["transform_name"], names["normalization_name"]]
1541 )
1542 for id, names in feature_name_dict.items()
1543 }
1544 else:
1545 raise Exception(
1546 """Failed to get feature names for model,
1547 this model doesn't fully support rankings 2.0"""
1548 )
1549
1550 def getDatasetDates(self, dataset_id):
1551 url = self.base_uri + f"/api/datasets/{dataset_id}"
1552 headers = {"Authorization": "ApiKey " + self.api_key}
1553 res = requests.get(url, headers=headers, **self._request_params)
1554 if res.ok:
1555 dataset = res.json()
1556 valid_to_array = dataset.get("validTo")
1557 valid_to_date = None
1558 valid_from_array = dataset.get("validFrom")
1559 valid_from_date = None
1560 if valid_to_array:
1561 valid_to_date = datetime.date(
1562 valid_to_array[0], valid_to_array[1], valid_to_array[2]
1563 )
1564 if valid_from_array:
1565 valid_from_date = datetime.date(
1566 valid_from_array[0], valid_from_array[1], valid_from_array[2]
1567 )
1568 return {"validTo": valid_to_date, "validFrom": valid_from_date}
1569 else:
1570 error_msg = self._try_extract_error_code(res)
1571 logger.error(error_msg)
1572 raise BoostedAPIException("Failed to query dataset: {0}.".format(error_msg))
1573
1574 def getRankingAnalysis(self, model_id, date):
1575 url = (
1576 self.base_uri
1577 + f"/api/explain-trades/analysis/{model_id}/{self.__iso_format(date)}/json"
1578 )
1579 headers = {"Authorization": "ApiKey " + self.api_key}
1580 analysis_res = requests.get(url, headers=headers, **self._request_params)
1581 if analysis_res.ok:
1582 ranking_dict = analysis_res.json()
1583 feature_name_dict = self.__get_rankings_ref_translation(model_id)
1584 columns = [feature_name_dict[col] for col in ranking_dict["columns"]]
1585
1586 df = protoCubeJsonDataToDataFrame(
1587 ranking_dict["data"],
1588 "Data Buckets",
1589 ranking_dict["rows"],
1590 "Feature Names",
1591 columns,
1592 ranking_dict["fields"],
1593 )
1594 return df
1595 else:
1596 error_msg = self._try_extract_error_code(analysis_res)
1597 logger.error(error_msg)
1598 raise BoostedAPIException("Failed to get ranking analysis: {0}.".format(error_msg))
1599
1600 def getRankingExplain(self, model_id, date, index_by_symbol: bool = False):
1601 url = (
1602 self.base_uri + f"/api/explain-trades/explain/{model_id}/{self.__iso_format(date)}/json"
1603 )
1604 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
1605 explain_res = requests.get(url, headers=headers, **self._request_params)
1606 if explain_res.ok:
1607 ranking_dict = explain_res.json()
1608 rows = ranking_dict["rows"]
1609 stock_summary_url = f"/api/stock-summaries/{model_id}"
1610 stock_summary_body = {"gbiIds": ranking_dict["rows"]}
1611 summary_res = requests.post(
1612 self.base_uri + stock_summary_url,
1613 data=json.dumps(stock_summary_body),
1614 headers=headers,
1615 **self._request_params,
1616 )
1617 if summary_res.ok:
1618 stock_summary = summary_res.json()
1619 if index_by_symbol:
1620 rows = [stock_summary[row]["symbol"] for row in ranking_dict["rows"]]
1621 else:
1622 rows = [stock_summary[row]["isin"] for row in ranking_dict["rows"]]
1623 else:
1624 error_msg = self._try_extract_error_code(summary_res)
1625 logger.error(error_msg)
1626 raise BoostedAPIException(
1627 "Failed to get isin information ranking explain: {0}.".format(error_msg)
1628 )
1629
1630 feature_name_dict = self.__get_rankings_ref_translation(model_id)
1631 columns = [feature_name_dict[col] for col in ranking_dict["columns"]]
1632
1633 id_col_name = "Symbols" if index_by_symbol else "ISINs"
1634 df = protoCubeJsonDataToDataFrame(
1635 ranking_dict["data"],
1636 id_col_name,
1637 rows,
1638 "Feature Names",
1639 columns,
1640 ranking_dict["fields"],
1641 )
1642 return df
1643 else:
1644 error_msg = self._try_extract_error_code(explain_res)
1645 logger.error(error_msg)
1646 raise BoostedAPIException("Failed to get ranking explain: {0}.".format(error_msg))
1647
1648 def getDenseSignals(self, model_id, portfolio_id, file_name=None, location="./"):
1649 url = self.base_uri + f"/api/models/{model_id}/{portfolio_id}/dense-signals"
1650 headers = {"Authorization": "ApiKey " + self.api_key}
1651 res = requests.get(url, headers=headers, **self._request_params)
1652 if file_name is None:
1653 file_name = f"{model_id}-{portfolio_id}_dense_signals.csv"
1654 download_location = os.path.join(location, file_name)
1655 if res.ok:
1656 with open(download_location, "wb") as file:
1657 file.write(res.content)
1658 print("Download Complete")
1659 elif res.status_code == 404:
1660 raise BoostedAPIException(
1661 f"""Dense Singals file does not exist for model:
1662 {model_id} - portfolio: {portfolio_id}"""
1663 )
1664 else:
1665 error_msg = self._try_extract_error_code(res)
1666 logger.error(error_msg)
1667 raise BoostedAPIException(
1668 f"""Failed to download dense singals file for model:
1669 {model_id} - portfolio: {portfolio_id}"""
1670 )
1671
1672 def _getIsPortfolioReadyForProcessing(self, model_id, portfolio_id, formatted_date):
1673 headers = {"Authorization": "ApiKey " + self.api_key}
1674 url = (
1675 self.base_uri
1676 + f"/api/explain-trades/{model_id}/{portfolio_id}"
1677 + f"/is-ready-for-processing/{formatted_date}"
1678 )
1679 res = requests.get(url, headers=headers, **self._request_params)
1680
1681 try:
1682 if res.ok:
1683 body = res.json()
1684 if "ready" in body:
1685 if body["ready"]:
1686 return True, ""
1687 else:
1688 reason_from_api = (
1689 body["notReadyReason"] if "notReadyReason" in body else "Unavailable"
1690 )
1691
1692 returned_reason = reason_from_api
1693
1694 if returned_reason == "SKIP":
1695 returned_reason = "holiday- market closed"
1696
1697 if returned_reason == "WAITING":
1698 returned_reason = "calculations pending"
1699
1700 return False, returned_reason
1701 else:
1702 return False, "Unavailable"
1703 else:
1704 error_msg = self._try_extract_error_code(res)
1705 logger.error(error_msg)
1706 raise BoostedAPIException(
1707 f"""Failed to generate file for model:
1708 {model_id} - portfolio: {portfolio_id} on date: {formatted_date}"""
1709 )
1710 except Exception as e:
1711 raise BoostedAPIException(
1712 f"""Failed to generate file for model:
1713 {model_id} - portfolio: {portfolio_id} on date: {formatted_date} {e}"""
1714 )
1715
1716 def getRanking2DateAnalysisFile(
1717 self, model_id, portfolio_id, date, file_name=None, location="./"
1718 ):
1719 formatted_date = self.__iso_format(date)
1720 s3_file_name = f"{formatted_date}_analysis.xlsx"
1721 download_url = (
1722 self.base_uri + f"/api/models/{model_id}/{portfolio_id}/ranking-file/{s3_file_name}"
1723 )
1724 headers = {"Authorization": "ApiKey " + self.api_key}
1725 if file_name is None:
1726 file_name = f"{model_id}-{portfolio_id}_statistical_analysis_{formatted_date}.xlsx"
1727 download_location = os.path.join(location, file_name)
1728
1729 res = requests.get(download_url, headers=headers, **self._request_params)
1730 if res.ok:
1731 with open(download_location, "wb") as file:
1732 file.write(res.content)
1733 print("Download Complete")
1734 elif res.status_code == 404:
1735 (
1736 is_portfolio_ready_for_processing,
1737 portfolio_ready_status,
1738 ) = self._getIsPortfolioReadyForProcessing(model_id, portfolio_id, formatted_date)
1739
1740 if not is_portfolio_ready_for_processing:
1741 logger.info(
1742 f"""\nPortfolio {portfolio_id} for model {model_id}
1743 on date {date} unavailable for Ranking2Date Analysis file.
1744 Status: {portfolio_ready_status}\n"""
1745 )
1746 return
1747
1748 generate_url = (
1749 self.base_uri
1750 + f"/api/explain-trades/{model_id}/{portfolio_id}"
1751 + f"/generate/date-data/{formatted_date}"
1752 )
1753
1754 generate_res = requests.get(generate_url, headers=headers, **self._request_params)
1755 if generate_res.ok:
1756 download_res = requests.get(download_url, headers=headers, **self._request_params)
1757 while download_res.status_code == 404 or (
1758 download_res.ok and len(download_res.content) == 0
1759 ):
1760 print("waiting for file to be generated")
1761 time.sleep(5)
1762 download_res = requests.get(
1763 download_url, headers=headers, **self._request_params
1764 )
1765 if download_res.ok:
1766 with open(download_location, "wb") as file:
1767 file.write(download_res.content)
1768 print("Download Complete")
1769 else:
1770 error_msg = self._try_extract_error_code(res)
1771 logger.error(error_msg)
1772 raise BoostedAPIException(
1773 f"""Failed to generate ranking analysis file for model:
1774 {model_id} - portfolio: {portfolio_id} on date: {formatted_date}"""
1775 )
1776 else:
1777 error_msg = self._try_extract_error_code(res)
1778 logger.error(error_msg)
1779 raise BoostedAPIException(
1780 f"""Failed to download ranking analysis file for model:
1781 {model_id} - portfolio: {portfolio_id} on date: {formatted_date}"""
1782 )
1783
1784 def getRanking2DateExplainFile(
1785 self, model_id, portfolio_id, date, file_name=None, location="./", overwrite: bool = False
1786 ):
1787 formatted_date = self.__iso_format(date)
1788 s3_file_name = f"{formatted_date}_explaindata.xlsx"
1789 download_url = (
1790 self.base_uri + f"/api/models/{model_id}/{portfolio_id}/ranking-file/{s3_file_name}"
1791 )
1792 headers = {"Authorization": "ApiKey " + self.api_key}
1793 if file_name is None:
1794 file_name = f"{model_id}-{portfolio_id}_explain_data_{formatted_date}.xlsx"
1795 download_location = os.path.join(location, file_name)
1796
1797 if not overwrite:
1798 res = requests.get(download_url, headers=headers, **self._request_params)
1799 if not overwrite and res.ok:
1800 with open(download_location, "wb") as file:
1801 file.write(res.content)
1802 print("Download Complete")
1803 elif overwrite or res.status_code == 404:
1804 (
1805 is_portfolio_ready_for_processing,
1806 portfolio_ready_status,
1807 ) = self._getIsPortfolioReadyForProcessing(model_id, portfolio_id, formatted_date)
1808
1809 if not is_portfolio_ready_for_processing:
1810 logger.info(
1811 f"""\nPortfolio {portfolio_id} for model {model_id}
1812 on date {date} unavailable for Ranking2Date Explain file.
1813 Status: {portfolio_ready_status}\n"""
1814 )
1815 return
1816
1817 generate_url = (
1818 self.base_uri
1819 + f"/api/explain-trades/{model_id}/{portfolio_id}"
1820 + f"/generate/date-data/{formatted_date}"
1821 )
1822
1823 generate_res = requests.get(generate_url, headers=headers, **self._request_params)
1824 if generate_res.ok:
1825 download_res = requests.get(download_url, headers=headers, **self._request_params)
1826 while download_res.status_code == 404 or (
1827 download_res.ok and len(download_res.content) == 0
1828 ):
1829 print("waiting for file to be generated")
1830 time.sleep(5)
1831 download_res = requests.get(
1832 download_url, headers=headers, **self._request_params
1833 )
1834 if download_res.ok:
1835 with open(download_location, "wb") as file:
1836 file.write(download_res.content)
1837 print("Download Complete")
1838 else:
1839 error_msg = self._try_extract_error_code(res)
1840 logger.error(error_msg)
1841 raise BoostedAPIException(
1842 f"""Failed to generate ranking explain file for model:
1843 {model_id} - portfolio: {portfolio_id} on date: {formatted_date}"""
1844 )
1845 else:
1846 error_msg = self._try_extract_error_code(res)
1847 logger.error(error_msg)
1848 raise BoostedAPIException(
1849 f"""Failed to download ranking explain file for model:
1850 {model_id} - portfolio: {portfolio_id} on date: {formatted_date}"""
1851 )
1852
1853 def getRanking2DateExplain(
1854 self,
1855 model_id: str,
1856 portfolio_id: str,
1857 date: Optional[datetime.date],
1858 overwrite: bool = False,
1859 ) -> Dict[str, pd.DataFrame]:
1860 """
1861 Wrapper around getRanking2DateExplainFile, but returns a pandas
1862 dataframe instead of downloading to a path. Dataframe is indexed by
1863 symbol and should always have 'rating' and 'rating_delta' columns. Other
1864 columns will be determined by model's features.
1865 """
1866 file_name = "explaindata.xlsx"
1867 with tempfile.TemporaryDirectory() as tmpdirname:
1868 self.getRanking2DateExplainFile(
1869 model_id=model_id,
1870 portfolio_id=portfolio_id,
1871 date=date,
1872 file_name=file_name,
1873 location=tmpdirname,
1874 overwrite=overwrite,
1875 )
1876 full_path = os.path.join(tmpdirname, file_name)
1877 excel_file = pd.ExcelFile(full_path)
1878 df_map = pd.read_excel(excel_file, sheet_name=None)
1879 df_map_final = {str(sheet): df.set_index("Symbol") for (sheet, df) in df_map.items()}
1880
1881 return df_map_final
1882
1883 def getTearSheet(self, model_id, portfolio_id, start_date=None, end_date=None, block=False):
1884 if start_date is None or end_date is None:
1885 if start_date is not None or end_date is not None:
1886 raise ValueError("start_date and end_date must both be None or both be defined")
1887 return self._getCurrentTearSheet(model_id, portfolio_id)
1888
1889 start_date_obj = self.__to_date_obj(start_date)
1890 end_date_obj = self.__to_date_obj(end_date)
1891 if start_date_obj >= end_date_obj:
1892 raise ValueError("end_date must be later than the start_date")
1893
1894 # get for the given date
1895 url = self.base_uri + f"/api/analysis/keyfacts/{model_id}/{portfolio_id}"
1896 data = {
1897 "startDate": self.__iso_format(start_date),
1898 "endDate": self.__iso_format(end_date),
1899 "shouldRecalc": True,
1900 }
1901 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
1902 res = requests.post(url, data=json.dumps(data), headers=headers, **self._request_params)
1903 if res.status_code == 404 and block:
1904 retries = 0
1905 data["shouldRecalc"] = False
1906 while retries < 10:
1907 time.sleep(10)
1908 retries += 1
1909 res = requests.post(
1910 url, data=json.dumps(data), headers=headers, **self._request_params
1911 )
1912 if res.status_code != 404:
1913 break
1914 if res.ok:
1915 return res.json()
1916 else:
1917 error_msg = self._try_extract_error_code(res)
1918 logger.error(error_msg)
1919 raise BoostedAPIException(
1920 "Failed to get tear sheet data: {0} {1}.".format(error_msg, str(res.status_code))
1921 )
1922
1923 def _getCurrentTearSheet(self, model_id, portfolio_id):
1924 url = self.base_uri + f"/api/model-summaries/{model_id}/{portfolio_id}"
1925 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
1926 res = requests.get(url, headers=headers, **self._request_params)
1927 if res.ok:
1928 json = res.json()
1929 return json.get("tearSheet", {})
1930 else:
1931 error_msg = self._try_extract_error_code(res)
1932 logger.error(error_msg)
1933 raise BoostedAPIException("Failed to get tear sheet data: {0}.".format(error_msg))
1934
1935 def getPortfolioStatus(self, model_id, portfolio_id, job_date):
1936 url = (
1937 self.base_uri
1938 + f"/api/analysis/portfolioStatus/{model_id}/{portfolio_id}?jobDate={job_date}"
1939 )
1940 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
1941 res = requests.get(url, headers=headers, **self._request_params)
1942 if res.ok:
1943 result = res.json()
1944 return {
1945 "is_complete": result["status"],
1946 "last_update": None if result["lastUpdate"] is None else result["lastUpdate"][:10],
1947 "next_update": None if result["nextUpdate"] is None else result["nextUpdate"][:10],
1948 }
1949 else:
1950 error_msg = self._try_extract_error_code(res)
1951 logger.error(error_msg)
1952 raise BoostedAPIException("Failed to get portfolio status: {0}".format(error_msg))
1953
1954 def _query_portfolio_factor_attribution(
1955 self,
1956 portfolio_id: str,
1957 start_date: Optional[BoostedDate] = None,
1958 end_date: Optional[BoostedDate] = None,
1959 ):
1960 response = self._get_graphql(
1961 query=graphql_queries.GET_PORTFOLIO_FACTOR_ATTRIBUTION_QUERY,
1962 variables={
1963 "portfolioId": portfolio_id,
1964 "startDate": str(start_date) if start_date else None,
1965 "endDate": str(end_date) if start_date else None,
1966 },
1967 error_msg_prefix="Failed to get factor attribution: ",
1968 )
1969 return response
1970
1971 def get_portfolio_factor_attribution(
1972 self,
1973 portfolio_id: str,
1974 start_date: Optional[BoostedDate] = None,
1975 end_date: Optional[BoostedDate] = None,
1976 ):
1977 """Get portfolio factor attribution for a portfolio
1978
1979 Args:
1980 portfolio_id (str): a valid UUID string
1981 start_date (BoostedDate, optional): The start date. Defaults to None.
1982 end_date (BoostedDate, optional): The end date. Defaults to None.
1983 """
1984 response = self._query_portfolio_factor_attribution(portfolio_id, start_date, end_date)
1985 factor_attribution = response["data"]["portfolio"]["factorAttribution"]
1986 dates = pd.DatetimeIndex(data=factor_attribution["dates"])
1987 beta = factor_attribution["factorBetas"]
1988 beta_df = pd.DataFrame(index=dates, data={x["name"]: x["data"] for x in beta})
1989 beta_df = beta_df.add_suffix("_beta")
1990 returns = factor_attribution["portfolioFactorPerformance"]
1991 returns_df = pd.DataFrame(index=dates, data={x["name"]: x["data"] for x in returns})
1992 returns_df = returns_df.add_suffix("_return")
1993 returns_df = (returns_df - 1) * 100
1994
1995 final_df = pd.concat([returns_df, beta_df], axis=1)
1996 ordered_columns = list(itertools.chain(*zip(returns_df.columns, beta_df.columns)))
1997 ordered_final_df = final_df.reindex(columns=ordered_columns)
1998
1999 # Add the column `total_return` which is the sum of returns_data
2000 ordered_final_df["total_return"] = returns_df.sum(axis=1)
2001 return ordered_final_df
2002
2003 def getBlacklist(self, blacklist_id):
2004 url = self.base_uri + f"/api/blacklist/{blacklist_id}"
2005 headers = {"Authorization": "ApiKey " + self.api_key}
2006 res = requests.get(url, headers=headers, **self._request_params)
2007 if res.ok:
2008 result = res.json()
2009 return result
2010 error_msg = self._try_extract_error_code(res)
2011 logger.error(error_msg)
2012 raise BoostedAPIException(f"Failed to get blacklist with id {blacklist_id}: {error_msg}")
2013
2014 def getBlacklists(self, model_id=None, company_id=None, last_N=None):
2015 params = {}
2016 if last_N:
2017 params["lastN"] = last_N
2018 if model_id:
2019 params["modelId"] = model_id
2020 if company_id:
2021 params["companyId"] = company_id
2022 url = self.base_uri + f"/api/blacklist"
2023 headers = {"Authorization": "ApiKey " + self.api_key}
2024 res = requests.get(url, headers=headers, params=params, **self._request_params)
2025 if res.ok:
2026 result = res.json()
2027 return result
2028 error_msg = self._try_extract_error_code(res)
2029 logger.error(error_msg)
2030 raise BoostedAPIException(
2031 f"""Failed to get blacklists with \
2032 model_id {model_id} company_id {company_id} last_N {last_N}: {error_msg}"""
2033 )
2034
2035 def createBlacklist(
2036 self,
2037 isin,
2038 long_short=2,
2039 start_date=datetime.date.today(),
2040 end_date="4000-01-01",
2041 model_id=None,
2042 ):
2043 url = self.base_uri + f"/api/blacklist"
2044 data = {
2045 "modelId": model_id,
2046 "isin": isin,
2047 "longShort": long_short,
2048 "startDate": self.__iso_format(start_date),
2049 "endDate": self.__iso_format(end_date),
2050 }
2051 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
2052 res = requests.post(url, data=json.dumps(data), headers=headers, **self._request_params)
2053 if res.ok:
2054 return res.json()
2055 else:
2056 error_msg = self._try_extract_error_code(res)
2057 logger.error(error_msg)
2058 raise BoostedAPIException(
2059 f"""Failed to create the blacklist with \
2060 isin {isin} long_short {long_short} start_date {start_date} end_date {end_date} \
2061 model_id {model_id}: {error_msg}."""
2062 )
2063
2064 def createBlacklistsFromCSV(self, csv_name):
2065 url = self.base_uri + f"/api/blacklists"
2066 data = []
2067 with open(csv_name, mode="r") as f:
2068 csv_reader = csv.DictReader(f)
2069 for row in csv_reader:
2070 blacklist = {"modelId": row["ModelID"], "isin": row["ISIN"]}
2071 if row["LongShort"] == "":
2072 blacklist["longShort"] = 2
2073 else:
2074 blacklist["longShort"] = row["LongShort"]
2075
2076 if row["StartDate"] == "":
2077 blacklist["startDate"] = self.__iso_format(datetime.date.today())
2078 else:
2079 blacklist["startDate"] = self.__iso_format(row["StartDate"])
2080
2081 if row["EndDate"] == "":
2082 blacklist["endDate"] = self.__iso_format("4000-01-01")
2083 else:
2084 blacklist["endDate"] = self.__iso_format(row["EndDate"])
2085 data.append(blacklist)
2086 print(f"Processed {len(data)} blacklists.")
2087 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
2088 res = requests.post(url, data=json.dumps(data), headers=headers, **self._request_params)
2089 if res.ok:
2090 return res.json()
2091 else:
2092 error_msg = self._try_extract_error_code(res)
2093 logger.error(error_msg)
2094 raise BoostedAPIException("failed to create blacklists")
2095
2096 def updateBlacklist(self, blacklist_id, long_short=None, start_date=None, end_date=None):
2097 params = {}
2098 if long_short:
2099 params["longShort"] = long_short
2100 if start_date:
2101 params["startDate"] = start_date
2102 if end_date:
2103 params["endDate"] = end_date
2104 url = self.base_uri + f"/api/blacklist/{blacklist_id}"
2105 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
2106 res = requests.patch(url, json=params, headers=headers, **self._request_params)
2107 if res.ok:
2108 return res.json()
2109 else:
2110 error_msg = self._try_extract_error_code(res)
2111 logger.error(error_msg)
2112 raise BoostedAPIException(
2113 f"Failed to update blacklist with id {blacklist_id}: {error_msg}"
2114 )
2115
2116 def deleteBlacklist(self, blacklist_id):
2117 url = self.base_uri + f"/api/blacklist/{blacklist_id}"
2118 headers = {"Authorization": "ApiKey " + self.api_key}
2119 res = requests.delete(url, headers=headers, **self._request_params)
2120 if res.ok:
2121 result = res.json()
2122 return result
2123 else:
2124 error_msg = self._try_extract_error_code(res)
2125 logger.error(error_msg)
2126 raise BoostedAPIException(
2127 f"Failed to delete blacklist with id {blacklist_id}: {error_msg}"
2128 )
2129
2130 def getFeatureImportance(self, model_id, date, N=None):
2131 url = self.base_uri + f"/api/analysis/explainability/{model_id}"
2132 headers = {"Authorization": "ApiKey " + self.api_key}
2133 logger.info("Retrieving rankings information for date {0}.".format(date))
2134 res = requests.get(url, headers=headers, **self._request_params)
2135 if not res.ok:
2136 error_msg = self._try_extract_error_code(res)
2137 logger.error(error_msg)
2138 raise BoostedAPIException(
2139 f"Failed to fetch feature importance for model/portfolio {model_id}: {error_msg}"
2140 )
2141
2142 json_data = res.json()
2143 if "all" not in json_data.keys() or not json_data["all"]:
2144 raise BoostedAPIException(f"Unexpected formatting of feature importance response")
2145
2146 feature_data = json_data["all"]
2147 # find the right period (assuming returned json has dates in descending order)
2148 date_obj = self.__to_date_obj(date)
2149 start_date_for_return_data = self.__to_date_obj(feature_data[0]["date"])
2150 features_for_requested_period = None
2151
2152 if date_obj > start_date_for_return_data:
2153 features_for_requested_period = feature_data[0]["variable"]
2154 else:
2155 i = 0
2156 while i < len(feature_data) - 1:
2157 current_date = self.__to_date_obj(feature_data[i]["date"])
2158 next_date = self.__to_date_obj(feature_data[i + 1]["date"])
2159 if next_date <= date_obj <= current_date:
2160 features_for_requested_period = feature_data[i + 1]["variable"]
2161 start_date_for_return_data = next_date
2162 break
2163 i += 1
2164
2165 if features_for_requested_period is None:
2166 raise BoostedAPIException(f"No feature data was found for requested date: {date_obj}")
2167
2168 features_for_requested_period.sort(key=lambda x: x["value"], reverse=True)
2169
2170 if type(N) is int and N > 0:
2171 df = pd.DataFrame.from_dict(features_for_requested_period[0:N])
2172 else:
2173 df = pd.DataFrame.from_dict(features_for_requested_period)
2174 result = df[["feature", "value"]]
2175
2176 return result.rename(columns={"feature": f"feature ({start_date_for_return_data})"})
2177
2178 def getAllModelNames(self) -> Dict[str, str]:
2179 url = f"{self.base_uri}/api/graphql"
2180 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
2181 req_json = {"query": "query listOfModels {\n models { id name }}", "variables": {}}
2182 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
2183 if not res.ok:
2184 error_msg = self._try_extract_error_code(res)
2185 logger.error(error_msg)
2186 raise BoostedAPIException(f"Failed to get user models: {error_msg}")
2187 data = res.json()
2188 if data["data"]["models"] is None:
2189 return {}
2190 return {rec["id"]: rec["name"] for rec in data["data"]["models"]}
2191
2192 def getAllModelDetails(self) -> Dict[str, Dict[str, Any]]:
2193 url = f"{self.base_uri}/api/graphql"
2194 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
2195 req_json = {
2196 "query": "query listOfModels {\n models { id name lastUpdated portfolios { id name }}}",
2197 "variables": {},
2198 }
2199 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
2200 if not res.ok:
2201 error_msg = self._try_extract_error_code(res)
2202 logger.error(error_msg)
2203 raise BoostedAPIException(f"Failed to get user models: {error_msg}")
2204 data = res.json()
2205 if data["data"]["models"] is None:
2206 return {}
2207
2208 output_data = {}
2209 for rec in data["data"]["models"]:
2210 model_id = rec["id"]
2211 output_data[model_id] = {
2212 "name": rec["name"],
2213 "last_updated": parser.parse(rec["lastUpdated"]),
2214 "portfolios": rec["portfolios"],
2215 }
2216
2217 return output_data
2218
2219 def get_hedge_experiments(self):
2220 url = self.base_uri + "/api/graphql"
2221 qry = """
2222 query getHedgeExperiments {
2223 hedgeExperiments {
2224 hedgeExperimentId
2225 experimentName
2226 userId
2227 config
2228 description
2229 experimentType
2230 lastCalculated
2231 lastModified
2232 status
2233 portfolioCalcStatus
2234 targetSecurities {
2235 gbiId
2236 security {
2237 gbiId
2238 symbol
2239 name
2240 }
2241 weight
2242 }
2243 targetPortfolios {
2244 portfolioId
2245 }
2246 baselineModel {
2247 id
2248 name
2249
2250 }
2251 baselineScenario {
2252 hedgeExperimentScenarioId
2253 scenarioName
2254 description
2255 portfolioSettingsJson
2256 hedgeExperimentPortfolios {
2257 portfolio {
2258 id
2259 name
2260 modelId
2261 performanceGridHeader
2262 performanceGrid
2263 status
2264 tearSheet {
2265 groupName
2266 members {
2267 name
2268 value
2269 }
2270 }
2271 }
2272 }
2273 status
2274 }
2275 baselineStockUniverseId
2276 }
2277 }
2278 """
2279
2280 headers = {"Authorization": "ApiKey " + self.api_key}
2281 resp = requests.post(url, json={"query": qry}, headers=headers, params=self._request_params)
2282
2283 json_resp = resp.json()
2284 # graphql endpoints typically return 200 or 400 status codes, so we must
2285 # check if we have any errors, even with a 200
2286 if (resp.ok and "errors" in json_resp) or not resp.ok:
2287 error_msg = self._try_extract_error_code(resp)
2288 logger.error(error_msg)
2289 raise BoostedAPIException(
2290 (f"Failed to get hedge experiments: {resp.status_code=}; {error_msg=}")
2291 )
2292
2293 json_experiments = resp.json()["data"]["hedgeExperiments"]
2294 experiments = [HedgeExperiment.from_json_dict(exp_json) for exp_json in json_experiments]
2295 return experiments
2296
2297 def get_hedge_experiment_details(self, experiment_id: str):
2298 url = self.base_uri + "/api/graphql"
2299 qry = """
2300 query getHedgeExperimentDetails($hedgeExperimentId: ID!) {
2301 hedgeExperiment(hedgeExperimentId: $hedgeExperimentId) {
2302 ...HedgeExperimentDetailsSummaryListFragment
2303 }
2304 }
2305
2306 fragment HedgeExperimentDetailsSummaryListFragment on HedgeExperiment {
2307 hedgeExperimentId
2308 experimentName
2309 userId
2310 config
2311 description
2312 experimentType
2313 lastCalculated
2314 lastModified
2315 status
2316 portfolioCalcStatus
2317 targetSecurities {
2318 gbiId
2319 security {
2320 gbiId
2321 symbol
2322 name
2323 }
2324 weight
2325 }
2326 selectedModels {
2327 id
2328 name
2329 stockUniverse {
2330 name
2331 }
2332 }
2333 hedgeExperimentScenarios {
2334 ...experimentScenarioFragment
2335 }
2336 selectedDummyHedgeExperimentModels {
2337 id
2338 name
2339 stockUniverse {
2340 name
2341 }
2342 }
2343 targetPortfolios {
2344 portfolioId
2345 }
2346 baselineModel {
2347 id
2348 name
2349
2350 }
2351 baselineScenario {
2352 hedgeExperimentScenarioId
2353 scenarioName
2354 description
2355 portfolioSettingsJson
2356 hedgeExperimentPortfolios {
2357 portfolio {
2358 id
2359 name
2360 modelId
2361 performanceGridHeader
2362 performanceGrid
2363 status
2364 tearSheet {
2365 groupName
2366 members {
2367 name
2368 value
2369 }
2370 }
2371 }
2372 }
2373 status
2374 }
2375 baselineStockUniverseId
2376 }
2377
2378 fragment experimentScenarioFragment on HedgeExperimentScenario {
2379 hedgeExperimentScenarioId
2380 scenarioName
2381 status
2382 description
2383 portfolioSettingsJson
2384 hedgeExperimentPortfolios {
2385 portfolio {
2386 id
2387 name
2388 modelId
2389 performanceGridHeader
2390 performanceGrid
2391 status
2392 tearSheet {
2393 groupName
2394 members {
2395 name
2396 value
2397 }
2398 }
2399 }
2400 }
2401 }
2402 """
2403 headers = {"Authorization": "ApiKey " + self.api_key}
2404 resp = requests.post(
2405 url,
2406 json={"query": qry, "variables": {"hedgeExperimentId": experiment_id}},
2407 headers=headers,
2408 params=self._request_params,
2409 )
2410
2411 json_resp = resp.json()
2412 # graphql endpoints typically return 200 or 400 status codes, so we must
2413 # check if we have any errors, even with a 200
2414 if (resp.ok and "errors" in json_resp) or not resp.ok:
2415 error_msg = self._try_extract_error_code(resp)
2416 logger.error(error_msg)
2417 raise BoostedAPIException(
2418 (
2419 f"Failed to get hedge experiment results for {experiment_id=}: "
2420 f"{resp.status_code=}; {error_msg=}"
2421 )
2422 )
2423
2424 json_exp_results = json_resp["data"]["hedgeExperiment"]
2425 if json_exp_results is None:
2426 return None # issued a request with a non-existent experiment_id
2427 exp_results = HedgeExperimentDetails.from_json_dict(json_exp_results)
2428 return exp_results
2429
2430 def get_portfolio_performance(self, portfolio_id: str) -> pd.DataFrame:
2431 url = f"{self.base_uri}/api/graphql"
2432 qry = """
2433 query getPortfolioPerformance($portfolioId: ID!) {
2434 portfolio(id: $portfolioId) {
2435 id
2436 modelId
2437 name
2438 status
2439 performance {
2440 benchmark
2441 date
2442 turnover
2443 value
2444 }
2445 }
2446 }
2447 """
2448
2449 headers = {"Authorization": "ApiKey " + self.api_key}
2450 resp = requests.post(
2451 url,
2452 json={"query": qry, "variables": {"portfolioId": portfolio_id}},
2453 headers=headers,
2454 params=self._request_params,
2455 )
2456
2457 json_resp = resp.json()
2458 # the webserver returns an error for non-ready portfolios, so we have to check
2459 # for this prior to the error check below
2460 pf = json_resp["data"].get("portfolio")
2461 if pf is not None and pf["status"] != "READY":
2462 return pd.DataFrame()
2463
2464 # graphql endpoints typically return 200 or 400 status codes, so we must
2465 # check if we have any errors, even with a 200
2466 if (resp.ok and "errors" in json_resp) or not resp.ok:
2467 error_msg = self._try_extract_error_code(resp)
2468 logger.error(error_msg)
2469 raise BoostedAPIException(
2470 (
2471 f"Failed to get portfolio performance for {portfolio_id=}: "
2472 f"{resp.status_code=}; {error_msg=}"
2473 )
2474 )
2475
2476 perf = json_resp["data"]["portfolio"]["performance"]
2477 df = pd.DataFrame(perf).set_index("date").rename(columns={"value": "portfolio"})
2478 df.index = pd.to_datetime(df.index)
2479 return df.astype(float)
2480
2481 def _is_portfolio_still_running(self, error_msg: str) -> bool:
2482 # this is jank af. a proper fix of this is either at the webserver
2483 # returning a better response for a portfolio in draft HT2-226, OR
2484 # a bigger refactor of the API that moves to more OOP, which would allow us
2485 # to have this data all in one place
2486 return "Could not find a model with this ID" in error_msg
2487
2488 def get_portfolio_factors(self, model_id: str, portfolio_id: str) -> pd.DataFrame:
2489 url = f"{self.base_uri}/api/analysis/factors/{model_id}/{portfolio_id}"
2490 headers = {"Authorization": "ApiKey " + self.api_key}
2491 resp = requests.get(url, headers=headers, params=self._request_params)
2492
2493 json_resp = resp.json()
2494 if (resp.ok and "errors" in json_resp) or not resp.ok:
2495 error_msg = json_resp["errors"][0]
2496 if self._is_portfolio_still_running(error_msg):
2497 return pd.DataFrame()
2498 logger.error(error_msg)
2499 raise BoostedAPIException(
2500 (
2501 f"Failed to get portfolio factors for {portfolio_id=}: "
2502 f"{resp.status_code=}; {error_msg=}"
2503 )
2504 )
2505
2506 df = pd.DataFrame(json_resp["data"], columns=json_resp["header_row"])
2507
2508 def to_lower_snake_case(s): # why are we linting lambdas? :(
2509 return "_".join(w.lower() for w in s.split(" "))
2510
2511 df = df.rename(columns={old: to_lower_snake_case(old) for old in df.columns}).set_index(
2512 "date"
2513 )
2514 df.index = pd.to_datetime(df.index)
2515 return df
2516
2517 def get_portfolio_volatility(self, model_id: str, portfolio_id: str) -> pd.DataFrame:
2518 url = f"{self.base_uri}/api/analysis/volatility_rolling/{model_id}/{portfolio_id}"
2519 headers = {"Authorization": "ApiKey " + self.api_key}
2520 resp = requests.get(url, headers=headers, params=self._request_params)
2521
2522 json_resp = resp.json()
2523 if (resp.ok and "errors" in json_resp) or not resp.ok:
2524 error_msg = json_resp["errors"][0]
2525 if self._is_portfolio_still_running(error_msg):
2526 return pd.DataFrame()
2527 logger.error(error_msg)
2528 raise BoostedAPIException(
2529 (
2530 f"Failed to get portfolio volatility for {portfolio_id=}: "
2531 f"{resp.status_code=}; {error_msg=}"
2532 )
2533 )
2534
2535 df = pd.DataFrame(json_resp["data"], columns=json_resp["headerRow"])
2536 df = df.rename(
2537 columns={old: old.lower().replace("avg", "avg_") for old in df.columns} # type: ignore
2538 ).set_index("date")
2539 df.index = pd.to_datetime(df.index)
2540 return df
2541
2542 def get_portfolio_holdings(self, model_id: str, portfolio_id: str) -> pd.DataFrame:
2543 url = f"{self.base_uri}/api/models/{model_id}/{portfolio_id}/basket-data"
2544 headers = {"Authorization": "ApiKey " + self.api_key}
2545 resp = requests.get(url, headers=headers, params=self._request_params)
2546
2547 # this is a classic abuse of try/except as control flow: we try to get json body
2548 # from the response so that we can error-check. if this fails, we assume we have
2549 # a legit text response (corresponding to the csv data we care about)
2550 try:
2551 json_resp = resp.json()
2552 except json.decoder.JSONDecodeError:
2553 df = pd.read_csv(io.StringIO(resp.text), header=[0])
2554 else:
2555 error_msg = json_resp["errors"][0]
2556 if self._is_portfolio_still_running(error_msg):
2557 return pd.DataFrame()
2558 else:
2559 logger.error(error_msg)
2560 raise BoostedAPIException(
2561 (
2562 f"Failed to get portfolio holdings for {portfolio_id=}: "
2563 f"{resp.status_code=}; {error_msg=}"
2564 )
2565 )
2566
2567 df = df.rename(columns={old: old.lower() for old in df.columns}).set_index("date")
2568 df.index = pd.to_datetime(df.index)
2569 return df
2570
2571 def getStockDataTableForDate(
2572 self, model_id: str, portfolio_id: str, date: datetime.date
2573 ) -> pd.DataFrame:
2574 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
2575
2576 url_base = f"{self.base_uri}/api/analysis"
2577 url_params = f"{model_id}/{portfolio_id}"
2578 formatted_date = date.strftime("%Y-%m-%d")
2579
2580 stock_prices_url = f"{url_base}/stock-prices/{url_params}/{formatted_date}"
2581 stock_factors_url = f"{url_base}/stock-factors/{url_params}/date/{formatted_date}"
2582
2583 prices_params = {"useTicker": "false", "useCurrentSignals": "true"}
2584 factors_param = {"useTicker": "false", "useCurrentSignals": "true"}
2585
2586 prices_resp = requests.get(
2587 stock_prices_url, headers=headers, params=prices_params, **self._request_params
2588 )
2589 factors_resp = requests.get(
2590 stock_factors_url, headers=headers, params=factors_param, **self._request_params
2591 )
2592
2593 frames = []
2594 gbi_ids = set()
2595 for res in (prices_resp, factors_resp):
2596 if not res.ok:
2597 error_msg = self._try_extract_error_code(res)
2598 logger.error(error_msg)
2599 raise BoostedAPIException(
2600 (
2601 f"Failed to fetch stock data table for model {model_id}"
2602 f" (it's possible no data is present for the given date: {date})."
2603 f" Error message: {error_msg}"
2604 )
2605 )
2606 result = res.json()
2607 df = pd.DataFrame(result)
2608 gbi_ids.update(df.columns.to_list())
2609 frames.append(pd.DataFrame(result))
2610
2611 all_gbiid_df = pd.concat(frames)
2612
2613 # Get the metadata of all GBI IDs
2614 gbiid_metadata_res = self._get_graphql(
2615 query=graphql_queries.GET_SEC_INFO_QRY, variables={"ids": [int(x) for x in gbi_ids]}
2616 )
2617 # Build a DF of metadata x GBI IDs
2618 gbiid_metadata_df = pd.DataFrame(
2619 {str(x["gbiId"]): x for x in gbiid_metadata_res["data"]["securities"]}
2620 )
2621 # Slice metadata we care. We'll drop "symbol" at the end.
2622 isin_country_currency_df = gbiid_metadata_df.loc[["isin", "country", "currency", "symbol"]]
2623 # Concatenate metadata to the existing stock data DF
2624 all_gbiid_with_metadata_df = pd.concat([all_gbiid_df, isin_country_currency_df])
2625 gbiid_with_symbol_df = all_gbiid_with_metadata_df.loc[
2626 :, all_gbiid_with_metadata_df.loc["symbol"].notna()
2627 ]
2628 renamed_df = gbiid_with_symbol_df.rename(
2629 index={"isin": "ISIN"}, columns=gbiid_with_symbol_df.loc["symbol"].to_dict()
2630 )
2631 output_df = renamed_df.drop(index=["symbol"])
2632 return output_df
2633
2634 def add_hedge_experiment_scenario(
2635 self,
2636 experiment_id: str,
2637 scenario_name: str,
2638 scenario_settings: PortfolioSettings,
2639 run_scenario_immediately: bool,
2640 ) -> HedgeExperimentScenario:
2641 add_scenario_input = {
2642 "hedgeExperimentId": experiment_id,
2643 "scenarioName": scenario_name,
2644 "portfolioSettingsJson": str(scenario_settings),
2645 "runExperimentOnScenario": run_scenario_immediately,
2646 "createDefaultPortfolio": "false",
2647 }
2648 qry = """
2649 mutation addHedgeExperimentScenario(
2650 $input: AddHedgeExperimentScenarioInput!
2651 ) {
2652 addHedgeExperimentScenario(input: $input) {
2653 hedgeExperimentScenario {
2654 hedgeExperimentScenarioId
2655 scenarioName
2656 description
2657 portfolioSettingsJson
2658 }
2659 }
2660 }
2661
2662 """
2663
2664 url = f"{self.base_uri}/api/graphql"
2665
2666 resp = requests.post(
2667 url,
2668 headers={"Authorization": "ApiKey " + self.api_key},
2669 json={"query": qry, "variables": {"input": add_scenario_input}},
2670 )
2671
2672 json_resp = resp.json()
2673 if (resp.ok and "errors" in json_resp) or not resp.ok:
2674 error_msg = self._try_extract_error_code(resp)
2675 logger.error(error_msg)
2676 raise BoostedAPIException(
2677 (f"Failed to add scenario: {resp.status_code=}; {error_msg=}")
2678 )
2679
2680 scenario_dict = json_resp["data"]["addHedgeExperimentScenario"]["hedgeExperimentScenario"]
2681 if scenario_dict is None:
2682 raise BoostedAPIException(
2683 "Failed to add scenario, likely due to bad experiment id or api key"
2684 )
2685 s = HedgeExperimentScenario.from_json_dict(scenario_dict)
2686 return s
2687
2688 # experiment life cycle has 4 steps:
2689 # 1. creation - essentially a very simple registration of a new instance, returning an id
2690 # 2. modify - populate with settings
2691 # 3. start - run the experiment
2692 # 4. delete - drop the experiment
2693 # while i would prefer to just have 2 funcs for (1,2,3) and (4) for a simpler api,
2694 # we need to expose finer-grained control becuase of how scenarios work.
2695 def create_hedge_experiment(
2696 self,
2697 name: str,
2698 description: str,
2699 experiment_type: hedge_experiment_type,
2700 target_securities: Union[Dict[GbiIdSecurity, float], str, None],
2701 ) -> HedgeExperiment:
2702 # we don't pass target_securities here (as much as id like to) because the
2703 # graphql input doesn't support it at this point
2704
2705 # note that this query returns a lot of null fields at this point, but
2706 # they are necessary for building a HE.
2707 create_qry = """
2708 mutation createDraftMutation($input: CreateHedgeExperimentDraftInput!) {
2709 createHedgeExperimentDraft(input: $input) {
2710 hedgeExperiment {
2711 hedgeExperimentId
2712 experimentName
2713 userId
2714 config
2715 description
2716 experimentType
2717 lastCalculated
2718 lastModified
2719 status
2720 portfolioCalcStatus
2721 targetSecurities {
2722 gbiId
2723 security {
2724 gbiId
2725 name
2726 symbol
2727 }
2728 weight
2729 }
2730 baselineModel {
2731 id
2732 name
2733 }
2734 baselineScenario {
2735 hedgeExperimentScenarioId
2736 scenarioName
2737 description
2738 portfolioSettingsJson
2739 hedgeExperimentPortfolios {
2740 portfolio {
2741 id
2742 name
2743 modelId
2744 performanceGridHeader
2745 performanceGrid
2746 status
2747 tearSheet {
2748 groupName
2749 members {
2750 name
2751 value
2752 }
2753 }
2754 }
2755 }
2756 status
2757 }
2758 baselineStockUniverseId
2759 }
2760 }
2761 }
2762 """
2763
2764 create_input: Dict[str, Any] = {
2765 "name": name,
2766 "experimentType": experiment_type,
2767 "description": description,
2768 }
2769 if isinstance(target_securities, dict):
2770 create_input["setTargetSecurities"] = [
2771 {"gbiId": sec.gbi_id, "weight": weight}
2772 for (sec, weight) in target_securities.items()
2773 ]
2774 elif isinstance(target_securities, str):
2775 create_input["setTargetPortfolios"] = [{"portfolioId": target_securities}]
2776 elif target_securities is None:
2777 pass
2778 else:
2779 raise TypeError(
2780 "Expected value of type Union[Dict[GbiIdSecurity, str], str] for "
2781 f"argument 'target_securities'; got {type(target_securities)}"
2782 )
2783 resp = requests.post(
2784 f"{self.base_uri}/api/graphql",
2785 json={"query": create_qry, "variables": {"input": create_input}},
2786 headers={"Authorization": "ApiKey " + self.api_key},
2787 params=self._request_params,
2788 )
2789
2790 json_resp = resp.json()
2791 if (resp.ok and "errors" in json_resp) or not resp.ok:
2792 error_msg = self._try_extract_error_code(resp)
2793 logger.error(error_msg)
2794 raise BoostedAPIException(
2795 (f"Failed to create hedge experiment: {resp.status_code=}; {error_msg=}")
2796 )
2797
2798 exp_dict = json_resp["data"]["createHedgeExperimentDraft"]["hedgeExperiment"]
2799 experiment = HedgeExperiment.from_json_dict(exp_dict)
2800 return experiment
2801
2802 def modify_hedge_experiment(
2803 self,
2804 experiment_id: str,
2805 name: Optional[str] = None,
2806 description: Optional[str] = None,
2807 experiment_type: Optional[hedge_experiment_type] = None,
2808 target_securities: Union[Dict[GbiIdSecurity, float], str, None] = None,
2809 model_ids: Optional[List[str]] = None,
2810 stock_universe_ids: Optional[List[str]] = None,
2811 create_default_scenario: bool = True,
2812 baseline_model_id: Optional[str] = None,
2813 baseline_stock_universe_id: Optional[str] = None,
2814 baseline_portfolio_settings: Optional[str] = None,
2815 ) -> HedgeExperiment:
2816 mod_qry = """
2817 mutation modifyHedgeExperimentDraft(
2818 $input: ModifyHedgeExperimentDraftInput!
2819 ) {
2820 modifyHedgeExperimentDraft(input: $input) {
2821 hedgeExperiment {
2822 ...HedgeExperimentSelectedSecuritiesPageFragment
2823 }
2824 }
2825 }
2826
2827 fragment HedgeExperimentSelectedSecuritiesPageFragment on HedgeExperiment {
2828 hedgeExperimentId
2829 experimentName
2830 userId
2831 config
2832 description
2833 experimentType
2834 lastCalculated
2835 lastModified
2836 status
2837 portfolioCalcStatus
2838 targetSecurities {
2839 gbiId
2840 security {
2841 gbiId
2842 name
2843 symbol
2844 }
2845 weight
2846 }
2847 targetPortfolios {
2848 portfolioId
2849 }
2850 baselineModel {
2851 id
2852 name
2853 }
2854 baselineScenario {
2855 hedgeExperimentScenarioId
2856 scenarioName
2857 description
2858 portfolioSettingsJson
2859 hedgeExperimentPortfolios {
2860 portfolio {
2861 id
2862 name
2863 modelId
2864 performanceGridHeader
2865 performanceGrid
2866 status
2867 tearSheet {
2868 groupName
2869 members {
2870 name
2871 value
2872 }
2873 }
2874 }
2875 }
2876 status
2877 }
2878 baselineStockUniverseId
2879 }
2880 """
2881 mod_input = {
2882 "hedgeExperimentId": experiment_id,
2883 "createDefaultScenario": create_default_scenario,
2884 }
2885 if name is not None:
2886 mod_input["newExperimentName"] = name
2887 if description is not None:
2888 mod_input["newExperimentDescription"] = description
2889 if experiment_type is not None:
2890 mod_input["newExperimentType"] = experiment_type
2891 if model_ids is not None:
2892 mod_input["setSelectdModels"] = model_ids
2893 if stock_universe_ids is not None:
2894 mod_input["selectedStockUniverseIds"] = stock_universe_ids
2895 if baseline_model_id is not None:
2896 mod_input["setBaselineModel"] = baseline_model_id
2897 if baseline_stock_universe_id is not None:
2898 mod_input["setBaselineStockUniverse"] = baseline_stock_universe_id
2899 if baseline_portfolio_settings is not None:
2900 mod_input["setBaselinePortfolioSettings"] = baseline_portfolio_settings
2901 # note that the behaviors bound to these data are mutually exclusive,
2902 # and its possible the opposite was set earlier in the DRAFT phase
2903 # of experiment creation, so when setting one, we must unset the other
2904 if isinstance(target_securities, dict):
2905 mod_input["setTargetSecurities"] = [
2906 {"gbiId": sec.gbi_id, "weight": weight}
2907 for (sec, weight) in target_securities.items()
2908 ]
2909 mod_input["setTargetPortfolios"] = None
2910 elif isinstance(target_securities, str):
2911 mod_input["setTargetPortfolios"] = [{"portfolioId": target_securities}]
2912 mod_input["setTargetSecurities"] = None
2913 elif target_securities is None:
2914 pass
2915 else:
2916 raise TypeError(
2917 "Expected value of type Union[Dict[GbiIdSecurity, str], str] "
2918 f"for argument 'target_securities'; got {type(target_securities)}"
2919 )
2920
2921 resp = requests.post(
2922 f"{self.base_uri}/api/graphql",
2923 json={"query": mod_qry, "variables": {"input": mod_input}},
2924 headers={"Authorization": "ApiKey " + self.api_key},
2925 params=self._request_params,
2926 )
2927
2928 json_resp = resp.json()
2929 if (resp.ok and "errors" in json_resp) or not resp.ok:
2930 error_msg = self._try_extract_error_code(resp)
2931 logger.error(error_msg)
2932 raise BoostedAPIException(
2933 (
2934 f"Failed to modify hedge experiment in preparation for start {experiment_id=}: "
2935 f"{resp.status_code=}; {error_msg=}"
2936 )
2937 )
2938
2939 exp_dict = json_resp["data"]["modifyHedgeExperimentDraft"]["hedgeExperiment"]
2940 experiment = HedgeExperiment.from_json_dict(exp_dict)
2941 return experiment
2942
2943 def start_hedge_experiment(self, experiment_id: str, *scenario_ids: str) -> HedgeExperiment:
2944 start_qry = """
2945 mutation startHedgeExperiment($input: StartHedgeExperimentInput!) {
2946 startHedgeExperiment(input: $input) {
2947 hedgeExperiment {
2948 hedgeExperimentId
2949 experimentName
2950 userId
2951 config
2952 description
2953 experimentType
2954 lastCalculated
2955 lastModified
2956 status
2957 portfolioCalcStatus
2958 targetSecurities {
2959 gbiId
2960 security {
2961 gbiId
2962 name
2963 symbol
2964 }
2965 weight
2966 }
2967 targetPortfolios {
2968 portfolioId
2969 }
2970 baselineModel {
2971 id
2972 name
2973 }
2974 baselineScenario {
2975 hedgeExperimentScenarioId
2976 scenarioName
2977 description
2978 portfolioSettingsJson
2979 hedgeExperimentPortfolios {
2980 portfolio {
2981 id
2982 name
2983 modelId
2984 performanceGridHeader
2985 performanceGrid
2986 status
2987 tearSheet {
2988 groupName
2989 members {
2990 name
2991 value
2992 }
2993 }
2994 }
2995 }
2996 status
2997 }
2998 baselineStockUniverseId
2999 }
3000 }
3001 }
3002 """
3003 start_input: Dict[str, Any] = {"hedgeExperimentId": experiment_id}
3004 if len(scenario_ids) > 0:
3005 start_input["hedgeExperimentScenarioIds"] = list(scenario_ids)
3006
3007 resp = requests.post(
3008 f"{self.base_uri}/api/graphql",
3009 json={"query": start_qry, "variables": {"input": start_input}},
3010 headers={"Authorization": "ApiKey " + self.api_key},
3011 params=self._request_params,
3012 )
3013
3014 json_resp = resp.json()
3015 if (resp.ok and "errors" in json_resp) or not resp.ok:
3016 error_msg = self._try_extract_error_code(resp)
3017 logger.error(error_msg)
3018 raise BoostedAPIException(
3019 (
3020 f"Failed to start hedge experiment {experiment_id=}: "
3021 f"{resp.status_code=}; {error_msg=}"
3022 )
3023 )
3024
3025 exp_dict = json_resp["data"]["startHedgeExperiment"]["hedgeExperiment"]
3026 experiment = HedgeExperiment.from_json_dict(exp_dict)
3027 return experiment
3028
3029 def delete_hedge_experiment(self, experiment_id: str) -> bool:
3030 delete_qry = """
3031 mutation($input: DeleteHedgeExperimentsInput!) {
3032 deleteHedgeExperiments(input: $input) {
3033 success
3034 }
3035 }
3036 """
3037 delete_input = {"hedgeExperimentIds": [experiment_id]}
3038 resp = requests.post(
3039 f"{self.base_uri}/api/graphql",
3040 json={"query": delete_qry, "variables": {"input": delete_input}},
3041 headers={"Authorization": "ApiKey " + self.api_key},
3042 params=self._request_params,
3043 )
3044
3045 json_resp = resp.json()
3046 if (resp.ok and "errors" in json_resp) or not resp.ok:
3047 error_msg = self._try_extract_error_code(resp)
3048 logger.error(error_msg)
3049 raise BoostedAPIException(
3050 (
3051 f"Failed to delete hedge experiment {experiment_id=}: "
3052 + f"status_code={resp.status_code}; error_msg={error_msg}"
3053 )
3054 )
3055
3056 return json_resp["data"]["deleteHedgeExperiments"]["success"]
3057
3058 def get_portfolio_accuracy(
3059 self,
3060 model_id: str,
3061 portfolio_id: str,
3062 start_date: Optional[BoostedDate] = None,
3063 end_date: Optional[BoostedDate] = None,
3064 ) -> dict:
3065 if start_date and end_date:
3066 validate_start_and_end_dates(start_date=start_date, end_date=end_date)
3067 start_date = convert_date(start_date)
3068 end_date = convert_date(end_date)
3069
3070 # TODO: Later change this URI to not use the watchlist prefix. It is misnamed.
3071 url = f"{self.base_uri}{WATCHLIST_ROUTE_PREFIX}{DAL_PA_ROUTE}/get-hit-rate/"
3072 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3073 req_json = {"model_id": model_id, "portfolio_id": portfolio_id}
3074 if start_date and end_date:
3075 req_json["start_date"] = start_date.isoformat()
3076 req_json["end_date"] = end_date.isoformat()
3077 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3078
3079 if not res.ok:
3080 error_msg = self._try_extract_error_code(res)
3081 logger.error(error_msg)
3082 raise BoostedAPIException(f"Failed to get Hit Rate: {error_msg}")
3083
3084 data = res.json()
3085 return data
3086
3087 def create_watchlist(self, name: str) -> str:
3088 url = f"{self.base_uri}{WATCHLIST_ROUTE_PREFIX}{DAL_WATCHLIST_ROUTE}/create/"
3089 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3090 req_json = {"name": name}
3091 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3092
3093 if not res.ok:
3094 error_msg = self._try_extract_error_code(res)
3095 logger.error(error_msg)
3096 raise BoostedAPIException(f"Failed to get user models: {error_msg}")
3097
3098 data = res.json()
3099 return data["watchlist_id"]
3100
3101 def _get_graphql(
3102 self,
3103 query: str,
3104 variables: Dict,
3105 error_msg_prefix: str = "Failed to get graphql result: ",
3106 log_error: bool = True,
3107 ) -> Dict:
3108 headers = {"Authorization": "ApiKey " + self.api_key}
3109 json_req = {"query": query, "variables": variables}
3110
3111 url = self.base_uri + "/api/graphql"
3112 resp = requests.post(
3113 url,
3114 json=json_req,
3115 headers=headers,
3116 params=self._request_params,
3117 )
3118
3119 # graphql endpoints typically return 200 or 400 status codes, so we must
3120 # check if we have any errors, even with a 200
3121 if not resp.ok or (resp.ok and "errors" in resp.json()):
3122 error_msg = self._try_extract_error_code(resp)
3123 error_str = str(error_msg_prefix) + f" {resp.status_code=}; {error_msg=}"
3124 if log_error:
3125 logger.error(error_str)
3126 raise BoostedAPIException(error_str)
3127
3128 json_resp = resp.json()
3129 return json_resp
3130
3131 def _get_security_info(self, gbi_ids: List[int]) -> Dict:
3132 query = graphql_queries.GET_SEC_INFO_QRY
3133 variables = {
3134 "ids": [] if not gbi_ids else gbi_ids,
3135 }
3136
3137 error_msg_prefix = "Failed to get Security Details:"
3138 return self._get_graphql(
3139 query=query, variables=variables, error_msg_prefix=error_msg_prefix
3140 )
3141
3142 def _get_sector_info(self) -> Dict:
3143 """
3144 Returns a list of sector objects, e.g.
3145 {
3146 "id": 1010,
3147 "parentId": 10,
3148 "name": "Energy",
3149 "topParentName": null,
3150 "spiqSectorId": -1,
3151 "legacy": false
3152 }
3153 """
3154 url = f"{self.base_uri}/api/sectors"
3155 headers = {"Authorization": "ApiKey " + self.api_key}
3156 res = requests.get(url, headers=headers, **self._request_params)
3157 self._check_ok_or_err_with_msg(res, "Failed to get sectors data")
3158 return res.json()["sectors"]
3159
3160 def _get_watchlist_analysis(
3161 self,
3162 gbi_ids: List[int],
3163 model_ids: List[str],
3164 portfolio_ids: List[str],
3165 asof_date=datetime.date.today(),
3166 ) -> Dict:
3167 query = graphql_queries.WATCHLIST_ANALYSIS_QRY
3168 variables = {
3169 "gbiIds": gbi_ids,
3170 "modelIds": model_ids,
3171 "portfolioIds": portfolio_ids,
3172 "date": self.__iso_format(asof_date),
3173 }
3174 error_msg_prefix = "Failed to get Coverage Analysis:"
3175 return self._get_graphql(
3176 query=query, variables=variables, error_msg_prefix=error_msg_prefix
3177 )
3178
3179 def _get_models_for_portfolio(self, portfolio_ids: List[str]) -> Dict:
3180 query = graphql_queries.GET_MODELS_FOR_PORTFOLIOS_QRY
3181 variables = {"ids": portfolio_ids}
3182 error_msg_prefix = "Failed to get Models for Portfolios: "
3183 return self._get_graphql(
3184 query=query, variables=variables, error_msg_prefix=error_msg_prefix
3185 )
3186
3187 def _get_excess_return(
3188 self, model_ids: List[str], gbi_ids: List[int], asof_date=datetime.date.today()
3189 ) -> Dict:
3190 query = graphql_queries.GET_EXCESS_RETURN_QRY
3191
3192 variables = {
3193 "modelIds": model_ids,
3194 "gbiIds": gbi_ids,
3195 "date": self.__iso_format(asof_date),
3196 }
3197 error_msg_prefix = "Failed to get Excess Return Slugging Pct: "
3198 return self._get_graphql(
3199 query=query, variables=variables, error_msg_prefix=error_msg_prefix
3200 )
3201
3202 def _coverage_column_name_format(self, in_str) -> str:
3203 if in_str.upper() == "ISIN":
3204 return "ISIN"
3205
3206 return in_str.title()
3207
3208 def _get_model_stocks(self, model_id: str) -> List[GbiIdTickerISIN]:
3209 # first, get the universe id
3210 resp = self._get_graphql(
3211 graphql_queries.GET_MODEL_STOCK_UNIVERSE_ID_QUERY,
3212 variables={"modelId": model_id},
3213 error_msg_prefix="Failed to get model stock universe ID",
3214 )
3215 universe_id = resp["data"]["model"]["stockUniverseId"]
3216
3217 # now, query for universe stocks
3218 url = self.base_uri + f"/api/stocks/model-universe/{universe_id}"
3219 headers = {"Authorization": "ApiKey " + self.api_key}
3220 universe_resp = requests.get(url, headers=headers, **self._request_params)
3221 universe = universe_resp.json()["stockUniverse"]
3222 securities = [
3223 GbiIdTickerISIN(gbi_id=security["id"], ticker=security["symbol"], isin=security["isin"])
3224 for security in universe
3225 ]
3226 return securities
3227
3228 def get_coverage_info(self, watchlist_id: str, portfolio_group_id: str) -> pd.DataFrame:
3229 # get securities list in watchlist
3230 watchlist_details = self.get_watchlist_details(watchlist_id)
3231 security_list = watchlist_details["targets"]
3232
3233 gbi_ids = [x["gbi_id"] for x in security_list]
3234
3235 gbi_data: Dict[Any, Dict] = {x: {} for x in gbi_ids}
3236
3237 # get security info ticker, name, industry etc
3238 sec_info = self._get_security_info(gbi_ids)
3239
3240 for sec in sec_info["data"]["securities"]:
3241 gbi_id = sec["gbiId"]
3242 for k in ["symbol", "name", "isin", "country", "currency"]:
3243 gbi_data[gbi_id][self._coverage_column_name_format(k)] = sec[k]
3244
3245 gbi_data[gbi_id][self._coverage_column_name_format("Sector")] = sec["sector"][
3246 "topParentName"
3247 ]
3248
3249 # get portfolios list in portfolio_Group
3250 portfolio_group = self.get_portfolio_group(portfolio_group_id)
3251 portfolio_ids = [x["portfolio_id"] for x in portfolio_group["portfolios"]]
3252 portfolio_info = {x["portfolio_id"]: x for x in portfolio_group["portfolios"]}
3253
3254 model_resp = self._get_models_for_portfolio(portfolio_ids=portfolio_ids)
3255 for portfolio in model_resp["data"]["portfolios"]:
3256 portfolio_info[portfolio["id"]].update(portfolio)
3257
3258 model_info = {
3259 x["modelId"]: portfolio_info[x["id"]] for x in model_resp["data"]["portfolios"]
3260 }
3261
3262 # model_ids and portfolio_ids are parallel arrays
3263 model_ids = [portfolio_info[x]["modelId"] for x in portfolio_ids]
3264
3265 # graphql: get watchlist analysis
3266 wl_analysis = self._get_watchlist_analysis(
3267 gbi_ids=gbi_ids,
3268 model_ids=model_ids,
3269 portfolio_ids=portfolio_ids,
3270 asof_date=datetime.date.today(),
3271 )
3272
3273 portfolio_gbi_data: Dict[Any, Dict] = {k: {} for k in portfolio_ids}
3274 for pi, v in portfolio_gbi_data.items():
3275 v.update({k: {} for k in gbi_data.keys()})
3276
3277 equity_explorer_date = wl_analysis["data"]["watchlistAnalysis"][0]["analysisDates"][0][
3278 "date"
3279 ]
3280 for wla in wl_analysis["data"]["watchlistAnalysis"]:
3281 gbi_id = wla["gbiId"]
3282 gbi_data[gbi_id]["Composite Rating"] = wla["analysisDates"][0]["aggregateSignal"][
3283 "rating"
3284 ]
3285 gbi_data[gbi_id]["Composite Rating Delta"] = wla["analysisDates"][0]["aggregateSignal"][
3286 "ratingDelta"
3287 ]
3288
3289 for p in wla["analysisDates"][0]["portfoliosSignals"]:
3290 model_name = portfolio_info[p["portfolioId"]]["modelName"]
3291
3292 portfolio_gbi_data[p["portfolioId"]][gbi_id][
3293 model_name + self._coverage_column_name_format(": rank")
3294 ] = (p["rank"] + 1)
3295 portfolio_gbi_data[p["portfolioId"]][gbi_id][
3296 model_name + self._coverage_column_name_format(": rank delta")
3297 ] = (-1 * p["signalDelta"])
3298 portfolio_gbi_data[p["portfolioId"]][gbi_id][
3299 model_name + self._coverage_column_name_format(": rating")
3300 ] = p["rating"]
3301 portfolio_gbi_data[p["portfolioId"]][gbi_id][
3302 model_name + self._coverage_column_name_format(": rating delta")
3303 ] = p["ratingDelta"]
3304
3305 neg_rec: Dict[Any, Dict] = {k: {} for k in gbi_data.keys()}
3306 pos_rec: Dict[Any, Dict] = {k: {} for k in gbi_data.keys()}
3307 for wla in wl_analysis["data"]["watchlistAnalysis"]:
3308 gbi_id = wla["gbiId"]
3309
3310 for pid, signals in zip(portfolio_ids, wla["analysisDates"][0]["portfoliosSignals"]):
3311 model_name = portfolio_info[pid]["modelName"]
3312 neg_rec[gbi_id][
3313 model_name + self._coverage_column_name_format(": negative recommendation")
3314 ] = signals["explainWeightNeg"]
3315 pos_rec[gbi_id][
3316 model_name + self._coverage_column_name_format(": positive recommendation")
3317 ] = signals["explainWeightPos"]
3318
3319 # graphql: GetExcessReturn - slugging pct
3320 er_sp = self._get_excess_return(
3321 model_ids=model_ids, gbi_ids=gbi_ids, asof_date=equity_explorer_date
3322 )
3323
3324 for model in er_sp["data"]["models"]:
3325 model_name = model_info[model["id"]]["modelName"]
3326 for stat in model["equityExplorerData"]["equityExplorerSummaryStatistics"]:
3327 portfolioId = model_info[model["id"]]["id"]
3328 portfolio_gbi_data[portfolioId][int(stat["gbiId"])][
3329 model_name + self._coverage_column_name_format(": slugging %")
3330 ] = (stat["ER"]["SP"]["sixMonthWindowOneMonthHorizon"] * 100)
3331
3332 # add rank, rating, slugging
3333 for pid, v in portfolio_gbi_data.items():
3334 for gbi_id, vv in v.items():
3335 gbi_data[gbi_id].update(vv)
3336
3337 # add neg/pos rec scores
3338 for rec in [neg_rec, pos_rec]:
3339 for k, v in rec.items():
3340 gbi_data[k].update(v)
3341
3342 df = pd.DataFrame.from_records([v for _, v in gbi_data.items()])
3343
3344 return df
3345
3346 def get_coverage_csv(
3347 self, watchlist_id: str, portfolio_group_id: str, filepath: Optional[str] = None
3348 ) -> Optional[str]:
3349 """
3350 Converts the coverage contents to CSV format
3351 Parameters
3352 ----------
3353 watchlist_id: str
3354 UUID str identifying the coverage watchlist
3355 portfolio_group_id: str
3356 UUID str identifying the group of portfolio to use for analysis
3357 filepath: Optional[str]
3358 UUID str identifying the group of portfolio to use for analysis
3359
3360 Returns:
3361 ----------
3362 None if filepath is provided, else a string with a csv's contents is returned
3363 """
3364
3365 df = self.get_coverage_info(watchlist_id, portfolio_group_id)
3366
3367 return df.to_csv(filepath, index=False, float_format="%.4f")
3368
3369 def get_watchlist_details(self, watchlist_id: str) -> Dict:
3370 url = f"{self.base_uri}{ROUTE_PREFIX}{DAL_WATCHLIST_ROUTE}/details/"
3371 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3372 req_json = {"watchlist_id": watchlist_id}
3373 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3374
3375 if not res.ok:
3376 error_msg = self._try_extract_error_code(res)
3377 logger.error(error_msg)
3378 raise BoostedAPIException(f"Failed to get user models: {error_msg}")
3379
3380 data = res.json()
3381 return data
3382
3383 def create_watchlist_from_file(self, name: str, filepath: str) -> str:
3384 url = f"{self.base_uri}{ROUTE_PREFIX}{DAL_WATCHLIST_ROUTE}/create_watchlist_from_file/"
3385 headers = {"Authorization": "ApiKey " + self.api_key}
3386
3387 with open(filepath, "rb") as fp:
3388 file_bytes = fp.read()
3389
3390 file_bytes_base64 = base64.b64encode(file_bytes).decode("ascii")
3391 json_req = {
3392 "content_type": mimetypes.guess_type(filepath)[0],
3393 "file_bytes_base64": file_bytes_base64,
3394 "name": name,
3395 }
3396
3397 res = requests.post(url, json=json_req, headers=headers)
3398
3399 if not res.ok:
3400 error_msg = self._try_extract_error_code(res)
3401 logger.error(error_msg)
3402 raise BoostedAPIException(f"Failed to create watchlist from file: {error_msg}")
3403
3404 data = res.json()
3405 return data["watchlist_id"]
3406
3407 def get_watchlists(self) -> List[Dict]:
3408 url = f"{self.base_uri}{ROUTE_PREFIX}{DAL_WATCHLIST_ROUTE}/get_user_watchlists/"
3409 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3410 req_json: Dict = {}
3411 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3412
3413 if not res.ok:
3414 error_msg = self._try_extract_error_code(res)
3415 logger.error(error_msg)
3416 raise BoostedAPIException(f"Failed to get user watchlists: {error_msg}")
3417
3418 data = res.json()
3419 return data["watchlists"]
3420
3421 def get_watchlist_contents(self, watchlist_id) -> Dict:
3422 url = f"{self.base_uri}{ROUTE_PREFIX}{DAL_WATCHLIST_ROUTE}/contents/"
3423 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3424 req_json = {"watchlist_id": watchlist_id}
3425 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3426
3427 if not res.ok:
3428 error_msg = self._try_extract_error_code(res)
3429 logger.error(error_msg)
3430 raise BoostedAPIException(f"Failed to get watchlist contents: {error_msg}")
3431
3432 data = res.json()
3433 return data
3434
3435 def get_watchlist_contents_as_csv(self, watchlist_id, filepath) -> None:
3436 data = self.get_watchlist_contents(watchlist_id)
3437 df = pd.DataFrame(data["contents"])
3438 df.to_csv(filepath, index=False)
3439
3440 # TODO this will need to be enhanced to accept country/currency overrides
3441 def add_securities_to_watchlist(
3442 self, watchlist_id: str, identifiers: List[str], identifier_type: Literal["TICKER", "ISIN"]
3443 ) -> Dict:
3444 # should we just make the arg lower? all caps has a flag-like feel to it
3445 id_type = identifier_type.lower()
3446 url = f"{self.base_uri}{WATCHLIST_ROUTE_PREFIX}{DAL_WATCHLIST_ROUTE}/add_{id_type}s/"
3447 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3448 req_json = {"watchlist_id": watchlist_id, id_type: identifiers}
3449 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3450
3451 if not res.ok:
3452 error_msg = self._try_extract_error_code(res)
3453 logger.error(error_msg)
3454 raise BoostedAPIException(f"Failed to get user models: {error_msg}")
3455
3456 data = res.json()
3457 return data
3458
3459 def remove_securities_from_watchlist(
3460 self, watchlist_id: str, identifiers: List[str], identifier_type: Literal["TICKER", "ISIN"]
3461 ) -> Dict:
3462 # should we just make the arg lower? all caps has a flag-like feel to it
3463 id_type = identifier_type.lower()
3464 url = f"{self.base_uri}{WATCHLIST_ROUTE_PREFIX}{DAL_WATCHLIST_ROUTE}/remove_{id_type}s/"
3465 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3466 req_json = {"watchlist_id": watchlist_id, id_type: identifiers}
3467 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3468
3469 if not res.ok:
3470 error_msg = self._try_extract_error_code(res)
3471 logger.error(error_msg)
3472 raise BoostedAPIException(f"Failed to get user models: {error_msg}")
3473
3474 data = res.json()
3475 return data
3476
3477 def get_portfolio_groups(
3478 self,
3479 ) -> Dict:
3480 """
3481 Parameters: None
3482
3483
3484 Returns:
3485 ----------
3486
3487 Dict: {
3488 user_id: str
3489 portfolio_groups: List[PortfolioGroup]
3490 }
3491 where PortfolioGroup is defined as = Dict {
3492 group_id: str
3493 group_name: str
3494 portfolios: List[PortfolioInGroup]
3495 }
3496 where PortfolioInGroup is defined as = Dict {
3497 portfolio_id: str
3498 rank_in_group: Optional[int]
3499 }
3500 """
3501 url = f"{self.base_uri}{WATCHLIST_ROUTE_PREFIX}{PORTFOLIO_GROUP_ROUTE}/get"
3502 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3503 req_json: Dict = {}
3504 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3505
3506 if not res.ok:
3507 error_msg = self._try_extract_error_code(res)
3508 logger.error(error_msg)
3509 raise BoostedAPIException(f"Failed to get user portfolio groups: {error_msg}")
3510
3511 data = res.json()
3512 return data
3513
3514 def get_portfolio_group(self, portfolio_group_id: str) -> Dict:
3515 """
3516 Parameters:
3517 portfolio_group_id: str
3518 UUID identifier for the portfolio group
3519
3520
3521 Returns:
3522 ----------
3523
3524 PortfolioGroup: Dict: {
3525 group_id: str
3526 group_name: str
3527 portfolios: List[PortfolioInGroup]
3528 }
3529 where PortfolioInGroup is defined as = Dict {
3530 portfolio_id: str
3531 portfolio_name: str
3532 rank_in_group: Optional[int]
3533 }
3534 """
3535 url = f"{self.base_uri}{WATCHLIST_ROUTE_PREFIX}{PORTFOLIO_GROUP_ROUTE}/get-one"
3536 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3537 req_json = {"portfolio_group_id": portfolio_group_id}
3538 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3539
3540 if not res.ok:
3541 error_msg = self._try_extract_error_code(res)
3542 logger.error(error_msg)
3543 raise BoostedAPIException(f"Failed to get user portfolio groups: {error_msg}")
3544
3545 data = res.json()
3546 return data
3547
3548 def set_sticky_portfolio_group(
3549 self,
3550 portfolio_group_id: str,
3551 ) -> Dict:
3552 """
3553 Set sticky portfolio group
3554
3555 Parameters
3556 ----------
3557
3558 group_id: str,
3559 UUID str identifying a portfolio group
3560
3561 Returns:
3562 -------
3563 Dict {
3564 changed: int - 1 == success
3565 }
3566 """
3567 url = f"{self.base_uri}{WATCHLIST_ROUTE_PREFIX}{PORTFOLIO_GROUP_ROUTE}/set-sticky"
3568 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3569 req_json = {"portfolio_group_id": portfolio_group_id}
3570 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3571
3572 if not res.ok:
3573 error_msg = self._try_extract_error_code(res)
3574 logger.error(error_msg)
3575 raise BoostedAPIException(f"Failed to set sticky portfolio group: {error_msg}")
3576
3577 data = res.json()
3578 return data
3579
3580 def get_sticky_portfolio_group(
3581 self,
3582 ) -> Dict:
3583 """
3584 Get sticky portfolio group for the user
3585
3586 Parameters
3587 ----------
3588
3589 Returns:
3590 -------
3591 Dict {
3592 group_id: str
3593 group_name: str
3594 portfolios: List[PortfolioInGroup(Dict)]
3595 PortfolioInGroup(Dict):
3596 portfolio_id: str
3597 rank_in_group: Optional[int] = None
3598 portfolio_name: Optional[str] = None
3599 }
3600 """
3601 url = f"{self.base_uri}{WATCHLIST_ROUTE_PREFIX}{PORTFOLIO_GROUP_ROUTE}/get-sticky"
3602 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3603 req_json: Dict = {}
3604 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3605
3606 if not res.ok:
3607 error_msg = self._try_extract_error_code(res)
3608 logger.error(error_msg)
3609 raise BoostedAPIException(f"Failed to get sticky portfolio group: {error_msg}")
3610
3611 data = res.json()
3612 return data
3613
3614 def create_portfolio_group(
3615 self,
3616 group_name: str,
3617 portfolios: Optional[List[Dict]] = None,
3618 ) -> Dict:
3619 """
3620 Create a new portfolio group
3621
3622 Parameters
3623 ----------
3624
3625 group_name: str
3626 name of the new group
3627
3628 portfolios: List of Dict [:
3629
3630 portfolio_id: str
3631 rank_in_group: Optional[int] = None
3632 ]
3633
3634 Returns:
3635 ----------
3636
3637 Dict: {
3638 group_id: str
3639 UUID identifier for the portfolio group
3640
3641 created: int
3642 num groups created, 1 == success
3643
3644 added: int
3645 num portfolios added to the group, should match the length of 'portfolios' argument
3646 }
3647 """
3648 url = f"{self.base_uri}{WATCHLIST_ROUTE_PREFIX}{PORTFOLIO_GROUP_ROUTE}/create"
3649 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3650 req_json = {"group_name": group_name, "portfolios": portfolios}
3651
3652 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3653
3654 if not res.ok:
3655 error_msg = self._try_extract_error_code(res)
3656 logger.error(error_msg)
3657 raise BoostedAPIException(f"Failed to create portfolio group: {error_msg}")
3658
3659 data = res.json()
3660 return data
3661
3662 def rename_portfolio_group(
3663 self,
3664 group_id: str,
3665 group_name: str,
3666 ) -> Dict:
3667 """
3668 Rename a portfolio group
3669
3670 Parameters
3671 ----------
3672
3673 group_id: str,
3674 UUID str identifying a portfolio group
3675
3676 group_name: str,
3677 The new name for the porfolio
3678
3679 Returns:
3680 -------
3681 Dict {
3682 changed: int - 1 == success
3683 }
3684 """
3685 url = f"{self.base_uri}{WATCHLIST_ROUTE_PREFIX}{PORTFOLIO_GROUP_ROUTE}/rename"
3686 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3687 req_json = {"group_id": group_id, "group_name": group_name}
3688 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3689
3690 if not res.ok:
3691 error_msg = self._try_extract_error_code(res)
3692 logger.error(error_msg)
3693 raise BoostedAPIException(f"Failed to rename portfolio group: {error_msg}")
3694
3695 data = res.json()
3696 return data
3697
3698 def add_to_portfolio_group(
3699 self,
3700 group_id: str,
3701 portfolios: List[Dict],
3702 ) -> Dict:
3703 """
3704 Add portfolios to a group
3705
3706 Parameters
3707 ----------
3708
3709 group_id: str,
3710 UUID str identifying a portfolio group
3711
3712 portfolios: List of Dict [:
3713 portfolio_id: str
3714 rank_in_group: Optional[int] = None
3715 ]
3716
3717
3718 Returns:
3719 -------
3720 Dict {
3721 added: int
3722 number of successful changes
3723 }
3724 """
3725 url = f"{self.base_uri}{WATCHLIST_ROUTE_PREFIX}{PORTFOLIO_GROUP_ROUTE}/add-to-group"
3726 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3727 req_json = {"group_id": group_id, "portfolios": portfolios}
3728
3729 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3730
3731 if not res.ok:
3732 error_msg = self._try_extract_error_code(res)
3733 logger.error(error_msg)
3734 raise BoostedAPIException(f"Failed to add portfolios to portfolio group: {error_msg}")
3735
3736 data = res.json()
3737 return data
3738
3739 def remove_from_portfolio_group(
3740 self,
3741 group_id: str,
3742 portfolios: List[str],
3743 ) -> Dict:
3744 """
3745 Remove portfolios from a group
3746
3747 Parameters
3748 ----------
3749
3750 group_id: str,
3751 UUID str identifying a portfolio group
3752
3753 portfolios: List of str
3754
3755
3756 Returns:
3757 -------
3758 Dict {
3759 removed: int
3760 number of successful changes
3761 }
3762 """
3763 url = f"{self.base_uri}{WATCHLIST_ROUTE_PREFIX}{PORTFOLIO_GROUP_ROUTE}/remove-from-group"
3764 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3765 req_json = {"group_id": group_id, "portfolios": portfolios}
3766 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3767
3768 if not res.ok:
3769 error_msg = self._try_extract_error_code(res)
3770 logger.error(error_msg)
3771 raise BoostedAPIException(
3772 f"Failed to remove portfolios from portfolio group: {error_msg}"
3773 )
3774
3775 data = res.json()
3776 return data
3777
3778 def delete_portfolio_group(
3779 self,
3780 group_id: str,
3781 ) -> Dict:
3782 """
3783 Delete a portfolio group
3784
3785 Parameters
3786 ----------
3787
3788 group_id: str,
3789 UUID str identifying a portfolio group
3790
3791
3792 Returns:
3793 -------
3794 Dict {
3795 removed_groups: int
3796 number of successful changes
3797
3798 removed_portfolios: int
3799 number of successful changes
3800 }
3801 """
3802 url = f"{self.base_uri}{WATCHLIST_ROUTE_PREFIX}{PORTFOLIO_GROUP_ROUTE}/remove"
3803 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3804 req_json = {"group_id": group_id}
3805 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3806
3807 if not res.ok:
3808 error_msg = self._try_extract_error_code(res)
3809 logger.error(error_msg)
3810 raise BoostedAPIException(f"Failed to delete portfolio group: {error_msg}")
3811
3812 data = res.json()
3813 return data
3814
3815 def set_portfolio_group_for_watchlist(
3816 self,
3817 portfolio_group_id: str,
3818 watchlist_id: str,
3819 ) -> Dict:
3820 """
3821 Set portfolio group for watchlist.
3822
3823 Parameters
3824 ----------
3825
3826 portfolio_group_id: str,
3827 UUID str identifying a portfolio group
3828
3829 watchlist_id: str,
3830 UUID str identifying a watchlist
3831
3832
3833 Returns:
3834 -------
3835 Dict {
3836 success: bool
3837 errors:
3838 data: Dict
3839 changed: int
3840 }
3841 """
3842 url = f"{self.base_uri}{WATCHLIST_ROUTE_PREFIX}{DAL_WATCHLIST_ROUTE}/set-portfolio-groups/"
3843 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3844 req_json = {"portfolio_group_id": portfolio_group_id, "watchlist_id": watchlist_id}
3845 res = requests.post(url, json=req_json, headers=headers, **self._request_params)
3846
3847 if not res.ok:
3848 error_msg = self._try_extract_error_code(res)
3849 logger.error(error_msg)
3850 raise BoostedAPIException(f"Failed to set portfolio group for watchlist: {error_msg}")
3851
3852 return res.json()
3853
3854 def get_ranking_dates(self, model_id: str, portfolio_id: str) -> List[datetime.date]:
3855 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3856 url = self.base_uri + f"/api/analysis/ranking-dates/{model_id}/{portfolio_id}"
3857 res = requests.get(url, headers=headers, **self._request_params)
3858 self._check_ok_or_err_with_msg(res, "Failed to get ranking dates")
3859 data = res.json().get("ranking_dates", [])
3860
3861 return [parser.parse(d).date() for d in data]
3862
3863 def get_prior_ranking_date(
3864 self, ranking_dates: List[datetime.date], starting_date: datetime.date
3865 ) -> datetime.date:
3866 """
3867 Given a starting date and a list of ranking dates, return the most
3868 recent previous ranking date.
3869 """
3870 # order from most recent to least
3871 ranking_dates.sort(reverse=True)
3872
3873 for d in ranking_dates:
3874 if d <= starting_date:
3875 return d
3876
3877 # if we get here, the starting date is before the earliest ranking date
3878 raise BoostedAPIException(f"No rankins exist on or before {starting_date}")
3879
3880 def _get_risk_factors_descriptors(
3881 self, model_id: str, portfolio_id: str, use_v2: bool = False
3882 ) -> Dict[int, str]:
3883 """Returns a map from descriptor id to descriptor name."""
3884 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3885
3886 risk_factor = RISK_FACTOR_V2 if use_v2 else RISK_FACTOR
3887 url = self.base_uri + f"/api/{risk_factor}/{model_id}/{portfolio_id}/descriptors"
3888 res = requests.get(url, headers=headers, **self._request_params)
3889
3890 self._check_ok_or_err_with_msg(res, "Failed to get risk factor descriptors")
3891
3892 descriptors = {int(i): name for i, name in res.json().items() if i.isnumeric()}
3893 return descriptors
3894
3895 def get_risk_groups(
3896 self, model_id: str, portfolio_id: str, date: datetime.date, use_v2: bool = False
3897 ) -> List[Dict[str, Any]]:
3898 # first get the group descriptors
3899 descriptors = self._get_risk_factors_descriptors(model_id, portfolio_id, use_v2)
3900
3901 # calculate the most recent prior rankings date. This is the date
3902 # we need to use to query for risk group data.
3903 ranking_dates = self.get_ranking_dates(model_id, portfolio_id)
3904 ranking_date = self.get_prior_ranking_date(ranking_dates, date)
3905 date_str = ranking_date.strftime("%Y-%m-%d")
3906
3907 risk_factor = RISK_FACTOR_V2 if use_v2 else RISK_FACTOR
3908
3909 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3910 url = self.base_uri + f"/api/{risk_factor}/{model_id}/{portfolio_id}/risk-groups/{date_str}"
3911 res = requests.get(url, headers=headers, **self._request_params)
3912
3913 self._check_ok_or_err_with_msg(
3914 res, f"Failed to get risk factors for {model_id=}, {portfolio_id=}, {date=}"
3915 )
3916
3917 # Response is a list of objects like:
3918 # [
3919 # [
3920 # 0,
3921 # 14,
3922 # 1
3923 # ],
3924 # [
3925 # 25,
3926 # 12,
3927 # 13
3928 # ],
3929 # 0.67013
3930 # ],
3931 #
3932 # Where each integer in the lists is a descriptor id.
3933
3934 groups = []
3935 for i, row in enumerate(res.json()):
3936 row_map: Dict[str, Any] = {}
3937 # map descriptor id to name
3938 row_map["machine"] = i + 1 # start at 1 not 0
3939 row_map["risk_group_a"] = [descriptors[i] for i in row[0]]
3940 row_map["risk_group_b"] = [descriptors[i] for i in row[1]]
3941 row_map["volatility_explained"] = row[2]
3942 groups.append(row_map)
3943
3944 return groups
3945
3946 def get_risk_factors_discovered_descriptors(
3947 self, model_id: str, portfolio_id: str, date: datetime.date, use_v2: bool = False
3948 ) -> pd.DataFrame:
3949 # first get the group descriptors
3950 descriptors = self._get_risk_factors_descriptors(model_id, portfolio_id)
3951
3952 # calculate the most recent prior rankings date. This is the date
3953 # we need to use to query for risk group data.
3954 ranking_dates = self.get_ranking_dates(model_id, portfolio_id)
3955 ranking_date = self.get_prior_ranking_date(ranking_dates, date)
3956 date_str = ranking_date.strftime("%Y-%m-%d")
3957
3958 risk_factor = RISK_FACTOR_V2 if use_v2 else RISK_FACTOR
3959
3960 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
3961 url = (
3962 self.base_uri
3963 + f"/api/{risk_factor}/{model_id}/{portfolio_id}/risk-descriptors/json/{date_str}"
3964 )
3965 res = requests.get(url, headers=headers, **self._request_params)
3966
3967 self._check_ok_or_err_with_msg(
3968 res, f"Failed to get risk factors for {model_id=}, {portfolio_id=}, {date=}"
3969 )
3970
3971 # Endpoint returns a nested list of floats
3972 df = pd.DataFrame(res.json(), columns=RISK_FACTOR_COLUMNS)
3973
3974 # This flat dataframe represents a potentially doubly nested structure
3975 # of Sector -> (high/low volatility) -> security. We don't care about
3976 # the high/low volatility rows, (which will have negative identifiers)
3977 # so we can filter these out.
3978 df = df[df["identifier"] >= 0]
3979
3980 # now, any values that had a depth of 2 should be set to a depth of 1,
3981 # since we removed the double nesting.
3982 df.replace(to_replace=2, value=1, inplace=True)
3983
3984 # This dataframe represents data that is nested on the UI, so the
3985 # "depth" field indicates which level of nesting each row is at. At this
3986 # point, a depth of 0 indicates a sector, and following depth 1 rows are
3987 # securities within the sector.
3988
3989 # Identifiers in rows with depth 1 will be gbi ids, need to convert to
3990 # symbols.
3991 gbi_ids = df[df["depth"] == 1]["identifier"].tolist()
3992 sec_info = self._get_security_info(gbi_ids)["data"]["securities"]
3993 sec_map = {s["gbiId"]: s["symbol"] for s in sec_info}
3994
3995 def convert_ids(row: pd.Series) -> pd.Series:
3996 # convert each row's "identifier" to the appropriate id type. If the
3997 # depth is 0, the identifier should be a sector, otherwise it should
3998 # be a ticker.
3999 ident = int(row["identifier"])
4000 row["identifier"] = (
4001 descriptors.get(ident).title() if row["depth"] == 0 else sec_map.get(ident)
4002 )
4003 return row
4004
4005 df["depth"] = df["depth"].astype(int)
4006 df["stock_count"] = df["stock_count"].astype(int)
4007 df = df.apply(convert_ids, axis=1)
4008 df = df.reset_index(drop=True)
4009 return df
4010
4011 def get_risk_factors_sectors(
4012 self, model_id: str, portfolio_id: str, date: datetime.date, use_v2: bool = False
4013 ) -> pd.DataFrame:
4014 # first get the group descriptors
4015 sectors = {s["id"]: s["name"] for s in self._get_sector_info()}
4016
4017 # calculate the most recent prior rankings date. This is the date
4018 # we need to use to query for risk group data.
4019 ranking_dates = self.get_ranking_dates(model_id, portfolio_id)
4020 ranking_date = self.get_prior_ranking_date(ranking_dates, date)
4021 date_str = ranking_date.strftime("%Y-%m-%d")
4022
4023 risk_factor = RISK_FACTOR_V2 if use_v2 else RISK_FACTOR
4024
4025 headers = {"Authorization": "ApiKey " + self.api_key, "Content-Type": "application/json"}
4026 url = (
4027 self.base_uri
4028 + f"/api/{risk_factor}/{model_id}/{portfolio_id}/risk-sectors/json/{date_str}"
4029 )
4030 res = requests.get(url, headers=headers, **self._request_params)
4031
4032 self._check_ok_or_err_with_msg(
4033 res, f"Failed to get risk factors for {model_id=}, {portfolio_id=}, {date=}"
4034 )
4035
4036 # Endpoint returns a nested list of floats
4037 df = pd.DataFrame(res.json(), columns=RISK_FACTOR_COLUMNS)
4038
4039 # identifier is a gics sector identifier
4040 df["identifier"] = df["identifier"].apply(lambda i: sectors.get(int(i), None))
4041
4042 # This dataframe represents data that is nested on the UI, so the
4043 # "depth" field indicates which level of nesting each row is at. For
4044 # risk factors sectors, each "depth" represents a level of specificity
4045 # for the sector. E.g. Energy -> Energy Equipment -> Oil & Gas Equipment
4046 df["depth"] = df["depth"].astype(int)
4047 df["stock_count"] = df["stock_count"].astype(int)
4048 df = df.reset_index(drop=True)
4049 return df
4050
4051 def download_complete_portfolio_data(
4052 self, model_id: str, portfolio_id: str