Source code for gliner.evaluation.evaluate_ner

import os
import glob
import json
import random

import torch
from tqdm import tqdm


[docs] def open_content(path): """Load train, dev, test, and label files from a dataset directory. Searches for JSON files in the specified directory and loads them based on filename patterns (train, dev, test, labels). Args: path: Path to the directory containing dataset JSON files. Returns: A tuple of (train, dev, test, labels) where: - train: List of training examples loaded from *train*.json, or None if not found - dev: List of development examples loaded from *dev*.json, or None if not found - test: List of test examples loaded from *test*.json, or None if not found - labels: List of entity type labels loaded from *labels*.json, or None if not found Note: Files are identified by checking if their filename contains 'train', 'dev', 'test', or 'labels'. All files are expected to be in JSON format with UTF-8 encoding. """ paths = glob.glob(os.path.join(path, "*.json")) train, dev, test, labels = None, None, None, None for p in paths: if "train" in p: with open(p, encoding="utf-8") as f: train = json.load(f) elif "dev" in p: with open(p, encoding="utf-8") as f: dev = json.load(f) elif "test" in p: with open(p, encoding="utf-8") as f: test = json.load(f) elif "labels" in p: with open(p, encoding="utf-8") as f: labels = json.load(f) return train, dev, test, labels
[docs] def process(data): """Convert character-level entity annotations to word-level annotations. Takes a data sample with character-level entity positions and converts them to word-level positions by tokenizing the sentence on whitespace. Args: data: Dictionary containing: - 'sentence': String of the full sentence - 'entities': List of entity dictionaries, each with: - 'pos': Tuple of (start_char, end_char) character positions - 'type': String entity type label Returns: Dictionary containing: - 'tokenized_text': List of words from the sentence - 'ner': List of tuples (start_word, end_word, entity_type) where start_word and end_word are word-level indices and entity_type is the lowercased entity type Note: This function assumes whitespace-separated words and that character positions align exactly with word boundaries (including spaces). """ words = data["sentence"].split() entities = [] # List of entities (start, end, type) for entity in data["entities"]: start_char, end_char = entity["pos"] # Initialize variables to keep track of word positions start_word = None end_word = None # Iterate through words and find the word positions char_count = 0 for i, word in enumerate(words): word_length = len(word) if char_count == start_char: start_word = i if char_count + word_length == end_char: end_word = i break char_count += word_length + 1 # Add 1 for the space # Append the word positions to the list entities.append((start_word, end_word, entity["type"].lower())) # Create a list of word positions for each entity sample = {"tokenized_text": words, "ner": entities} return sample
[docs] def create_dataset(path): """Create train, dev, and test datasets from a directory of JSON files. Loads all dataset splits and processes them to convert character-level annotations to word-level annotations. Also normalizes entity type labels to lowercase. Args: path: Path to the directory containing dataset JSON files. Returns: A tuple of (train_dataset, dev_dataset, test_dataset, labels) where: - train_dataset: List of processed training samples - dev_dataset: List of processed development samples - test_dataset: List of processed test samples - labels: List of entity type labels (lowercased) Note: Each sample in the datasets is a dictionary with 'tokenized_text' and 'ner' keys as returned by the process() function. """ train, dev, test, labels = open_content(path) train_dataset = [] dev_dataset = [] test_dataset = [] for data in train: train_dataset.append(process(data)) for data in dev: dev_dataset.append(process(data)) for data in test: test_dataset.append(process(data)) labels = [label.lower() for label in labels] return train_dataset, dev_dataset, test_dataset, labels
[docs] @torch.no_grad() def get_for_one_path(path, model): """Evaluate a model on a single dataset. Loads the test set from the specified path and evaluates the model's performance. Automatically determines whether to use flat NER evaluation based on the dataset name. Args: path: Path to the dataset directory. model: NER model instance with an evaluate() method. Returns: A tuple of (data_name, results, f1) where: - data_name: String name of the dataset (extracted from path) - results: Detailed evaluation results dictionary from model.evaluate() - f1: F1 score (float) for the dataset Note: Datasets with 'ACE', 'GENIA', or 'Corpus' in their name are evaluated with flat_ner=False, all others use flat_ner=True. Evaluation uses a threshold of 0.5 and batch size of 12. """ # load the dataset _, _, test_dataset, entity_types = create_dataset(path) data_name = path.split("/")[-1] # get the name of the dataset # check if the dataset is flat_ner flat_ner = True if any(i in data_name for i in ["ACE", "GENIA", "Corpus"]): flat_ner = False # evaluate the model results, f1 = model.evaluate( test_dataset, flat_ner=flat_ner, threshold=0.5, batch_size=12, entity_types=entity_types ) return data_name, results, f1
[docs] def get_for_all_path(model, steps, log_dir, data_paths): """Evaluate a model across multiple datasets and log results. Evaluates the model on all datasets in the specified directory, separating results into standard benchmarks and zero-shot benchmarks. Writes detailed results to log files and computes average scores. Args: model: NER model instance with an evaluate() method and PyTorch parameters. steps: Integer representing the current training step (for logging). log_dir: Directory path where result files will be saved. data_paths: Path to directory containing multiple dataset subdirectories. Note: Creates two log files in log_dir: - 'results.txt': Detailed results for each dataset - 'tables.txt': Formatted tables with averages for benchmarks Zero-shot benchmark datasets (not included in main average): - mit-movie, mit-restaurant - CrossNER_AI, CrossNER_literature, CrossNER_music, CrossNER_politics, CrossNER_science Datasets with 'sample_' in their path are skipped. """ all_paths = glob.glob(f"{data_paths}/*") all_paths = sorted(all_paths) # move the model to the device device = next(model.parameters()).device model.to(device) # set the model to eval mode model.eval() # log the results save_path = os.path.join(log_dir, "results.txt") with open(save_path, "a") as f: f.write("##############################################\n") # write step f.write("step: " + str(steps) + "\n") zero_shot_benc = [ "mit-movie", "mit-restaurant", "CrossNER_AI", "CrossNER_literature", "CrossNER_music", "CrossNER_politics", "CrossNER_science", ] zero_shot_benc_results = {} all_results = {} # without crossNER for p in tqdm(all_paths): if "sample_" not in p: data_name, results, f1 = get_for_one_path(p, model) # write to file with open(save_path, "a") as f: f.write(data_name + "\n") f.write(str(results) + "\n") if data_name in zero_shot_benc: zero_shot_benc_results[data_name] = f1 else: all_results[data_name] = f1 avg_all = sum(all_results.values()) / len(all_results) avg_zs = sum(zero_shot_benc_results.values()) / len(zero_shot_benc_results) save_path_table = os.path.join(log_dir, "tables.txt") # results for all datasets except crossNER table_bench_all = "" for k, v in all_results.items(): table_bench_all += f"{k:20}: {v:.1%}\n" # (20 size aswell for average i.e. :20) table_bench_all += f"{'Average':20}: {avg_all:.1%}" # results for zero-shot benchmark table_bench_zeroshot = "" for k, v in zero_shot_benc_results.items(): table_bench_zeroshot += f"{k:20}: {v:.1%}\n" table_bench_zeroshot += f"{'Average':20}: {avg_zs:.1%}" # write to file with open(save_path_table, "a") as f: f.write("##############################################\n") f.write("step: " + str(steps) + "\n") f.write("Table for all datasets except crossNER\n") f.write(table_bench_all + "\n\n") f.write("Table for zero-shot benchmark\n") f.write(table_bench_zeroshot + "\n") f.write("##############################################\n\n")
[docs] def sample_train_data(data_paths, sample_size=10000): """Sample training data from multiple datasets for combined training. Creates a combined training set by sampling a fixed number of examples from each dataset (excluding zero-shot benchmark datasets). Shuffles each dataset before sampling to ensure diversity. Args: data_paths: Path to directory containing multiple dataset subdirectories. sample_size: Maximum number of samples to take from each dataset. Defaults to 10000. Returns: List of training samples, where each sample is a dictionary with: - 'tokenized_text': List of words - 'ner': List of entity tuples (start, end, type) - 'label': List of all entity type labels for this dataset Note: Excludes zero-shot benchmark datasets: - CrossNER_AI, CrossNER_literature, CrossNER_music, CrossNER_politics, CrossNER_science, ACE 2004 Each dataset is shuffled before sampling to ensure random selection. If a dataset has fewer than sample_size examples, all examples are used. """ all_paths = glob.glob(f"{data_paths}/*") all_paths = sorted(all_paths) # to exclude the zero-shot benchmark datasets zero_shot_benc = [ "CrossNER_AI", "CrossNER_literature", "CrossNER_music", "CrossNER_politics", "CrossNER_science", "ACE 2004", ] new_train = [] # take 10k samples from each dataset for p in tqdm(all_paths): if any(i in p for i in zero_shot_benc): continue train, _, _, labels = create_dataset(p) # add label key to the train data for i in range(len(train)): train[i]["label"] = labels random.shuffle(train) train = train[:sample_size] new_train.extend(train) return new_train