diff --git a/scripts/comparison.pdf b/scripts/comparison.pdf index 0e69977..1817b22 100644 Binary files a/scripts/comparison.pdf and b/scripts/comparison.pdf differ diff --git a/swh/langdetect/cnn.py b/swh/langdetect/cnn.py index a03f60e..dea9bfc 100644 --- a/swh/langdetect/cnn.py +++ b/swh/langdetect/cnn.py @@ -1,262 +1,262 @@ import os import sys import subprocess import time import random import csv import numpy as np import warnings with warnings.catch_warnings(): warnings.simplefilter("ignore") import tensorflow as tf import json import argparse from ast import literal_eval from pickle import dump from pickle import load from numpy import array -from utils.common import tokenizer -from utils.common import file_to_string +from .utils.common import tokenizer +from .utils.common import file_to_string from keras.preprocessing.sequence import pad_sequences from keras.models import Model from keras.models import Sequential from keras.models import load_model from keras.layers import Input from keras.layers import Dense from keras.layers import Flatten from keras.layers import Dropout from keras.layers import ThresholdedReLU from keras.layers import Activation from keras.layers import Lambda from keras.layers import Embedding from keras.layers.convolutional import Convolution1D from keras.layers.convolutional import MaxPooling1D from keras.layers.normalization import BatchNormalization from keras.layers.merge import concatenate from keras.utils import np_utils from keras.optimizers import SGD def main(): parser = argparse.ArgumentParser(description='Training and test tool of charactor-level ConvNet text categorisation.') subparsers = parser.add_subparsers() parser_train = subparsers.add_parser('train', help='Training on the dataset, dataset must be a *.csv file. A model will be created in the same directory.') parser_train.add_argument('train_path', metavar='PATH', type=str, help='Path of the training dataset.') parser_train.add_argument('-ms', '--maxsize', metavar='SIZE', dest='train_maxsize', type=int, help='Set maximum input size of ConvNet, default 1024.') parser_train.add_argument('-e', '--epochs', metavar='N', dest='train_epochs', type=int, help='Number of training epochs (iterations), default 50.') parser_test = subparsers.add_parser('test', help='Test on the dataset, dataset must be a directory with *.csv dataset named by corresponding language.') parser_test.add_argument('test_root', metavar='ROOT', type=str, help='Root of the test dataset.') if len(sys.argv[1:]) == 0: parser.print_help() parser.exit() args = parser.parse_args() print(args) if args.train_path: if args.train_maxsize: if args.train_epochs: n = CNN(args.train_path, maxsize=args.train_maxsize, epochs=args.train_epochs) n.train() else: n = CNN(args.train_path, maxsize=args.train_maxsize) n.train() else: if args.train_epochs: n = CNN(args.train_path, epochs=args.train_epochs) n.train() else: n = CNN(args.train_path) n.train() elif args.test_root: n = CNN(args.test_root) n.test() else: parser.parse_args('-h') class CNN: def __init__(self, path, maxsize=1024, epochs=50): self._path = path # Root of model folder self._root_model = os.path.join(os.path.dirname(path), 'model_cnn') try: os.mkdir(self._root_model) except: pass # Path of result self._path_result = os.path.join(os.path.dirname(path), 'result_cnn') with open('static_data/languages.json', 'r') as f: self._languages = json.load(f) self._path_test_csv = path self._input_size = maxsize self._vocab_size = 256 self._num_of_classes = len(self._languages) self._batch_size = 128 self._epochs = epochs def file_len(self, fname): with open(fname) as f: count = 0 for l in f: count += 1 return count def train(self): model = self._get_model() model.fit_generator( self._generator(self._input_size, self._num_of_classes, self._batch_size), steps_per_epoch=self.file_len(self._path) / self._batch_size, epochs=self._epochs) model.save(os.path.join(self._root_model, 'model.h5')) def _generator(self, length, total_class, batch_size=128): counter = 0 while True: with open(self._path, newline='') as csvfile: r = csv.reader(csvfile, delimiter=' ', quotechar='|') for pair in r: if counter == 0: X = np.empty((0, length)) Y = np.empty((0, total_class)) label, string = pair label = int(label) string = literal_eval(string) tokens = [x + 1 for x in tokenizer(string, 'letter')] X = np.append(X, pad_sequences([tokens], maxlen=length), axis=0) label = array(np_utils.to_categorical([label], total_class)) Y = np.append(Y, label, axis=0) counter += 1 if counter == batch_size: counter = 0 yield(X,Y) def _get_model(self): input_size = self._input_size alphabet_size = self._vocab_size embedding_size = 256 conv_layers = [(256,7,3), (256,7,3), (256,3,-1), (256,3,-1), (256,3,-1), (256,3,3)] threshold = 1e-6 fully_connected_layers = [1024, 1024] dropout_p = 0.2 optimizer = 'adam' loss = 'categorical_crossentropy' num_of_classes = self._num_of_classes # Input layer inputs = Input(shape=(input_size,), name='sent_input', dtype='int64') # Embedding layers x = Embedding(alphabet_size + 1, embedding_size, input_length=input_size)(inputs) # Convolution layers for cl in conv_layers: x = Convolution1D(cl[0], cl[1])(x) x = ThresholdedReLU(threshold)(x) if cl[2] != -1: x = MaxPooling1D(cl[2])(x) x = Flatten()(x) # Fully connected layers for fl in fully_connected_layers: x = Dense(fl)(x) x = ThresholdedReLU(threshold)(x) x = Dropout(dropout_p)(x) # Output layer predictions = Dense(num_of_classes, activation='softmax')(x) # Build and compile model model = Model(inputs=inputs, outputs=predictions) model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy']) print(model.summary()) return model def _max_len(self, texts): return max([len(text) for text in texts]) def test(self): csv.field_size_limit(sys.maxsize) try: r = open(self._path_result, 'rb') test_result = load(r) r.close() except FileNotFoundError: test_result = {} model = self._load_model() for language in [x for x in self._languages if x not in test_result.keys()]: test_result[language] = self.test_class(model, language) with open(self._path_result, 'wb') as f: dump(test_result, f) def _load_model(self): model = load_model(os.path.join(self._root_model, 'model.h5')) return model def _count_size(self, files): size = 0 for f in files: size += os.path.getsize(f) return size def test_class(self, model, language): ok = 0 results = [] count = 0 total_test = self.file_len(os.path.join(self._path_test_csv, language + '.csv')) with open(os.path.join(self._path_test_csv, language + '.csv'), newline='') as csvfile: r = csv.reader(csvfile, delimiter=' ', quotechar='|') for pair in r: label, string = pair label = int(label) string = literal_eval(string) tokens = [x + 1 for x in tokenizer(string, 'letter')] result = self._guess_file_language(model, tokens) count += 1 print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, total_test, result[0][1], result[0][0]),end='\r') results.append(result[0]) if result[0][1] == language: ok += 1 accuracy = ok / total_test print('Tests for {} '.format(language)) print('Total test files : {}'.format(total_test)) print('Correctly classified files : {}'.format(ok)) print('Accuracy : {}%'.format(accuracy * 100)) return (ok, total_test, accuracy, results) def speed_benchmark(self): language = self._languages[10] model = self._load_model() test_set = self._get_test_set(language) total_size = self._count_size(test_set) print('{} kB in total'.format(total_size / 1024)) t_start = time.perf_counter() self.test_class(model, language) t_end = time.perf_counter() print('{} seconds.'.format(t_end - t_start)) print('{} seconds per KiB'.format(((t_end - t_start) / total_size) * 1024)) def _guess_file_language(self, model, tokens): X = pad_sequences([tokens], maxlen=self._input_size) result = list(model.predict(X))[0] result = [(s, self._languages[i]) for i, s in enumerate(result)] return sorted(result, reverse=True) if __name__ == '__main__': main() diff --git a/swh/langdetect/guesslang.py b/swh/langdetect/guesslang.py index 1c98909..593af8a 100644 --- a/swh/langdetect/guesslang.py +++ b/swh/langdetect/guesslang.py @@ -1,202 +1,202 @@ import os, sys, subprocess, time import numpy as np import tensorflow as tf from itertools import islice from pickle import dump, load from collections import Counter from numpy import array -from utils.common import tokenizer, file_to_string, find_file, count_files +from .utils.common import tokenizer, file_to_string, find_file, count_files from keras.preprocessing.sequence import pad_sequences from keras.utils import np_utils class Guesslang: def __init__(self, root): # Root of dataset self._root = root # Root of training set self._root_training_set = os.path.join(self._root, '..', 'training_set') # Root of model folder self._root_model = os.path.join(self._root, '..', 'model_guesslang') # Root of arranged dataset self._root_language_dataset = os.path.join(self._root, '..', 'code_by_language') # Path of result self._path_result = os.path.join(self._root, '..', 'result_guesslang') self.languages = [x for x in os.listdir(self._root_training_set) if not x.startswith('.')] self.LENGTH = 1000 self.TOTAL_CLASS = len(self.languages) feature_columns = [tf.contrib.layers.real_valued_column('', dimension=self.LENGTH)] self._classifer = tf.contrib.learn.DNNLinearCombinedClassifier( linear_feature_columns=feature_columns, dnn_feature_columns=feature_columns, dnn_hidden_units=[256, 64, 16], n_classes=self.TOTAL_CLASS, linear_optimizer=tf.train.RMSPropOptimizer(0.05), dnn_optimizer=tf.train.RMSPropOptimizer(0.05), model_dir=self._root_model, fix_global_step_increment_bug=True ) def train(self): try: if len(os.listdir(self._root_training_set)) == 0: build_training_set(self._root) try: os.mkdir(self._root_model) except FileExistsError: pass except FileNotFoundError: os.mkdir(self._root_training_set) build_training_set(self._root) try: f = open(os.path.join(self._root, '..', 'model_guesslang', 'texts+labels'), 'rb') train_file_with_label = load(f) except FileNotFoundError: train_file_with_label = self._train_file_with_label() with open(os.path.join(self._root, '..', 'model_guesslang', 'texts+labels'), 'wb') as f: dump(train_file_with_label, f) for index in range(self.TOTAL_CLASS): self._classifer.partial_fit(input_fn=lambda:self._generator(self.LENGTH, self.TOTAL_CLASS, index),steps=500) def _generator(self, length, total_class, index): print("Language: {}".format(index)) with open(os.path.join(self._root, '..', 'model_guesslang', 'texts+labels'), 'rb') as f: train_file_with_label = load(f) X = np.empty((0, length)) Y = np.empty((0, 1), dtype=int) for path, label in train_file_with_label: if label == index: X = np.append(X, self._file_to_x(path), axis=0) l = array([label], dtype=int) Y = np.append(Y, l) return tf.convert_to_tensor(X), tf.convert_to_tensor(Y) def _file_to_x(self, filename): wrapper = (lambda x: x + 1) tokens = [wrapper(x) for x in tokenizer(file_to_string(filename), 'letter')] return pad_sequences([tokens], maxlen=self.LENGTH) def _train_file_with_label(self): l = [] for language in self.languages: root_training_set_language = os.path.join(self._root_training_set, language) root_stat_language = os.path.join(self._root_model, language) index_lang = self.languages.index(language) if os.path.isfile(root_stat_language): continue print(language) for f in [x for x in os.listdir(root_training_set_language) if not x.startswith('.')]: filename = os.path.join(root_training_set_language, f) l.append((filename, index_lang)) return l def _max_len(self, texts): return max([len(text) for text in texts]) def _vocabulary_size(self, texts): vocabulary = dict(Counter([token for text in texts for token in text])) return len(vocabulary.keys()) def test(self): try: r = open(self._path_result, 'rb') test_result = load(r) r.close() except FileNotFoundError: test_result = {} for language in [x for x in os.listdir(self._root_training_set) if not x.startswith('.') and x not in test_result.keys()]: test_result[language] = self.test_class(language) with open(self._path_result, 'wb') as f: dump(test_result, f) def _get_test_set(self, language): root_training_language = os.path.join(self._root_training_set, language) root_language = os.path.join(self._root_language_dataset, language) total = count_files(root_language) training_set = [int(os.path.splitext(x)[0]) for x in os.listdir(root_training_language) if not x.startswith('.')] it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set and os.path.getsize(find_file(root_language, x)) <= 1048576) test_set = list(islice(it, 1000)) if len(test_set) == 0: it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set) test_set = list(islice(it, 1000)) return test_set def _count_size(self, files): size = 0 for f in files: size += os.path.getsize(f) return size def test_class(self, language): test_set = self._get_test_set(language) ok = 0 results = [] count = 0 length = len(test_set) for test in test_set: result = self._guess_file_language(test) count += 1 print('[{0:4d}/{1:4d}] {2} '.format(count, length, result),end='\r') results.append(result) if result == language: ok += 1 total_test = len(test_set) accuracy = ok / len(test_set) print('Tests for {} '.format(language)) print('Total test files : {}'.format(total_test)) print('Correctly classified files : {}'.format(ok)) print('Accuracy : {}%'.format(accuracy * 100)) return (ok, len(test_set), accuracy, results) def speed_benchmark(self): language = [x for x in os.listdir(self._root_model) if not x.startswith('.')][10] models = self._load_models() test_set = self._get_test_set(language) total_size = self._count_size(test_set) print('{} kB in total'.format(total_size / 1024)) t_start = time.perf_counter() self.test_class(models, language) t_end = time.perf_counter() print('{} seconds.'.format(t_end - t_start)) print('{} seconds per kB'.format(((t_end - t_start) / total_size) * 1024)) def _guess_file_language(self, filename): x = self._file_to_x(filename) result = list(self._classifer.predict(x=x))[0] return self.languages[result] if __name__ == '__main__': if len(sys.argv) == 3 and sys.argv[1] == '--train': n = Guesslang(sys.argv[2]) n.train() elif len(sys.argv) == 3 and sys.argv[1] == '--test': n = Guesslang(sys.argv[2]) n.test() elif len(sys.argv) == 3 and sys.argv[1] == '--benchmark': n = NGramProb(sys.argv[2]) n.speed_benchmark() elif len(sys.argv) == 4 and sys.argv[1] == '--test': n = NGramProb(sys.argv[2]) n.test_class(n.load_models(), sys.argv[3]) else: print('Wrong arguments, please check your input.') diff --git a/swh/langdetect/naivebayesian.py b/swh/langdetect/naivebayesian.py index 86f1114..b8f54cd 100644 --- a/swh/langdetect/naivebayesian.py +++ b/swh/langdetect/naivebayesian.py @@ -1,223 +1,211 @@ """ Naive Bayesian """ import os, sys, operator, nltk, random, time import numpy as np from itertools import islice from pickle import dump, load -from utils.common import tokenizer, file_to_string, find_file, count_files -from utils.training import build_training_set +from .utils.common import tokenizer, file_to_string, find_file, count_files from nltk.util import ngrams from collections import Counter from sklearn.naive_bayes import MultinomialNB from sklearn.feature_extraction.text import HashingVectorizer, TfidfTransformer from sklearn.externals import joblib class NaiveBayesian: def __init__(self, root): # Root of dataset self._root = root # Root of training set self._root_training_set = os.path.join(self._root, '..', 'training_set') # Root of model folder self._root_model = os.path.join(self._root, '..', 'model_bayesian') # Root of arranged dataset self._root_language_dataset = os.path.join(self._root, '..', 'code_by_language') # Path of result self._path_result = os.path.join(self._root, '..', 'result_bayesian') self._languages = [x for x in os.listdir(self._root_training_set) if not x.startswith('.')] def train(self): ''' train () generates and stores counted n-grams in '_root_model' folder ''' + try: - if len(os.listdir(self._root_training_set)) == 0: - build_training_set(self._root) - try: - os.mkdir(self._root_model) - except FileExistsError: - pass - except FileNotFoundError: - os.mkdir(self._root_training_set) - build_training_set(self._root) + os.mkdir(self._root_model) + except FileExistsError: + pass ''' Calculate frequencies of generated n-grams then store them into a sorted list of (ngram, count) ''' - clf = MultinomialNB() - cv = HashingVectorizer(analyzer='char', ngram_range=(1, 3), n_features=2**17, alternate_sign=False) + clf = MultinomialNB(alpha=0.001) + cv = HashingVectorizer(analyzer='char', ngram_range=(1, 4), n_features=2**16, alternate_sign=False) indices = list(range(len(self._languages))) for language in self._languages: root_training_set_language = os.path.join(self._root_training_set, language) root_stat_language = os.path.join(self._root_model, 'classifier') index_lang = self._languages.index(language) texts = [] for f in os.listdir(root_training_set_language): if not f.startswith('.'): print(f) filename = os.path.join(root_training_set_language, f) - tokens = tokenizer(file_to_string(filename), 'letter') + tokens = tokenizer(file_to_string(filename), 'letter')[-2048:] text = ''.join([chr(token) for token in tokens]) texts.append(text) counts = cv.fit_transform(texts) - tf = TfidfTransformer(use_idf=False).fit(counts) + tf = TfidfTransformer().fit(counts) normalised = tf.transform(counts) clf.partial_fit(normalised, np.array([index_lang for _ in texts]), indices) with open(root_stat_language + '.clf', 'wb') as f: joblib.dump(clf, f) with open(root_stat_language + '.hv', 'wb') as f: joblib.dump(cv, f) def test(self): try: r = open(self._path_result, 'rb') test_result = load(r) r.close() except FileNotFoundError: test_result = {} with open(os.path.join(self._root_model, 'classifier.clf'), 'rb') as f: clf = joblib.load(f) with open(os.path.join(self._root_model, 'classifier.hv'), 'rb') as f: cv = joblib.load(f) for language in [x for x in os.listdir(self._root_training_set) if not x.startswith('.') and x not in test_result.keys()]: test_result[language] = self.test_class((clf, cv), language) with open(self._path_result, 'wb') as f: dump(test_result, f) def speed_benchmark(self): language = [x for x in os.listdir(self._root_training_set) if not x.startswith('.')][10] models = self._load_models() test_set = self._get_test_set(language) total_size = self._count_size(test_set) print('{} kB in total'.format(total_size / 1024)) t_start = time.perf_counter() self.test_class(models, language) t_end = time.perf_counter() print('{} seconds.'.format(t_end - t_start)) print('{} seconds per kB'.format(((t_end - t_start) / total_size) * 1024)) def _get_test_set(self, language): root_training_language = os.path.join(self._root_training_set, language) root_language = os.path.join(self._root_language_dataset, language) total = count_files(root_language) training_set = [int(os.path.splitext(x)[0]) for x in os.listdir(root_training_language) if not x.startswith('.')] it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set and os.path.getsize(find_file(root_language, x)) <= 1048576) test_set = list(islice(it, 1000)) if len(test_set) == 0: it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set) test_set = list(islice(it, 1000)) return test_set def _count_size(self, files): size = 0 for f in files: size += os.path.getsize(f) return size def test_class(self, clf, language): test_set = self._get_test_set(language) index_lang = self._languages.index(language) ok = 0 results = [] count = 0 length = len(test_set) for test in test_set: result = self._guess_file_language(clf, test) count += 1 print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, length, result[0][1], result[0][0]),end='\r') results.append(result[0]) if result[0][1] == language: ok += 1 total_test = len(test_set) accuracy = ok / len(test_set) print('Tests for {} '.format(language)) print('Total test files : {}'.format(total_test)) print('Correctly classified files : {}'.format(ok)) print('Accuracy : {}%'.format(accuracy * 100)) return (ok, len(test_set), accuracy, results) def test_single(self, filename): self._guess_file_language(clf, filename) def _guess_file_language(self, cc, filename): clf = cc[0] cv = cc[1] tokens = tokenizer(file_to_string(filename), 'letter') text = ''.join([chr(token) for token in tokens]) counts = cv.fit_transform([text]) - tf = TfidfTransformer(use_idf=False).fit(counts) + tf = TfidfTransformer().fit(counts) normalised = tf.transform(counts) result = clf.predict_log_proba(normalised) result = [(val, self._languages[idx]) for idx, val in enumerate(result[0])] return sorted(result, reverse=True) - - def _sort_by_value(self, statistics): - statistics_sorted = sorted(statistics.items(), - key = operator.itemgetter(1), - reverse = True)[:500] - return statistics_sorted def _distance(self, model_profile, test_profile): distance = 0 maximum = len(test_profile) for test_ngram in test_profile.keys(): test_rank = test_profile.get(test_ngram) model_rank = model_profile.get(test_ngram, maximum) d = abs(test_rank - model_rank) distance += d return distance ''' def _prob(model, trigrams): print('Checking {} model ...'.format(model)) with open(model, 'rb') as f: kneser_ney = load(f) result = 1 for trigram in trigrams: prob = kneser_ney.prob(trigram) result = result * prob return result ''' if __name__ == '__main__': if len(sys.argv) == 3 and sys.argv[1] == '--train': n = NaiveBayesian(sys.argv[2]) n.train() elif len(sys.argv) == 3 and sys.argv[1] == '--test': n = NaiveBayesian(sys.argv[2]) n.test() elif len(sys.argv) == 3 and sys.argv[1] == '--benchmark': n = NaiveBayesian(sys.argv[2]) n.speed_benchmark() elif len(sys.argv) == 4 and sys.argv[1] == '--test': n = NaiveBayesian(sys.argv[2]) n.test_class(n.load_models(), sys.argv[3]) else: print('Wrong arguments, please check your input.') diff --git a/swh/langdetect/ngramdist.py b/swh/langdetect/ngramdist.py index 06449dd..e67df63 100644 --- a/swh/langdetect/ngramdist.py +++ b/swh/langdetect/ngramdist.py @@ -1,256 +1,249 @@ """ Baseline approach """ import os, sys, operator, nltk, random, time from itertools import islice from pickle import dump, load from nltk.util import ngrams -from utils.common import tokenizer, file_to_string, find_file, count_files -from utils.training import build_training_set +from .utils.common import tokenizer, file_to_string, find_file, count_files class NGramDist: def __init__(self, root): # Root of dataset self._root = root # Root of training set self._root_training_set = os.path.join(self._root, '..', 'training_set') # Root of model folder self._root_model = os.path.join(self._root, '..', 'model_ngram_dist') # Root of arranged dataset self._root_language_dataset = os.path.join(self._root, '..', 'code_by_language') # Path of result self._path_result = os.path.join(self._root, '..', 'result_freq') def train(self): ''' train () generates and stores counted n-grams in '_root_model' folder ''' - + try: - if len(os.listdir(self._root_training_set)) == 0: - build_training_set(self._root) - try: - os.mkdir(self._root_model) - except FileExistsError: - pass - except FileNotFoundError: - os.mkdir(self._root_training_set) - build_training_set(self._root) + os.mkdir(self._root_model) + except FileExistsError: + pass ''' Calculate frequencies of generated n-grams then store them into a sorted list of (ngram, count) ''' for language in os.listdir(self._root_training_set): if not language.startswith('.'): root_training_set_language = os.path.join(self._root_training_set, language) root_stat_language = os.path.join(self._root_model, language) if os.path.isfile(root_stat_language): continue statistics = {} for f in os.listdir(root_training_set_language): print(f) if not f.startswith('.'): filename = os.path.join(root_training_set_language, f) tokens = tokenizer(file_to_string(filename), 'letter') generated_ngrams = self._generate_ngrams([chr(token) for token in tokens], 3) self._count_ngrams(statistics, generated_ngrams) with open(root_stat_language, 'wb') as f: dump(self._sort_by_value(statistics), f) def _generate_ngrams(self, tokens, n): ''' :param tokens: generated tokens from a string. :param n: maximum n of n-grams :type tokens: list :type n: int :return: generated 1-grams, ... , n-grams :rtype: list ''' generated_ngrams = [] for i in range(1, n+1): igrams = ngrams(tokens, i, pad_left=True, pad_right=True, left_pad_symbol = '$BOF$', right_pad_symbol = '$EOF$') for igram in igrams: generated_ngrams.append(''.join(igram)) return generated_ngrams def _count_ngrams(self, statistics, ngrams): ''' :param statistics: shared dictionary for statistics :param ngrams: n-grams to be accumulated into statistics ''' for ngram in ngrams: statistics[ngram] = statistics.get(ngram, 0) + 1 def test(self): try: r = open(self._path_result, 'rb') test_result = load(r) r.close() except FileNotFoundError: test_result = {} models = self._load_models() for language in [x for x in os.listdir(self._root_model) if not x.startswith('.') and x not in test_result.keys()]: test_result[language] = self.test_class(models, language) with open(self._path_result, 'wb') as f: dump(test_result, f) def speed_benchmark(self): language = [x for x in os.listdir(self._root_model) if not x.startswith('.')][10] models = self._load_models() test_set = self._get_test_set(language) total_size = self._count_size(test_set) print('{} kB in total'.format(total_size / 1024)) t_start = time.perf_counter() self.test_class(models, language) t_end = time.perf_counter() print('{} seconds.'.format(t_end - t_start)) print('{} seconds per kB'.format(((t_end - t_start) / total_size) * 1024)) def _load_models(self): models = {} for model in [model for model in os.listdir(self._root_model) if not model.startswith('.')]: root_model = os.path.join(self._root_model, model) with open(root_model, 'rb') as sorted_file: models[model] = self._list_to_dict(load(sorted_file)) return models def _list_to_dict(self, model): model_ngrams = [x[0] for x in model] model_dict = {} index = 0 for ngram in model_ngrams: index += 1 model_dict[ngram] = index return model_dict def _get_test_set(self, language): root_training_language = os.path.join(self._root_training_set, language) root_language = os.path.join(self._root_language_dataset, language) total = count_files(root_language) training_set = [int(os.path.splitext(x)[0]) for x in os.listdir(root_training_language) if not x.startswith('.')] it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set and os.path.getsize(find_file(root_language, x)) <= 1048576) test_set = list(islice(it, 1000)) if len(test_set) == 0: it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set) test_set = list(islice(it, 1000)) return test_set def _count_size(self, files): size = 0 for f in files: size += os.path.getsize(f) return size def test_class(self, models, language): test_set = self._get_test_set(language) ok = 0 results = [] count = 0 length = len(test_set) for test in test_set: result = self._guess_file_language(models, test) count += 1 print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, length, result[0][1], result[0][0]),end='\r') results.append(result[0]) if result[0][1] == language: ok += 1 total_test = len(test_set) accuracy = ok / len(test_set) print('Tests for {} '.format(language)) print('Total test files : {}'.format(total_test)) print('Correctly classified files : {}'.format(ok)) print('Accuracy : {}%'.format(accuracy * 100)) return (ok, len(test_set), accuracy, results) def test_single(self, models, filename): self._guess_file_language(models, filename) def _guess_file_language(self, models, filename): tokens = tokenizer(file_to_string(filename), 'letter') generated_ngrams = self._generate_ngrams([chr(token) for token in tokens], 3) statistics = {} self._count_ngrams(statistics, generated_ngrams) test_profile = self._list_to_dict(self._sort_by_value(statistics)) result = [] for model in models.keys(): root_model = os.path.join(self._root_model, model) model_profile = models[model] distance = self._distance(model_profile, test_profile) result.append((distance, model)) return sorted(result) def _sort_by_value(self, statistics): statistics_sorted = sorted(statistics.items(), key = operator.itemgetter(1), reverse = True)[:500] return statistics_sorted def _distance(self, model_profile, test_profile): distance = 0 maximum = len(test_profile) for test_ngram in test_profile.keys(): test_rank = test_profile.get(test_ngram) model_rank = model_profile.get(test_ngram, maximum) d = abs(test_rank - model_rank) distance += d return distance ''' def _prob(model, trigrams): print('Checking {} model ...'.format(model)) with open(model, 'rb') as f: kneser_ney = load(f) result = 1 for trigram in trigrams: prob = kneser_ney.prob(trigram) result = result * prob return result ''' if __name__ == '__main__': if len(sys.argv) == 3 and sys.argv[1] == '--train': n = NGramDist(sys.argv[2]) n.train() elif len(sys.argv) == 3 and sys.argv[1] == '--test': n = NGramDist(sys.argv[2]) n.test() elif len(sys.argv) == 3 and sys.argv[1] == '--benchmark': n = NGramDist(sys.argv[2]) n.speed_benchmark() elif len(sys.argv) == 4 and sys.argv[1] == '--test': n = NGramDist(sys.argv[2]) n.test_class(n.load_models(), sys.argv[3]) else: print('Wrong arguments, please check your input.') diff --git a/swh/langdetect/ngramprob.py b/swh/langdetect/ngramprob.py index 7cf5e47..ec9e372 100644 --- a/swh/langdetect/ngramprob.py +++ b/swh/langdetect/ngramprob.py @@ -1,169 +1,170 @@ import os, sys, subprocess, time import kenlm from itertools import islice from pickle import dump, load -from utils.common import tokenizer, file_to_string, find_file, count_files +from .utils.common import tokenizer, file_to_string, find_file, count_files, remove_comment class NGramProb: def __init__(self, root): # Root of dataset self._root = root # Root of training set self._root_training_set = os.path.join(self._root, '..', 'training_set') # Root of model folder self._root_model = os.path.join(self._root, '..', 'model_ngram_prob') # Root of arranged dataset self._root_language_dataset = os.path.join(self._root, '..', 'code_by_language') # Path of result self._path_result = os.path.join(self._root, '..', 'result_prob') def train(self): - try: - if len(os.listdir(self._root_training_set)) == 0: - build_training_set(self._root) - try: - os.mkdir(self._root_model) - except FileExistsError: - pass - except FileNotFoundError: - os.mkdir(self._root_training_set) - build_training_set(self._root) for language in [x for x in os.listdir(self._root_training_set) if not x.startswith('.')]: root_training_set_language = os.path.join(self._root_training_set, language) texts = [] root_stat_language = os.path.join(self._root_model, language) if os.path.isfile(root_stat_language): continue for f in [x for x in os.listdir(root_training_set_language) if not x.startswith('.')]: + print(f) filename = os.path.join(root_training_set_language, f) - tokens = tokenizer(file_to_string(filename), 'letter') - texts.append((' '.join(chr(token) for token in tokens))) + text = file_to_string(filename) + text = remove_comment(text, language) + tokens = tokenizer(text, 'letter') + texts.append(' '.join(chr(token) for token in tokens)) + #tokens = tokenizer(text, 'word')[-1024:] + #tokens = b' '.join(tokens) + #texts.append((''.join(chr(token) for token in list(tokens)))) train_text = ' '.join(texts) command = ['../../bin/lmplz', '-o', '3', '-T', '/tmp', '--discount_fallback'] with open(root_stat_language, 'wb') as f: proc = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=f) proc.communicate(train_text.encode()) if os.path.getsize(root_stat_language) == 0: os.remove(root_stat_language) # st = os.stat(root_stat_language) # os.chmod(root_stat_language, st.st_mode | stat.S_IEXEC) def test(self): try: r = open(self._path_result, 'rb') test_result = load(r) r.close() except FileNotFoundError: test_result = {} models = self._load_models() for language in [x for x in os.listdir(self._root_model) if not x.startswith('.') and x not in test_result.keys()]: test_result[language] = self.test_class(models, language) with open(self._path_result, 'wb') as f: dump(test_result, f) def _load_models(self): models = {} for model in [model for model in os.listdir(self._root_model) if not model.startswith('.')]: root_model = os.path.join(self._root_model, model) models[model] = kenlm.LanguageModel(root_model) return models def _get_test_set(self, language): root_training_language = os.path.join(self._root_training_set, language) root_language = os.path.join(self._root_language_dataset, language) total = count_files(root_language) training_set = [int(os.path.splitext(x)[0]) for x in os.listdir(root_training_language) if not x.startswith('.')] it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set and os.path.getsize(find_file(root_language, x)) <= 1048576) test_set = list(islice(it, 1000)) if len(test_set) == 0: it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set) test_set = list(islice(it, 1000)) return test_set def _count_size(self, files): size = 0 for f in files: size += os.path.getsize(f) return size def test_class(self, models, language): test_set = self._get_test_set(language) ok = 0 results = [] count = 0 length = len(test_set) for test in test_set: result = self._guess_file_language(models, test) count += 1 print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, length, result[0][1], result[0][0]),end='\r') results.append(result[0]) if result[0][1] == language: ok += 1 total_test = len(test_set) accuracy = ok / len(test_set) print('Tests for {} '.format(language)) print('Total test files : {}'.format(total_test)) print('Correctly classified files : {}'.format(ok)) print('Accuracy : {}%'.format(accuracy * 100)) return (ok, len(test_set), accuracy, results) def speed_benchmark(self): language = [x for x in os.listdir(self._root_model) if not x.startswith('.')][10] models = self._load_models() test_set = self._get_test_set(language) total_size = self._count_size(test_set) print('{} kB in total'.format(total_size / 1024)) t_start = time.perf_counter() self.test_class(models, language) t_end = time.perf_counter() print('{} seconds.'.format(t_end - t_start)) print('{} seconds per kB'.format(((t_end - t_start) / total_size) * 1024)) def _guess_file_language(self, models, filename): tokens = tokenizer(file_to_string(filename), 'letter') + tokens = tokens[-1024:] text = ' '.join(chr(token) for token in tokens) + #text = file_to_string(filename) + #tokens = tokenizer(text, 'word') + #tokens = b' '.join(tokens) + #text = ''.join(chr(token) for token in list(tokens)) result = [] for model_key in models.keys(): root_model = os.path.join(self._root_model, model_key) model = models[model_key] score = model.score(text) result.append((score, model_key)) return sorted(result, reverse=True) if __name__ == '__main__': if len(sys.argv) == 3 and sys.argv[1] == '--train': n = NGramProb(sys.argv[2]) n.train() elif len(sys.argv) == 3 and sys.argv[1] == '--test': n = NGramProb(sys.argv[2]) n.test() elif len(sys.argv) == 3 and sys.argv[1] == '--benchmark': n = NGramProb(sys.argv[2]) n.speed_benchmark() elif len(sys.argv) == 4 and sys.argv[1] == '--test': n = NGramProb(sys.argv[2]) n.test_class(n.load_models(), sys.argv[3]) else: print('Wrong arguments, please check your input.') diff --git a/swh/langdetect/utils/common.py b/swh/langdetect/utils/common.py index 65864f1..dc7bdc9 100644 --- a/swh/langdetect/utils/common.py +++ b/swh/langdetect/utils/common.py @@ -1,79 +1,155 @@ """ Here regroup basic preprocessing methods used in learning stage for different approaches. """ import re, os -_re_string = re.compile(r"""("(\\.|[^"\\])*"|'(\\.|[^'\\])*')""") -_re_number = re.compile(r'([\d]+)|([\d]+.[\d]+)[^A-Za-z]') -_re_separator = re.compile(r'(\W)') +#_re_string = re.compile(b"""("(\\.|[^"\\])*"|'(\\.|[^'\\])*')""") +_re_number = re.compile(b'([\d]+)|([\d]+.[\d]+)[^A-Za-z]') +_re_separator = re.compile(b'(\W)') _not_start_with_point = lambda x: not x.startswith('.') def tokenizer(text, re_name): ''' Splits text into tokens ''' if re_name == 'letter': return list(text) elif re_name == 'word': - return [word for word in _re_separator.split(text) if word.strip(' \t')] + return [word for word in _re_separator.split(text) if word.strip(b' \t')] def file_to_string(filename): """ Read a file to a string. """ with open(filename, 'rb') as f: data = f.read() return replace_string_and_number(data) def count_files(root_language): all_folders = natural_sort(filter (_not_start_with_point, os.listdir(root_language))) files = natural_sort(filter (_not_start_with_point, os.listdir(root_language + '/' + all_folders[-1]))) (max,_) = os.path.splitext(files[-1]) return int(max) def find_file(root_language, n): '''Find the n-th file in language folder''' if n > count_files(root_language): return '' else: start = (n - 1) // 1000 * 1000 + 1 end = start + 999 root_count = root_language + '/' + str(start) + '-' + str(end) files = natural_sort(filter (_not_start_with_point, os.listdir(root_count))) return root_count + '/' + files[n - start] def replace_string_and_number(text): """ Replace strings and numbers in a file by special tokens """ # str_replaced = re.sub(_re_string, '__str__', text) # str_num_replaced = re.sub(_re_number, '__num__', str_replaced) str_num_replaced = text return str_num_replaced def natural_sort(l): convert = lambda text: int(text) if text.isdigit() else text.lower() alphanum_key = lambda key: [ convert(c) for c in re.split('([0-9]+)', key) ] return sorted(l, key = alphanum_key) -def remove_comment(text): - # TODO: remove only inline comments and block comments - # TODO: maybe build a list of comment markers - pass +def remove_comment(binary_text, language): + splited_text = binary_text.splitlines() + text = b'\n'.join(splited_text) + regexp = get_regexp(language) + if not regexp: + return binary_text + return regexp.sub(b'\n', text) + +def get_regexp(language): + re_inline = get_inline(language) + re_block = get_block(language) + rs = [] + if re_inline: + rs.append(re_inline) + if re_block: + rs.append(re_block) + if rs == []: + return None + return re.compile(b'|'.join(rs), re.DOTALL) + + +def get_inline(language): + r_base = b'[^\\n]*(?:\\n|$)' + if language in ['Ada', + 'Eiffel', + 'VHDL', + 'AppleScript', + 'Haskell', + 'Lua', + 'PLSQL']: + r = b'(--)' + r_base + elif language in ['C', + 'C++', + 'C#', + 'D', + 'JavaScript', + 'ActionScript', + 'Java', + 'Rust']: + r = b'(//)' + r_base + elif language == 'Xojo': + r = b'(' + b'|'.join([b'//', b"\'"]) + b')' + r_base + elif language in ['R', + 'Tcl', + 'Awk', + 'Perl', + 'Perl 6', + 'Ruby', + 'Python']: + r = b'(#)' + r_base + elif language in ['COBOL']: + r = b'(\\*>)' + r_base + elif language in ['Matlab']: + r = b'(%)' + r_base + else: + return None + return b'(' + r + b')' +def get_block(language): + r_base = b'.*?' + if language in ['C', + 'C++', + 'C#', + 'JavaScript', + 'ActionScript', + 'PLSQL', + 'PHP', + 'Rust']: + r = b'(/\\*)' + r_base + b'(\\*/)' + elif language in ['OCaml', + 'Pascal', + 'Modula-2', + 'Smarty']: + r = b'(\\(\\*)' + r_base + b'(\\*\\))' + elif language == 'Python': + r = b'(\'\'\')' + r_base + b'(\'\'\')' + else: + return None + return b'(' + r + b')' + + def purify(text, lang): # TODO: for some language like HTML, remove code other than principal language pass diff --git a/swh/langdetect/utils/training.py b/swh/langdetect/utils/training.py index d0ca69b..77f7022 100644 --- a/swh/langdetect/utils/training.py +++ b/swh/langdetect/utils/training.py @@ -1,105 +1,108 @@ import os import random import csv -from utils.common import count_files, find_file, file_to_string +from .common import count_files, find_file, file_to_string from itertools import islice class Dataset: def __init__(self, root): self.root_code = os.path.join(root, '..', 'code_by_language') self.root_training = os.path.join(root, '..', 'training_set') self.root_training_csv = os.path.join(root, '..', 'training_set_csv') self.root_test = os.path.join(root, '..', 'test_set') self.root_test_csv = os.path.join(root, '..', 'test_set_csv') try: os.mkdir(self.root_training) except FileExistsError: pass try: os.mkdir(self.root_training_csv) except FileExistsError: pass try: os.mkdir(self.root_test) except FileExistsError: pass try: os.mkdir(self.root_test_csv) except FileExistsError: pass def build_training_set(self, languages): for language in languages: # limit defines the size of training set # upper defines the maximum size root_code_language = os.path.join(self.root_code, language) root_training_language = os.path.join(self.root_training, language) total = count_files(root_code_language) try: os.mkdir(root_training_language) except FileExistsError: pass upper = 1000 if total >= upper: limit = upper // 2 else: limit = total // 2 indices = random.sample(range(1, total + 1), limit) files = map(lambda x : find_file(root_code_language, x), indices) for src in files: basename = os.path.basename(src) des = os.path.join(root_training_language, basename) os.symlink(src, des) - def build_test_set(self, languages): + def build_test_set(self, languages, extension=True): for language in languages: root_language = os.path.join(self.root_code, language) root_test_language = os.path.join(self.root_test, language) try: os.mkdir(root_test_language) except FileExistsError: pass files = self.get_test_set(language) for src in files: - des = os.path.join(root_test_language, os.path.basename(src)) + if extension: + des = os.path.join(root_test_language, os.path.basename(src)) + else: + des = os.path.join(root_test_language, os.path.splitext(os.path.basename(src))[0]) os.symlink(src, des) def train_files_with_label(self, languages, maxsize): with open(os.path.join(self.root_training_csv, 'training_set.csv'), 'w', newline='') as csvfile: setwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL) for language in languages: root_training_language = os.path.join(self.root_training, language) index_lang = languages.index(language) for f in [x for x in os.listdir(root_training_language) if not x.startswith('.')]: filename = os.path.join(root_training_language, f) tokens = file_to_string(filename)[-maxsize:] # 10240 setwriter.writerow([index_lang, tokens]) def get_test_set(self, language): root_training_language = os.path.join(self.root_training, language) root_language = os.path.join(self.root_code, language) total = count_files(root_language) training_set = [int(os.path.splitext(x)[0]) for x in os.listdir(root_training_language) if not x.startswith('.')] it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set and os.path.getsize(find_file(root_language, x)) <= 1048576) test_set = list(islice(it, 1000)) if len(test_set) == 0: it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set) test_set = list(islice(it, 1000)) return test_set def test_files_with_label(self, languages): for language in languages: root_test_language = os.path.join(self.root_test, language) index_lang = languages.index(language) with open(os.path.join(self.root_test_csv, language + '.csv'), 'w', newline='') as csvfile: setwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL) for f in [x for x in os.listdir(root_test_language) if not x.startswith('.')]: filename = os.path.join(root_test_language, f) tokens = file_to_string(filename) setwriter.writerow([index_lang, tokens])