diff --git a/swh/langdetect/naivebayesian.py b/swh/langdetect/naivebayesian.py index b8f54cd..5fe1e9c 100644 --- a/swh/langdetect/naivebayesian.py +++ b/swh/langdetect/naivebayesian.py @@ -1,211 +1,241 @@ """ Naive Bayesian """ -import os, sys, operator, nltk, random, time +import os +import sys +import operator +import nltk +import random +import time import numpy as np +import csv +import argparse +import json +from ast import literal_eval from itertools import islice from pickle import dump, load from .utils.common import tokenizer, file_to_string, find_file, count_files from nltk.util import ngrams from collections import Counter from sklearn.naive_bayes import MultinomialNB from sklearn.feature_extraction.text import HashingVectorizer, TfidfTransformer from sklearn.externals import joblib -class NaiveBayesian: +csv.field_size_limit(sys.maxsize) + +def main(): + parser = argparse.ArgumentParser(description='Training and test tool of multinumial naive bayesian.') + + subparsers = parser.add_subparsers(dest='sub_command') + + parser_train = subparsers.add_parser('train', help='Training on the dataset, dataset must be a *.csv file. A model will be created in the same directory.') + parser_train.add_argument('train_path', metavar='PATH', type=str, help='Path of the training dataset.') + # parser_train.add_argument('-n', '--ngrams', metavar='N', dest='train_maxsize', type=int, help='Set maximum input size of ConvNet, default 5.') + parser_test = subparsers.add_parser('test', help='Test on the dataset, dataset must be a directory with *.csv dataset named by corresponding language.') + parser_test.add_argument('test_root', metavar='ROOT', type=str, help='Root of the test dataset.') - def __init__(self, root): - # Root of dataset - self._root = root + if len(sys.argv[1:]) == 0: + parser.print_help() + parser.exit() + args = parser.parse_args() + + if args.sub_command == 'train' : + n = NaiveBayesian(args.train_path) + n.train() + elif args.sub_command == 'test': + n = NaiveBayesian(args.test_root) + n.test() + else: + parser.parse_args('-h') - # Root of training set - self._root_training_set = os.path.join(self._root, '..', 'training_set') +class NaiveBayesian: + + def __init__(self, path): + + self._path = path # Root of model folder - self._root_model = os.path.join(self._root, '..', 'model_bayesian') - - # Root of arranged dataset - self._root_language_dataset = os.path.join(self._root, '..', 'code_by_language') + self._root_model = os.path.join(os.path.dirname(path), 'model_bayesian') + try: + os.mkdir(self._root_model) + except: + pass # Path of result - self._path_result = os.path.join(self._root, '..', 'result_bayesian') + self._path_result = os.path.join(os.path.dirname(path), 'result_bayesian') + + dir_path = os.path.dirname(os.path.abspath(__file__)) + with open(os.path.join(dir_path, 'static_data', 'languages.json'), 'r') as f: + self._languages = json.load(f) - self._languages = [x for x in os.listdir(self._root_training_set) if not x.startswith('.')] + self._path_test_csv = path + + self._num_of_classes = len(self._languages) + def train(self): ''' train () generates and stores counted n-grams in '_root_model' folder ''' - - try: - os.mkdir(self._root_model) - except FileExistsError: - pass - ''' Calculate frequencies of generated n-grams then store them into a sorted list of (ngram, count) ''' clf = MultinomialNB(alpha=0.001) cv = HashingVectorizer(analyzer='char', ngram_range=(1, 4), n_features=2**16, alternate_sign=False) indices = list(range(len(self._languages))) - - for language in self._languages: - root_training_set_language = os.path.join(self._root_training_set, language) - root_stat_language = os.path.join(self._root_model, 'classifier') - index_lang = self._languages.index(language) - texts = [] - for f in os.listdir(root_training_set_language): - if not f.startswith('.'): - print(f) - filename = os.path.join(root_training_set_language, f) - tokens = tokenizer(file_to_string(filename), 'letter')[-2048:] - text = ''.join([chr(token) for token in tokens]) - texts.append(text) - counts = cv.fit_transform(texts) - tf = TfidfTransformer().fit(counts) - normalised = tf.transform(counts) - clf.partial_fit(normalised, np.array([index_lang for _ in texts]), indices) + + with open(self._path, newline='') as csvfile: + r = csv.reader(csvfile, delimiter=' ', quotechar='|') + for pair in r: + label, string = pair + label = int(label) + print(label, end='\r') + + string = literal_eval(string) + print(string) + tokens = tokenizer(string, 'letter') + text = ''.join([chr(token) for token in tokens]) + + counts = cv.fit_transform([text]) + tf = TfidfTransformer().fit(counts) + normalised = tf.transform(counts) + clf.partial_fit(normalised, np.array([label]), indices) - with open(root_stat_language + '.clf', 'wb') as f: + with open(os.path.join(self._root_model, 'classifier.clf'), 'wb') as f: joblib.dump(clf, f) - with open(root_stat_language + '.hv', 'wb') as f: + with open(os.path.join(self._root_model, 'classifier.hv'), 'wb') as f: joblib.dump(cv, f) def test(self): try: r = open(self._path_result, 'rb') test_result = load(r) r.close() except FileNotFoundError: test_result = {} with open(os.path.join(self._root_model, 'classifier.clf'), 'rb') as f: clf = joblib.load(f) with open(os.path.join(self._root_model, 'classifier.hv'), 'rb') as f: cv = joblib.load(f) - for language in [x for x in os.listdir(self._root_training_set) if not x.startswith('.') and x not in test_result.keys()]: + for language in [x for x in self._languages if x not in test_result.keys()]: test_result[language] = self.test_class((clf, cv), language) with open(self._path_result, 'wb') as f: dump(test_result, f) def speed_benchmark(self): language = [x for x in os.listdir(self._root_training_set) if not x.startswith('.')][10] models = self._load_models() test_set = self._get_test_set(language) total_size = self._count_size(test_set) print('{} kB in total'.format(total_size / 1024)) t_start = time.perf_counter() self.test_class(models, language) t_end = time.perf_counter() print('{} seconds.'.format(t_end - t_start)) print('{} seconds per kB'.format(((t_end - t_start) / total_size) * 1024)) def _get_test_set(self, language): root_training_language = os.path.join(self._root_training_set, language) root_language = os.path.join(self._root_language_dataset, language) total = count_files(root_language) training_set = [int(os.path.splitext(x)[0]) for x in os.listdir(root_training_language) if not x.startswith('.')] it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set and os.path.getsize(find_file(root_language, x)) <= 1048576) test_set = list(islice(it, 1000)) if len(test_set) == 0: it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set) test_set = list(islice(it, 1000)) return test_set def _count_size(self, files): size = 0 for f in files: size += os.path.getsize(f) return size def test_class(self, clf, language): - test_set = self._get_test_set(language) - index_lang = self._languages.index(language) - ok = 0 results = [] count = 0 - length = len(test_set) - for test in test_set: - result = self._guess_file_language(clf, test) - count += 1 - print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, length, result[0][1], result[0][0]),end='\r') - results.append(result[0]) - if result[0][1] == language: - ok += 1 - - total_test = len(test_set) - accuracy = ok / len(test_set) + total_test = self.file_len(os.path.join(self._path_test_csv, language + '.csv')) + with open(os.path.join(self._path_test_csv, language + '.csv'), newline='') as csvfile: + r = csv.reader(csvfile, delimiter=' ', quotechar='|') + for pair in r: + label, string = pair + label = int(label) + string = literal_eval(string) + result = self._guess_file_language(clf, string) + count += 1 + print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, total_test, result[0][1], result[0][0]),end='\r') + results.append(result[0]) + if result[0][1] == language: + ok += 1 + + accuracy = ok / total_test print('Tests for {} '.format(language)) print('Total test files : {}'.format(total_test)) print('Correctly classified files : {}'.format(ok)) print('Accuracy : {}%'.format(accuracy * 100)) - return (ok, len(test_set), accuracy, results) + return (ok, total_test, accuracy, results) def test_single(self, filename): self._guess_file_language(clf, filename) + + def file_len(self, fname): + with open(fname) as f: + count = 0 + for l in f: + count += 1 + return count - def _guess_file_language(self, cc, filename): + def _guess_file_language(self, cc, string): clf = cc[0] cv = cc[1] - tokens = tokenizer(file_to_string(filename), 'letter') + tokens = tokenizer(string, 'letter') text = ''.join([chr(token) for token in tokens]) counts = cv.fit_transform([text]) tf = TfidfTransformer().fit(counts) normalised = tf.transform(counts) result = clf.predict_log_proba(normalised) result = [(val, self._languages[idx]) for idx, val in enumerate(result[0])] return sorted(result, reverse=True) def _distance(self, model_profile, test_profile): distance = 0 maximum = len(test_profile) for test_ngram in test_profile.keys(): test_rank = test_profile.get(test_ngram) model_rank = model_profile.get(test_ngram, maximum) d = abs(test_rank - model_rank) distance += d return distance ''' def _prob(model, trigrams): print('Checking {} model ...'.format(model)) with open(model, 'rb') as f: kneser_ney = load(f) result = 1 for trigram in trigrams: prob = kneser_ney.prob(trigram) result = result * prob return result ''' if __name__ == '__main__': - if len(sys.argv) == 3 and sys.argv[1] == '--train': - n = NaiveBayesian(sys.argv[2]) - n.train() - elif len(sys.argv) == 3 and sys.argv[1] == '--test': - n = NaiveBayesian(sys.argv[2]) - n.test() - elif len(sys.argv) == 3 and sys.argv[1] == '--benchmark': - n = NaiveBayesian(sys.argv[2]) - n.speed_benchmark() - elif len(sys.argv) == 4 and sys.argv[1] == '--test': - n = NaiveBayesian(sys.argv[2]) - n.test_class(n.load_models(), sys.argv[3]) - else: - print('Wrong arguments, please check your input.') + main() diff --git a/swh/langdetect/naivebayesian.py b/swh/langdetect/unsupervised.py similarity index 50% copy from swh/langdetect/naivebayesian.py copy to swh/langdetect/unsupervised.py index b8f54cd..fc322c2 100644 --- a/swh/langdetect/naivebayesian.py +++ b/swh/langdetect/unsupervised.py @@ -1,211 +1,255 @@ """ Naive Bayesian """ -import os, sys, operator, nltk, random, time +import os +import sys +import operator +import nltk +import random +import time import numpy as np +import csv +import argparse +import json +from ast import literal_eval from itertools import islice from pickle import dump, load from .utils.common import tokenizer, file_to_string, find_file, count_files from nltk.util import ngrams from collections import Counter from sklearn.naive_bayes import MultinomialNB -from sklearn.feature_extraction.text import HashingVectorizer, TfidfTransformer +from sklearn.feature_extraction.text import HashingVectorizer +from sklearn.metrics.pairwise import cosine_similarity from sklearn.externals import joblib +from sklearn.cluster import KMeans -class NaiveBayesian: +csv.field_size_limit(sys.maxsize) + +def main(): + parser = argparse.ArgumentParser(description='Training and test tool of multinumial naive bayesian.') + + subparsers = parser.add_subparsers(dest='sub_command') + + parser_train = subparsers.add_parser('train', help='Training on the dataset, dataset must be a *.csv file. A model will be created in the same directory.') + parser_train.add_argument('train_path', metavar='PATH', type=str, help='Path of the training dataset.') + # parser_train.add_argument('-n', '--ngrams', metavar='N', dest='train_maxsize', type=int, help='Set maximum input size of ConvNet, default 5.') + parser_test = subparsers.add_parser('test', help='Test on the dataset, dataset must be a directory with *.csv dataset named by corresponding language.') + parser_test.add_argument('test_root', metavar='ROOT', type=str, help='Root of the test dataset.') + + if len(sys.argv[1:]) == 0: + parser.print_help() + parser.exit() + args = parser.parse_args() - def __init__(self, root): - # Root of dataset - self._root = root + if args.sub_command == 'train' : + n = Unsupervised(args.train_path) + # n.train() + n.clustering() + elif args.sub_command == 'test': + n = Unsupervised(args.test_root) + n.test() + else: + parser.parse_args('-h') - # Root of training set - self._root_training_set = os.path.join(self._root, '..', 'training_set') +class Unsupervised: + + def __init__(self, path): + + self._path = path # Root of model folder - self._root_model = os.path.join(self._root, '..', 'model_bayesian') - - # Root of arranged dataset - self._root_language_dataset = os.path.join(self._root, '..', 'code_by_language') + self._root_model = os.path.join(os.path.dirname(path), 'model_unsupervised') + try: + os.mkdir(self._root_model) + except: + pass # Path of result - self._path_result = os.path.join(self._root, '..', 'result_bayesian') + self._path_result = os.path.join(os.path.dirname(path), 'result_unsupervised') - self._languages = [x for x in os.listdir(self._root_training_set) if not x.startswith('.')] + dir_path = os.path.dirname(os.path.abspath(__file__)) + with open(os.path.join(dir_path, 'static_data', 'languages.json'), 'r') as f: + self._languages = json.load(f) + + self._path_test_csv = path + + self._num_of_classes = len(self._languages) + def train(self): ''' train () generates and stores counted n-grams in '_root_model' folder ''' - - try: - os.mkdir(self._root_model) - except FileExistsError: - pass - ''' Calculate frequencies of generated n-grams then store them into a sorted list of (ngram, count) ''' - clf = MultinomialNB(alpha=0.001) cv = HashingVectorizer(analyzer='char', ngram_range=(1, 4), n_features=2**16, alternate_sign=False) indices = list(range(len(self._languages))) - - for language in self._languages: - root_training_set_language = os.path.join(self._root_training_set, language) - root_stat_language = os.path.join(self._root_model, 'classifier') - index_lang = self._languages.index(language) - texts = [] - for f in os.listdir(root_training_set_language): - if not f.startswith('.'): - print(f) - filename = os.path.join(root_training_set_language, f) - tokens = tokenizer(file_to_string(filename), 'letter')[-2048:] - text = ''.join([chr(token) for token in tokens]) - texts.append(text) - counts = cv.fit_transform(texts) - tf = TfidfTransformer().fit(counts) - normalised = tf.transform(counts) - clf.partial_fit(normalised, np.array([index_lang for _ in texts]), indices) - - with open(root_stat_language + '.clf', 'wb') as f: - joblib.dump(clf, f) - with open(root_stat_language + '.hv', 'wb') as f: + texts = [] + + with open(self._path, newline='') as csvfile: + r = csv.reader(csvfile, delimiter=' ', quotechar='|') + for pair in r: + label, string = pair + label = int(label) + print(label, end='\r') + + string = literal_eval(string) + tokens = tokenizer(string, 'letter')[-10248:] + text = ''.join([chr(token) for token in tokens]) + texts.append(text) + + #counts = cv.fit_transform(texts) + + with open(os.path.join(self._root_model, 'classifier.cv'), 'wb') as f: joblib.dump(cv, f) + with open(os.path.join(self._root_model, 'classifier.counts'), 'wb') as f: + joblib.dump(counts, f) + + def clustering(self): + with open(os.path.join(self._root_model, 'classifier.cv'), 'rb') as f: + cv = joblib.load(f) + with open(os.path.join(self._root_model, 'classifier.counts'), 'rb') as f: + counts = joblib.load(f) + + num_clusters = 323 + + km = KMeans(n_clusters=num_clusters) + km.fit(counts) + + with open(os.path.join(self._root_model, 'cluster.pkl'), 'wb') as f: + joblib.dump(km, f) def test(self): try: r = open(self._path_result, 'rb') test_result = load(r) r.close() except FileNotFoundError: test_result = {} with open(os.path.join(self._root_model, 'classifier.clf'), 'rb') as f: clf = joblib.load(f) with open(os.path.join(self._root_model, 'classifier.hv'), 'rb') as f: cv = joblib.load(f) - for language in [x for x in os.listdir(self._root_training_set) if not x.startswith('.') and x not in test_result.keys()]: + for language in [x for x in self._languages if x not in test_result.keys()]: test_result[language] = self.test_class((clf, cv), language) with open(self._path_result, 'wb') as f: dump(test_result, f) def speed_benchmark(self): language = [x for x in os.listdir(self._root_training_set) if not x.startswith('.')][10] models = self._load_models() test_set = self._get_test_set(language) total_size = self._count_size(test_set) print('{} kB in total'.format(total_size / 1024)) t_start = time.perf_counter() self.test_class(models, language) t_end = time.perf_counter() print('{} seconds.'.format(t_end - t_start)) print('{} seconds per kB'.format(((t_end - t_start) / total_size) * 1024)) def _get_test_set(self, language): root_training_language = os.path.join(self._root_training_set, language) root_language = os.path.join(self._root_language_dataset, language) total = count_files(root_language) training_set = [int(os.path.splitext(x)[0]) for x in os.listdir(root_training_language) if not x.startswith('.')] it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set and os.path.getsize(find_file(root_language, x)) <= 1048576) test_set = list(islice(it, 1000)) if len(test_set) == 0: it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set) test_set = list(islice(it, 1000)) return test_set def _count_size(self, files): size = 0 for f in files: size += os.path.getsize(f) return size def test_class(self, clf, language): - test_set = self._get_test_set(language) - index_lang = self._languages.index(language) - ok = 0 results = [] count = 0 - length = len(test_set) - for test in test_set: - result = self._guess_file_language(clf, test) - count += 1 - print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, length, result[0][1], result[0][0]),end='\r') - results.append(result[0]) - if result[0][1] == language: - ok += 1 - - total_test = len(test_set) - accuracy = ok / len(test_set) + total_test = self.file_len(os.path.join(self._path_test_csv, language + '.csv')) + with open(os.path.join(self._path_test_csv, language + '.csv'), newline='') as csvfile: + r = csv.reader(csvfile, delimiter=' ', quotechar='|') + for pair in r: + label, string = pair + label = int(label) + string = literal_eval(string) + result = self._guess_file_language(clf, string) + count += 1 + print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, total_test, result[0][1], result[0][0]),end='\r') + results.append(result[0]) + if result[0][1] == language: + ok += 1 + + accuracy = ok / total_test print('Tests for {} '.format(language)) print('Total test files : {}'.format(total_test)) print('Correctly classified files : {}'.format(ok)) print('Accuracy : {}%'.format(accuracy * 100)) - return (ok, len(test_set), accuracy, results) + return (ok, total_test, accuracy, results) def test_single(self, filename): self._guess_file_language(clf, filename) + + def file_len(self, fname): + with open(fname) as f: + count = 0 + for l in f: + count += 1 + return count - def _guess_file_language(self, cc, filename): + def _guess_file_language(self, cc, string): clf = cc[0] cv = cc[1] - tokens = tokenizer(file_to_string(filename), 'letter') + tokens = tokenizer(string, 'letter') text = ''.join([chr(token) for token in tokens]) counts = cv.fit_transform([text]) tf = TfidfTransformer().fit(counts) normalised = tf.transform(counts) result = clf.predict_log_proba(normalised) result = [(val, self._languages[idx]) for idx, val in enumerate(result[0])] return sorted(result, reverse=True) def _distance(self, model_profile, test_profile): distance = 0 maximum = len(test_profile) for test_ngram in test_profile.keys(): test_rank = test_profile.get(test_ngram) model_rank = model_profile.get(test_ngram, maximum) d = abs(test_rank - model_rank) distance += d return distance ''' def _prob(model, trigrams): print('Checking {} model ...'.format(model)) with open(model, 'rb') as f: kneser_ney = load(f) result = 1 for trigram in trigrams: prob = kneser_ney.prob(trigram) result = result * prob return result ''' if __name__ == '__main__': - if len(sys.argv) == 3 and sys.argv[1] == '--train': - n = NaiveBayesian(sys.argv[2]) - n.train() - elif len(sys.argv) == 3 and sys.argv[1] == '--test': - n = NaiveBayesian(sys.argv[2]) - n.test() - elif len(sys.argv) == 3 and sys.argv[1] == '--benchmark': - n = NaiveBayesian(sys.argv[2]) - n.speed_benchmark() - elif len(sys.argv) == 4 and sys.argv[1] == '--test': - n = NaiveBayesian(sys.argv[2]) - n.test_class(n.load_models(), sys.argv[3]) - else: - print('Wrong arguments, please check your input.') + main()