diff --git a/scripts/comparison.pdf b/scripts/comparison.pdf index 82e40f1..0e69977 100644 Binary files a/scripts/comparison.pdf and b/scripts/comparison.pdf differ diff --git a/scripts/comparison.pdf b/scripts/comparison_2.pdf similarity index 100% copy from scripts/comparison.pdf copy to scripts/comparison_2.pdf diff --git a/scripts/draw_accuracy.py b/scripts/draw_accuracy.py index 979645f..ad3a51a 100644 --- a/scripts/draw_accuracy.py +++ b/scripts/draw_accuracy.py @@ -1,138 +1,144 @@ #!/bin/bash/python3 import sys from pickle import load from collections import namedtuple, Counter try: import numpy as np import matplotlib.pyplot as plt from matplotlib.ticker import MaxNLocator except ImportError: raise ImportError('Please install matplotlib') def heatmap(path): with open(path, 'rb') as f: data = load(f) mat = process(data) labels = sorted(data) fig, ax = plt.subplots() fig.set_size_inches(100,100) heatmap = ax.matshow(mat, cmap='Blues') fig = plt.gcf() ax.set_frame_on(False) ax.set_yticks(np.arange(len(labels)), minor=False) ax.set_xticks(np.arange(len(labels)), minor=False) ax.set_xlabel('Classification of test files') ax.set_ylabel('Ground truth class of test files') ax.set_xticklabels(labels, minor=False) ax.set_yticklabels(labels, minor=False) ax.xaxis.tick_top() ax.xaxis.set_label_position('top') plt.xticks(rotation=90) ax.grid(False) ''' for i in np.arange(len(mat)): for j in np.arange(len(mat[i])): ax.text(i, j, "%.1f" % (mat[i][j] * 100), color='white') ''' ax = plt.gca() for t in ax.xaxis.get_major_ticks(): t.tick1On = False t.tick2On = False for t in ax.yaxis.get_major_ticks(): t.tick1On = False t.tick2On = False fig.savefig("results.pdf", bbox_inches='tight') def process(data): ''' ''' ldata = sorted(data) length = len(ldata) out = [[0 for x in range(length)] for y in range(length)] for lang in ldata: index_lan = ldata.index(lang) ok = data[lang][0] if data[lang][1] > 1000 : test_size = 1000 else: test_size = data[lang][1] result = [x[1] for x in data[lang][3]] counter = dict(Counter(result)) for res_lan in counter.keys(): index_res = ldata.index(res_lan) out[index_lan][index_res] = counter.get(res_lan, 0) / test_size return out def get_accuracy(data): ldata = sorted(data) out = {} for lang in ldata: ok = data[lang][0] if data[lang][1] > 1000: test_size = 1000 else: test_size = data[lang][1] result = [x[1] for x in data[lang][3]] counter = dict(Counter(result)) out[lang] = counter.get(lang, 0) / test_size return out -def compare(baseline, target): +def compare(baseline, target1, target2): with open(baseline, 'rb') as f: data = load(f) dict_base = get_accuracy(data) - with open(target, 'rb') as f: + with open(target1, 'rb') as f: data = load(f) - dict_targ = get_accuracy(data) + dict_targ1 = get_accuracy(data) - all_lang = sorted(list(set().union(dict_base.keys(),dict_targ.keys())))[::-1] + with open(target2, 'rb') as f: + data = load(f) + dict_targ2 = get_accuracy(data) + + all_lang = sorted(list(set().union(dict_base.keys(),dict_targ1.keys())))[::-1] n = len(all_lang) acc_base = [dict_base.get(lang, 0) for lang in all_lang] - acc_targ = [dict_targ.get(lang, 0) for lang in all_lang] + acc_targ1 = [dict_targ1.get(lang, 0) for lang in all_lang] + acc_targ2 = [dict_targ2.get(lang, 0) for lang in all_lang] fig, ax = plt.subplots() - fig.set_size_inches(10,200) + fig.set_size_inches(10,250) ind = np.arange(n) - width = 0.35 + width = 0.25 opacity = 0.4 - rects1 = ax.barh(ind + width, acc_base, width, alpha=opacity, color='b', label='N-grams with frequency distance') - rects2 = ax.barh(ind, acc_targ, width, alpha=opacity, color='r', label='N-grams with probability') - + rects1 = ax.barh(ind + 1.5 * width, acc_base, width, alpha=opacity, color='b', label='N-grams with frequency distance') + rects2 = ax.barh(ind + 0.5 * width, acc_targ1, width, alpha=opacity, color='r', label='N-grams with probability') + rects3 = ax.barh(ind - 0.5 * width, acc_targ2, width, alpha=opacity, color='y', label='Multinominal Naive Bayes') ax.set_xlabel('Accuracy / %') ax.set_yticks(ind + width / 2) ax.set_yticklabels(all_lang) vals = ax.get_xticks() ax.set_xticklabels(['{:3.0f}%'.format(x * 100) for x in vals]) ax.xaxis.tick_top() ax.legend() def autolabel(rects): for rect in rects: width = rect.get_width() ax.text(width + 0.01, rect.get_y() + rect.get_height() / 2., '{0:.1f}%'.format(width * 100), ha='left', va='center') autolabel(rects1) autolabel(rects2) + autolabel(rects3) plt.ylim([-1,n+1]) fig.tight_layout() fig.savefig("comparison.pdf", bbox_inches='tight') if __name__ == '__main__': if len(sys.argv) == 2: heatmap(sys.argv[1]) - elif len(sys.argv) == 3: - compare(sys.argv[1],sys.argv[2]) + elif len(sys.argv) == 4: + compare(sys.argv[1],sys.argv[2],sys.argv[3]) else: print('Please check arguments.') diff --git a/swh/langdetect/cnn.py b/swh/langdetect/cnn.py index 81788c6..8dbafb4 100644 --- a/swh/langdetect/cnn.py +++ b/swh/langdetect/cnn.py @@ -1,247 +1,328 @@ -import os, sys, subprocess, time -import kenlm +import os, sys, subprocess, time, random +import numpy as np +import tensorflow as tf from itertools import islice from pickle import dump, load from collections import Counter from numpy import array from utils.common import tokenizer, file_to_string, find_file, count_files from keras.utils.vis_utils import plot_model from keras.preprocessing.sequence import pad_sequences -from keras.models import Model -from keras.layers import Input, Dense, Flatten, Dropout, Embedding, ThresholdedReLU +from keras.models import Model, Sequential +from keras.layers import Input, Dense, Flatten, Dropout, Embedding, ThresholdedReLU, Activation, Lambda from keras.layers.convolutional import Convolution1D, MaxPooling1D +from keras.layers.normalization import BatchNormalization from keras.layers.merge import concatenate +from keras.utils import np_utils +from keras.optimizers import SGD class CNN: def __init__(self, root): # Root of dataset self._root = root # Root of training set self._root_training_set = os.path.join(self._root, '..', 'training_set') # Root of model folder self._root_model = os.path.join(self._root, '..', 'model_cnn') # Root of arranged dataset self._root_language_dataset = os.path.join(self._root, '..', 'code_by_language') # Path of result self._path_result = os.path.join(self._root, '..', 'result_cnn') def train(self): try: if len(os.listdir(self._root_training_set)) == 0: build_training_set(self._root) try: os.mkdir(self._root_model) except FileExistsError: pass except FileNotFoundError: os.mkdir(self._root_training_set) build_training_set(self._root) languages = [x for x in os.listdir(self._root_training_set) if not x.startswith('.')] try: f = open(os.path.join(self._root, '..', 'model_cnn', 'texts+labels'), 'rb') train_file_with_label = load(f) except FileNotFoundError: train_file_with_label = self._train_file_with_label(languages) with open(os.path.join(self._root, '..', 'model_cnn', 'texts+labels'), 'wb') as f: dump(train_file_with_label, f) - length = 1000 + length = 1024 vocab_size = 256 total_class = len(languages) + batch_size = 128 model = self._get_model(length, vocab_size, total_class) - model.fit_generator(self._generator(length, total_class), steps_per_epoch=len(train_file_with_label), epochs=10) + model.fit_generator(self._generator(length, total_class, batch_size), steps_per_epoch=len(train_file_with_label)/batch_size, epochs=100) model.save(os.path.join(self._root, '..', 'model_cnn', 'model.h5')) - def _generator(self, length, total_class): + def _generator(self, length, total_class, batch_size=128): + counter = 0 while True: with open(os.path.join(self._root, '..', 'model_cnn', 'texts+labels'), 'rb') as f: train_file_with_label = load(f) + train_file_with_label = [(a,b) for (a,b) in train_file_with_label if b < total_class ] + random.shuffle(train_file_with_label) for pair in train_file_with_label: + if counter == 0: + X = np.empty((0, length)) + Y = np.empty((0, total_class)) path, label = pair tokens = [x + 1 for x in tokenizer(file_to_string(path), 'letter')] - tokens = pad_sequences([tokens], maxlen=length, padding='post') - truth = array([[0 for _ in range(total_class)]]) - truth[0][label] = 1 - yield ([tokens], truth) + X = np.append(X, pad_sequences([tokens], maxlen=length), axis=0) + label = array(np_utils.to_categorical([label], total_class)) + Y = np.append(Y, label, axis=0) + + counter += 1 + if counter == batch_size: + counter = 0 + yield(X,Y) def _train_file_with_label(self, languages): l = [] for language in languages: root_training_set_language = os.path.join(self._root_training_set, language) root_stat_language = os.path.join(self._root_model, language) index_lang = languages.index(language) if os.path.isfile(root_stat_language): continue print(language) for f in [x for x in os.listdir(root_training_set_language) if not x.startswith('.')]: filename = os.path.join(root_training_set_language, f) l.append((filename, index_lang)) return l + ''' + def _get_model(self, length, vocab_size, total_class): + num_filters = [64, 128, 256, 512] + num_classes = total_class + sequence_max_length = length + num_quantized_chars = vocab_size + 1 + embedding_size = 16 + learning_rate = 0.001 + top_k = 3 + + class ConvBlockLayer(object): + """ + two layer ConvNet. Apply batch_norm and relu after each layer + """ + + def __init__(self, input_shape, num_filters): + self.model = Sequential() + # first conv layer + self.model.add(Convolution1D(filters=num_filters, kernel_size=3, strides=1, padding="same", input_shape=input_shape)) + self.model.add(BatchNormalization()) + self.model.add(Activation('relu')) + + # second conv layer + self.model.add(Convolution1D(filters=num_filters, kernel_size=3, strides=1, padding="same")) + self.model.add(BatchNormalization()) + self.model.add(Activation('relu')) + + def __call__(self, inputs): + return self.model(inputs) + + def get_conv_shape(conv): + return conv.get_shape().as_list()[1:] + + inputs = Input(shape=(sequence_max_length, ), dtype='int32', name='inputs') + + embedded_sent = Embedding(num_quantized_chars, embedding_size, input_length=sequence_max_length)(inputs) + + # First conv layer + conv = Convolution1D(filters=64, kernel_size=3, strides=2, padding="same")(embedded_sent) + + # Each ConvBlock with one MaxPooling Layer + for i in range(len(num_filters)): + conv = ConvBlockLayer(get_conv_shape(conv), num_filters[i])(conv) + conv = MaxPooling1D(pool_size=3, strides=2, padding="same")(conv) + + # k-max pooling (Finds values and indices of the k largest entries for the last dimension) + def _top_k(x): + x = tf.transpose(x, [0, 2, 1]) + k_max = tf.nn.top_k(x, k=top_k) + return tf.reshape(k_max[0], (-1, num_filters[-1] * top_k)) + k_max = Lambda(_top_k, output_shape=(num_filters[-1] * top_k,))(conv) + + # 3 fully-connected layer with dropout regularization + fc1 = Dropout(0.2)(Dense(128, activation='relu', kernel_initializer='he_normal')(k_max)) + fc2 = Dropout(0.2)(Dense(128, activation='relu', kernel_initializer='he_normal')(fc1)) + fc3 = Dense(num_classes, activation='softmax')(fc2) + + # define optimizer + sgd = SGD(lr=learning_rate, decay=1e-6, momentum=0.9, nesterov=False) + model = Model(inputs=inputs, outputs=fc3) + model.compile(optimizer=sgd, loss='mean_squared_error', metrics=['accuracy']) + + print(model.summary()) + return model +''' + def _get_model(self, length, vocab_size, total_class): input_size = length alphabet_size = vocab_size - embedding_size = 128 + embedding_size = 256 conv_layers = [(256,7,3), (256,7,3), (256,3,-1), (256,3,-1), (256,3,-1), (256,3,3)] threshold = 1e-6 fully_connected_layers = [1024, 1024] - dropout_p = 0.5 + dropout_p = 0.2 optimizer = 'adam' loss = 'categorical_crossentropy' num_of_classes = total_class # Input layer inputs = Input(shape=(input_size,), name='sent_input', dtype='int64') # Embedding layers x = Embedding(alphabet_size + 1, embedding_size, input_length=input_size)(inputs) # Convolution layers for cl in conv_layers: x = Convolution1D(cl[0], cl[1])(x) x = ThresholdedReLU(threshold)(x) if cl[2] != -1: x = MaxPooling1D(cl[2])(x) x = Flatten()(x) # Fully connected layers for fl in fully_connected_layers: x = Dense(fl)(x) x = ThresholdedReLU(threshold)(x) x = Dropout(dropout_p)(x) # Output layer predictions = Dense(num_of_classes, activation='softmax')(x) # Build and compile model model = Model(inputs=inputs, outputs=predictions) model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy']) print(model.summary()) return model def _max_len(self, texts): return max([len(text) for text in texts]) def _vocabulary_size(self, texts): vocabulary = dict(Counter([token for text in texts for token in text])) return len(vocabulary.keys()) def test(self): try: r = open(self._path_result, 'rb') test_result = load(r) r.close() except FileNotFoundError: test_result = {} models = self._load_models() for language in [x for x in os.listdir(self._root_model) if not x.startswith('.') and x not in test_result.keys()]: test_result[language] = self.test_class(models, language) with open(self._path_result, 'wb') as f: dump(test_result, f) def _load_models(self): models = {} for model in [model for model in os.listdir(self._root_model) if not model.startswith('.')]: root_model = os.path.join(self._root_model, model) models[model] = kenlm.LanguageModel(root_model) return models def _get_test_set(self, language): root_training_language = os.path.join(self._root_training_set, language) root_language = os.path.join(self._root_language_dataset, language) total = count_files(root_language) training_set = [int(os.path.splitext(x)[0]) for x in os.listdir(root_training_language) if not x.startswith('.')] it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set and os.path.getsize(find_file(root_language, x)) <= 1048576) test_set = list(islice(it, 1000)) if len(test_set) == 0: it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set) test_set = list(islice(it, 1000)) return test_set def _count_size(self, files): size = 0 for f in files: size += os.path.getsize(f) return size def test_class(self, models, language): test_set = self._get_test_set(language) ok = 0 results = [] count = 0 length = len(test_set) for test in test_set: result = self._guess_file_language(models, test) count += 1 print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, length, result[0][1], result[0][0]),end='\r') results.append(result[0]) if result[0][1] == language: ok += 1 total_test = len(test_set) accuracy = ok / len(test_set) print('Tests for {} '.format(language)) print('Total test files : {}'.format(total_test)) print('Correctly classified files : {}'.format(ok)) print('Accuracy : {}%'.format(accuracy * 100)) return (ok, len(test_set), accuracy, results) def speed_benchmark(self): language = [x for x in os.listdir(self._root_model) if not x.startswith('.')][10] models = self._load_models() test_set = self._get_test_set(language) total_size = self._count_size(test_set) print('{} kB in total'.format(total_size / 1024)) t_start = time.perf_counter() self.test_class(models, language) t_end = time.perf_counter() print('{} seconds.'.format(t_end - t_start)) print('{} seconds per kB'.format(((t_end - t_start) / total_size) * 1024)) def _guess_file_language(self, models, filename): tokens = tokenizer(file_to_string(filename), 'letter') text = ' '.join(chr(token) for token in tokens) result = [] for model_key in models.keys(): root_model = os.path.join(self._root_model, model_key) model = models[model_key] score = model.score(text) result.append((score, model_key)) return sorted(result, reverse=True) if __name__ == '__main__': if len(sys.argv) == 3 and sys.argv[1] == '--train': n = CNN(sys.argv[2]) n.train() elif len(sys.argv) == 3 and sys.argv[1] == '--test': n = NGramProb(sys.argv[2]) n.test() elif len(sys.argv) == 3 and sys.argv[1] == '--benchmark': n = NGramProb(sys.argv[2]) n.speed_benchmark() elif len(sys.argv) == 4 and sys.argv[1] == '--test': n = NGramProb(sys.argv[2]) n.test_class(n.load_models(), sys.argv[3]) else: print('Wrong arguments, please check your input.') diff --git a/swh/langdetect/cnn.py b/swh/langdetect/guesslang.py similarity index 52% copy from swh/langdetect/cnn.py copy to swh/langdetect/guesslang.py index 81788c6..1c98909 100644 --- a/swh/langdetect/cnn.py +++ b/swh/langdetect/guesslang.py @@ -1,247 +1,202 @@ import os, sys, subprocess, time -import kenlm +import numpy as np +import tensorflow as tf from itertools import islice from pickle import dump, load from collections import Counter from numpy import array from utils.common import tokenizer, file_to_string, find_file, count_files -from keras.utils.vis_utils import plot_model from keras.preprocessing.sequence import pad_sequences -from keras.models import Model -from keras.layers import Input, Dense, Flatten, Dropout, Embedding, ThresholdedReLU -from keras.layers.convolutional import Convolution1D, MaxPooling1D -from keras.layers.merge import concatenate +from keras.utils import np_utils - -class CNN: +class Guesslang: def __init__(self, root): # Root of dataset self._root = root # Root of training set self._root_training_set = os.path.join(self._root, '..', 'training_set') # Root of model folder - self._root_model = os.path.join(self._root, '..', 'model_cnn') + self._root_model = os.path.join(self._root, '..', 'model_guesslang') # Root of arranged dataset self._root_language_dataset = os.path.join(self._root, '..', 'code_by_language') # Path of result - self._path_result = os.path.join(self._root, '..', 'result_cnn') + self._path_result = os.path.join(self._root, '..', 'result_guesslang') + + self.languages = [x for x in os.listdir(self._root_training_set) if not x.startswith('.')] + self.LENGTH = 1000 + self.TOTAL_CLASS = len(self.languages) + + feature_columns = [tf.contrib.layers.real_valued_column('', dimension=self.LENGTH)] + + self._classifer = tf.contrib.learn.DNNLinearCombinedClassifier( + linear_feature_columns=feature_columns, + dnn_feature_columns=feature_columns, + dnn_hidden_units=[256, 64, 16], + n_classes=self.TOTAL_CLASS, + linear_optimizer=tf.train.RMSPropOptimizer(0.05), + dnn_optimizer=tf.train.RMSPropOptimizer(0.05), + model_dir=self._root_model, + fix_global_step_increment_bug=True + ) def train(self): try: if len(os.listdir(self._root_training_set)) == 0: build_training_set(self._root) try: os.mkdir(self._root_model) except FileExistsError: pass except FileNotFoundError: os.mkdir(self._root_training_set) build_training_set(self._root) - languages = [x for x in os.listdir(self._root_training_set) if not x.startswith('.')] try: - f = open(os.path.join(self._root, '..', 'model_cnn', 'texts+labels'), 'rb') + f = open(os.path.join(self._root, '..', 'model_guesslang', 'texts+labels'), 'rb') train_file_with_label = load(f) except FileNotFoundError: - train_file_with_label = self._train_file_with_label(languages) - with open(os.path.join(self._root, '..', 'model_cnn', 'texts+labels'), 'wb') as f: + train_file_with_label = self._train_file_with_label() + with open(os.path.join(self._root, '..', 'model_guesslang', 'texts+labels'), 'wb') as f: dump(train_file_with_label, f) - length = 1000 - vocab_size = 256 - total_class = len(languages) - - model = self._get_model(length, vocab_size, total_class) - model.fit_generator(self._generator(length, total_class), steps_per_epoch=len(train_file_with_label), epochs=10) - model.save(os.path.join(self._root, '..', 'model_cnn', 'model.h5')) - - def _generator(self, length, total_class): - while True: - with open(os.path.join(self._root, '..', 'model_cnn', 'texts+labels'), 'rb') as f: - train_file_with_label = load(f) - for pair in train_file_with_label: - path, label = pair - tokens = [x + 1 for x in tokenizer(file_to_string(path), 'letter')] - tokens = pad_sequences([tokens], maxlen=length, padding='post') - truth = array([[0 for _ in range(total_class)]]) - truth[0][label] = 1 - yield ([tokens], truth) + for index in range(self.TOTAL_CLASS): + self._classifer.partial_fit(input_fn=lambda:self._generator(self.LENGTH, self.TOTAL_CLASS, index),steps=500) + + def _generator(self, length, total_class, index): + print("Language: {}".format(index)) + with open(os.path.join(self._root, '..', 'model_guesslang', 'texts+labels'), 'rb') as f: + train_file_with_label = load(f) + X = np.empty((0, length)) + Y = np.empty((0, 1), dtype=int) + for path, label in train_file_with_label: + if label == index: + X = np.append(X, self._file_to_x(path), axis=0) + l = array([label], dtype=int) + Y = np.append(Y, l) + return tf.convert_to_tensor(X), tf.convert_to_tensor(Y) + + def _file_to_x(self, filename): + wrapper = (lambda x: x + 1) + tokens = [wrapper(x) for x in tokenizer(file_to_string(filename), 'letter')] + return pad_sequences([tokens], maxlen=self.LENGTH) - def _train_file_with_label(self, languages): + def _train_file_with_label(self): l = [] - for language in languages: + for language in self.languages: root_training_set_language = os.path.join(self._root_training_set, language) root_stat_language = os.path.join(self._root_model, language) - index_lang = languages.index(language) + index_lang = self.languages.index(language) if os.path.isfile(root_stat_language): continue print(language) for f in [x for x in os.listdir(root_training_set_language) if not x.startswith('.')]: filename = os.path.join(root_training_set_language, f) l.append((filename, index_lang)) return l - - def _get_model(self, length, vocab_size, total_class): - - input_size = length - alphabet_size = vocab_size - embedding_size = 128 - conv_layers = [(256,7,3), (256,7,3), (256,3,-1), (256,3,-1), (256,3,-1), (256,3,3)] - threshold = 1e-6 - fully_connected_layers = [1024, 1024] - dropout_p = 0.5 - optimizer = 'adam' - loss = 'categorical_crossentropy' - num_of_classes = total_class - - # Input layer - inputs = Input(shape=(input_size,), name='sent_input', dtype='int64') - # Embedding layers - x = Embedding(alphabet_size + 1, embedding_size, input_length=input_size)(inputs) - # Convolution layers - for cl in conv_layers: - x = Convolution1D(cl[0], cl[1])(x) - x = ThresholdedReLU(threshold)(x) - if cl[2] != -1: - x = MaxPooling1D(cl[2])(x) - x = Flatten()(x) - # Fully connected layers - for fl in fully_connected_layers: - x = Dense(fl)(x) - x = ThresholdedReLU(threshold)(x) - x = Dropout(dropout_p)(x) - # Output layer - predictions = Dense(num_of_classes, activation='softmax')(x) - # Build and compile model - model = Model(inputs=inputs, outputs=predictions) - model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy']) - - print(model.summary()) - - return model - def _max_len(self, texts): return max([len(text) for text in texts]) def _vocabulary_size(self, texts): vocabulary = dict(Counter([token for text in texts for token in text])) return len(vocabulary.keys()) def test(self): try: r = open(self._path_result, 'rb') test_result = load(r) r.close() except FileNotFoundError: test_result = {} - models = self._load_models() - for language in [x for x in os.listdir(self._root_model) if not x.startswith('.') and x not in test_result.keys()]: - test_result[language] = self.test_class(models, language) + for language in [x for x in os.listdir(self._root_training_set) if not x.startswith('.') and x not in test_result.keys()]: + test_result[language] = self.test_class(language) with open(self._path_result, 'wb') as f: dump(test_result, f) - - def _load_models(self): - models = {} - - for model in [model - for model in os.listdir(self._root_model) - if not model.startswith('.')]: - root_model = os.path.join(self._root_model, model) - models[model] = kenlm.LanguageModel(root_model) - return models def _get_test_set(self, language): root_training_language = os.path.join(self._root_training_set, language) root_language = os.path.join(self._root_language_dataset, language) total = count_files(root_language) training_set = [int(os.path.splitext(x)[0]) for x in os.listdir(root_training_language) if not x.startswith('.')] it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set and os.path.getsize(find_file(root_language, x)) <= 1048576) test_set = list(islice(it, 1000)) if len(test_set) == 0: it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set) test_set = list(islice(it, 1000)) return test_set def _count_size(self, files): size = 0 for f in files: size += os.path.getsize(f) return size - def test_class(self, models, language): + def test_class(self, language): test_set = self._get_test_set(language) ok = 0 results = [] count = 0 length = len(test_set) for test in test_set: - result = self._guess_file_language(models, test) + result = self._guess_file_language(test) count += 1 - print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, length, result[0][1], result[0][0]),end='\r') - results.append(result[0]) - if result[0][1] == language: + print('[{0:4d}/{1:4d}] {2} '.format(count, length, result),end='\r') + results.append(result) + if result == language: ok += 1 total_test = len(test_set) accuracy = ok / len(test_set) print('Tests for {} '.format(language)) print('Total test files : {}'.format(total_test)) print('Correctly classified files : {}'.format(ok)) print('Accuracy : {}%'.format(accuracy * 100)) return (ok, len(test_set), accuracy, results) def speed_benchmark(self): language = [x for x in os.listdir(self._root_model) if not x.startswith('.')][10] models = self._load_models() test_set = self._get_test_set(language) total_size = self._count_size(test_set) print('{} kB in total'.format(total_size / 1024)) t_start = time.perf_counter() self.test_class(models, language) t_end = time.perf_counter() print('{} seconds.'.format(t_end - t_start)) print('{} seconds per kB'.format(((t_end - t_start) / total_size) * 1024)) - def _guess_file_language(self, models, filename): - tokens = tokenizer(file_to_string(filename), 'letter') - text = ' '.join(chr(token) for token in tokens) + def _guess_file_language(self, filename): + x = self._file_to_x(filename) - result = [] - - for model_key in models.keys(): - root_model = os.path.join(self._root_model, model_key) - model = models[model_key] - score = model.score(text) - result.append((score, model_key)) - return sorted(result, reverse=True) + result = list(self._classifer.predict(x=x))[0] + return self.languages[result] if __name__ == '__main__': if len(sys.argv) == 3 and sys.argv[1] == '--train': - n = CNN(sys.argv[2]) + n = Guesslang(sys.argv[2]) n.train() elif len(sys.argv) == 3 and sys.argv[1] == '--test': - n = NGramProb(sys.argv[2]) + n = Guesslang(sys.argv[2]) n.test() elif len(sys.argv) == 3 and sys.argv[1] == '--benchmark': n = NGramProb(sys.argv[2]) n.speed_benchmark() elif len(sys.argv) == 4 and sys.argv[1] == '--test': n = NGramProb(sys.argv[2]) n.test_class(n.load_models(), sys.argv[3]) else: print('Wrong arguments, please check your input.') diff --git a/swh/langdetect/naivebayesian.py b/swh/langdetect/naivebayesian.py index 495dd86..86f1114 100644 --- a/swh/langdetect/naivebayesian.py +++ b/swh/langdetect/naivebayesian.py @@ -1,221 +1,223 @@ """ Naive Bayesian """ import os, sys, operator, nltk, random, time import numpy as np from itertools import islice from pickle import dump, load from utils.common import tokenizer, file_to_string, find_file, count_files from utils.training import build_training_set from nltk.util import ngrams from collections import Counter from sklearn.naive_bayes import MultinomialNB from sklearn.feature_extraction.text import HashingVectorizer, TfidfTransformer from sklearn.externals import joblib class NaiveBayesian: def __init__(self, root): # Root of dataset self._root = root # Root of training set self._root_training_set = os.path.join(self._root, '..', 'training_set') # Root of model folder self._root_model = os.path.join(self._root, '..', 'model_bayesian') # Root of arranged dataset self._root_language_dataset = os.path.join(self._root, '..', 'code_by_language') # Path of result self._path_result = os.path.join(self._root, '..', 'result_bayesian') self._languages = [x for x in os.listdir(self._root_training_set) if not x.startswith('.')] def train(self): ''' train () generates and stores counted n-grams in '_root_model' folder ''' try: if len(os.listdir(self._root_training_set)) == 0: build_training_set(self._root) try: os.mkdir(self._root_model) except FileExistsError: pass except FileNotFoundError: os.mkdir(self._root_training_set) build_training_set(self._root) ''' Calculate frequencies of generated n-grams then store them into a sorted list of (ngram, count) ''' clf = MultinomialNB() cv = HashingVectorizer(analyzer='char', ngram_range=(1, 3), n_features=2**17, alternate_sign=False) indices = list(range(len(self._languages))) for language in self._languages: root_training_set_language = os.path.join(self._root_training_set, language) root_stat_language = os.path.join(self._root_model, 'classifier') index_lang = self._languages.index(language) texts = [] for f in os.listdir(root_training_set_language): if not f.startswith('.'): print(f) filename = os.path.join(root_training_set_language, f) tokens = tokenizer(file_to_string(filename), 'letter') text = ''.join([chr(token) for token in tokens]) texts.append(text) counts = cv.fit_transform(texts) tf = TfidfTransformer(use_idf=False).fit(counts) normalised = tf.transform(counts) clf.partial_fit(normalised, np.array([index_lang for _ in texts]), indices) with open(root_stat_language + '.clf', 'wb') as f: joblib.dump(clf, f) with open(root_stat_language + '.hv', 'wb') as f: joblib.dump(cv, f) def test(self): try: r = open(self._path_result, 'rb') test_result = load(r) r.close() except FileNotFoundError: test_result = {} - with open(os.path.join(self._root_model, 'classifier'), 'rb') as f: - clf, cv = load(f) + with open(os.path.join(self._root_model, 'classifier.clf'), 'rb') as f: + clf = joblib.load(f) + with open(os.path.join(self._root_model, 'classifier.hv'), 'rb') as f: + cv = joblib.load(f) for language in [x for x in os.listdir(self._root_training_set) if not x.startswith('.') and x not in test_result.keys()]: test_result[language] = self.test_class((clf, cv), language) with open(self._path_result, 'wb') as f: dump(test_result, f) def speed_benchmark(self): language = [x for x in os.listdir(self._root_training_set) if not x.startswith('.')][10] models = self._load_models() test_set = self._get_test_set(language) total_size = self._count_size(test_set) print('{} kB in total'.format(total_size / 1024)) t_start = time.perf_counter() self.test_class(models, language) t_end = time.perf_counter() print('{} seconds.'.format(t_end - t_start)) print('{} seconds per kB'.format(((t_end - t_start) / total_size) * 1024)) def _get_test_set(self, language): root_training_language = os.path.join(self._root_training_set, language) root_language = os.path.join(self._root_language_dataset, language) total = count_files(root_language) training_set = [int(os.path.splitext(x)[0]) for x in os.listdir(root_training_language) if not x.startswith('.')] it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set and os.path.getsize(find_file(root_language, x)) <= 1048576) test_set = list(islice(it, 1000)) if len(test_set) == 0: it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set) test_set = list(islice(it, 1000)) return test_set def _count_size(self, files): size = 0 for f in files: size += os.path.getsize(f) return size def test_class(self, clf, language): test_set = self._get_test_set(language) index_lang = self._languages.index(language) ok = 0 results = [] count = 0 length = len(test_set) for test in test_set: result = self._guess_file_language(clf, test) count += 1 print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, length, result[0][1], result[0][0]),end='\r') results.append(result[0]) if result[0][1] == language: ok += 1 total_test = len(test_set) accuracy = ok / len(test_set) print('Tests for {} '.format(language)) print('Total test files : {}'.format(total_test)) print('Correctly classified files : {}'.format(ok)) print('Accuracy : {}%'.format(accuracy * 100)) return (ok, len(test_set), accuracy, results) def test_single(self, filename): self._guess_file_language(clf, filename) def _guess_file_language(self, cc, filename): clf = cc[0] cv = cc[1] tokens = tokenizer(file_to_string(filename), 'letter') text = ''.join([chr(token) for token in tokens]) counts = cv.fit_transform([text]) tf = TfidfTransformer(use_idf=False).fit(counts) normalised = tf.transform(counts) result = clf.predict_log_proba(normalised) result = [(val, self._languages[idx]) for idx, val in enumerate(result[0])] return sorted(result, reverse=True) def _sort_by_value(self, statistics): statistics_sorted = sorted(statistics.items(), key = operator.itemgetter(1), reverse = True)[:500] return statistics_sorted def _distance(self, model_profile, test_profile): distance = 0 maximum = len(test_profile) for test_ngram in test_profile.keys(): test_rank = test_profile.get(test_ngram) model_rank = model_profile.get(test_ngram, maximum) d = abs(test_rank - model_rank) distance += d return distance ''' def _prob(model, trigrams): print('Checking {} model ...'.format(model)) with open(model, 'rb') as f: kneser_ney = load(f) result = 1 for trigram in trigrams: prob = kneser_ney.prob(trigram) result = result * prob return result ''' if __name__ == '__main__': if len(sys.argv) == 3 and sys.argv[1] == '--train': n = NaiveBayesian(sys.argv[2]) n.train() elif len(sys.argv) == 3 and sys.argv[1] == '--test': n = NaiveBayesian(sys.argv[2]) n.test() elif len(sys.argv) == 3 and sys.argv[1] == '--benchmark': n = NaiveBayesian(sys.argv[2]) n.speed_benchmark() elif len(sys.argv) == 4 and sys.argv[1] == '--test': n = NaiveBayesian(sys.argv[2]) n.test_class(n.load_models(), sys.argv[3]) else: print('Wrong arguments, please check your input.')