Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/langdetect/cnn.py b/swh/langdetect/cnn.py
index c526567..b7bc9e5 100644
--- a/swh/langdetect/cnn.py
+++ b/swh/langdetect/cnn.py
@@ -1,264 +1,266 @@
import os
import sys
import subprocess
import time
import random
import csv
import numpy as np
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import tensorflow as tf
import json
import argparse
from ast import literal_eval
from pickle import dump
from pickle import load
from numpy import array
from .utils.common import tokenizer
from .utils.common import file_to_string
from keras.preprocessing.sequence import pad_sequences
from keras.models import Model
from keras.models import Sequential
from keras.models import load_model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Dropout
from keras.layers import ThresholdedReLU
from keras.layers import Activation
from keras.layers import Lambda
from keras.layers import Embedding
from keras.layers.convolutional import Convolution1D
from keras.layers.convolutional import MaxPooling1D
from keras.layers.normalization import BatchNormalization
from keras.layers.merge import concatenate
from keras.utils import np_utils
from keras.optimizers import SGD
def main():
parser = argparse.ArgumentParser(description='Training and test tool of charactor-level ConvNet text categorisation.')
subparsers = parser.add_subparsers()
parser_train = subparsers.add_parser('train', help='Training on the dataset, dataset must be a *.csv file. A model will be created in the same directory.')
parser_train.add_argument('train_path', metavar='PATH', type=str, help='Path of the training dataset.')
parser_train.add_argument('-ms', '--maxsize', metavar='SIZE', dest='train_maxsize', type=int, help='Set maximum input size of ConvNet, default 1024.')
parser_train.add_argument('-e', '--epochs', metavar='N', dest='train_epochs', type=int, help='Number of training epochs (iterations), default 50.')
parser_test = subparsers.add_parser('test', help='Test on the dataset, dataset must be a directory with *.csv dataset named by corresponding language.')
parser_test.add_argument('test_root', metavar='ROOT', type=str, help='Root of the test dataset.')
if len(sys.argv[1:]) == 0:
parser.print_help()
parser.exit()
args = parser.parse_args()
print(args)
if args.train_path:
if args.train_maxsize:
if args.train_epochs:
n = CNN(args.train_path, maxsize=args.train_maxsize, epochs=args.train_epochs)
n.train()
else:
n = CNN(args.train_path, maxsize=args.train_maxsize)
n.train()
else:
if args.train_epochs:
n = CNN(args.train_path, epochs=args.train_epochs)
n.train()
else:
n = CNN(args.train_path)
n.train()
elif args.test_root:
n = CNN(args.test_root)
n.test()
else:
parser.parse_args('-h')
class CNN:
def __init__(self, path, maxsize=1024, epochs=50):
self._path = path
# Root of model folder
self._root_model = os.path.join(os.path.dirname(path), 'model_cnn')
try:
os.mkdir(self._root_model)
except:
pass
# Path of result
self._path_result = os.path.join(os.path.dirname(path), 'result_cnn')
path = os.path.abspath(__file__)
dir_path = os.path.dirname(path)
with open(os.path.join(dir_path, 'static_data', 'languages.json'), 'r') as f:
self._languages = json.load(f)
self._path_test_csv = path
self._input_size = maxsize
self._vocab_size = 256
self._num_of_classes = len(self._languages)
self._batch_size = 128
self._epochs = epochs
def file_len(self, fname):
with open(fname) as f:
count = 0
for l in f:
count += 1
return count
def train(self):
model = self._get_model()
+ earlystop = EarlyStopping(monitor='loss', min_delta=0, patience=1, verbose=0, mode='auto')
+ callbacks = [earlystop]
model.fit_generator(
self._generator(self._input_size, self._num_of_classes, self._batch_size),
- steps_per_epoch=self.file_len(self._path) / self._batch_size, epochs=self._epochs)
+ steps_per_epoch=self.file_len(self._path) / self._batch_size, epochs=self._epochs, callbacks=callbacks)
model.save(os.path.join(self._root_model, 'model.h5'))
def _generator(self, length, total_class, batch_size=128):
counter = 0
while True:
with open(self._path, newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
for pair in r:
if counter == 0:
X = np.empty((0, length))
Y = np.empty((0, total_class))
label, string = pair
label = int(label)
string = literal_eval(string)
tokens = [x + 1 for x in tokenizer(string, 'letter')]
X = np.append(X, pad_sequences([tokens], maxlen=length), axis=0)
label = array(np_utils.to_categorical([label], total_class))
Y = np.append(Y, label, axis=0)
counter += 1
if counter == batch_size:
counter = 0
yield(X,Y)
def _get_model(self):
input_size = self._input_size
alphabet_size = self._vocab_size
embedding_size = 256
conv_layers = [(256,7,3), (256,7,3), (256,3,-1), (256,3,-1), (256,3,-1), (256,3,3)]
threshold = 1e-6
fully_connected_layers = [1024, 1024]
dropout_p = 0.2
optimizer = 'adam'
loss = 'categorical_crossentropy'
num_of_classes = self._num_of_classes
# Input layer
inputs = Input(shape=(input_size,), name='sent_input', dtype='int64')
# Embedding layers
x = Embedding(alphabet_size + 1, embedding_size, input_length=input_size)(inputs)
# Convolution layers
for cl in conv_layers:
x = Convolution1D(cl[0], cl[1])(x)
x = ThresholdedReLU(threshold)(x)
if cl[2] != -1:
x = MaxPooling1D(cl[2])(x)
x = Flatten()(x)
# Fully connected layers
for fl in fully_connected_layers:
x = Dense(fl)(x)
x = ThresholdedReLU(threshold)(x)
x = Dropout(dropout_p)(x)
# Output layer
predictions = Dense(num_of_classes, activation='softmax')(x)
# Build and compile model
model = Model(inputs=inputs, outputs=predictions)
model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
print(model.summary())
return model
def _max_len(self, texts):
return max([len(text) for text in texts])
def test(self):
csv.field_size_limit(sys.maxsize)
try:
r = open(self._path_result, 'rb')
test_result = load(r)
r.close()
except FileNotFoundError:
test_result = {}
model = self._load_model()
for language in [x for x in self._languages if x not in test_result.keys()]:
test_result[language] = self.test_class(model, language)
with open(self._path_result, 'wb') as f:
dump(test_result, f)
def _load_model(self):
model = load_model(os.path.join(self._root_model, 'model.h5'))
return model
def _count_size(self, files):
size = 0
for f in files:
size += os.path.getsize(f)
return size
def test_class(self, model, language):
ok = 0
results = []
count = 0
total_test = self.file_len(os.path.join(self._path_test_csv, language + '.csv'))
with open(os.path.join(self._path_test_csv, language + '.csv'), newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
for pair in r:
label, string = pair
label = int(label)
string = literal_eval(string)
tokens = [x + 1 for x in tokenizer(string, 'letter')]
result = self._guess_file_language(model, tokens)
count += 1
print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, total_test, result[0][1], result[0][0]),end='\r')
results.append(result[0])
if result[0][1] == language:
ok += 1
accuracy = ok / total_test
print('Tests for {} '.format(language))
print('Total test files : {}'.format(total_test))
print('Correctly classified files : {}'.format(ok))
print('Accuracy : {}%'.format(accuracy * 100))
return (ok, total_test, accuracy, results)
def speed_benchmark(self):
language = self._languages[10]
model = self._load_model()
test_set = self._get_test_set(language)
total_size = self._count_size(test_set)
print('{} kB in total'.format(total_size / 1024))
t_start = time.perf_counter()
self.test_class(model, language)
t_end = time.perf_counter()
print('{} seconds.'.format(t_end - t_start))
print('{} seconds per KiB'.format(((t_end - t_start) / total_size) * 1024))
def _guess_file_language(self, model, tokens):
X = pad_sequences([tokens], maxlen=self._input_size)
result = list(model.predict(X))[0]
result = [(s, self._languages[i]) for i, s in enumerate(result)]
return sorted(result, reverse=True)
if __name__ == '__main__':
main()
diff --git a/swh/langdetect/cnn.py b/swh/langdetect/cnn_w.py
similarity index 70%
copy from swh/langdetect/cnn.py
copy to swh/langdetect/cnn_w.py
index c526567..e5f3673 100644
--- a/swh/langdetect/cnn.py
+++ b/swh/langdetect/cnn_w.py
@@ -1,264 +1,299 @@
import os
import sys
import subprocess
import time
import random
import csv
import numpy as np
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import tensorflow as tf
import json
import argparse
from ast import literal_eval
from pickle import dump
from pickle import load
from numpy import array
from .utils.common import tokenizer
from .utils.common import file_to_string
from keras.preprocessing.sequence import pad_sequences
+from keras.callbacks import EarlyStopping
from keras.models import Model
from keras.models import Sequential
from keras.models import load_model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import Flatten
+from keras.layers import Merge
from keras.layers import Dropout
from keras.layers import ThresholdedReLU
from keras.layers import Activation
from keras.layers import Lambda
from keras.layers import Embedding
from keras.layers.convolutional import Convolution1D
from keras.layers.convolutional import MaxPooling1D
from keras.layers.normalization import BatchNormalization
-from keras.layers.merge import concatenate
+from keras.layers import Concatenate
from keras.utils import np_utils
from keras.optimizers import SGD
+from collections import Counter
+
+from keras import backend as K
+K.set_session(K.tf.Session(config=K.tf.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)))
def main():
parser = argparse.ArgumentParser(description='Training and test tool of charactor-level ConvNet text categorisation.')
subparsers = parser.add_subparsers()
parser_train = subparsers.add_parser('train', help='Training on the dataset, dataset must be a *.csv file. A model will be created in the same directory.')
parser_train.add_argument('train_path', metavar='PATH', type=str, help='Path of the training dataset.')
parser_train.add_argument('-ms', '--maxsize', metavar='SIZE', dest='train_maxsize', type=int, help='Set maximum input size of ConvNet, default 1024.')
parser_train.add_argument('-e', '--epochs', metavar='N', dest='train_epochs', type=int, help='Number of training epochs (iterations), default 50.')
parser_test = subparsers.add_parser('test', help='Test on the dataset, dataset must be a directory with *.csv dataset named by corresponding language.')
parser_test.add_argument('test_root', metavar='ROOT', type=str, help='Root of the test dataset.')
if len(sys.argv[1:]) == 0:
parser.print_help()
parser.exit()
args = parser.parse_args()
print(args)
if args.train_path:
if args.train_maxsize:
if args.train_epochs:
- n = CNN(args.train_path, maxsize=args.train_maxsize, epochs=args.train_epochs)
+ n = CNNword(args.train_path, maxsize=args.train_maxsize, epochs=args.train_epochs)
n.train()
else:
- n = CNN(args.train_path, maxsize=args.train_maxsize)
+ n = CNNword(args.train_path, maxsize=args.train_maxsize)
n.train()
else:
if args.train_epochs:
- n = CNN(args.train_path, epochs=args.train_epochs)
+ n = CNNword(args.train_path, epochs=args.train_epochs)
n.train()
else:
- n = CNN(args.train_path)
+ n = CNNword(args.train_path)
n.train()
elif args.test_root:
- n = CNN(args.test_root)
+ n = CNNword(args.test_root)
n.test()
else:
parser.parse_args('-h')
-class CNN:
+class CNNword:
- def __init__(self, path, maxsize=1024, epochs=50):
+ def __init__(self, path, maxsize=300, epochs=30):
self._path = path
# Root of model folder
- self._root_model = os.path.join(os.path.dirname(path), 'model_cnn')
+ self._root_model = os.path.join(os.path.dirname(path), 'model_cnn_word')
try:
os.mkdir(self._root_model)
except:
pass
# Path of result
- self._path_result = os.path.join(os.path.dirname(path), 'result_cnn')
-
+ self._path_result = os.path.join(os.path.dirname(path), 'result_cnn_word')
+
path = os.path.abspath(__file__)
dir_path = os.path.dirname(path)
with open(os.path.join(dir_path, 'static_data', 'languages.json'), 'r') as f:
self._languages = json.load(f)
self._path_test_csv = path
+ self._path_vocab = os.path.join(self._root_model, 'vocab')
self._input_size = maxsize
- self._vocab_size = 256
+ self._vocab_size = 15001
self._num_of_classes = len(self._languages)
- self._batch_size = 128
+ self._batch_size = 64
self._epochs = epochs
def file_len(self, fname):
with open(fname) as f:
count = 0
for l in f:
count += 1
return count
def train(self):
-
model = self._get_model()
+ if not os.path.isfile(self._path_vocab):
+ self._learn_vocab(self._input_size, self._num_of_classes)
+ with open(self._path_vocab, 'rb') as f:
+ c = load(f)
+ l = c.most_common(15000)
+ indexer = dict((v[0], i + 1) for i, v in enumerate(l))
+ earlystop = EarlyStopping(monitor='loss', min_delta=0, patience=1, verbose=0, mode='auto')
+ callbacks = [earlystop]
model.fit_generator(
- self._generator(self._input_size, self._num_of_classes, self._batch_size),
- steps_per_epoch=self.file_len(self._path) / self._batch_size, epochs=self._epochs)
+ self._generator(self._input_size, self._num_of_classes, indexer, self._batch_size),
+ steps_per_epoch=self.file_len(self._path) / self._batch_size, epochs=self._epochs, callbacks=callbacks)
model.save(os.path.join(self._root_model, 'model.h5'))
- def _generator(self, length, total_class, batch_size=128):
+ def _learn_vocab(self, length, total_class):
+ c = Counter()
+ with open(self._path, newline='') as csvfile:
+ r = csv.reader(csvfile, delimiter=' ', quotechar='|')
+ for pair in r:
+ label, string = pair
+ label = int(label)
+ print(label)
+ string = literal_eval(string)
+ tokens = tokenizer(string, 'word')
+ c.update(tokens)
+
+ with open(self._path_vocab, 'wb') as f:
+ dump(c, f)
+
+ def _generator(self, length, total_class, indexer, batch_size=64):
counter = 0
+ oov_index = len(indexer) + 1
while True:
with open(self._path, newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
for pair in r:
if counter == 0:
X = np.empty((0, length))
Y = np.empty((0, total_class))
label, string = pair
label = int(label)
string = literal_eval(string)
- tokens = [x + 1 for x in tokenizer(string, 'letter')]
+ tokens = [indexer.get(x, oov_index) for x in tokenizer(string, 'word')]
X = np.append(X, pad_sequences([tokens], maxlen=length), axis=0)
label = array(np_utils.to_categorical([label], total_class))
Y = np.append(Y, label, axis=0)
counter += 1
if counter == batch_size:
counter = 0
yield(X,Y)
+
def _get_model(self):
input_size = self._input_size
- alphabet_size = self._vocab_size
+ vocab_size = self._vocab_size
embedding_size = 256
- conv_layers = [(256,7,3), (256,7,3), (256,3,-1), (256,3,-1), (256,3,-1), (256,3,3)]
- threshold = 1e-6
- fully_connected_layers = [1024, 1024]
- dropout_p = 0.2
optimizer = 'adam'
loss = 'categorical_crossentropy'
num_of_classes = self._num_of_classes
+
+ embedding_layer = Embedding(vocab_size + 1,
+ embedding_size,
+ input_length=input_size,
+# trainable=False,
+ )
+
+ # applying a more complex convolutional approach
+ convs = []
+ filter_sizes = [3,4,5]
+
+ sequence_input = Input(shape=(input_size,), dtype='int64')
+ embedded_sequences = embedding_layer(sequence_input)
- # Input layer
- inputs = Input(shape=(input_size,), name='sent_input', dtype='int64')
- # Embedding layers
- x = Embedding(alphabet_size + 1, embedding_size, input_length=input_size)(inputs)
- # Convolution layers
- for cl in conv_layers:
- x = Convolution1D(cl[0], cl[1])(x)
- x = ThresholdedReLU(threshold)(x)
- if cl[2] != -1:
- x = MaxPooling1D(cl[2])(x)
- x = Flatten()(x)
- # Fully connected layers
- for fl in fully_connected_layers:
- x = Dense(fl)(x)
- x = ThresholdedReLU(threshold)(x)
- x = Dropout(dropout_p)(x)
- # Output layer
- predictions = Dense(num_of_classes, activation='softmax')(x)
- # Build and compile model
- model = Model(inputs=inputs, outputs=predictions)
+ for fsz in filter_sizes:
+ l_conv = Convolution1D(filters=32, kernel_size=fsz, activation='relu')(embedded_sequences)
+ l_pool = MaxPooling1D(5)(l_conv)
+ convs.append(l_pool)
+
+ l_merge = Concatenate(axis=1)(convs)
+ l_cov1= Convolution1D(128, 5, activation='relu')(l_merge)
+ l_pool1 = MaxPooling1D(5)(l_cov1)
+ l_cov2 = Convolution1D(128, 5, activation='relu')(l_pool1)
+ l_pool2 = MaxPooling1D(5)(l_cov2)
+ l_flat = Flatten()(l_pool2)
+ l_dense = Dense(512, activation='relu')(l_flat)
+ preds = Dense(num_of_classes, activation='softmax')(l_dense)
+
+ model = Model(sequence_input, preds)
model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
print(model.summary())
return model
def _max_len(self, texts):
return max([len(text) for text in texts])
def test(self):
csv.field_size_limit(sys.maxsize)
try:
r = open(self._path_result, 'rb')
test_result = load(r)
r.close()
except FileNotFoundError:
test_result = {}
model = self._load_model()
for language in [x for x in self._languages if x not in test_result.keys()]:
test_result[language] = self.test_class(model, language)
with open(self._path_result, 'wb') as f:
dump(test_result, f)
def _load_model(self):
model = load_model(os.path.join(self._root_model, 'model.h5'))
return model
def _count_size(self, files):
size = 0
for f in files:
size += os.path.getsize(f)
return size
def test_class(self, model, language):
ok = 0
results = []
count = 0
total_test = self.file_len(os.path.join(self._path_test_csv, language + '.csv'))
with open(os.path.join(self._path_test_csv, language + '.csv'), newline='') as csvfile:
r = csv.reader(csvfile, delimiter=' ', quotechar='|')
for pair in r:
label, string = pair
label = int(label)
string = literal_eval(string)
tokens = [x + 1 for x in tokenizer(string, 'letter')]
result = self._guess_file_language(model, tokens)
count += 1
print('[{0:4d}/{1:4d}] {2}:{3} '.format(count, total_test, result[0][1], result[0][0]),end='\r')
results.append(result[0])
if result[0][1] == language:
ok += 1
accuracy = ok / total_test
print('Tests for {} '.format(language))
print('Total test files : {}'.format(total_test))
print('Correctly classified files : {}'.format(ok))
print('Accuracy : {}%'.format(accuracy * 100))
return (ok, total_test, accuracy, results)
def speed_benchmark(self):
language = self._languages[10]
model = self._load_model()
test_set = self._get_test_set(language)
total_size = self._count_size(test_set)
print('{} kB in total'.format(total_size / 1024))
t_start = time.perf_counter()
self.test_class(model, language)
t_end = time.perf_counter()
print('{} seconds.'.format(t_end - t_start))
print('{} seconds per KiB'.format(((t_end - t_start) / total_size) * 1024))
def _guess_file_language(self, model, tokens):
X = pad_sequences([tokens], maxlen=self._input_size)
result = list(model.predict(X))[0]
result = [(s, self._languages[i]) for i, s in enumerate(result)]
return sorted(result, reverse=True)
if __name__ == '__main__':
main()
diff --git a/swh/langdetect/utils/common.py b/swh/langdetect/utils/common.py
index dc7bdc9..462788a 100644
--- a/swh/langdetect/utils/common.py
+++ b/swh/langdetect/utils/common.py
@@ -1,155 +1,155 @@
"""
Here regroup basic preprocessing methods
used in learning stage for different
approaches.
"""
import re, os
#_re_string = re.compile(b"""("(\\.|[^"\\])*"|'(\\.|[^'\\])*')""")
_re_number = re.compile(b'([\d]+)|([\d]+.[\d]+)[^A-Za-z]')
-_re_separator = re.compile(b'(\W)')
+_re_separator = re.compile(b'([\x20-\x30\x3a-\x40\x5b-\x60\x7b-\x7e\t\n])')
_not_start_with_point = lambda x: not x.startswith('.')
def tokenizer(text, re_name):
''' Splits text into tokens '''
if re_name == 'letter':
return list(text)
elif re_name == 'word':
- return [word for word in _re_separator.split(text) if word.strip(b' \t')]
+ return [word for word in _re_separator.split(text) if word.strip(b'')]
def file_to_string(filename):
""" Read a file to a string. """
with open(filename, 'rb') as f:
data = f.read()
return replace_string_and_number(data)
def count_files(root_language):
all_folders = natural_sort(filter
(_not_start_with_point,
os.listdir(root_language)))
files = natural_sort(filter
(_not_start_with_point,
os.listdir(root_language + '/' + all_folders[-1])))
(max,_) = os.path.splitext(files[-1])
return int(max)
def find_file(root_language, n):
'''Find the n-th file in language folder'''
if n > count_files(root_language):
return ''
else:
start = (n - 1) // 1000 * 1000 + 1
end = start + 999
root_count = root_language + '/' + str(start) + '-' + str(end)
files = natural_sort(filter
(_not_start_with_point,
os.listdir(root_count)))
return root_count + '/' + files[n - start]
def replace_string_and_number(text):
""" Replace strings and numbers in a file by special tokens
"""
# str_replaced = re.sub(_re_string, '__str__', text)
# str_num_replaced = re.sub(_re_number, '__num__', str_replaced)
str_num_replaced = text
return str_num_replaced
def natural_sort(l):
convert = lambda text: int(text) if text.isdigit() else text.lower()
alphanum_key = lambda key: [ convert(c) for c in re.split('([0-9]+)', key) ]
return sorted(l, key = alphanum_key)
def remove_comment(binary_text, language):
splited_text = binary_text.splitlines()
text = b'\n'.join(splited_text)
regexp = get_regexp(language)
if not regexp:
return binary_text
return regexp.sub(b'\n', text)
def get_regexp(language):
re_inline = get_inline(language)
re_block = get_block(language)
rs = []
if re_inline:
rs.append(re_inline)
if re_block:
rs.append(re_block)
if rs == []:
return None
return re.compile(b'|'.join(rs), re.DOTALL)
def get_inline(language):
r_base = b'[^\\n]*(?:\\n|$)'
if language in ['Ada',
'Eiffel',
'VHDL',
'AppleScript',
'Haskell',
'Lua',
'PLSQL']:
r = b'(--)' + r_base
elif language in ['C',
'C++',
'C#',
'D',
'JavaScript',
'ActionScript',
'Java',
'Rust']:
r = b'(//)' + r_base
elif language == 'Xojo':
r = b'(' + b'|'.join([b'//', b"\'"]) + b')' + r_base
elif language in ['R',
'Tcl',
'Awk',
'Perl',
'Perl 6',
'Ruby',
'Python']:
r = b'(#)' + r_base
elif language in ['COBOL']:
r = b'(\\*>)' + r_base
elif language in ['Matlab']:
r = b'(%)' + r_base
else:
return None
return b'(' + r + b')'
def get_block(language):
r_base = b'.*?'
if language in ['C',
'C++',
'C#',
'JavaScript',
'ActionScript',
'PLSQL',
'PHP',
'Rust']:
r = b'(/\\*)' + r_base + b'(\\*/)'
elif language in ['OCaml',
'Pascal',
'Modula-2',
'Smarty']:
r = b'(\\(\\*)' + r_base + b'(\\*\\))'
elif language == 'Python':
r = b'(\'\'\')' + r_base + b'(\'\'\')'
else:
return None
return b'(' + r + b')'
def purify(text, lang):
# TODO: for some language like HTML, remove code other than principal language
pass
diff --git a/swh/langdetect/utils/training.py b/swh/langdetect/utils/training.py
index 77f7022..1f44ca5 100644
--- a/swh/langdetect/utils/training.py
+++ b/swh/langdetect/utils/training.py
@@ -1,108 +1,109 @@
import os
import random
import csv
from .common import count_files, find_file, file_to_string
from itertools import islice
+from shutil import copyfile
class Dataset:
def __init__(self, root):
self.root_code = os.path.join(root, '..', 'code_by_language')
self.root_training = os.path.join(root, '..', 'training_set')
self.root_training_csv = os.path.join(root, '..', 'training_set_csv')
self.root_test = os.path.join(root, '..', 'test_set')
self.root_test_csv = os.path.join(root, '..', 'test_set_csv')
try:
os.mkdir(self.root_training)
except FileExistsError:
pass
try:
os.mkdir(self.root_training_csv)
except FileExistsError:
pass
try:
os.mkdir(self.root_test)
except FileExistsError:
pass
try:
os.mkdir(self.root_test_csv)
except FileExistsError:
pass
def build_training_set(self, languages):
for language in languages:
# limit defines the size of training set
# upper defines the maximum size
root_code_language = os.path.join(self.root_code, language)
root_training_language = os.path.join(self.root_training, language)
total = count_files(root_code_language)
try:
os.mkdir(root_training_language)
except FileExistsError:
pass
upper = 1000
if total >= upper:
limit = upper // 2
else:
limit = total // 2
indices = random.sample(range(1, total + 1), limit)
files = map(lambda x : find_file(root_code_language, x), indices)
for src in files:
basename = os.path.basename(src)
des = os.path.join(root_training_language, basename)
os.symlink(src, des)
def build_test_set(self, languages, extension=True):
for language in languages:
root_language = os.path.join(self.root_code, language)
root_test_language = os.path.join(self.root_test, language)
try:
os.mkdir(root_test_language)
except FileExistsError:
pass
files = self.get_test_set(language)
for src in files:
if extension:
des = os.path.join(root_test_language, os.path.basename(src))
else:
des = os.path.join(root_test_language, os.path.splitext(os.path.basename(src))[0])
- os.symlink(src, des)
+ copyfile(src, des)
def train_files_with_label(self, languages, maxsize):
with open(os.path.join(self.root_training_csv, 'training_set.csv'), 'w', newline='') as csvfile:
setwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL)
for language in languages:
root_training_language = os.path.join(self.root_training, language)
index_lang = languages.index(language)
for f in [x for x in os.listdir(root_training_language) if not x.startswith('.')]:
filename = os.path.join(root_training_language, f)
tokens = file_to_string(filename)[-maxsize:] # 10240
setwriter.writerow([index_lang, tokens])
def get_test_set(self, language):
root_training_language = os.path.join(self.root_training, language)
root_language = os.path.join(self.root_code, language)
total = count_files(root_language)
training_set = [int(os.path.splitext(x)[0]) for x in os.listdir(root_training_language) if not x.startswith('.')]
it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set and os.path.getsize(find_file(root_language, x)) <= 1048576)
test_set = list(islice(it, 1000))
if len(test_set) == 0:
it = (find_file(root_language, x) for x in range(1, total + 1) if x not in training_set)
test_set = list(islice(it, 1000))
return test_set
def test_files_with_label(self, languages):
for language in languages:
root_test_language = os.path.join(self.root_test, language)
index_lang = languages.index(language)
with open(os.path.join(self.root_test_csv, language + '.csv'), 'w', newline='') as csvfile:
setwriter = csv.writer(csvfile, delimiter=' ', quotechar='|', quoting=csv.QUOTE_MINIMAL)
for f in [x for x in os.listdir(root_test_language) if not x.startswith('.')]:
filename = os.path.join(root_test_language, f)
tokens = file_to_string(filename)
setwriter.writerow([index_lang, tokens])

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:38 PM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3304394

Event Timeline