겨울맞이 대박할인
분석시각화 대회 코드 공유 게시물은
내용 확인 후
좋아요(투표) 가능합니다.
[수정]평가 산식 코드
안녕하세요. 데이콘입니다.
본 대회에서 사용되는 평가 코드입니다.
감사합니다.
데이콘 드림.
import argparse
import pandas as pd
from tqdm import tqdm
from rouge_metric import Rouge
class RougeScorer:
def __init__(self):
self.rouge_evaluator = Rouge(
metrics=["rouge-n", "rouge-l"],
max_n=2,
limit_length=True,
length_limit=1000,
length_limit_type="words",
use_tokenizer=True,
apply_avg=True,
apply_best=False,
alpha=0.5, # Default F1_score
weight_factor=1.2,
)
def compute_rouge(self, ref_df, hyp_df):
#ref_df = pd.read_csv(ref_path)
#hyp_df = pd.read_csv(hyp_path)
hyp_df.iloc[:,1] = hyp_df.iloc[:,1].fillna(' ')
ids = ref_df['id']
hyp_df = hyp_df[hyp_df['id'].isin(ids)]
hyp_df.index = ref_df.index
ref_df = ref_df.sort_values(by=["id"])
hyp_df = hyp_df.sort_values(by=["id"])
ref_df["id"] = ref_df["id"].astype(int)
hyp_df["id"] = hyp_df["id"].astype(int)
hyps = [tuple(row) for row in hyp_df.values]
refs = [tuple(row) for row in ref_df.values]
reference_summaries = []
generated_summaries = []
for ref_tp, hyp_tp in zip(refs, hyps):
ref_id, ref = ref_tp
hyp_id, hyp = hyp_tp
assert ref_id == hyp_id
reference_summaries.append(ref)
generated_summaries.append(hyp)
scores = self.rouge_evaluator.get_scores(generated_summaries, reference_summaries)
str_scores = self.format_rouge_scores(scores)
#self.save_rouge_scores(str_scores)
return str_scores
def save_rouge_scores(self, str_scores):
with open("rouge_scores.txt", "w") as output:
output.write(str_scores)
def format_rouge_scores(self, scores):
return "{:.3f},{:.3f},{:.3f}".format(
scores["rouge-1"]["f"],
scores["rouge-2"]["f"],
scores["rouge-l"]["f"],
)
rouge_metric.py
import os
import re
import platform
import itertools
import collections
import pkg_resources # pip install py-rouge
from io import open
if platform.system() == "Windows":
try:
from eunjeon import Mecab
except:
print("please install eunjeon module")
else: # Ubuntu일 경우
from konlpy.tag import Mecab
class Rouge:
DEFAULT_METRICS = {"rouge-n"}
DEFAULT_N = 1
STATS = ["f", "p", "r"]
AVAILABLE_METRICS = {"rouge-n", "rouge-l", "rouge-w"}
AVAILABLE_LENGTH_LIMIT_TYPES = {"words", "bytes"}
REMOVE_CHAR_PATTERN = re.compile("[^A-Za-z0-9가-힣]")
def __init__(
self,
metrics=None,
max_n=None,
limit_length=True,
length_limit=1000,
length_limit_type="words",
apply_avg=True,
apply_best=False,
use_tokenizer=True,
alpha=0.5,
weight_factor=1.0,
):
self.metrics = metrics[:] if metrics is not None else Rouge.DEFAULT_METRICS
for m in self.metrics:
if m not in Rouge.AVAILABLE_METRICS:
raise ValueError("Unknown metric '{}'".format(m))
self.max_n = max_n if "rouge-n" in self.metrics else None
# Add all rouge-n metrics
if self.max_n is not None:
index_rouge_n = self.metrics.index("rouge-n")
del self.metrics[index_rouge_n]
self.metrics += ["rouge-{}".format(n) for n in range(1, self.max_n + 1)]
self.metrics = set(self.metrics)
self.limit_length = limit_length
if self.limit_length:
if length_limit_type not in Rouge.AVAILABLE_LENGTH_LIMIT_TYPES:
raise ValueError("Unknown length_limit_type '{}'".format(length_limit_type))
self.length_limit = length_limit
if self.length_limit == 0:
self.limit_length = False
self.length_limit_type = length_limit_type
self.use_tokenizer = use_tokenizer
if use_tokenizer:
self.tokenizer = Mecab()
self.apply_avg = apply_avg
self.apply_best = apply_best
self.alpha = alpha
self.weight_factor = weight_factor
if self.weight_factor <= 0:
raise ValueError("ROUGE-W weight factor must greater than 0.")
def tokenize_text(self, text):
if self.use_tokenizer:
return self.tokenizer.morphs(text)
else:
return text
@staticmethod
def split_into_sentences(text):
return text.split("\n")
@staticmethod
def _get_ngrams(n, text):
ngram_set = collections.defaultdict(int)
max_index_ngram_start = len(text) - n
for i in range(max_index_ngram_start + 1):
ngram_set[tuple(text[i : i + n])] += 1
return ngram_set
@staticmethod
def _split_into_words(sentences):
return list(itertools.chain(*[_.split() for _ in sentences]))
@staticmethod
def _get_word_ngrams_and_length(n, sentences):
assert len(sentences) > 0
assert n > 0
tokens = Rouge._split_into_words(sentences)
return Rouge._get_ngrams(n, tokens), tokens, len(tokens) - (n - 1)
@staticmethod
def _get_unigrams(sentences):
assert len(sentences) > 0
tokens = Rouge._split_into_words(sentences)
unigram_set = collections.defaultdict(int)
for token in tokens:
unigram_set[token] += 1
return unigram_set, len(tokens)
@staticmethod
def _compute_p_r_f_score(
evaluated_count,
reference_count,
overlapping_count,
alpha=0.5,
weight_factor=1.0,
):
precision = 0.0 if evaluated_count == 0 else overlapping_count / float(evaluated_count)
if weight_factor != 1.0:
precision = precision ** (1.0 / weight_factor)
recall = 0.0 if reference_count == 0 else overlapping_count / float(reference_count)
if weight_factor != 1.0:
recall = recall ** (1.0 / weight_factor)
f1_score = Rouge._compute_f_score(precision, recall, alpha)
return {"f": f1_score, "p": precision, "r": recall}
@staticmethod
def _compute_f_score(precision, recall, alpha=0.5):
return (
0.0
if (recall == 0.0 or precision == 0.0)
else precision * recall / ((1 - alpha) * precision + alpha * recall)
)
@staticmethod
def _compute_ngrams(evaluated_sentences, reference_sentences, n):
if len(evaluated_sentences) <= 0 or len(reference_sentences) <= 0:
raise ValueError("Collections must contain at least 1 sentence.")
evaluated_ngrams, _, evaluated_count = Rouge._get_word_ngrams_and_length(
n, evaluated_sentences
)
reference_ngrams, _, reference_count = Rouge._get_word_ngrams_and_length(
n, reference_sentences
)
# Gets the overlapping ngrams between evaluated and reference
overlapping_ngrams = set(evaluated_ngrams.keys()).intersection(set(reference_ngrams.keys()))
overlapping_count = 0
for ngram in overlapping_ngrams:
overlapping_count += min(evaluated_ngrams[ngram], reference_ngrams[ngram])
return evaluated_count, reference_count, overlapping_count
@staticmethod
def _compute_ngrams_lcs(evaluated_sentences, reference_sentences, weight_factor=1.0):
def _lcs(x, y):
m = len(x)
n = len(y)
vals = collections.defaultdict(int)
dirs = collections.defaultdict(int)
for i in range(1, m + 1):
for j in range(1, n + 1):
if x[i - 1] == y[j - 1]:
vals[i, j] = vals[i - 1, j - 1] + 1
dirs[i, j] = "|"
elif vals[i - 1, j] >= vals[i, j - 1]:
vals[i, j] = vals[i - 1, j]
dirs[i, j] = "^"
else:
vals[i, j] = vals[i, j - 1]
dirs[i, j] = "<"
return vals, dirs
def _wlcs(x, y, weight_factor):
m = len(x)
n = len(y)
vals = collections.defaultdict(float)
dirs = collections.defaultdict(int)
lengths = collections.defaultdict(int)
for i in range(1, m + 1):
for j in range(1, n + 1):
if x[i - 1] == y[j - 1]:
length_tmp = lengths[i - 1, j - 1]
vals[i, j] = (
vals[i - 1, j - 1]
+ (length_tmp + 1) ** weight_factor
- length_tmp ** weight_factor
)
dirs[i, j] = "|"
lengths[i, j] = length_tmp + 1
elif vals[i - 1, j] >= vals[i, j - 1]:
vals[i, j] = vals[i - 1, j]
dirs[i, j] = "^"
lengths[i, j] = 0
else:
vals[i, j] = vals[i, j - 1]
dirs[i, j] = "<"
lengths[i, j] = 0
return vals, dirs
def _mark_lcs(mask, dirs, m, n):
while m != 0 and n != 0:
if dirs[m, n] == "|":
m -= 1
n -= 1
mask[m] = 1
elif dirs[m, n] == "^":
m -= 1
elif dirs[m, n] == "<":
n -= 1
else:
raise UnboundLocalError("Illegal move")
return mask
if len(evaluated_sentences) <= 0 or len(reference_sentences) <= 0:
raise ValueError("Collections must contain at least 1 sentence.")
evaluated_unigrams_dict, evaluated_count = Rouge._get_unigrams(evaluated_sentences)
reference_unigrams_dict, reference_count = Rouge._get_unigrams(reference_sentences)
# Has to use weight factor for WLCS
use_WLCS = weight_factor != 1.0
if use_WLCS:
evaluated_count = evaluated_count ** weight_factor
reference_count = 0
overlapping_count = 0.0
for reference_sentence in reference_sentences:
reference_sentence_tokens = reference_sentence.split()
if use_WLCS:
reference_count += len(reference_sentence_tokens) ** weight_factor
hit_mask = [0 for _ in range(len(reference_sentence_tokens))]
for evaluated_sentence in evaluated_sentences:
evaluated_sentence_tokens = evaluated_sentence.split()
if use_WLCS:
_, lcs_dirs = _wlcs(
reference_sentence_tokens,
evaluated_sentence_tokens,
weight_factor,
)
else:
_, lcs_dirs = _lcs(reference_sentence_tokens, evaluated_sentence_tokens)
_mark_lcs(
hit_mask,
lcs_dirs,
len(reference_sentence_tokens),
len(evaluated_sentence_tokens),
)
overlapping_count_length = 0
for ref_token_id, val in enumerate(hit_mask):
if val == 1:
token = reference_sentence_tokens[ref_token_id]
if evaluated_unigrams_dict[token] > 0 and reference_unigrams_dict[token] > 0:
evaluated_unigrams_dict[token] -= 1
reference_unigrams_dict[ref_token_id] -= 1
if use_WLCS:
overlapping_count_length += 1
if (
ref_token_id + 1 < len(hit_mask) and hit_mask[ref_token_id + 1] == 0
) or ref_token_id + 1 == len(hit_mask):
overlapping_count += overlapping_count_length ** weight_factor
overlapping_count_length = 0
else:
overlapping_count += 1
if use_WLCS:
reference_count = reference_count ** weight_factor
return evaluated_count, reference_count, overlapping_count
def get_scores(self, hypothesis, references):
if isinstance(hypothesis, str):
hypothesis, references = [hypothesis], [references]
if type(hypothesis) != type(references):
raise ValueError("'hyps' and 'refs' are not of the same type")
if len(hypothesis) != len(references):
raise ValueError("'hyps' and 'refs' do not have the same length")
scores = {}
has_rouge_n_metric = (
len([metric for metric in self.metrics if metric.split("-")[-1].isdigit()]) > 0
)
if has_rouge_n_metric:
scores.update(self._get_scores_rouge_n(hypothesis, references))
# scores = {**scores, **self._get_scores_rouge_n(hypothesis, references)}
has_rouge_l_metric = (
len([metric for metric in self.metrics if metric.split("-")[-1].lower() == "l"]) > 0
)
if has_rouge_l_metric:
scores.update(self._get_scores_rouge_l_or_w(hypothesis, references, False))
# scores = {**scores, **self._get_scores_rouge_l_or_w(hypothesis, references, False)}
has_rouge_w_metric = (
len([metric for metric in self.metrics if metric.split("-")[-1].lower() == "w"]) > 0
)
if has_rouge_w_metric:
scores.update(self._get_scores_rouge_l_or_w(hypothesis, references, True))
# scores = {**scores, **self._get_scores_rouge_l_or_w(hypothesis, references, True)}
return scores
def _get_scores_rouge_n(self, all_hypothesis, all_references):
metrics = [metric for metric in self.metrics if metric.split("-")[-1].isdigit()]
if self.apply_avg or self.apply_best:
scores = {metric: {stat: 0.0 for stat in Rouge.STATS} for metric in metrics}
else:
scores = {
metric: [{stat: [] for stat in Rouge.STATS} for _ in range(len(all_hypothesis))]
for metric in metrics
}
for sample_id, (hypothesis, references) in enumerate(zip(all_hypothesis, all_references)):
assert isinstance(hypothesis, str)
has_multiple_references = False
if isinstance(references, list):
has_multiple_references = len(references) > 1
if not has_multiple_references:
references = references[0]
# Prepare hypothesis and reference(s)
hypothesis = self._preprocess_summary_as_a_whole(hypothesis)
references = (
[self._preprocess_summary_as_a_whole(reference) for reference in references]
if has_multiple_references
else [self._preprocess_summary_as_a_whole(references)]
)
# Compute scores
for metric in metrics:
suffix = metric.split("-")[-1]
n = int(suffix)
# Aggregate
if self.apply_avg:
# average model
total_hypothesis_ngrams_count = 0
total_reference_ngrams_count = 0
total_ngrams_overlapping_count = 0
for reference in references:
(
hypothesis_count,
reference_count,
overlapping_ngrams,
) = Rouge._compute_ngrams(hypothesis, reference, n)
total_hypothesis_ngrams_count += hypothesis_count
total_reference_ngrams_count += reference_count
total_ngrams_overlapping_count += overlapping_ngrams
score = Rouge._compute_p_r_f_score(
total_hypothesis_ngrams_count,
total_reference_ngrams_count,
total_ngrams_overlapping_count,
self.alpha,
)
for stat in Rouge.STATS:
scores[metric][stat] += score[stat]
else:
# Best model
if self.apply_best:
best_current_score = None
for reference in references:
(
hypothesis_count,
reference_count,
overlapping_ngrams,
) = Rouge._compute_ngrams(hypothesis, reference, n)
score = Rouge._compute_p_r_f_score(
hypothesis_count,
reference_count,
overlapping_ngrams,
self.alpha,
)
if best_current_score is None or score["r"] > best_current_score["r"]:
best_current_score = score
for stat in Rouge.STATS:
scores[metric][stat] += best_current_score[stat]
# Keep all
else:
for reference in references:
(
hypothesis_count,
reference_count,
overlapping_ngrams,
) = Rouge._compute_ngrams(hypothesis, reference, n)
score = Rouge._compute_p_r_f_score(
hypothesis_count,
reference_count,
overlapping_ngrams,
self.alpha,
)
for stat in Rouge.STATS:
scores[metric][sample_id][stat].append(score[stat])
# Compute final score with the average or the the max
if (self.apply_avg or self.apply_best) and len(all_hypothesis) > 1:
for metric in metrics:
for stat in Rouge.STATS:
scores[metric][stat] /= len(all_hypothesis)
return scores
def _get_scores_rouge_l_or_w(self, all_hypothesis, all_references, use_w=False):
metric = "rouge-w" if use_w else "rouge-l"
if self.apply_avg or self.apply_best:
scores = {metric: {stat: 0.0 for stat in Rouge.STATS}}
else:
scores = {
metric: [{stat: [] for stat in Rouge.STATS} for _ in range(len(all_hypothesis))]
}
for sample_id, (hypothesis_sentences, references_sentences) in enumerate(
zip(all_hypothesis, all_references)
):
assert isinstance(hypothesis_sentences, str)
has_multiple_references = False
if isinstance(references_sentences, list):
has_multiple_references = len(references_sentences) > 1
if not has_multiple_references:
references_sentences = references_sentences[0]
# Prepare hypothesis and reference(s)
hypothesis_sentences = self._preprocess_summary_per_sentence(hypothesis_sentences)
references_sentences = (
[
self._preprocess_summary_per_sentence(reference)
for reference in references_sentences
]
if has_multiple_references
else [self._preprocess_summary_per_sentence(references_sentences)]
)
# Compute scores
# Aggregate
if self.apply_avg:
# average model
total_hypothesis_ngrams_count = 0
total_reference_ngrams_count = 0
total_ngrams_overlapping_count = 0
for reference_sentences in references_sentences:
(
hypothesis_count,
reference_count,
overlapping_ngrams,
) = Rouge._compute_ngrams_lcs(
hypothesis_sentences,
reference_sentences,
self.weight_factor if use_w else 1.0,
)
total_hypothesis_ngrams_count += hypothesis_count
total_reference_ngrams_count += reference_count
total_ngrams_overlapping_count += overlapping_ngrams
score = Rouge._compute_p_r_f_score(
total_hypothesis_ngrams_count,
total_reference_ngrams_count,
total_ngrams_overlapping_count,
self.alpha,
self.weight_factor if use_w else 1.0,
)
for stat in Rouge.STATS:
scores[metric][stat] += score[stat]
else:
# Best model
if self.apply_best:
best_current_score = None
best_current_score_wlcs = None
for reference_sentences in references_sentences:
(
hypothesis_count,
reference_count,
overlapping_ngrams,
) = Rouge._compute_ngrams_lcs(
hypothesis_sentences,
reference_sentences,
self.weight_factor if use_w else 1.0,
)
score = Rouge._compute_p_r_f_score(
total_hypothesis_ngrams_count,
total_reference_ngrams_count,
total_ngrams_overlapping_count,
self.alpha,
self.weight_factor if use_w else 1.0,
)
if use_w:
reference_count_for_score = reference_count ** (
1.0 / self.weight_factor
)
overlapping_ngrams_for_score = overlapping_ngrams
score_wlcs = (
overlapping_ngrams_for_score / reference_count_for_score
) ** (1.0 / self.weight_factor)
if (
best_current_score_wlcs is None
or score_wlcs > best_current_score_wlcs
):
best_current_score = score
best_current_score_wlcs = score_wlcs
else:
if best_current_score is None or score["r"] > best_current_score["r"]:
best_current_score = score
for stat in Rouge.STATS:
scores[metric][stat] += best_current_score[stat]
# Keep all
else:
for reference_sentences in references_sentences:
(
hypothesis_count,
reference_count,
overlapping_ngrams,
) = Rouge._compute_ngrams_lcs(
hypothesis_sentences,
reference_sentences,
self.weight_factor if use_w else 1.0,
)
score = Rouge._compute_p_r_f_score(
hypothesis_count,
reference_count,
overlapping_ngrams,
self.alpha,
self.weight_factor,
)
for stat in Rouge.STATS:
scores[metric][sample_id][stat].append(score[stat])
# Compute final score with the average or the the max
if (self.apply_avg or self.apply_best) and len(all_hypothesis) > 1:
for stat in Rouge.STATS:
scores[metric][stat] /= len(all_hypothesis)
return scores
def _preprocess_summary_as_a_whole(self, summary):
sentences = Rouge.split_into_sentences(summary)
# Truncate
if self.limit_length:
# By words
if self.length_limit_type == "words":
summary = " ".join(sentences)
all_tokens = summary.split() # Counting as in the perls script
summary = " ".join(all_tokens[: self.length_limit])
# By bytes
elif self.length_limit_type == "bytes":
summary = ""
current_len = 0
for sentence in sentences:
sentence = sentence.strip()
sentence_len = len(sentence)
if current_len + sentence_len < self.length_limit:
if current_len != 0:
summary += " "
summary += sentence
current_len += sentence_len
else:
if current_len > 0:
summary += " "
summary += sentence[: self.length_limit - current_len]
break
else:
summary = " ".join(sentences)
summary = Rouge.REMOVE_CHAR_PATTERN.sub(" ", summary.lower()).strip()
tokens = self.tokenize_text(Rouge.REMOVE_CHAR_PATTERN.sub(" ", summary))
preprocessed_summary = [" ".join(tokens)]
return preprocessed_summary
def _preprocess_summary_per_sentence(self, summary):
sentences = Rouge.split_into_sentences(summary)
# Truncate
if self.limit_length:
final_sentences = []
current_len = 0
# By words
if self.length_limit_type == "words":
for sentence in sentences:
tokens = sentence.strip().split()
tokens_len = len(tokens)
if current_len + tokens_len < self.length_limit:
sentence = " ".join(tokens)
final_sentences.append(sentence)
current_len += tokens_len
else:
sentence = " ".join(tokens[: self.length_limit - current_len])
final_sentences.append(sentence)
break
# By bytes
elif self.length_limit_type == "bytes":
for sentence in sentences:
sentence = sentence.strip()
sentence_len = len(sentence)
if current_len + sentence_len < self.length_limit:
final_sentences.append(sentence)
current_len += sentence_len
else:
sentence = sentence[: self.length_limit - current_len]
final_sentences.append(sentence)
break
sentences = final_sentences
final_sentences = []
for sentence in sentences:
sentence = Rouge.REMOVE_CHAR_PATTERN.sub(" ", sentence.lower()).strip()
tokens = self.tokenize_text(Rouge.REMOVE_CHAR_PATTERN.sub(" ", sentence))
sentence = " ".join(tokens)
final_sentences.append(sentence)
return final_sentences
데이콘(주) | 대표 김국진 | 699-81-01021
통신판매업 신고번호: 제 2021-서울영등포-1704호
직업정보제공사업 신고번호: J1204020250004
서울특별시 영등포구 은행로 3 익스콘벤처타워 901호
이메일 dacon@dacon.io |
전화번호: 070-4102-0545
Copyright ⓒ DACON Inc. All rights reserved