Commit 4658f92c by Fabian Rump Committed by Timm Behner

Initial commit

parents
# general things to ignore
.project
.pydevproject
build/
dist/
*.egg-info/
*.egg
*.py[cod]
__pycache__/
*.so
*~
from .file_functions import read_in_chunks, get_directory_for_filename, create_dir_for_file, human_readable_file_size
from .git_functions import get_version_string_from_git
from .hash_functions import md5sum
from .fail_safe_file_operations import get_binary_from_file, write_binary_to_file, get_safe_name, delete_file, get_files_in_dir
__all__ = [
'get_directory_for_filename',
'create_dir_for_file',
'human_readable_file_size',
'read_in_chunks',
'get_version_string_from_git',
'md5sum',
'get_binary_from_file',
'write_binary_to_file',
'get_safe_name',
'delete_file',
'get_files_in_dir'
]
import os
import sys
import logging
import re
from .file_functions import create_dir_for_file
def get_binary_from_file(file_path):
"""
Fail-safe file read operation. Symbolic links are converted to text files including the link.
Errors are logged. No exception raised.
:param file_path: Path of the file. Can be absolute or relative to the current directory.
:type file_path: str
:return: file's binary as bytes; returns empty byte string on error
"""
try:
if os.path.islink(file_path):
binary = "symbolic link -> {}".format(os.readlink(file_path))
else:
with open(file_path, 'rb') as f:
binary = f.read()
except Exception as e:
logging.error("Could not read file: {} {}".format(sys.exc_info()[0].__name__, e))
binary = b''
return binary
def write_binary_to_file(file_binary, file_path, overwrite=False, file_copy=False):
"""
Fail-safe file write operation. Creates directories if needed.
Errors are logged. No exception raised.
:param file_binary: binary to write into the file
:type file_binary: bytes or str
:param file_path: Path of the file. Can be absolute or relative to the current directory.
:type file_path: str
:param overwrite: overwrite file if it exists
:type overwrite: bool
:default overwrite: False
:param file_copy: If overwrite is false and file already exists, write into new file and add a counter to the file name.
:type file_copy: bool
:default file_copy: False
:return: None
"""
try:
create_dir_for_file(file_path)
if not os.path.exists(file_path) or overwrite:
_write_file(file_path, file_binary)
elif file_copy and not overwrite:
new_path = _get_counted_file_path(file_path)
_write_file(new_path, file_binary)
except Exception as e:
logging.error("Could not write file: {} {}".format(sys.exc_info()[0].__name__, e))
def _write_file(file_path, binary):
with open(file_path, 'wb') as f:
f.write(binary)
def _get_counted_file_path(original_path):
tmp = re.search(r"-([0-9]+)\Z", original_path)
if tmp is not None:
current_count = int(tmp.group(1))
new_file_path = re.sub(r"-[0-9]+\Z", "-{}".format(current_count+1), original_path)
else:
new_file_path = "{}-1".format(original_path)
return new_file_path
def delete_file(file_path):
"""
Fail-safe delete file operation. Deletes a file if it exists.
Errors are logged. No exception raised.
:param file_path: Path of the file. Can be absolute or relative to the current directory.
:type file_path: str
:return: None
"""
try:
os.unlink(file_path)
except Exception as e:
logging.error("Could not delete file: {} {}".format(sys.exc_info()[0].__name__, e))
def get_safe_name(file_name, max_size=200, valid_characters='abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_+. '):
"""
removes all problematic characters from a file name
cuts file names if they are too long
:param file_name: Original file name
:type file_name: str
:param max_size: maximum allowed file name length
:type max_size: int
:default max_size: 200
:param valid_characters: characters that shall be allowed in a file name
:type valid_characters: str
:default valid_characters: 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_+. '
:return: str
"""
allowed_charachters = set(valid_characters)
safe_name = filter(lambda x: x in allowed_charachters, file_name)
safe_name = "".join(safe_name)
safe_name = safe_name.replace(" ", "_")
if len(safe_name) > max_size:
safe_name = safe_name[0:max_size]
return safe_name
def get_files_in_dir(directory_path):
"""
Returns a list with the absolute paths of all files in the directory directory_path
:param directory_path: directory including files
:type directory_path: str
:return: list
"""
result = []
try:
for file_path, _, files in os.walk(directory_path):
for file_ in files:
result.append(os.path.abspath(os.path.join(file_path, file_)))
except Exception as e:
logging.error("Could not get files: {} {}".format(sys.exc_info()[0].__name__, e))
return result
import os
from hurry.filesize import size, alternative
def read_in_chunks(file_object, chunk_size=1024):
"""
Helper function to read large file objects iteratively in smaller chunks. Can be used like this::
file_object = open('somelargefile.xyz', 'rb')
for chunk in read_in_chunks(file_object):
# Do something with chunk
:param file_object: The file object from which the chunk data is read. Must be a subclass of ``io.BufferedReader``.
:param chunk_size: Number of bytes to read per chunk.
:type chunk_size: int
:return: Returns a generator to iterate over all chunks, see above for usage.
"""
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
def get_directory_for_filename(filename):
"""
Convenience function which returns the absolute path to the directory that contains the given file name.
:param filename: Path of the file. Can be absolute or relative to the current directory.
:type filename: str
:return: Absolute path of the directory
"""
return os.path.dirname(os.path.abspath(filename))
def create_dir_for_file(file_path):
"""
Creates all directories of file path. File path may include the file as well.
:param file_path: Path of the file. Can be absolute or relative to the current directory.
:type file_path: str
:return: None
"""
directory = os.path.dirname(os.path.abspath(file_path))
os.makedirs(directory, exist_ok=True)
def human_readable_file_size(size_in_bytes):
"""
Returns a nicly human readable file size
:param size_in_bytes: Size in Bytes
:type size_in_bytes: int
:return: str
"""
return size(size_in_bytes, system=alternative)
import subprocess
def get_version_string_from_git(directory_name):
return subprocess.check_output(['git', 'describe', '--always'], cwd=directory_name).strip().decode('utf-8')
import hashlib
from .file_functions import read_in_chunks
def md5sum(file_object):
m = hashlib.md5()
for data in read_in_chunks(file_object):
m.update(data)
return m.hexdigest().lower()
hurry.filesize >= 0.9
\ No newline at end of file
import os
import subprocess
from setuptools import setup, find_packages
setup(
name="common_helper_files",
version=subprocess.check_output(['git', 'describe', '--always'], cwd=os.path.dirname(os.path.abspath(__file__))).strip().decode('utf-8'),
packages=find_packages(),
install_requires=[
'hurry.filesize >= 0.9'
],
description="file operation helper functions"
)
this is a test
\ No newline at end of file
test
\ No newline at end of file
import unittest
import os
from tempfile import TemporaryDirectory
from common_helper_files.fail_safe_file_operations import get_binary_from_file,\
write_binary_to_file, delete_file, get_safe_name, _get_counted_file_path,\
get_files_in_dir
from common_helper_files.file_functions import get_directory_for_filename
class Test_FailSafeFileOperations(unittest.TestCase):
def setUp(self):
self.tmp_dir = TemporaryDirectory(prefix="test_common_helper_file")
def tearDown(self):
self.tmp_dir.cleanup()
@staticmethod
def get_directory_of_current_file():
return get_directory_for_filename(__file__)
def test_fail_safe_read_file(self):
test_file_path = os.path.join(self.get_directory_of_current_file(), "data", "read_test")
file_binary = get_binary_from_file(test_file_path)
self.assertEqual(file_binary, b'this is a test', "content not correct")
# Test none existing file
none_existing_file_path = os.path.join(self.get_directory_of_current_file(), "data", "none_existing_file")
file_binary = get_binary_from_file(none_existing_file_path)
self.assertEqual(file_binary, b'', "content not correct")
def test_fail_safe_write_file(self):
file_path = os.path.join(self.tmp_dir.name, "test_folder", "test_file")
write_binary_to_file(b'this is a test', file_path)
self.assertTrue(os.path.exists(file_path), "file not created")
read_binary = get_binary_from_file(file_path)
self.assertEqual(read_binary, b'this is a test', "written data not correct")
# Test not overwrite flag
write_binary_to_file(b'do not overwirte', file_path, overwrite=False)
read_binary = get_binary_from_file(file_path)
self.assertEqual(read_binary, b'this is a test', "written data not correct")
# Test overwrite flag
write_binary_to_file(b'overwrite', file_path, overwrite=True)
read_binary = get_binary_from_file(file_path)
self.assertEqual(read_binary, b'overwrite', "written data not correct")
# Test copy_file_flag
write_binary_to_file(b'second_overwrite', file_path, file_copy=True)
self.assertTrue(os.path.exists("{}-1".format(file_path)), "new file copy does not exist")
read_binary_original = get_binary_from_file(file_path)
self.assertEqual(read_binary_original, b'overwrite', "original file no longer correct")
read_binary_new = get_binary_from_file("{}-1".format(file_path))
self.assertEqual(read_binary_new, b'second_overwrite', "binary of new file not correct")
def test_get_counted_file_path(self):
self.assertEqual(_get_counted_file_path("/foo/bar"), "/foo/bar-1", "simple case")
self.assertEqual(_get_counted_file_path("/foo/bar-11"), "/foo/bar-12", "simple count two digits")
self.assertEqual(_get_counted_file_path("foo-34/bar"), "foo-34/bar-1", "complex case")
def test_delete_file(self):
file_path = os.path.join(self.tmp_dir.name, "test_folder", "test_file")
write_binary_to_file(b'this is a test', file_path)
self.assertTrue(os.path.exists(file_path), "file not created")
delete_file(file_path)
self.assertFalse(os.path.exists(file_path))
# Test delete none existing file
delete_file(file_path)
def test_get_safe_name(self):
a = "/()=Hello%&World!? Foo"
self.assertEqual(get_safe_name(a), "HelloWorld_Foo", "result not correct")
b = 250 * 'a'
self.assertEqual(len(get_safe_name(b)), 200, "lenght not cutted correctly")
def test_get_files_in_dir(self):
test_dir_path = os.path.join(self.get_directory_of_current_file(), "data")
result = get_files_in_dir(test_dir_path)
self.assertIn(os.path.join(test_dir_path, "read_test"), result, "file in root folder not found")
self.assertIn(os.path.join(test_dir_path, "test_folder/generic_test_file"), result, "file in sub folder not found")
self.assertEqual(len(result), 2, "number of found files not correct")
def test_get_files_in_dir_error(self):
result = get_files_in_dir("/none_existing/dir")
self.assertEqual(result, [], "error result should be an empty list")
if __name__ == "__main__":
unittest.main()
'''
Created on Dec 5, 2016
@author: weidenba
'''
import unittest
from common_helper_files.file_functions import human_readable_file_size
class Test_file_functions(unittest.TestCase):
def test_human_readable_file_size(self):
self.assertEqual(human_readable_file_size(1024), "1 KB")
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment