"""
A collection of WEFE utility functions.
This file contains functions for to process to massively execute queries, aggregate them
through rankings and graph these results.
"""
import copy
import logging
from typing import Callable, List, Type, Union
import numpy as np
import pandas as pd
import pkg_resources
import plotly.express as px
import plotly.graph_objects as go
from gensim.models.keyedvectors import KeyedVectors
from sklearn.utils.validation import check_is_fitted as _check_is_fitted
from wefe.metrics.base_metric import BaseMetric
from wefe.query import Query
from wefe.word_embedding_model import WordEmbeddingModel
def check_is_fitted(estimator, attributes) -> None:
msg = (
"This %(name)s instance is not fitted yet. Call 'fit' with "
"appropriate arguments before using this debias method."
)
_check_is_fitted(estimator, attributes, msg=msg)
# -----------------------------------------------------------------------------
# ---------------------------------- Runners ----------------------------------
# -----------------------------------------------------------------------------
AGGREGATION_FUNCTIONS = {
"sum": lambda df: df.sum(1),
"avg": lambda df: df.mean(1),
"abs_sum": lambda df: df.abs().sum(1),
"abs_avg": lambda df: df.abs().mean(1),
}
AGGREGATION_FUNCTION_NAMES = {
"sum": "sum",
"avg": "average",
"abs_sum": "sum of abs values",
"abs_avg": "average of abs values",
}
[docs]def generate_subqueries_from_queries_list(
metric: BaseMetric, queries: List[Query]
) -> List[Query]:
"""Generate a list of subqueries from queries.
Parameters
----------
metric : BaseMetric
Some metric.
queries : List[Query]
A list with queries.
Returns
-------
List[Query]
A list with all the generated subqueries.
"""
# instance metric
metric_ = metric()
subqueries = []
for query_idx, query in enumerate(queries):
try:
subqueries += query.get_subqueries(metric_.metric_template)
except Exception as e:
logging.warning(
"Query in index {} ({}) can not be splitted in subqueries "
"with the {} metric template = {}. Exception: \n{}".format(
query_idx,
query.query_name,
metric_.metric_name,
metric_.metric_template,
e,
)
)
# remove duplicates (o(n^2)...)
filtered_subqueries: List[Query] = []
for subquery in subqueries:
duplicated = False
for filtered_subquery in filtered_subqueries:
if filtered_subquery.query_name == subquery.query_name:
duplicated = True
break
if not duplicated:
filtered_subqueries.append(subquery)
return filtered_subqueries
[docs]def run_queries(
metric: Type[BaseMetric],
queries: List[Query],
models: List[WordEmbeddingModel],
queries_set_name: str = "Unnamed queries set",
lost_vocabulary_threshold: float = 0.2,
metric_params: dict = {},
generate_subqueries: bool = False,
aggregate_results: bool = False,
aggregation_function: Union[str, Callable] = "abs_avg",
return_only_aggregation: bool = False,
warn_not_found_words: bool = False,
) -> pd.DataFrame:
"""Run several queries over a several word embedding models using a specific metic.
Parameters
----------
metric : Type[BaseMetric]
A metric class.
queries : list
An iterable with a set of queries.
word_embeddings_models : list
An iterable with a set of word embedding pretrianed models.
queries_set_name : str, optional
The name of the set of queries or the criteria that will be tested,
by default 'Unnamed queries set'
lost_vocabulary_threshold : float, optional
The threshold that will be passed to the , by default 0.2
metric_params : dict, optional
A dict with custom params that will passed to run_query method of the
respective metric, by default {}
generate_subqueries: bool, optional
It indicates if the program, when detecting queries with a bigger
template than the metric, should try to generate subqueries compatible
with it.
If any query is compatible with the metric template, then it appends
the same query.
DANGER: This may cause some comparisons to become meaningless when
comparing biases that are not compatible with each other.
By default, False.
aggregate_results : bool, optional
A boolean that indicates if the results must be aggregated with some
function.
aggregation_function : Union[str, Callable], optional
The function that will be applied row by row to add the results.
It must be pandas row compatible operation.
Implemented functions: 'sum', 'abs_sub', 'avg' and 'abs_avg',
by default 'abs_avg'.
return_only_aggregation : bool, optional
If return_only_aggregation is True, only the column with the added queries is
returned, by default False.
Returns
-------
pd.DataFrame
A dataframe with the results. The index contains the word embedding model name
and the columns the experiment name.
Each cell represents the result of run a metric using a specific word
embedding model and query.
"""
# check inputs:
# metric handling (TODO: issubclass not working...)
# if not issubclass(metric, BaseMetric):
# raise Exception('metric parameter must be instance of BaseMetric')
# queries handling
if not isinstance(queries, (list, np.ndarray)):
raise TypeError(
"queries parameter must be a list or a numpy array. given: {}".format(
queries
)
)
if len(queries) == 0:
raise Exception(
"queries list must have at least one query instance. given: {}".format(
queries
)
)
for idx, query in enumerate(queries):
if query is None or not isinstance(query, Query):
raise TypeError(
"item on index {} must be a Query instance. given: {}".format(
idx, query
)
)
# word vectors wrappers handling
if not isinstance(models, (list, np.ndarray)):
raise TypeError(
"word_embeddings_models parameter must be a list or a numpy array."
" given: {}".format(models)
)
if len(models) == 0:
raise Exception(
"word_embeddings_models parameter must be a non empty list or "
"numpy array. given: {}".format(models)
)
for idx, model in enumerate(models):
if model is None or not isinstance(model, WordEmbeddingModel):
raise TypeError(
"item on index {} must be a WordEmbeddingModel instance. "
"given: {}".format(idx, model)
)
# experiment name handling
if not isinstance(queries_set_name, str) or queries_set_name == "":
raise TypeError(
"When queries_set_name parameter is provided, it must be a "
"non-empty string. given: {}".format(queries_set_name)
)
# metric_params handling
if not isinstance(metric_params, dict):
raise TypeError(
"run_experiment_params must be a dict with a params for the metric"
)
# aggregate results bool
if not isinstance(aggregate_results, bool):
raise TypeError(
"aggregate_results parameter must be a bool value. Given:"
"{}".format(aggregate_results)
)
# aggregation function:
AGG_FUNCTION_MSG = (
"aggregation_function must be one of 'sum',"
"abs_sum', 'avg', 'abs_avg' or a callable. given: {}"
)
if isinstance(aggregation_function, str):
if aggregation_function not in ["sum", "abs_sum", "avg", "abs_avg"]:
raise Exception(AGG_FUNCTION_MSG.format(aggregation_function))
elif not callable(aggregation_function):
raise Exception(AGG_FUNCTION_MSG.format(aggregation_function))
# average_with_abs_values handling
if not isinstance(return_only_aggregation, bool):
raise TypeError(
"return_only_aggregation param must be boolean. Given: {}".format(
return_only_aggregation
)
)
if generate_subqueries:
queries = generate_subqueries_from_queries_list(metric, queries)
metric_instance = metric()
results = []
query_names = []
try:
for query in queries:
for model in models:
result = metric_instance.run_query(
query,
model,
lost_vocabulary_threshold=lost_vocabulary_threshold,
warn_not_found_words=warn_not_found_words,
**metric_params,
)
result["model_name"] = model.name
results.append(result)
if result["query_name"] not in query_names:
query_names.append(result["query_name"])
except Exception as e:
raise Exception(
"Error during executing the query {} on the model {}: {}".format(
query.query_name, model.name, str(e)
)
)
# get original column order
# reorder the results in a legible table
pivoted_results = pd.DataFrame(results).pivot(
index="model_name", columns="query_name", values="result"
)
pivoted_results = pivoted_results.reindex(
index=[model.name for model in models],
columns=query_names,
)
if aggregate_results:
# if the aggregation function is one of the preimplemented functions.
if (
isinstance(aggregation_function, str)
and aggregation_function in AGGREGATION_FUNCTIONS
):
aggregated_results = AGGREGATION_FUNCTIONS[aggregation_function](
pivoted_results
)
aggregated_results_name = AGGREGATION_FUNCTION_NAMES[aggregation_function]
# run the custom aggregation function over the pivoted results
else:
aggregated_results = aggregation_function(pivoted_results)
aggregated_results_name = "custom aggregation"
# generate the new aggregation column name.
aggregation_column_name = "{}: {} {} score".format(
metric_instance.metric_short_name, queries_set_name, aggregated_results_name
)
# set the aggregation column name.
aggregated_results = pd.DataFrame(
aggregated_results, columns=[aggregation_column_name]
)
# return option with only aggregation.
if return_only_aggregation:
return aggregated_results
results = pd.concat([pivoted_results, aggregated_results], axis=1)
return results
return pivoted_results
# -----------------------------------------------------------------------------
# ----------------------------------- Plots -----------------------------------
# -----------------------------------------------------------------------------
[docs]def plot_queries_results(results: pd.DataFrame, by: str = "query") -> go.Figure:
"""Plot the results obtained by a run_queries execution.
Parameters
----------
results : pd.DataFrame
A dataframe that contains the result of having executed run_queries
with a set of queries and word embeddings.
by : {'query', 'model'}, optional
The aggregation function , by default 'query
Returns
-------
plotly.Figure
A Figure that contains the generated graphic.
Raises
------
TypeError
if results is not a instance of pandas DataFrame.
"""
if not isinstance(results, pd.DataFrame):
raise TypeError(
"results must be a pandas DataFrame, result of having executed "
"running_queries. Given: {}".format(results)
)
results_copy = results.copy(deep=True)
if by == "model":
results_copy = results_copy
else:
results_copy = results_copy.T
results_copy["query_name"] = results_copy.index
cols = results_copy.columns
id_vars = ["query_name"]
values_vars = [col_name for col_name in cols if col_name not in id_vars]
# melt the dataframe
melted_results = pd.melt(
results_copy,
id_vars=id_vars,
value_vars=values_vars,
var_name="Word Embedding Model",
)
# configure the plot
xaxis_title = "Model" if by == "model" else "Query"
fig = px.bar(
melted_results,
x="query_name",
y="value",
color="Word Embedding Model",
barmode="group",
)
fig.update_layout(
xaxis_title=xaxis_title,
yaxis_title="Bias measure",
)
fig.for_each_trace(
lambda t: t.update(x=["wrt<br>".join(label.split("wrt")) for label in t.x])
)
# fig.show()
return fig
# -----------------------------------------------------------------------------
# --------------------------------- Rankings ----------------------------------
# -----------------------------------------------------------------------------
[docs]def create_ranking(
results_dataframes: List[pd.DataFrame],
method: str = "first",
ascending: bool = True,
) -> pd.DataFrame:
"""Create a ranking form the aggregated scores of the provided dataframes.
The function will assume that the aggregated scores are in the last column
of each result dataframe.
It uses pandas.DataFrame.rank to generate the ranks.
Parameters
----------
results_dataframes : List[pd.DataFrame]
A list or array of dataframes returned by the run_queries function.
method : str, optional
How to rank the group of records that have the same value, by default 'first'.
The options are:
- average: average rank of the group
- min: lowest rank in the group
- max: highest rank in the group
- first: ranks assigned in order they appear in the array
- dense: like ‘min’, but rank always increases by 1 between groups.
ascending : bool, optional
Whether or not the elements should be ranked in ascending order,
by default True.
Returns
-------
pd.DataFrame
A dataframe with the ranked scores.
Raises
------
Exception
If there is no average column in some result Dataframe.
TypeError
If some element of results_dataframes is not a pandas DataFrame.
"""
# check the input.
for idx, results_df in enumerate(results_dataframes):
if not isinstance(results_df, pd.DataFrame):
raise TypeError(
"All elements of results_dataframes must be a pandas "
"Dataframe instance. Got {} at position {}".format(
type(results_df), idx
)
)
# get the avg_scores columns and merge into one dataframe
aggregation_columns: List[pd.DataFrame] = []
for result in results_dataframes:
aggregation_columns.append(result[result.columns[-1]])
# check for duplicated column names
column_names = pd.Series([series.name for series in aggregation_columns])
duplicated_names = column_names[column_names.duplicated(keep="first")]
no_duplicated_column_names = copy.copy(column_names)
for duplicated_name in duplicated_names:
count = 0
for idx, name in enumerate(no_duplicated_column_names):
if name == duplicated_name:
no_duplicated_column_names[idx] = "{} ({})".format(name, idx + 1)
count += 1
avg_scores = pd.concat(aggregation_columns, axis=1)
avg_scores.columns = no_duplicated_column_names
rankings = avg_scores.rank(method=method, ascending=ascending)
return rankings
def _melt_df(results: pd.DataFrame) -> pd.DataFrame:
results = results.copy()
results["exp_name"] = results.index
id_vars = ["exp_name"]
cols = results.columns
values_vars = [col_name for col_name in cols if col_name not in id_vars]
melted_results = pd.melt(
results, id_vars=id_vars, value_vars=values_vars, var_name="Metric"
)
melted_results.columns = ["Embedding model", "Metric", "Ranking"]
return melted_results
[docs]def plot_ranking(
ranking: pd.DataFrame,
use_metric_as_facet: bool = False,
) -> go.Figure:
melted_ranking = _melt_df(ranking.copy(deep=True))
if use_metric_as_facet:
fig = px.bar(
melted_ranking,
x="Ranking",
y="Embedding model",
barmode="stack",
color="Metric",
orientation="h",
facet_col="Metric",
)
else:
fig = px.bar(
melted_ranking,
x="Ranking",
y="Embedding model",
barmode="stack",
color="Metric",
orientation="h",
)
fig.update_layout(yaxis={"categoryorder": "total ascending"})
fig.update_layout(showlegend=False)
fig.update_yaxes(title_text="")
fig.update_yaxes(tickfont={"size": 10})
# fig.for_each_trace(lambda t: t.update(name=t.name.split('=')[1]))
fig.for_each_annotation(lambda a: a.update(text=a.text.split("=")[1]))
return fig
# -----------------------------------------------------------------------------
# ------------------------------- Correlations --------------------------------
# -----------------------------------------------------------------------------
[docs]def calculate_ranking_correlations(
rankings: pd.DataFrame,
method: str = "spearman",
) -> pd.DataFrame:
"""Calculate the correlation between the calculated rankings.
It uses pandas corr() method to calculate the correlations.
The method parameter documentarion was copied from the documentation of the pandas
DataFrame.corr() method.
To see the updated documentation, visit:
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.corr.html
Parameters
----------
rankings : pd.DataFrame
DataFrame that contains the calculated rankings.
method : {'pearson', 'kendall', 'spearman'} or callable
Correlation type:
- pearson : standard correlation coefficient
- kendall : Kendall Tau correlation coefficient
- spearman : Spearman rank correlation
- callable: callable with input two 1d ndarrays and returning a float.
Returns
-------
pd.DataFrame
A dataframe with the calculated correlations.
"""
if not isinstance(rankings, pd.DataFrame):
raise TypeError(
"rankings parameter must be a pandas DataFrame result of having "
"executed create_rankings. Given: {}".format(rankings)
)
correlation_matrix = rankings.corr(method=method)
return correlation_matrix
[docs]def plot_ranking_correlations(
correlation_matrix: pd.DataFrame,
title: str = "",
) -> go.Figure:
fig = go.Figure(
data=go.Heatmap(
z=correlation_matrix,
x=correlation_matrix.columns,
y=correlation_matrix.index,
hoverongaps=False,
zmin=0.0,
zmax=1,
colorscale="Darkmint",
)
)
fig.update_layout(title=title, font={"color": "#000000"})
return fig
[docs]def load_test_model() -> WordEmbeddingModel:
"""Load a Word2vec subset to test metrics and debias methods.
Returns
-------
WordEmbeddingModel
The loaded model
"""
from gensim.models import KeyedVectors
# load dummy weat word vectors:
resource_package = __name__
resource_path = "/".join(("datasets", "data", "test_model.kv"))
weat_w2v_path = pkg_resources.resource_filename(resource_package, resource_path)
test_model = KeyedVectors.load(weat_w2v_path)
return WordEmbeddingModel(test_model, "test_w2v")
def print_doc_table(df: pd.DataFrame) -> None:
from tabulate import tabulate
df_2 = df.reset_index()
print(tabulate(df_2, headers=df_2.columns, tablefmt="rst", showindex=False))
def save_doc_image(fig: go.Figure, name: str) -> None:
fig.write_image(f"./doc/images/{name}.png", width=1200, height=600, scale=3)
[docs]def flair_to_gensim(flair_embedding) -> KeyedVectors:
# load model from flair
# hack to transform pytorch embedding to gensim's KeyedVectors
keyed_vectors = KeyedVectors(vector_size=flair_embedding.embedding_length)
keyed_vectors.add_vectors(
keys=list(flair_embedding.vocab.keys()),
weights=flair_embedding.embedding.weight.numpy()[:-1, :],
)
return keyed_vectors