Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/langdetect/__init__.py b/swh/langdetect/__init__.py
index 5f8841b..947d176 100644
--- a/swh/langdetect/__init__.py
+++ b/swh/langdetect/__init__.py
@@ -1,5 +1,12 @@
"""
-Detectlang detects the programming language of source code file.
+Langdetect detects the programming language of source code file.
"""
+from .cnn import CNN
+
+__cnn_classifer = CNN(None, 4096, None)
+
+def classify(path):
+ __cnn_classifer.classify(path)
+
diff --git a/swh/langdetect/cnn.py b/swh/langdetect/cnn.py
index ad6298a..42faba4 100644
--- a/swh/langdetect/cnn.py
+++ b/swh/langdetect/cnn.py
@@ -1,346 +1,346 @@
import os
import sys
import subprocess
import time
import random
import csv
import numpy as np
import warnings
import gzip
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import tensorflow as tf
import json
import argparse
import magic
from ast import literal_eval
from pickle import dump
from pickle import load
from numpy import array
from .utils.common import Tokenizer
from .utils.common import file_to_string
from keras.preprocessing.sequence import pad_sequences
from keras.callbacks import EarlyStopping
from keras.models import Model
from keras.models import Sequential
from keras.models import load_model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Dropout, AlphaDropout
from keras.layers import ThresholdedReLU
from keras.layers import Activation
from keras.layers import Lambda
from keras.layers import Embedding
from keras.layers import Concatenate, GlobalMaxPooling1D
from keras.layers.convolutional import Convolution1D, MaxPooling1D
from keras.layers.normalization import BatchNormalization
from keras.utils import np_utils
from keras.optimizers import SGD
#from pyspark import SparkContext, SparkConf
#from elephas.spark_model import SparkModel # pip install flask
#from elephas import optimizers as elephas_optimizers
#from elephas.utils.rdd_utils import to_labeled_point
csv.field_size_limit(sys.maxsize)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
from keras import backend as K
#K.set_session(K.tf.Session(config=K.tf.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)))
def main():
parser = argparse.ArgumentParser(description='Training and test tool of charactor-level ConvNet text categorisation.')
subparsers = parser.add_subparsers(dest='sub_command')
parser_train = subparsers.add_parser('train', help='Training on the dataset, dataset must be a *.csv file. A model will be created in the same directory.')
parser_train.add_argument('-s', '--spark', type=bool, help='Training on cluster.', dest='train_spark')
parser_train.add_argument('train_path', metavar='PATH', type=str, help='Path of the training dataset.')
parser_train.add_argument('-ms', '--maxsize', metavar='SIZE', dest='train_maxsize', type=int, help='Set maximum input size of ConvNet, default 1024.')
parser_train.add_argument('-e', '--epochs', metavar='N', dest='train_epochs', type=int, help='Number of training epochs (iterations), default 50.')
parser_test = subparsers.add_parser('test', help='Test on the dataset, dataset must be a directory with *.csv dataset named by corresponding language.')
parser_test.add_argument('test_root', metavar='ROOT', type=str, help='Root of the test dataset.')
parser_clf = subparsers.add_parser('clf', help='Test a file.')
parser_clf.add_argument('clf_path', metavar='PATH', type=str, help='Path of test file.')
if len(sys.argv[1:]) == 0:
parser.print_help()
parser.exit()
args = parser.parse_args()
- maxsize = 2048
+ maxsize = 4096
epochs = 15
if args.sub_command == 'train' :
if args.train_maxsize:
maxsize = args.train_maxsize
if args.train_epochs:
epochs = args.train_epochs
n = CNN(args.train_path, maxsize=maxsize, epochs=epochs)
if args.train_spark:
n.train_on_cluster()
else:
n.train()
elif args.sub_command == 'test':
n = CNN(args.test_root, maxsize=maxsize, epochs=epochs)
n.test()
elif args.sub_command == 'clf':
n = CNN(None, maxsize, None)
n.classify(args.clf_path)
else:
parser.parse_args('-h')
class CNN:
def __init__(self, path, maxsize, epochs):
if path != None:
self._path = path
# Root of model folder
self._root_model = os.path.join(os.path.dirname(path), 'model_cnn')
try:
os.mkdir(self._root_model)
except:
pass
# Path of result
self._path_result = os.path.join(os.path.dirname(path), 'result_cnn')
self._path_test_csv = path
dir_path = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(dir_path, 'static_data', 'languages.json'), 'r') as f:
self._languages = json.load(f)
self._input_size = maxsize
self._vocab_size = 256
self._num_of_classes = len(self._languages)
self._batch_size = 64
self._epochs = epochs
self._model = None
if path == None and epochs == None:
self._model = load_model(os.path.join(dir_path, 'static_data', 'model.h5'))
def file_len(self, fname):
with open(fname) as f:
count = 0
for l in f:
count += 1
return count
def train(self):
self._get_model()
earlystop = EarlyStopping(monitor='loss', min_delta=0, patience=3, verbose=0, mode='auto')
callbacks = [earlystop]
self._model.fit_generator(
self._generator(self._input_size,
self._num_of_classes,
self._batch_size),
steps_per_epoch=self.file_len(self._path) / self._batch_size,
epochs=self._epochs,
callbacks=callbacks)
self._model.save(os.path.join(self._root_model, 'model.h5'))
def _generator(self, length, total_class, batch_size=128):
counter = 0
while True:
with open(self._path, newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
for pair in r:
if counter == 0:
X = np.empty((0, length))
Y = np.empty((0, total_class))
label, string = pair
label = int(label)
string = literal_eval(string)
if len(string) > self._input_size:
len_s = len(string)
stop_1 = int(len_s / 3)
stop_2 = int(len_s * 2 / 3)
part = int(self._input_size / 4)
half_part = int(part / 2)
string = string[:part] + string[stop_1 - half_part:stop_1 + half_part] + string[stop_2 - half_part:stop_2 + half_part] + string[-part:]
tokens = [x + 1 for x in Tokenizer.tokenize(string, 'letter')]
X = np.append(X, pad_sequences([tokens], maxlen=length), axis=0)
label = array(np_utils.to_categorical([label], total_class))
Y = np.append(Y, label, axis=0)
counter += 1
if counter == batch_size:
counter = 0
yield(X,Y)
def _get_model_zhang(self):
input_size = self._input_size
alphabet_size = self._vocab_size
embedding_size = 128
conv_layers = [(256,7,3), (256,7,3), (256,3,-1), (256,3,-1), (256,3,-1), (256,3,3)]
threshold = 1e-6
fully_connected_layers = [1024, 1024]
dropout_p = 0.2
optimizer = 'adam'
loss = 'categorical_crossentropy'
num_of_classes = self._num_of_classes
# Input layer
inputs = Input(shape=(input_size,), name='sent_input', dtype='int64')
# Embedding layers
x = Embedding(alphabet_size + 1, embedding_size, input_length=input_size)(inputs)
# Convolution layers
for cl in conv_layers:
x = Convolution1D(cl[0], cl[1])(x)
x = ThresholdedReLU(threshold)(x)
if cl[2] != -1:
x = MaxPooling1D(cl[2])(x)
x = Flatten()(x)
# Fully connected layers
for fl in fully_connected_layers:
x = Dense(fl)(x)
x = ThresholdedReLU(threshold)(x)
x = Dropout(dropout_p)(x)
# Output layer
predictions = Dense(num_of_classes, activation='softmax')(x)
# Build and compile model
model = Model(inputs=inputs, outputs=predictions)
model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
print(model.summary())
self._model = model
def _get_model(self):
input_size = self._input_size
alphabet_size = self._vocab_size
embedding_size = 64
conv_layers = [(256,10), (256,7), (256,5), (256,3)]
threshold = 1e-6
fully_connected_layers = [1024, 1024]
dropout_p = 0.1
optimizer = 'adam'
loss = 'categorical_crossentropy'
num_of_classes = self._num_of_classes
# Input layer
inputs = Input(shape=(input_size,), name='sent_input', dtype='int64')
# Embedding layers
x = Embedding(alphabet_size + 1, embedding_size, input_length=input_size)(inputs)
convolution_output = []
# Convolution layers
for num_filters, filter_width in conv_layers:
conv = Convolution1D(filters=num_filters,
kernel_size=filter_width,
activation='tanh',
name='Conv1D_{}_{}'.format(num_filters, filter_width))(x)
pool = GlobalMaxPooling1D(name='MaxPoolingOverTime_{}_{}'.format(num_filters, filter_width))(conv)
convolution_output.append(pool)
x = Concatenate()(convolution_output)
# Fully connected layers
for fl in fully_connected_layers:
x = Dense(fl, activation='selu', kernel_initializer='lecun_normal')(x)
x = Dropout(dropout_p)(x)
# Output layer
predictions = Dense(num_of_classes, activation='softmax')(x)
# Build and compile model
model = Model(inputs=inputs, outputs=predictions)
model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
print(model.summary())
self._model = model
def _max_len(self, texts):
return max([len(text) for text in texts])
def _load_model(self):
self._model = load_model(os.path.join(self._root_model, 'model.h5'))
def test(self):
csv.field_size_limit(sys.maxsize)
try:
r = open(self._path_result, 'rb')
test_result = load(r)
r.close()
except FileNotFoundError:
test_result = {}
self._load_model()
for language in [x for x in self._languages if x not in test_result.keys()]:
test_result[language] = self.test_class(language)
with open(self._path_result, 'wb') as f:
dump(test_result, f)
def _count_size(self, files):
size = 0
for f in files:
size += os.path.getsize(f)
return size
def test_class(self, language):
ok = 0
results = []
count = 0
total_test = self.file_len(os.path.join(self._path_test_csv, language + '.csv'))
with open(os.path.join(self._path_test_csv, language + '.csv'), newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
for pair in r:
label, string = pair
label = int(label)
string = literal_eval(string)
if len(string) > self._input_size:
length = len(string)
stop_1 = int(length / 3)
stop_2 = int(length * 2 / 3)
part = int(self._input_size / 4)
half_part = int(part / 2)
string = string[:part] + string[stop_1 - half_part:stop_1 + half_part] + string[stop_2 - half_part:stop_2 + half_part] + string[-part:]
tokens = [x + 1 for x in Tokenizer.tokenize(string, 'letter')]
result = self._guess_file_language(tokens)
count += 1
- print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, total_test, result[0][1], result[0][0]),end='\r')
+ print('[{0:4d}/{1:4d}] {2}:\t{3:.3f} '.format(count, total_test, result[0][1], result[0][0]),end='\r')
results.append(result[0])
if result[0][1] == language:
ok += 1
accuracy = ok / total_test
print('Tests for {} '.format(language))
print('Total test files : {}'.format(total_test))
print('Correctly classified files : {}'.format(ok))
print('Accuracy : {}%'.format(accuracy * 100))
return (ok, total_test, accuracy, results)
def speed_benchmark(self):
language = self._languages[10]
self._model = load_model(os.path.join(self._root_model, 'model.h5'))
test_set = self._get_test_set(language)
total_size = self._count_size(test_set)
print('{} kB in total'.format(total_size / 1024))
t_start = time.perf_counter()
self.test_class(language)
t_end = time.perf_counter()
print('{} seconds.'.format(t_end - t_start))
print('{} seconds per KiB'.format(((t_end - t_start) / total_size) * 1024))
def _guess_file_language(self, tokens):
X = pad_sequences([tokens], maxlen=self._input_size)
result = list(self._model.predict(X))[0]
result = [(s, self._languages[i]) for i, s in enumerate(result)]
return sorted(result, reverse=True)
def classify(self, path):
with gzip.open(path, 'rb') as f:
string = f.read()
a = magic.from_buffer(string, mime=True)
print(a)
tokens = [x + 1 for x in Tokenizer.tokenize(string, 'letter')]
res = self._guess_file_language(tokens)
print('Filename :\t{}\nLanguage :\t{}\nValue :\t{}'.format(path, res[0][1],res[0][0]))
return (res[0][1], res[0][0])
if __name__ == '__main__':
main()
diff --git a/swh/langdetect/cnn_w.py b/swh/langdetect/cnn_w.py
index b622abb..9f44fe7 100644
--- a/swh/langdetect/cnn_w.py
+++ b/swh/langdetect/cnn_w.py
@@ -1,300 +1,294 @@
import os
import sys
import subprocess
import time
import random
import csv
import numpy as np
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import tensorflow as tf
import json
import argparse
from ast import literal_eval
from pickle import dump
from pickle import load
from numpy import array
from .utils.common import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.callbacks import EarlyStopping
from keras.models import Model
from keras.models import Sequential
from keras.models import load_model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import Flatten
-from keras.layers import Merge
from keras.layers import Dropout
from keras.layers import ThresholdedReLU
from keras.layers import Activation
from keras.layers import Lambda
from keras.layers import Embedding
+from keras.layers import GlobalMaxPooling1D
from keras.layers.convolutional import Convolution1D
from keras.layers.convolutional import MaxPooling1D
from keras.layers.normalization import BatchNormalization
from keras.layers import Concatenate
from keras.utils import np_utils
from keras.optimizers import SGD
from collections import Counter
csv.field_size_limit(sys.maxsize)
from keras import backend as K
-K.set_session(K.tf.Session(config=K.tf.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)))
+# K.set_session(K.tf.Session(config=K.tf.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)))
def main():
parser = argparse.ArgumentParser(description='Training and test tool of charactor-level ConvNet text categorisation.')
subparsers = parser.add_subparsers(dest='sub_command')
parser_train = subparsers.add_parser('train', help='Training on the dataset, dataset must be a *.csv file. A model will be created in the same directory.')
parser_train.add_argument('train_path', metavar='PATH', type=str, help='Path of the training dataset.')
parser_train.add_argument('-ms', '--maxsize', metavar='SIZE', dest='train_maxsize', type=int, help='Set maximum input size of ConvNet, default 1024.')
parser_train.add_argument('-e', '--epochs', metavar='N', dest='train_epochs', type=int, help='Number of training epochs (iterations), default 50.')
parser_test = subparsers.add_parser('test', help='Test on the dataset, dataset must be a directory with *.csv dataset named by corresponding language.')
parser_test.add_argument('test_root', metavar='ROOT', type=str, help='Root of the test dataset.')
if len(sys.argv[1:]) == 0:
parser.print_help()
parser.exit()
args = parser.parse_args()
if args.sub_command == "train":
if args.train_maxsize:
if args.train_epochs:
n = CNNword(args.train_path, maxsize=args.train_maxsize, epochs=args.train_epochs)
n.train()
else:
n = CNNword(args.train_path, maxsize=args.train_maxsize)
n.train()
else:
if args.train_epochs:
n = CNNword(args.train_path, epochs=args.train_epochs)
n.train()
else:
n = CNNword(args.train_path)
n.train()
elif args.sub_command == "test":
n = CNNword(args.test_root)
print(args.test_root)
n.test()
else:
parser.parse_args('-h')
class CNNword:
- def __init__(self, path, maxsize=1024, epochs=30):
+ def __init__(self, path, maxsize=400, epochs=30):
self._path = path
# Root of model folder
self._root_model = os.path.join(os.path.dirname(path), 'model_cnn_word')
try:
os.mkdir(self._root_model)
except:
pass
# Path of result
self._path_result = os.path.join(os.path.dirname(path), 'result_cnn_word')
dir_path = os.path.dirname(os.path.abspath(__file__))
- with open(os.path.join(dir_path, 'static_data', 'languages_less.json'), 'r') as f:
+ with open(os.path.join(dir_path, 'static_data', 'languages.json'), 'r') as f:
self._languages = json.load(f)
self._path_test_csv = path
self._path_vocab = os.path.join(self._root_model, 'vocab')
self._input_size = maxsize
self._vocab_size = 15001
self._num_of_classes = len(self._languages)
self._batch_size = 64
self._epochs = epochs
if not os.path.isfile(self._path_vocab):
self._learn_vocab(self._input_size, self._num_of_classes)
with open(self._path_vocab, 'rb') as f:
c = load(f)
l = c.most_common(15000)
- print(l)
self._indexer = dict((v[0], i + 1) for i, v in enumerate(l))
self._oov_index = len(self._indexer) + 1
def file_len(self, fname):
with open(fname) as f:
count = 0
for l in f:
count += 1
return count
def train(self):
model = self._get_model()
earlystop = EarlyStopping(monitor='loss', min_delta=0, patience=3, verbose=0, mode='auto')
callbacks = [earlystop]
model.fit_generator(
self._generator(self._input_size, self._num_of_classes, self._batch_size),
steps_per_epoch=self.file_len(self._path) / self._batch_size, epochs=self._epochs, callbacks=callbacks)
model.save(os.path.join(self._root_model, 'model.h5'))
def _learn_vocab(self, length, total_class):
c = Counter()
with open(self._path, newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
for pair in r:
label, string = pair
label = int(label)
print(label, end='\r')
string = literal_eval(string)
tokens = Tokenizer.tokenize(string, 'word')
c.update(tokens)
with open(self._path_vocab, 'wb') as f:
dump(c, f)
def _generator(self, length, total_class, batch_size=64):
counter = 0
while True:
with open(self._path, newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
for pair in r:
if counter == 0:
X = np.empty((0, length))
Y = np.empty((0, total_class))
label, string = pair
label = int(label)
string = literal_eval(string)
tokens = [self._indexer.get(x, self._oov_index) for x in Tokenizer.tokenize(string, 'word')]
X = np.append(X, pad_sequences([tokens], maxlen=length), axis=0)
label = array(np_utils.to_categorical([label], total_class))
Y = np.append(Y, label, axis=0)
counter += 1
if counter == batch_size:
counter = 0
yield(X,Y)
def _get_model(self):
input_size = self._input_size
vocab_size = self._vocab_size
- embedding_size = 128
+ embedding_size = 50
optimizer = 'adam'
loss = 'categorical_crossentropy'
num_of_classes = self._num_of_classes
embedding_layer = Embedding(vocab_size + 1,
embedding_size,
input_length=input_size,
)
-
- # applying a more complex convolutional approach
+
convs = []
- filter_sizes = [3,4,5]
+ filter_sizes = [3,4,5,6,7]
sequence_input = Input(shape=(input_size,), dtype='int64')
embedded_sequences = embedding_layer(sequence_input)
+ z = Dropout(0.5)(embedded_sequences)
for fsz in filter_sizes:
- l_conv = Convolution1D(filters=10, kernel_size=fsz, activation='relu')(embedded_sequences)
- l_pool = MaxPooling1D(3)(l_conv)
- convs.append(l_pool)
-
- l_merge = Concatenate(axis=1)(convs)
- l_conv1= Convolution1D(128, 3, activation='relu')(l_merge)
- l_pool1 = MaxPooling1D(5)(l_conv1)
- l_conv2 = Convolution1D(128, 3, activation='relu')(l_pool1)
- l_pool2 = MaxPooling1D(5)(l_conv2)
- l_flat = Flatten()(l_pool2)
- l_dense = Dense(512, activation='relu')(l_flat)
- preds = Dense(num_of_classes, activation='softmax')(l_dense)
+ x = Convolution1D(filters=10, kernel_size=fsz, activation='relu')(z)
+ x = GlobalMaxPooling1D()(x)
+ convs.append(x)
+
+ x = Concatenate(axis=1)(convs)
+ x = Dense(1024, activation="relu")(x)
+ preds = Dense(num_of_classes, activation='softmax')(x)
model = Model(sequence_input, preds)
model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
print(model.summary())
return model
def _max_len(self, texts):
return max([len(text) for text in texts])
def test(self):
csv.field_size_limit(sys.maxsize)
try:
r = open(self._path_result, 'rb')
test_result = load(r)
r.close()
except FileNotFoundError:
test_result = {}
model = self._load_model()
for language in [x for x in self._languages if x not in test_result.keys()]:
test_result[language] = self.test_class(model, language)
with open(self._path_result, 'wb') as f:
dump(test_result, f)
def _load_model(self):
model = load_model(os.path.join(self._root_model, 'model.h5'))
return model
def _count_size(self, files):
size = 0
for f in files:
size += os.path.getsize(f)
return size
def test_class(self, model, language):
ok = 0
results = []
count = 0
total_test = self.file_len(os.path.join(self._path_test_csv, language + '.csv'))
with open(os.path.join(self._path_test_csv, language + '.csv'), newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
for pair in r:
label, string = pair
label = int(label)
string = literal_eval(string)
tokens = [self._indexer.get(x, self._oov_index) for x in Tokenizer.tokenize(string, 'word')]
result = self._guess_file_language(model, tokens)
count += 1
print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, total_test, result[0][1], result[0][0]),end='\r')
results.append(result[0])
if result[0][1] == language:
ok += 1
accuracy = ok / total_test
print('Tests for {} '.format(language))
print('Total test files : {}'.format(total_test))
print('Correctly classified files : {}'.format(ok))
print('Accuracy : {}%'.format(accuracy * 100))
return (ok, total_test, accuracy, results)
def speed_benchmark(self):
language = self._languages[10]
model = self._load_model()
test_set = self._get_test_set(language)
total_size = self._count_size(test_set)
print('{} kB in total'.format(total_size / 1024))
t_start = time.perf_counter()
self.test_class(model, language)
t_end = time.perf_counter()
print('{} seconds.'.format(t_end - t_start))
print('{} seconds per KiB'.format(((t_end - t_start) / total_size) * 1024))
def _guess_file_language(self, model, tokens):
X = pad_sequences([tokens], maxlen=self._input_size)
result = list(model.predict(X))[0]
result = [(s, self._languages[i]) for i, s in enumerate(result)]
return sorted(result, reverse=True)
if __name__ == '__main__':
main()
diff --git a/swh/langdetect/hierarchical.py b/swh/langdetect/hierarchical.py
index 684e412..86d31fa 100644
--- a/swh/langdetect/hierarchical.py
+++ b/swh/langdetect/hierarchical.py
@@ -1,238 +1,237 @@
import os
import sys
import operator
import nltk
import random
import time
import numpy as np
import csv
import argparse
import json
import matplotlib.pyplot as plt
import matplotlib as mpl
from ast import literal_eval
from itertools import islice
from pickle import dump, load
from .utils.common import Tokenizer
from nltk.util import ngrams
from collections import Counter
from sklearn.feature_extraction.text import HashingVectorizer, TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.externals import joblib
from sklearn.cluster import KMeans, MiniBatchKMeans
from sklearn.metrics.pairwise import cosine_similarity, cosine_distances, euclidean_distances
from scipy.sparse import vstack
from scipy.sparse import csr_matrix
from scipy.cluster.hierarchy import ward, dendrogram, centroid, complete, average, weighted, median
from sklearn.manifold import MDS
csv.field_size_limit(sys.maxsize)
def main():
parser = argparse.ArgumentParser(description='Training and test tool of multinumial naive bayesian.')
subparsers = parser.add_subparsers(dest='sub_command')
parser_train = subparsers.add_parser('train', help='Training on the dataset, dataset must be a *.csv file. A model will be created in the same directory.')
parser_train.add_argument('train_path', metavar='PATH', type=str, help='Path of the training dataset.')
# parser_train.add_argument('-n', '--ngrams', metavar='N', dest='train_maxsize', type=int, help='Set maximum input size of ConvNet, default 5.')
parser_test = subparsers.add_parser('test', help='Test on the dataset, dataset must be a directory with *.csv dataset named by corresponding language.')
parser_test.add_argument('test_root', metavar='ROOT', type=str, help='Root of the test dataset.')
if len(sys.argv[1:]) == 0:
parser.print_help()
parser.exit()
args = parser.parse_args()
if args.sub_command == 'train' :
n = Unsupervised(args.train_path)
n.train()
n.graph_top_20()
elif args.sub_command == 'test':
n = Unsupervised(args.test_root)
n.test()
else:
parser.parse_args('-h')
class Unsupervised:
def __init__(self, path):
self._path = path
# Root of model folder
self._root_model = os.path.join(os.path.dirname(path), 'model_unsupervised')
try:
os.mkdir(self._root_model)
except:
pass
# Path of result
self._path_result = os.path.join(os.path.dirname(path), 'result_unsupervised')
dir_path = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(dir_path, 'static_data', 'languages.json'), 'r') as f:
self._languages = json.load(f)
self._path_test_csv = path
self._num_of_classes = len(self._languages)
def train(self):
cv = HashingVectorizer(analyzer='char', ngram_range=(1, 5), n_features=2**24, alternate_sign=False)
texts = []
label = 0
string = ''
- top_20 = ['Python', 'Java', 'JavaScript', 'PHP', 'C#', 'C', 'C++',
- 'R', 'Objective-C', 'Swift', 'Matlab', 'Ruby', 'TypeScript',
- 'Visual Basic', 'Scala', 'Kotlin', 'Go', 'Perl', 'Lua',
- 'Rust', 'Haskell']
+ top_20 = ["C", "C#", "C++", "Clojure", "CSS", "Go",
+ "Haskell", "HTML", "Java", "JavaScript", "Lua",
+ "Objective-C", "Perl", "PHP", "Python", "R", "Ruby",
+ "Scala", "Scheme", "XML"]
top_20 = [self._languages.index(x) for x in top_20]
print(top_20)
with open(self._path, newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
for pair in r:
label_new, string_new = pair
print(label_new, end=' \r')
if not int(label_new) == label:
if not os.path.isfile(os.path.join(self._root_model, 'counts{}.pkl'.format(label))):
if label in top_20:
counts = cv.fit_transform(texts)
self.clustering(counts, 1, label)
self.graph(label)
texts = []
label = int(label_new)
if label in top_20:
string = literal_eval(string_new)
#tokens = Tokenizer.tokenize(string, 'word')
#text = ' '.join([''.join([chr(x) for x in token]) for token in tokens])
tokens = Tokenizer.tokenize(string, 'letter')
text = ''.join([chr(token) for token in tokens])
texts.append(text)
with open(os.path.join(self._root_model, 'classifier.cv'), 'wb') as f:
joblib.dump(cv, f)
def clustering(self, counts, num_clusters, label):
# km = KMeans(n_clusters=num_clusters)
# km.fit(counts)
with open(os.path.join(self._root_model, 'counts{}.pkl'.format(label)), 'wb') as f:
joblib.dump(counts, f)
#with open(os.path.join(self._root_model, 'cluster{}.pkl'.format(label)), 'wb') as f:
# joblib.dump(km, f)
def graph_top_20(self):
- top_20 = ['Python', 'Java', 'JavaScript', 'PHP', 'C#', 'C', 'C++',
- 'R', 'Objective-C', 'Swift', 'Matlab', 'Ruby', 'TypeScript',
- 'Visual Basic', 'Scala', 'Kotlin', 'Go', 'Perl', 'Lua',
- 'Rust', 'Haskell']
+ top_20 = ["C", "C#", "C++", "Clojure", "CSS", "Go", "Haskell",
+ "HTML", "Java", "JavaScript", "Lua", "Objective-C",
+ "Perl", "PHP", "Python", "R", "Ruby", "Scala", "Scheme", "XML"]
top_20 = [self._languages.index(x) for x in top_20]
counts = csr_matrix((0, 2 ** 24))
for label in top_20:
with open(os.path.join(self._root_model, 'counts{}.pkl'.format(label)), 'rb') as f:
counts = vstack((counts, joblib.load(f)))
print(counts.shape)
if not os.path.isfile(os.path.join(self._root_model, 'linkage_matrix')):
dist = euclidean_distances(counts)
- linkage_matrix = ward(dist)
+ linkage_matrix = weighted(dist)
with open(os.path.join(self._root_model, 'linkage_matrix'), 'wb') as f:
joblib.dump(linkage_matrix, f)
else:
with open(os.path.join(self._root_model, 'linkage_matrix'), 'rb') as f:
linkage_matrix = joblib.load(f)
print(linkage_matrix)
fig, ax = plt.subplots(figsize=(15, 150))
titles = [self._languages[top_20[x // 500]] for x in list(range(0,counts.shape[0]))]
ax = dendrogram(linkage_matrix, orientation="right", labels=titles)
plt.tick_params(axis= 'x',
which='both',
bottom=False,
top=False,
labelbottom=False)
plt.tight_layout()
plt.savefig(os.path.join(self._root_model, 'top_20_cluster.pdf'))
def graph(self, label):
with open(os.path.join(self._root_model, 'counts{}.pkl'.format(label)), 'rb') as f:
counts = joblib.load(f)
dist = euclidean_distances(counts)
linkage_matrix = ward(dist)
fig, ax = plt.subplots(figsize=(15, 40))
titles = list(range(1,counts.shape[0]+1))
ax = dendrogram(linkage_matrix, orientation="right", labels=titles)
plt.tick_params(axis= 'x',
which='both',
bottom=False,
top=False,
labelbottom=False)
plt.tight_layout()
plt.savefig(os.path.join(self._root_model, '{}_cluster.pdf'.format(self._languages[label])))
def speed_benchmark(self):
language = [x for x in os.listdir(self._root_training_set) if not x.startswith('.')][10]
models = self._load_models()
test_set = self._get_test_set(language)
total_size = self._count_size(test_set)
print('{} kB in total'.format(total_size / 1024))
t_start = time.perf_counter()
self.test_class(models, language)
t_end = time.perf_counter()
print('{} seconds.'.format(t_end - t_start))
print('{} seconds per kB'.format(((t_end - t_start) / total_size) * 1024))
def _count_size(self, files):
size = 0
for f in files:
size += os.path.getsize(f)
return size
def file_len(self, fname):
with open(fname) as f:
count = 0
for l in f:
count += 1
return count
def _distance(self, model_profile, test_profile):
distance = 0
maximum = len(test_profile)
for test_ngram in test_profile.keys():
test_rank = test_profile.get(test_ngram)
model_rank = model_profile.get(test_ngram, maximum)
d = abs(test_rank - model_rank)
distance += d
return distance
'''
def _prob(model, trigrams):
print('Checking {} model ...'.format(model))
with open(model, 'rb') as f:
kneser_ney = load(f)
result = 1
for trigram in trigrams:
prob = kneser_ney.prob(trigram)
result = result * prob
return result
'''
if __name__ == '__main__':
main()
diff --git a/swh/langdetect/naivebayesian.py b/swh/langdetect/naivebayesian.py
index d1691e2..94702e2 100644
--- a/swh/langdetect/naivebayesian.py
+++ b/swh/langdetect/naivebayesian.py
@@ -1,240 +1,256 @@
"""
Naive Bayesian
"""
import os
import sys
import operator
import nltk
import random
import time
import numpy as np
import csv
import argparse
import json
from ast import literal_eval
from itertools import islice
from pickle import dump, load
from .utils.common import Tokenizer, file_to_string, find_file, count_files
from nltk.util import ngrams
from collections import Counter
from sklearn.naive_bayes import MultinomialNB
from sklearn.feature_extraction.text import HashingVectorizer, TfidfTransformer
from sklearn.externals import joblib
csv.field_size_limit(sys.maxsize)
def main():
parser = argparse.ArgumentParser(description='Training and test tool of multinumial naive bayesian.')
subparsers = parser.add_subparsers(dest='sub_command')
parser_train = subparsers.add_parser('train', help='Training on the dataset, dataset must be a *.csv file. A model will be created in the same directory.')
parser_train.add_argument('train_path', metavar='PATH', type=str, help='Path of the training dataset.')
- # parser_train.add_argument('-n', '--ngrams', metavar='N', dest='train_maxsize', type=int, help='Set maximum input size of ConvNet, default 5.')
parser_test = subparsers.add_parser('test', help='Test on the dataset, dataset must be a directory with *.csv dataset named by corresponding language.')
parser_test.add_argument('test_root', metavar='ROOT', type=str, help='Root of the test dataset.')
if len(sys.argv[1:]) == 0:
parser.print_help()
parser.exit()
args = parser.parse_args()
if args.sub_command == 'train' :
n = NaiveBayesian(args.train_path)
n.train()
elif args.sub_command == 'test':
n = NaiveBayesian(args.test_root)
n.test()
else:
parser.parse_args('-h')
class NaiveBayesian:
- def __init__(self, path):
+ def __init__(self, path, token):
self._path = path
# Root of model folder
self._root_model = os.path.join(os.path.dirname(path), 'model_bayesian')
try:
os.mkdir(self._root_model)
except:
pass
# Path of result
self._path_result = os.path.join(os.path.dirname(path), 'result_bayesian')
dir_path = os.path.dirname(os.path.abspath(__file__))
- with open(os.path.join(dir_path, 'static_data', 'languages_less.json'), 'r') as f:
+ with open(os.path.join(dir_path, 'static_data', 'languages.json'), 'r') as f:
self._languages = json.load(f)
self._path_test_csv = path
self._num_of_classes = len(self._languages)
def train(self):
'''
train () generates and stores counted n-grams in '_root_model' folder
'''
'''
Calculate frequencies of generated n-grams then store
them into a sorted list of (ngram, count)
'''
clf = MultinomialNB(alpha=0.001)
cv = HashingVectorizer(analyzer='char', ngram_range=(1, 4), n_features=2**16, alternate_sign=False)
+
+ #cv = HashingVectorizer(analyzer='word', ngram_range=(1, 3), n_features=2**18, alternate_sign=False)
indices = list(range(len(self._languages)))
with open(self._path, newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
+ labels = []
+ texts = []
+ label = 0
for pair in r:
+ label_new, _ = pair
+ if label != int(label_new):
+ counts = cv.fit_transform(texts)
+ tf = TfidfTransformer().fit(counts)
+ normalised = tf.transform(counts)
+ clf.partial_fit(normalised, np.array(labels), indices)
+ texts = []
+ labels = []
+
label, string = pair
label = int(label)
print(label, end='\r')
string = literal_eval(string)
+
tokens = Tokenizer.tokenize(string, 'letter')
text = ''.join([chr(token) for token in tokens])
- counts = cv.fit_transform([text])
- tf = TfidfTransformer().fit(counts)
- normalised = tf.transform(counts)
- clf.partial_fit(normalised, np.array([label]), indices)
+ #tokens = Tokenizer.tokenize(string, 'word')
+ #textb = b' '.join(tokens)
+ #text = ''.join([chr(x) for x in list(textb)])
+
+ texts.append(text)
+ labels.append(label)
+
+ counts = cv.fit_transform(texts)
+ tf = TfidfTransformer().fit(counts)
+ normalised = tf.transform(counts)
+ clf.partial_fit(normalised, np.array(labels), indices)
with open(os.path.join(self._root_model, 'classifier.clf'), 'wb') as f:
joblib.dump(clf, f)
with open(os.path.join(self._root_model, 'classifier.hv'), 'wb') as f:
joblib.dump(cv, f)
def test(self):
try:
r = open(self._path_result, 'rb')
test_result = load(r)
r.close()
except FileNotFoundError:
test_result = {}
with open(os.path.join(self._root_model, 'classifier.clf'), 'rb') as f:
clf = joblib.load(f)
with open(os.path.join(self._root_model, 'classifier.hv'), 'rb') as f:
cv = joblib.load(f)
for language in [x for x in self._languages if x not in test_result.keys()]:
test_result[language] = self.test_class((clf, cv), language)
with open(self._path_result, 'wb') as f:
dump(test_result, f)
def speed_benchmark(self):
language = [x for x in os.listdir(self._root_training_set) if not x.startswith('.')][10]
models = self._load_models()
test_set = self._get_test_set(language)
total_size = self._count_size(test_set)
print('{} kB in total'.format(total_size / 1024))
t_start = time.perf_counter()
self.test_class(models, language)
t_end = time.perf_counter()
print('{} seconds.'.format(t_end - t_start))
print('{} seconds per kB'.format(((t_end - t_start) / total_size) * 1024))
def _get_test_set(self, language):
root_training_language = os.path.join(self._root_training_set, language)
root_language = os.path.join(self._root_language_dataset, language)
total = count_files(root_language)
training_set = [int(os.path.splitext(x)[0]) for x in os.listdir(root_training_language) if not x.startswith('.')]
it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set and os.path.getsize(find_file(root_language, x)) <= 1048576)
test_set = list(islice(it, 1000))
if len(test_set) == 0:
it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set)
test_set = list(islice(it, 1000))
return test_set
def _count_size(self, files):
size = 0
for f in files:
size += os.path.getsize(f)
return size
def test_class(self, clf, language):
ok = 0
results = []
count = 0
total_test = self.file_len(os.path.join(self._path_test_csv, language + '.csv'))
with open(os.path.join(self._path_test_csv, language + '.csv'), newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
for pair in r:
label, string = pair
label = int(label)
string = literal_eval(string)
result = self._guess_file_language(clf, string)
count += 1
print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, total_test, result[0][1], result[0][0]),end='\r')
results.append(result[0])
if result[0][1] == language:
ok += 1
accuracy = ok / total_test
print('Tests for {} '.format(language))
print('Total test files : {}'.format(total_test))
print('Correctly classified files : {}'.format(ok))
print('Accuracy : {}%'.format(accuracy * 100))
return (ok, total_test, accuracy, results)
def test_single(self, filename):
self._guess_file_language(clf, filename)
def file_len(self, fname):
with open(fname) as f:
count = 0
for l in f:
count += 1
return count
def _guess_file_language(self, cc, string):
clf = cc[0]
cv = cc[1]
+
tokens = Tokenizer.tokenize(string, 'letter')
text = ''.join([chr(token) for token in tokens])
+
+ #tokens = Tokenizer.tokenize(string, 'word')
+ #textb = b' '.join(tokens)
+ #text = ''.join([chr(x) for x in list(textb)])
+
counts = cv.fit_transform([text])
tf = TfidfTransformer().fit(counts)
normalised = tf.transform(counts)
result = clf.predict_log_proba(normalised)
result = [(val, self._languages[idx]) for idx, val in enumerate(result[0])]
return sorted(result, reverse=True)
def _distance(self, model_profile, test_profile):
distance = 0
maximum = len(test_profile)
for test_ngram in test_profile.keys():
test_rank = test_profile.get(test_ngram)
model_rank = model_profile.get(test_ngram, maximum)
d = abs(test_rank - model_rank)
distance += d
return distance
- '''
- def _prob(model, trigrams):
- print('Checking {} model ...'.format(model))
- with open(model, 'rb') as f:
- kneser_ney = load(f)
- result = 1
- for trigram in trigrams:
- prob = kneser_ney.prob(trigram)
- result = result * prob
- return result
- '''
if __name__ == '__main__':
main()
diff --git a/swh/langdetect/ngramdist.py b/swh/langdetect/ngramdist.py
index 004fdd8..962e62f 100644
--- a/swh/langdetect/ngramdist.py
+++ b/swh/langdetect/ngramdist.py
@@ -1,235 +1,248 @@
import os
import sys
import time
import random
import csv
import json
import argparse
import nltk
import operator
from ast import literal_eval
from itertools import islice
from pickle import dump, load
from nltk.util import ngrams
from .utils.common import Tokenizer, file_to_string, find_file, count_files
csv.field_size_limit(sys.maxsize)
def main():
parser = argparse.ArgumentParser(description='Training and test tool of frequency distance of n-grams.')
subparsers = parser.add_subparsers(dest='sub_command')
parser_train = subparsers.add_parser('train', help='Training on the dataset, dataset must be a *.csv file. A model will be created in the same directory.')
parser_train.add_argument('train_path', metavar='PATH', type=str, help='Path of the training dataset.')
# parser_train.add_argument('-n', '--ngrams', metavar='N', dest='train_maxsize', type=int, help='Set maximum input size of ConvNet, default 5.')
parser_test = subparsers.add_parser('test', help='Test on the dataset, dataset must be a directory with *.csv dataset named by corresponding language.')
parser_test.add_argument('test_root', metavar='ROOT', type=str, help='Root of the test dataset.')
if len(sys.argv[1:]) == 0:
parser.print_help()
parser.exit()
args = parser.parse_args()
if args.sub_command == 'train' :
n = NGramDist(args.train_path)
n.train()
elif args.sub_command == 'test':
n = NGramDist(args.test_root)
n.test()
else:
parser.parse_args('-h')
class NGramDist:
def __init__(self, path):
self._path = path
# Root of model folder
self._root_model = os.path.join(os.path.dirname(path), 'model_ngram_dist')
try:
os.mkdir(self._root_model)
except:
pass
# Path of result
self._path_result = os.path.join(os.path.dirname(path), 'result_ngram_dist')
dir_path = os.path.dirname(os.path.abspath(__file__))
- with open(os.path.join(dir_path, 'static_data', 'languages_less.json'), 'r') as f:
+ with open(os.path.join(dir_path, 'static_data', 'languages.json'), 'r') as f:
self._languages = json.load(f)
self._path_test_csv = path
self._num_of_classes = len(self._languages)
def file_len(self, fname):
with open(fname) as f:
count = 0
for l in f:
count += 1
return count
def train(self):
statistics = {}
+ t_start = time.perf_counter()
with open(self._path, newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
for pair in r:
label, string = pair
label = int(label)
language = self._languages[label]
- print(language, end='\r')
statistics_lang = statistics.get(language, {})
string = literal_eval(string)
tokens = Tokenizer.tokenize(string, 'letter')
generated_ngrams = self._generate_ngrams([chr(token) for token in tokens], 3)
+
+ #tokens = Tokenizer.tokenize(string, 'word')
+ #tokens = [''.join([chr(x) for x in token]) for token in tokens]
+ #generated_ngrams = self._generate_ngrams(tokens, 3)
+
self._count_ngrams(statistics_lang, generated_ngrams)
statistics[language] = statistics_lang
+
+ t_end = time.perf_counter()
+ print(str(t_end - t_start) + ' ' + str(label), end='\r')
for language in self._languages:
with open(os.path.join(self._root_model, language), 'wb') as f:
dump(self._sort_by_value(statistics[language]), f)
def _generate_ngrams(self, tokens, n):
generated_ngrams = []
for i in range(1, n+1):
igrams = ngrams(tokens, i,
pad_left=True,
pad_right=True,
left_pad_symbol = '$BOF$',
right_pad_symbol = '$EOF$')
for igram in igrams:
generated_ngrams.append(''.join(igram))
return generated_ngrams
def _count_ngrams(self, statistics, ngrams):
for ngram in ngrams:
statistics[ngram] = statistics.get(ngram, 0) + 1
def test(self):
try:
r = open(self._path_result, 'rb')
test_result = load(r)
r.close()
except FileNotFoundError:
test_result = {}
model = self._load_models()
for language in [x for x in self._languages if x not in test_result.keys()]:
test_result[language] = self.test_class(model, language)
with open(self._path_result, 'wb') as f:
dump(test_result, f)
def _load_models(self):
models = {}
for model in [model
- for model in os.listdir(self._root_model)
- if not model.startswith('.')]:
+ for model in self._languages]:
root_model = os.path.join(self._root_model, model)
with open(root_model, 'rb') as sorted_file:
models[model] = self._list_to_dict(load(sorted_file))
return models
def _list_to_dict(self, model):
model_ngrams = [x[0] for x in model]
model_dict = {}
index = 0
for ngram in model_ngrams:
index += 1
model_dict[ngram] = index
return model_dict
def _count_size(self, files):
size = 0
for f in files:
size += os.path.getsize(f)
return size
def test_class(self, model, language):
ok = 0
results = []
count = 0
total_test = self.file_len(os.path.join(self._path_test_csv, language + '.csv'))
-
+
+ t_start = time.perf_counter()
with open(os.path.join(self._path_test_csv, language + '.csv'), newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
for pair in r:
label, string = pair
label = int(label)
string = literal_eval(string)
result = self._guess_file_language(model, string)
count += 1
- print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, total_test, result[0][1], result[0][0]),end='\r')
results.append(result[0])
if result[0][1] == language:
ok += 1
+ t_end = time.perf_counter()
+ print('[{0:4d}/{1:4d}] {2}:{3} {4} '.format(count, total_test, result[0][1], result[0][0], t_end - t_start), end='\r')
accuracy = ok / total_test
print('Tests for {} '.format(language))
print('Total test files : {}'.format(total_test))
print('Correctly classified files : {}'.format(ok))
print('Accuracy : {}%'.format(accuracy * 100))
return (ok, total_test, accuracy, results)
def speed_benchmark(self):
language = self._languages[10]
model = self._load_model()
test_set = self._get_test_set(language)
total_size = self._count_size(test_set)
print('{} kB in total'.format(total_size / 1024))
t_start = time.perf_counter()
self.test_class(model, language)
t_end = time.perf_counter()
print('{} seconds.'.format(t_end - t_start))
print('{} seconds per KiB'.format(((t_end - t_start) / total_size) * 1024))
def _guess_file_language(self, models, string):
tokens = Tokenizer.tokenize(string, 'letter')
generated_ngrams = self._generate_ngrams([chr(token) for token in tokens], 3)
+
+ #tokens = Tokenizer.tokenize(string, 'word')
+ #tokens = [''.join([chr(x) for x in token]) for token in tokens]
+ #generated_ngrams = self._generate_ngrams(tokens, 3)
statistics = {}
self._count_ngrams(statistics, generated_ngrams)
test_profile = self._list_to_dict(self._sort_by_value(statistics))
result = []
for model in models.keys():
root_model = os.path.join(self._root_model, model)
model_profile = models[model]
distance = self._distance(model_profile, test_profile)
result.append((distance, model))
return sorted(result)
def _sort_by_value(self, statistics):
statistics_sorted = sorted(statistics.items(),
key = operator.itemgetter(1),
reverse = True)[:500]
return statistics_sorted
def _distance(self, model_profile, test_profile):
distance = 0
maximum = len(test_profile)
for test_ngram in test_profile.keys():
test_rank = test_profile.get(test_ngram)
model_rank = model_profile.get(test_ngram, maximum)
d = abs(test_rank - model_rank)
distance += d
return distance
if __name__ == '__main__':
main()
diff --git a/swh/langdetect/ngramprob.py b/swh/langdetect/ngramprob.py
index ff2b0ee..e104285 100644
--- a/swh/langdetect/ngramprob.py
+++ b/swh/langdetect/ngramprob.py
@@ -1,191 +1,211 @@
import os, sys, subprocess, time, csv, argparse, json
import kenlm
from ast import literal_eval
from itertools import islice
from pickle import dump, load
from .utils.common import Tokenizer, file_to_string, find_file, count_files, remove_comment
csv.field_size_limit(sys.maxsize)
def main():
parser = argparse.ArgumentParser(description='Training and test tool of n-grams model.')
subparsers = parser.add_subparsers(dest='sub_command')
parser_train = subparsers.add_parser('train', help='Training on the dataset, dataset must be a *.csv file. A model will be created in the same directory.')
parser_train.add_argument('train_path', metavar='PATH', type=str, help='Path of the training dataset.')
# parser_train.add_argument('-n', '--ngrams', metavar='N', dest='train_maxsize', type=int, help='Set maximum input size of ConvNet, default 5.')
parser_test = subparsers.add_parser('test', help='Test on the dataset, dataset must be a directory with *.csv dataset named by corresponding language.')
parser_test.add_argument('test_root', metavar='ROOT', type=str, help='Root of the test dataset.')
if len(sys.argv[1:]) == 0:
parser.print_help()
parser.exit()
args = parser.parse_args()
if args.sub_command == 'train' :
n = NGramProb(args.train_path)
n.train()
elif args.sub_command == 'test':
n = NGramProb(args.test_root)
n.test()
else:
parser.parse_args('-h')
class NGramProb:
def __init__(self, path):
self._path = path
# Root of model folder
self._root_model = os.path.join(os.path.dirname(path), 'model_ngram_prob')
try:
os.mkdir(self._root_model)
except:
pass
+ try:
+ os.mkdir(os.path.join(self._root_model, 'arpa'))
+ except:
+ pass
+ try:
+ os.mkdir(os.path.join(self._root_model, 'text'))
+ except:
+ pass
# Path of result
self._path_result = os.path.join(os.path.dirname(path), 'result_ngram_prob')
dir_path = os.path.dirname(os.path.abspath(__file__))
- with open(os.path.join(dir_path, 'static_data', 'languages_less.json'), 'r') as f:
+ with open(os.path.join(dir_path, 'static_data', 'languages.json'), 'r') as f:
self._languages = json.load(f)
self._path_test_csv = path
self._num_of_classes = len(self._languages)
def file_len(self, fname):
with open(fname) as f:
count = 0
for l in f:
count += 1
return count
def train(self):
command = [os.path.join(os.path.dirname(os.path.abspath(__file__)),
'..' , '..', 'bin', 'lmplz'),
- '-o', '3', '-T', '/tmp', '--discount_fallback']
+ '-o', '3', '--discount_fallback']
with open(self._path, newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
label = 0
language = self._languages[label]
texts = []
for pair in r:
label_new, _ = pair
if label != int(label_new):
- with open(os.path.join(self._root_model, language), 'wb') as f:
+ with open(os.path.join(self._root_model, 'arpa', language), 'wb') as f:
train_text = ' '.join(texts)
- proc = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=f)
- proc.communicate(train_text.encode())
+ with open(os.path.join(self._root_model, 'text', language), 'w') as t:
+ t.write(train_text)
+ with open(os.path.join(self._root_model, 'text', language), 'r') as t:
+ proc = subprocess.Popen(command, stdin=t, stdout=f)
+ proc.communicate()
texts = []
label, string = pair
label = int(label)
language = self._languages[label]
print(language, end='\r')
- text = literal_eval(string)
- tokens = Tokenizer.tokenize(text, 'letter')
-
+ string = literal_eval(string)
+ tokens = Tokenizer.tokenize(string, 'letter')
texts.append(' '.join(chr(token) for token in tokens))
+
+ #tokens = Tokenizer.tokenize(string, 'word')
+ #textb = b' '.join(tokens)
+ #text = ''.join([chr(x) for x in list(textb)])
+ #text = ' '.join([x for x in text.split(' ') if x.strip('')])
+ #texts.append(text)
- with open(os.path.join(self._root_model, language), 'wb') as f:
+ with open(os.path.join(self._root_model, 'arpa', language), 'wb') as f:
train_text = ' '.join(texts)
- proc = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=f)
- proc.communicate(train_text.encode())
+ with open(os.path.join(self._root_model, 'text', language), 'w') as t:
+ t.write(train_text)
+ with open(os.path.join(self._root_model, 'text', language), 'r') as t:
+ proc = subprocess.Popen(command, stdin=t, stdout=f)
+ proc.communicate()
+
+
def test(self):
try:
r = open(self._path_result, 'rb')
test_result = load(r)
r.close()
except FileNotFoundError:
test_result = {}
models = self._load_models()
for language in [x for x in self._languages if x not in test_result.keys()]:
test_result[language] = self.test_class(models, language)
with open(self._path_result, 'wb') as f:
dump(test_result, f)
def _load_models(self):
models = {}
for model in [model
- for model in os.listdir(self._root_model)
- if not model.startswith('.')]:
- root_model = os.path.join(self._root_model, model)
+ for model in self._languages]:
+ root_model = os.path.join(self._root_model, 'arpa', model)
models[model] = kenlm.LanguageModel(root_model)
return models
def _count_size(self, files):
size = 0
for f in files:
size += os.path.getsize(f)
return size
def test_class(self, model, language):
ok = 0
results = []
count = 0
total_test = self.file_len(os.path.join(self._path_test_csv, language + '.csv'))
with open(os.path.join(self._path_test_csv, language + '.csv'), newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
for pair in r:
label, string = pair
label = int(label)
string = literal_eval(string)
result = self._guess_file_language(model, string)
count += 1
print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, total_test, result[0][1], result[0][0]),end='\r')
results.append(result[0])
if result[0][1] == language:
ok += 1
accuracy = ok / total_test
print('Tests for {} '.format(language))
print('Total test files : {}'.format(total_test))
print('Correctly classified files : {}'.format(ok))
print('Accuracy : {}%'.format(accuracy * 100))
return (ok, total_test, accuracy, results)
def speed_benchmark(self):
language = self._languages[10]
model = self._load_model()
test_set = self._get_test_set(language)
total_size = self._count_size(test_set)
print('{} kB in total'.format(total_size / 1024))
t_start = time.perf_counter()
self.test_class(model, language)
t_end = time.perf_counter()
print('{} seconds.'.format(t_end - t_start))
print('{} seconds per KiB'.format(((t_end - t_start) / total_size) * 1024))
def _guess_file_language(self, models, string):
tokens = Tokenizer.tokenize(string, 'letter')
text = ' '.join(chr(token) for token in tokens)
- #text = file_to_string(filename)
- #tokens = tokenizer(text, 'word')
- #tokens = b' '.join(tokens)
- #text = ''.join(chr(token) for token in list(tokens))
+
+ #tokens = Tokenizer.tokenize(string, 'word')
+ #textb = b' '.join(tokens)
+ #text = ''.join([chr(x) for x in list(textb)])
result = []
for model_key in models.keys():
root_model = os.path.join(self._root_model, model_key)
model = models[model_key]
score = model.score(text)
result.append((score, model_key))
return sorted(result, reverse=True)
if __name__ == '__main__':
main()
diff --git a/swh/langdetect/utils/common.py b/swh/langdetect/utils/common.py
index 652009e..e586d21 100644
--- a/swh/langdetect/utils/common.py
+++ b/swh/langdetect/utils/common.py
@@ -1,175 +1,175 @@
"""
Here regroup basic preprocessing methods
used in learning stage for different
approaches.
"""
import re, os, time
_not_start_with_point = lambda x: not x.startswith('.')
class Tokenizer():
separator = re.compile(
b'([\x20-\x2f\x3a-\x40\x5b-\x5e\x60\x7b-\x7e\s]|\d+\.\d+|\d+|\d+\.\d+[eE][+-]?\d+)')
def is_number(n):
try:
float(n)
except ValueError:
return False
return True
def tokenize(text, re_name):
''' Splits text into tokens '''
if re_name == 'letter':
return list(text)
elif re_name == 'word':
- pretokens = [x for x in Tokenizer.separator.split(text) if x and x.strip(b'\n')]
+ pretokens = [x for x in Tokenizer.separator.split(text) if x ]
tokens = []
for x in pretokens :
if Tokenizer.is_number(x):
tokens.append(b'<number>')
- elif x.isspace():
+ elif x.isspace() and x != b'\n':
tokens.append(b' ')
else:
tokens.append(x)
return tokens
def file_to_string(filename):
""" Read a file to a string. """
with open(filename, 'rb') as f:
data = f.read()
return data
def count_files(root_language):
all_folders = natural_sort(filter
(_not_start_with_point,
os.listdir(root_language)))
files = natural_sort(filter
(_not_start_with_point,
os.listdir(root_language + '/' + all_folders[-1])))
(max,_) = os.path.splitext(files[-1])
return int(max)
def find_file(root_language, n):
'''Find the n-th file in language folder'''
if n > count_files(root_language):
return ''
else:
start = (n - 1) // 1000 * 1000 + 1
end = start + 999
root_count = root_language + '/' + str(start) + '-' + str(end)
files = natural_sort(filter
(_not_start_with_point,
os.listdir(root_count)))
return root_count + '/' + files[n - start]
'''def replace_string_and_number(text):
""" Replace strings and numbers in a file by special tokens
"""
str_replaced = _re_string.sub(b'"__str__"', text)
str_num_replaced = _re_number.sub(b'__num__', str_replaced)
#str_num_replaced = text
return str_num_replaced
'''
def natural_sort(l):
convert = lambda text: int(text) if text.isdigit() else text.lower()
alphanum_key = lambda key: [ convert(c) for c in re.split('([0-9]+)', key) ]
return sorted(l, key = alphanum_key)
def remove_comment(binary_text, language):
splited_text = binary_text.splitlines()
text = b'\n'.join(splited_text)
regexp = get_regexp(language)
if not regexp:
return binary_text
return regexp.sub(b'\n', text)
def get_regexp(language):
re_inline = get_inline(language)
re_block = get_block(language)
rs = []
if re_inline:
rs.append(re_inline)
if re_block:
rs.append(re_block)
if rs == []:
return None
return re.compile(b'|'.join(rs), re.DOTALL)
def get_inline(language):
r_base = b'[^\\n]*(?:\\n|$)'
if language in ['Ada',
'Eiffel',
'VHDL',
'AppleScript',
'Haskell',
'Lua',
'PLSQL']:
r = b'(--)' + r_base
elif language in ['C',
'C++',
'C#',
'D',
'JavaScript',
'ActionScript',
'Java',
'Rust']:
r = b'(//)' + r_base
elif language == 'Xojo':
r = b'(' + b'|'.join([b'//', b"\'"]) + b')' + r_base
elif language in ['R',
'Tcl',
'Awk',
'Perl',
'Perl 6',
'Ruby',
'Python']:
r = b'(#)' + r_base
elif language in ['COBOL']:
r = b'(\\*>)' + r_base
elif language in ['Matlab']:
r = b'(%)' + r_base
else:
return None
return b'(' + r + b')'
def get_block(language):
r_base = b'.*?'
if language in ['C',
'C++',
'C#',
'JavaScript',
'ActionScript',
'PLSQL',
'PHP',
'Rust']:
r = b'(/\\*)' + r_base + b'(\\*/)'
elif language in ['OCaml',
'Pascal',
'Modula-2',
'Smarty']:
r = b'(\\(\\*)' + r_base + b'(\\*\\))'
elif language == 'Python':
r = b'(\'\'\')' + r_base + b'(\'\'\')'
else:
return None
return b'(' + r + b')'
def purify(text, lang):
# TODO: for some language like HTML, remove code other than principal language
pass
diff --git a/swh/langdetect/utils/training.py b/swh/langdetect/utils/training.py
index 09c90c9..ffdf47a 100644
--- a/swh/langdetect/utils/training.py
+++ b/swh/langdetect/utils/training.py
@@ -1,115 +1,125 @@
import os
import random
import csv
import json
from .common import count_files, find_file, file_to_string
-from itertools import islice
from shutil import copyfile
class Dataset:
def __init__(self, root):
self.root_code = os.path.join(root, '..', 'code_by_language')
self.root_training = os.path.join(root, '..', 'training_set')
self.root_training_csv = os.path.join(root, '..', 'training_set_csv')
self.root_test = os.path.join(root, '..', 'test_set')
self.root_test_csv = os.path.join(root, '..', 'test_set_csv')
try:
os.mkdir(self.root_training)
except FileExistsError:
pass
try:
os.mkdir(self.root_training_csv)
except FileExistsError:
pass
try:
os.mkdir(self.root_test)
except FileExistsError:
pass
try:
os.mkdir(self.root_test_csv)
except FileExistsError:
pass
dir_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
print(dir_path)
- with open(os.path.join(dir_path, 'static_data', 'languages_less.json'), 'r') as f:
+ with open(os.path.join(dir_path, 'static_data', 'languages_mini.json'), 'r') as f:
self._languages = json.load(f)
def build_training_set(self):
for language in self._languages:
# limit defines the size of training set
# upper defines the maximum size
root_code_language = os.path.join(self.root_code, language)
root_training_language = os.path.join(self.root_training, language)
total = count_files(root_code_language)
try:
os.mkdir(root_training_language)
except FileExistsError:
pass
upper = 1000
if total >= upper:
limit = upper // 2
else:
limit = total // 2
indices = random.sample(range(1, total + 1), limit)
files = map(lambda x : find_file(root_code_language, x), indices)
for src in files:
basename = os.path.basename(src)
des = os.path.join(root_training_language, basename)
os.symlink(src, des)
def build_test_set(self, extension=True):
for language in self._languages:
root_language = os.path.join(self.root_code, language)
root_test_language = os.path.join(self.root_test, language)
try:
os.mkdir(root_test_language)
except FileExistsError:
pass
files = self.get_test_set(language)
for src in files:
if extension:
des = os.path.join(root_test_language, os.path.basename(src))
else:
des = os.path.join(root_test_language, os.path.splitext(os.path.basename(src))[0])
- copyfile(src, des)
+ os.symlink(src, des)
def train_files_with_label(self):
with open(os.path.join(self.root_training_csv, 'training_set.csv'), 'w', newline='') as csvfile:
setwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL)
+ lang_index = {k : v for v, k in enumerate(self._languages)}
for language in self._languages:
print(language)
root_training_language = os.path.join(self.root_training, language)
- index_lang = self._languages.index(language)
+ index_lang = lang_index[language]
for f in [x for x in os.listdir(root_training_language) if not x.startswith('.')]:
filename = os.path.join(root_training_language, f)
- tokens = file_to_string(filename) # 10240
- setwriter.writerow([index_lang, tokens])
+ _, extension = os.path.splitext(f)
+ text = extension.encode() + b' ' + file_to_string(filename) # 10240
+ setwriter.writerow([index_lang, text])
def get_test_set(self, language):
root_training_language = os.path.join(self.root_training, language)
root_language = os.path.join(self.root_code, language)
total = count_files(root_language)
training_set = [int(os.path.splitext(x)[0]) for x in os.listdir(root_training_language) if not x.startswith('.')]
- it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set and os.path.getsize(find_file(root_language, x)) <= 1048576)
- test_set = list(islice(it, 1000))
+
+ it = [find_file(root_language, x) for x in range(1, total + 1) if x not in training_set]
+ try:
+ test_set = random.sample(it, 1000)
+ except ValueError:
+ test_set = it
+
if len(test_set) == 0:
- it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set)
- test_set = list(islice(it, 1000))
+ it = [find_file(root_language, x) for x in range(1, total + 1) if x not in training_set]
+ try:
+ test_set = random.sample(it, 1000)
+ except ValueError:
+ test_set = it
return test_set
def test_files_with_label(self):
for language in self._languages:
root_test_language = os.path.join(self.root_test, language)
index_lang = self._languages.index(language)
with open(os.path.join(self.root_test_csv, language + '.csv'), 'w', newline='') as csvfile:
setwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL)
for f in [x for x in os.listdir(root_test_language) if not x.startswith('.')]:
filename = os.path.join(root_test_language, f)
- tokens = file_to_string(filename)
- setwriter.writerow([index_lang, tokens])
+ _, extension = os.path.splitext(f)
+ text = extension.encode() + b' ' + file_to_string(filename)
+ setwriter.writerow([index_lang, text])

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 2:55 PM (5 d, 4 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3317920

Event Timeline