| from huggingface_hub import HfFileSystem |
| import pandas as pd |
| from utils import logger |
| from datetime import datetime |
| import threading |
| import traceback |
| import json |
| import re |
|
|
| |
| fs = HfFileSystem() |
|
|
| IMPORTANT_MODELS = [ |
| "auto", |
| "bert", |
| "gpt2", |
| "t5", |
| "modernbert", |
| "vit", |
| "clip", |
| "detr", |
| "table-transformer", |
| "got_ocr2", |
| "whisper", |
| "wav2vec2", |
| "llama", |
| "gemma3", |
| "qwen2", |
| "mistral3", |
| "qwen2_5_vl", |
| "llava", |
| "smolvlm", |
| "internvl", |
| "gemma3n", |
| "qwen2_5_omni", |
| ] |
|
|
| KEYS_TO_KEEP = [ |
| "success_amd", |
| "success_nvidia", |
| "skipped_amd", |
| "skipped_nvidia", |
| "failed_multi_no_amd", |
| "failed_multi_no_nvidia", |
| "failed_single_no_amd", |
| "failed_single_no_nvidia", |
| "failures_amd", |
| "failures_nvidia", |
| "job_link_amd", |
| "job_link_nvidia", |
| ] |
|
|
|
|
| def log_dataframe_link(link: str) -> str: |
| """ |
| Adds the link to the dataset in the logs, modifies it to get a clockable link and then returns the date of the |
| report. |
| """ |
| logger.info(f"Reading df located at {link}") |
| |
| if link.startswith("hf://"): |
| link = "https://huggingface.co/" + link.removeprefix("hf://") |
| |
| pattern = r'transformers_daily_ci(.*?)/(\d{4}-\d{2}-\d{2})' |
| match = re.search(pattern, link) |
| |
| if not match: |
| logger.error("Could not find transformers_daily_ci and.or date in the link") |
| return "9999-99-99" |
| |
| path_between = match.group(1) |
| link = link.replace("transformers_daily_ci" + path_between, "transformers_daily_ci/blob/main") |
| logger.info(f"Link to data source: {link}") |
| |
| return match.group(2) |
|
|
| def infer_latest_update_msg(date_df_amd: str, date_df_nvidia: str) -> str: |
| |
| if date_df_amd.startswith("9999") and date_df_nvidia.startswith("9999"): |
| return "could not find last update time" |
| |
| if date_df_amd != date_df_nvidia: |
| logger.warning(f"Different dates found: {date_df_amd} (AMD) vs {date_df_nvidia} (NVIDIA)") |
| |
| try: |
| latest_date = max(date_df_amd, date_df_nvidia) |
| yyyy, mm, dd = latest_date.split("-") |
| return f"last updated {mm}/{dd}/{yyyy}" |
| except Exception as e: |
| logger.error(f"When trying to infer latest date, got error {e}") |
| return "could not find last update time" |
|
|
| def read_one_dataframe(json_path: str, device_label: str) -> tuple[pd.DataFrame, str]: |
| df_upload_date = log_dataframe_link(json_path) |
| df = pd.read_json(json_path, orient="index") |
| df.index.name = "model_name" |
| df[f"failed_multi_no_{device_label}"] = df["failures"].apply(lambda x: len(x["multi"]) if "multi" in x else 0) |
| df[f"failed_single_no_{device_label}"] = df["failures"].apply(lambda x: len(x["single"]) if "single" in x else 0) |
| return df, df_upload_date |
|
|
| def get_distant_data() -> tuple[pd.DataFrame, str]: |
| |
| amd_src = "hf://datasets/optimum-amd/transformers_daily_ci/**/runs/**/ci_results_run_models_gpu/model_results.json" |
| files_amd = sorted(fs.glob(amd_src, refresh=True), reverse=True) |
| df_amd, date_df_amd = read_one_dataframe(f"hf://{files_amd[0]}", "amd") |
| |
| |
| nvidia_src = "hf://datasets/hf-internal-testing/transformers_daily_ci/*/ci_results_run_models_gpu/model_results.json" |
| files_nvidia = sorted(fs.glob(nvidia_src, refresh=True), reverse=True) |
| |
| nvidia_path = files_nvidia[0].lstrip('datasets/hf-internal-testing/transformers_daily_ci/') |
| nvidia_path = "https://huggingface.co/datasets/hf-internal-testing/transformers_daily_ci/raw/main/" + nvidia_path |
| df_nvidia, date_df_nvidia = read_one_dataframe(nvidia_path, "nvidia") |
| |
| latest_update_msg = infer_latest_update_msg(date_df_amd, date_df_nvidia) |
| |
| joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") |
| joined = joined[KEYS_TO_KEEP] |
| joined.index = joined.index.str.replace("^models_", "", regex=True) |
| |
| important_models_lower = [model.lower() for model in IMPORTANT_MODELS] |
| filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] |
| |
| for model in IMPORTANT_MODELS: |
| if model not in filtered_joined.index: |
| print(f"[WARNING] Model {model} was missing from index.") |
| return filtered_joined, latest_update_msg |
|
|
|
|
| def get_sample_data() -> tuple[pd.DataFrame, str]: |
| |
| df_amd, _ = read_one_dataframe("sample_amd.json", "amd") |
| df_nvidia, _ = read_one_dataframe("sample_nvidia.json", "nvidia") |
| |
| joined = df_amd.join(df_nvidia, rsuffix="_nvidia", lsuffix="_amd", how="outer") |
| joined = joined[KEYS_TO_KEEP] |
| joined.index = joined.index.str.replace("^models_", "", regex=True) |
| |
| important_models_lower = [model.lower() for model in IMPORTANT_MODELS] |
| filtered_joined = joined[joined.index.str.lower().isin(important_models_lower)] |
| |
| filtered_joined.index = "sample_" + filtered_joined.index |
| return filtered_joined, "sample data was loaded" |
|
|
| def safe_extract(row: pd.DataFrame, key: str) -> int: |
| return int(row.get(key, 0)) if pd.notna(row.get(key, 0)) else 0 |
|
|
| def extract_model_data(row: pd.Series) -> tuple[dict[str, int], dict[str, int], int, int, int, int]: |
| """Extract and process model data from DataFrame row.""" |
| |
| success_nvidia = safe_extract(row, "success_nvidia") |
| success_amd = safe_extract(row, "success_amd") |
|
|
| skipped_nvidia = safe_extract(row, "skipped_nvidia") |
| skipped_amd = safe_extract(row, "skipped_amd") |
| |
| failed_multi_amd = safe_extract(row, 'failed_multi_no_amd') |
| failed_multi_nvidia = safe_extract(row, 'failed_multi_no_nvidia') |
| failed_single_amd = safe_extract(row, 'failed_single_no_amd') |
| failed_single_nvidia = safe_extract(row, 'failed_single_no_nvidia') |
| |
| total_failed_amd = failed_multi_amd + failed_single_amd |
| total_failed_nvidia = failed_multi_nvidia + failed_single_nvidia |
| |
| amd_stats = { |
| 'passed': success_amd, |
| 'failed': total_failed_amd, |
| 'skipped': skipped_amd, |
| 'error': 0 |
| } |
| nvidia_stats = { |
| 'passed': success_nvidia, |
| 'failed': total_failed_nvidia, |
| 'skipped': skipped_nvidia, |
| 'error': 0 |
| } |
| return amd_stats, nvidia_stats, failed_multi_amd, failed_single_amd, failed_multi_nvidia, failed_single_nvidia |
|
|
|
|
|
|
| class CIResults: |
|
|
| def __init__(self): |
| self.df = pd.DataFrame() |
| self.available_models = [] |
| self.latest_update_msg = "" |
|
|
| def load_data(self) -> None: |
| """Load data from the data source.""" |
| |
| try: |
| logger.info("Loading distant data...") |
| new_df, latest_update_msg = get_distant_data() |
| self.latest_update_msg = latest_update_msg |
| except Exception as e: |
| error_msg = [ |
| "Loading data failed:", |
| "-" * 120, |
| traceback.format_exc(), |
| "-" * 120, |
| "Falling back on sample data." |
| ] |
| logger.error("\n".join(error_msg)) |
| new_df, latest_update_msg = get_sample_data() |
| self.latest_update_msg = latest_update_msg |
| |
| self.df = new_df |
| self.available_models = new_df.index.tolist() |
| |
| logger.info(f"Data loaded successfully: {len(self.available_models)} models") |
| logger.info(f"Models: {self.available_models[:5]}{'...' if len(self.available_models) > 5 else ''}") |
| logger.info(f"Latest update message: {self.latest_update_msg}") |
| |
| msg = {} |
| for model in self.available_models[:3]: |
| msg[model] = {} |
| for col in self.df.columns: |
| value = self.df.loc[model, col] |
| if not isinstance(value, int): |
| value = str(value) |
| if len(value) > 10: |
| value = value[:10] + "..." |
| msg[model][col] = value |
| logger.info(json.dumps(msg, indent=4)) |
|
|
| def schedule_data_reload(self): |
| """Schedule the next data reload.""" |
| def reload_data(): |
| self.load_data() |
| |
| timer = threading.Timer(900.0, reload_data) |
| timer.daemon = True |
| timer.start() |
| logger.info("Next data reload scheduled in 15 minutes") |
|
|
| |
| timer = threading.Timer(900.0, reload_data) |
| timer.daemon = True |
| timer.start() |
| logger.info("Data auto-reload scheduled every 15 minutes") |
|
|