diff --git a/bin/build_binary b/bin/build_binary new file mode 100755 index 0000000..c5e9431 Binary files /dev/null and b/bin/build_binary differ diff --git a/bin/lmplz b/bin/lmplz new file mode 100755 index 0000000..a7072aa Binary files /dev/null and b/bin/lmplz differ diff --git a/swh/langdetect/ngramdist.py b/swh/langdetect/ngramdist.py index ff13c89..46ed397 100644 --- a/swh/langdetect/ngramdist.py +++ b/swh/langdetect/ngramdist.py @@ -1,244 +1,243 @@ """ Baseline approach """ import os, sys, operator, nltk, random, time from pickle import dump, load from nltk.util import ngrams from utils.common import tokenizer, file_to_string, find_file, count_files from utils.training import build_training_set class NGramDist: def __init__(self, root): # Root of dataset self._root = root # Root of training set self._root_training_set = os.path.join(self._root, '..', 'training_set') # Root of model folder self._root_model = os.path.join(self._root, '..', 'model_ngram_dist') # Root of arranged dataset self._root_language_dataset = os.path.join(self._root, '..', 'code_by_language') # Path of result self._path_result = os.path.join(self._root, '..', 'result') def train(self): ''' train () generates and stores counted n-grams in '_root_model' folder ''' try: if len(os.listdir(self._root_training_set)) == 0: build_training_set(self._root) try: os.mkdir(self._root_model) except FileExistsError: pass except FileNotFoundError: os.mkdir(self._root_training_set) build_training_set(self._root) ''' Calculate frequencies of generated n-grams then store them into a sorted list of (ngram, count) ''' for language in os.listdir(self._root_training_set): if not language.startswith('.'): root_training_set_language = os.path.join(self._root_training_set, language) root_stat_language = os.path.join(self._root_model, language) if os.path.isfile(root_stat_language): continue - else: - statistics = {} - for f in os.listdir(root_training_set_language): - print(f) - if not f.startswith('.'): - filename = os.path.join(root_training_set_language, f) - tokens = tokenizer(file_to_string(filename)) - generated_ngrams = self._generate_ngrams(tokens, 3) - self._count_ngrams(statistics, generated_ngrams) - with open(root_stat_language, 'wb') as f: - dump(self._sort_by_value(statistics), f) + statistics = {} + for f in os.listdir(root_training_set_language): + print(f) + if not f.startswith('.'): + filename = os.path.join(root_training_set_language, f) + tokens = tokenizer(file_to_string(filename), 'letter') + generated_ngrams = self._generate_ngrams(tokens, 3) + self._count_ngrams(statistics, generated_ngrams) + with open(root_stat_language, 'wb') as f: + dump(self._sort_by_value(statistics), f) def _generate_ngrams(self, tokens, n): ''' :param tokens: generated tokens from a string. :param n: maximum n of n-grams :type tokens: list :type n: int :return: generated 1-grams, ... , n-grams :rtype: list ''' generated_ngrams = [] for i in range(1, n+1): igrams = ngrams(tokens, i, pad_left=True, pad_right=True, left_pad_symbol = '$BOF$', right_pad_symbol = '$EOF$') for igram in igrams: generated_ngrams.append(''.join(igram)) return generated_ngrams def _count_ngrams(self, statistics, ngrams): ''' :param statistics: shared dictionary for statistics :param ngrams: n-grams to be accumulated into statistics ''' for ngram in ngrams: statistics[ngram] = statistics.get(ngram, 0) + 1 def test(self): test_result = {} models = self._load_models() for language in [x for x in os.listdir(self._root_language_dataset) if not x.startswith('.')]: test_result[language] = self.test_class(models, language) with open(self._path_result, 'wb') as f: dump(test_result, f) def speed_benchmark(self): language = random.choice([x for x in os.listdir(self._root_language_dataset) if not x.startswith('.')]) models = self._load_models() test_set = self._get_test_set(language) total_size = self._count_size(test_set) print('{} kB in total'.format(total_size / 1024)) t_start = time.perf_counter() self.test_class(models, language) t_end = time.perf_counter() print('{} seconds.'.format(t_end - t_start)) print('{} seconds per kB'.format(((t_end - t_start) / total_size) * 1024)) def _load_models(self): models = {} for model in [model for model in os.listdir(self._root_model) if not model.startswith('.')]: root_model = os.path.join(self._root_model, model) with open(root_model, 'rb') as sorted_file: models[model] = self._list_to_dict(load(sorted_file)) return models def _list_to_dict(self, model): model_ngrams = [x[0] for x in model] model_dict = {} index = 0 for ngram in model_ngrams: index += 1 model_dict[ngram] = index return model_dict def _get_test_set(self, language): root_training_language = os.path.join(self._root_training_set, language) root_language = os.path.join(self._root_language_dataset, language) total = count_files(root_language) training_set = [int(os.path.splitext(x)[0]) for x in os.listdir(root_training_language) if not x.startswith('.')] test_set = [find_file(root_language, x) for x in range(1, total + 1) if x not in training_set][:1000] return test_set def _count_size(self, files): size = 0 for f in files: size += os.path.getsize(f) return size def test_class(self, models, language): test_set = self._get_test_set(language) ok = 0 results = [] for test in test_set: result = self._guess_file_language(models, test) print('{} '.format(result[0]),end='\r') results.append(result[0]) if result[0][1] == language: ok += 1 total_test = len(test_set) accuracy = ok / len(test_set) print('Tests for {} '.format(language)) print('Total test files : {}'.format(total_test)) print('Correctly classified files : {}'.format(ok)) print('Accuracy : {}%'.format(accuracy * 100)) return (ok, len(test_set), accuracy, results) def test_single(self, models, filename): self._guess_file_language(models, filename) def _guess_file_language(self, models, filename): - tokens = tokenizer(file_to_string(filename)) + tokens = tokenizer(file_to_string(filename), 'letter') generated_ngrams = self._generate_ngrams(tokens, 3) statistics = {} self._count_ngrams(statistics, generated_ngrams) test_profile = self._list_to_dict(self._sort_by_value(statistics)) result = [] for model in models.keys(): root_model = os.path.join(self._root_model, model) model_profile = models[model] distance = self._distance(model_profile, test_profile) result.append((distance, model)) return sorted(result) def _sort_by_value(self, statistics): statistics_sorted = sorted(statistics.items(), key = operator.itemgetter(1), reverse = True)[:500] return statistics_sorted def _distance(self, model_profile, test_profile): distance = 0 maximum = len(test_profile) for test_ngram in test_profile.keys(): test_rank = test_profile.get(test_ngram) model_rank = model_profile.get(test_ngram, maximum) d = abs(test_rank - model_rank) distance += d return distance ''' def _prob(model, trigrams): print('Checking {} model ...'.format(model)) with open(model, 'rb') as f: kneser_ney = load(f) result = 1 for trigram in trigrams: prob = kneser_ney.prob(trigram) result = result * prob return result ''' if __name__ == '__main__': if len(sys.argv) == 3 and sys.argv[1] == '--train': n = NGramDist(sys.argv[2]) n.train() elif len(sys.argv) == 3 and sys.argv[1] == '--test': n = NGramDist(sys.argv[2]) n.test() elif len(sys.argv) == 3 and sys.argv[1] == '--benchmark': n = NGramDist(sys.argv[2]) n.speed_benchmark() elif len(sys.argv) == 4 and sys.argv[1] == '--test': n = NGramDist(sys.argv[2]) n.test_class(n.load_models(), sys.argv[3]) else: print('Wrong arguments, please check your input.') diff --git a/swh/langdetect/ngramprob.py b/swh/langdetect/ngramprob.py new file mode 100644 index 0000000..ef64e30 --- /dev/null +++ b/swh/langdetect/ngramprob.py @@ -0,0 +1,135 @@ + +import os, sys, subprocess +import kenlm + +from utils.common import tokenizer, file_to_string, find_file, count_files + +class NGramProb: + + def __init__(self, root): + # Root of dataset + self._root = root + + # Root of training set + self._root_training_set = os.path.join(self._root, '..', 'training_set') + + # Root of model folder + self._root_model = os.path.join(self._root, '..', 'model_ngram_prob') + + # Root of arranged dataset + self._root_language_dataset = os.path.join(self._root, '..', 'code_by_language') + + # Path of result + self._path_result = os.path.join(self._root, '..', 'result_prob') + + def train(self): + try: + if len(os.listdir(self._root_training_set)) == 0: + build_training_set(self._root) + try: + os.mkdir(self._root_model) + except FileExistsError: + pass + except FileNotFoundError: + os.mkdir(self._root_training_set) + build_training_set(self._root) + + for language in [x for x in os.listdir(self._root_training_set) if not x.startswith('.')]: + root_training_set_language = os.path.join(self._root_training_set, language) + texts = [] + root_stat_language = os.path.join(self._root_model, language) + if os.path.isfile(root_stat_language): + continue + + for f in [x for x in os.listdir(root_training_set_language) if not x.startswith('.')]: + filename = os.path.join(root_training_set_language, f) + tokens = tokenizer(file_to_string(filename), 'letter') + texts.append(' '.join(tokens)) + + train_text = ' '.join(texts) + command = ['../../bin/lmplz', '-o', '3', '--discount_fallback'] + + with open(root_stat_language, 'wb') as f: + proc = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=f) + proc.communicate(train_text.encode()) + if os.path.getsize(root_stat_language) == 0: + os.remove(root_stat_language) + + # st = os.stat(root_stat_language) + # os.chmod(root_stat_language, st.st_mode | stat.S_IEXEC) + + def test(self): + test_result = {} + models = self._load_models() + + for language in [x for x in os.listdir(self._root_language_dataset) if not x.startswith('.')]: + test_result[language] = self.test_class(models, language) + with open(self._path_result, 'wb') as f: + dump(test_result, f) + + def _load_models(self): + models = {} + + for model in [model + for model in os.listdir(self._root_model) + if not model.startswith('.')]: + root_model = os.path.join(self._root_model, model) + models[model] = kenlm.LanguageModel(root_model) + return models + + def _get_test_set(self, language): + root_training_language = os.path.join(self._root_training_set, language) + root_language = os.path.join(self._root_language_dataset, language) + total = count_files(root_language) + training_set = [int(os.path.splitext(x)[0]) for x in os.listdir(root_training_language) if not x.startswith('.')] + test_set = [find_file(root_language, x) for x in range(1, total + 1) if x not in training_set][:1000] + return test_set + + def test_class(self, models, language): + test_set = self._get_test_set(language) + + ok = 0 + results = [] + for test in test_set: + result = self._guess_file_language(models, test) + print('{} '.format(result[0]),end='\r') + results.append(result[0]) + if result[0][1] == language: + ok += 1 + + total_test = len(test_set) + accuracy = ok / len(test_set) + print('Tests for {} '.format(language)) + print('Total test files : {}'.format(total_test)) + print('Correctly classified files : {}'.format(ok)) + print('Accuracy : {}%'.format(accuracy * 100)) + return (ok, len(test_set), accuracy, results) + + def _guess_file_language(self, models, filename): + tokens = tokenizer(file_to_string(filename), 'letter') + text = ' '.join(tokens) + + result = [] + + for model_key in models.keys(): + root_model = os.path.join(self._root_model, model_key) + model = models[model_key] + score = model.score(text) + result.append((score, model_key)) + return sorted(result, reverse=True) + +if __name__ == '__main__': + if len(sys.argv) == 3 and sys.argv[1] == '--train': + n = NGramProb(sys.argv[2]) + n.train() + elif len(sys.argv) == 3 and sys.argv[1] == '--test': + n = NGramProb(sys.argv[2]) + n.test() + elif len(sys.argv) == 3 and sys.argv[1] == '--benchmark': + n = NGramProb(sys.argv[2]) + n.speed_benchmark() + elif len(sys.argv) == 4 and sys.argv[1] == '--test': + n = NGramProb(sys.argv[2]) + n.test_class(n.load_models(), sys.argv[3]) + else: + print('Wrong arguments, please check your input.') diff --git a/swh/langdetect/utils/common.py b/swh/langdetect/utils/common.py index 4f89523..72d3604 100644 --- a/swh/langdetect/utils/common.py +++ b/swh/langdetect/utils/common.py @@ -1,77 +1,79 @@ """ Here regroup basic preprocessing methods used in learning stage for different approaches. """ import re, os _re_string = re.compile(r"""("(\\.|[^"\\])*"|'(\\.|[^'\\])*')""") _re_number = re.compile(r'([\d]+)|([\d]+.[\d]+)[^A-Za-z]') _re_separator = re.compile(r'(\W)') _not_start_with_point = lambda x: not x.startswith('.') -def tokenizer(text): +def tokenizer(text, re_name): ''' Splits text into tokens ''' - #return [word for word in _re_separator.split(text) if word.strip(' \t')] - return list(text) + if re_name == 'letter': + return list(text) + elif re_name == 'word': + return [word for word in _re_separator.split(text) if word.strip(' \t')] def file_to_string(filename): """ Read a file to a string. """ with open(filename, 'r', errors='ignore') as f: data = f.read().lower() return replace_string_and_number(data) def count_files(root_language): all_folders = natural_sort(filter (_not_start_with_point, os.listdir(root_language))) files = natural_sort(filter (_not_start_with_point, os.listdir(root_language + '/' + all_folders[-1]))) (max,_) = os.path.splitext(files[-1]) return int(max) def find_file(root_language, n): '''Find the n-th file in language folder''' if n > count_files(root_language): return '' else: start = (n - 1) // 1000 * 1000 + 1 end = start + 999 root_count = root_language + '/' + str(start) + '-' + str(end) files = natural_sort(filter (_not_start_with_point, os.listdir(root_count))) return root_count + '/' + files[n - start] def replace_string_and_number(text): """ Replace strings and numbers in a file by special tokens """ # str_replaced = re.sub(_re_string, '__str__', text) # str_num_replaced = re.sub(_re_number, '__num__', str_replaced) str_num_replaced = text return str_num_replaced def natural_sort(l): convert = lambda text: int(text) if text.isdigit() else text.lower() alphanum_key = lambda key: [ convert(c) for c in re.split('([0-9]+)', key) ] return sorted(l, key = alphanum_key) def remove_comment(text): # TODO: remove only inline comments and block comments # TODO: maybe build a list of comment markers pass def purify(text, lang): # TODO: for some language like HTML, remove code other than principal language pass