# -*- coding: utf-8 -*-
"""
The h5writer submodule contains the hdf5_output function, which accepts various datatypes (binary, image, tabular),
and adds them to a (possibly pre-existing) HDF5 file.
"""
from pandas import HDFStore, DataFrame
from tables.nodes import filenode
from tables import NodeError, Filters
from os.path import isfile
from time import sleep
from datetime import datetime
from os import remove, fdopen, open as low_level_open, O_CREAT, O_EXCL, O_WRONLY, getpid
from fnmatch import fnmatch
import numpy as np
import zlib
import bz2
try:
import lzma
except ImportError:
lzma = None
import pickle
import time
lock_debugging = False
[docs]def wait_for_lock_and_prepare_filename(base_filename, local_timeout):
found_filename = False
filename = base_filename
lock_file = None
while not found_filename:
lock_file = '%s.lock' % (filename,)
begin_time = datetime.now()
while isfile(lock_file) or local_timeout == 0:
# print(lock_file, isfile(lock_file), local_timeout)
sleep(1.0)
current_time = datetime.now()
if (current_time - begin_time).total_seconds() > local_timeout:
newfilename = '%s_%s.h5' % (base_filename, current_time.strftime("%Y%m%d%H%M%S_%f"),)
print(
"WARNING: Waited %d seconds to acquire a lock for \"%s\", "
"but failed. Will now try to write data to new file \"%s\"." %
(local_timeout, filename, newfilename,)
)
filename = newfilename
lock_file = '%s.lock' % (filename,)
break
if not isfile(lock_file):
found_filename = True
return filename, lock_file
[docs]def acquire_lock(lock_file):
if lock_debugging:
print("Process %d acquired lock %s." % (getpid(), lock_file))
return fdopen(low_level_open(lock_file, O_CREAT | O_EXCL | O_WRONLY), 'w')
[docs]def release_lock(lock_file):
if lock_debugging:
print("Process %d released lock %s." % (getpid(), lock_file))
try:
remove(lock_file)
except FileNotFoundError:
pass
timeout = 5 * 60.0
[docs]def hdf5_node_name(s):
s = s.\
replace('/', '_').\
replace(':', '_').\
replace('\\', '_').\
replace('.', '_').\
replace('-', '_').\
replace('?', '_').\
replace('=', '_').\
replace('&', '_').\
replace(';', '_')
if s[0:1] == '_':
s = s[1:]
return s
[docs]class CompressedObject(object):
__slots__ = 'data', 'compression'
debug = False
default_compression = 'bz2'
compressions = {
'zlib': (lambda _: zlib.compress(_, 6), lambda _: zlib.decompress(_)),
'bz2': (lambda _: bz2.compress(_, 9), lambda _: bz2.decompress(_)),
'lzma': (lambda _: lzma.compress(_, check=lzma.CHECK_NONE), lambda _: lzma.decompress(_)),
}
def __init__(self, data, compression=None):
before = time.time()
if compression is None:
compression = self.default_compression
serialized = pickle.dumps(data, pickle.HIGHEST_PROTOCOL)
before_len = len(serialized)
assert compression in self.compressions
compress, uncompress = self.compressions[compression]
serialized = compress(serialized)
self.compression = compression
self.data = serialized
del serialized
del data
after_len = len(self.data)
after = time.time()
if self.debug:
print(
"... serialized and compressed object using %s from %d to %d (%.3f) took %.3fs" % (
compression, before_len, after_len, after_len/before_len, (after-before)
)
)
[docs] def get(self):
compress, uncompress = self.compressions[self.compression]
return pickle.loads(uncompress(self.data))
[docs]def return_or_uncompress(something):
if isinstance(something, CompressedObject):
return something.get()
else:
return something
[docs]def hdf5_output(_filename, immediate_prefix='', tabular_name='result_table'):
def _inner_hdf5_output(meta, result):
# noinspection PyProtectedMember
meta_str = '_'.join(
k + '_' + ('%09d' % v if type(v) == int else v.__name__)
for k, v in sorted(meta._asdict().items(), key=lambda x: x[0])
)
prefix = '/results/'
if immediate_prefix != '':
prefix += immediate_prefix
prefix += '/' + meta_str + '/'
success = False
local_timeout = timeout
base_filename = _filename
lock_file = None
while not success:
filename, lock_file = wait_for_lock_and_prepare_filename(base_filename, local_timeout)
compression_type = 'zlib'
compression_level = 6
compression_filter = Filters(complib=compression_type, complevel=compression_level)
try:
# race conditions
# open(lock_file, 'w+')
# noinspection PyUnusedLocal
with acquire_lock(lock_file) as lock:
store = HDFStore(filename, complevel=compression_level, complib=compression_type)
# noinspection PyProtectedMember
h5 = store._handle
# cache for palettes
# currently unused
# palette_written = {}
def store_image(h5path, name, data, upsample_binary=True):
h5path = h5path.replace('//', '/')
# hdf5 stores bitfields as well, but default 0,1 will be invisible on a fixed 0-255 palette ...
if data.dtype == bool and upsample_binary:
data = (data * 255).astype(np.uint8)
arr = h5.create_carray(h5path, name, obj=data, createparents=True, filters=compression_filter)
arr.attrs.CLASS = 'IMAGE'
arr.attrs.IMAGE_SUBCLASS = 'IMAGE_GRAYSCALE'
arr.attrs.IMAGE_VERSION = '1.2'
def store_data(h5path, name, data):
h5path = h5path.replace('//', '/')
h5path_splits = [x for x in h5path.split('/') if x != '']
for i in range(len(h5path_splits)):
try:
h5.create_group('/' + '/'.join(h5path_splits[:i]), h5path_splits[i])
except NodeError:
pass
f = filenode.new_node(h5, where=h5path, name=name, filters=compression_filter)
if type(data) == str:
data = data.encode('utf-8')
f.write(data)
f.close()
def store_table(name, data):
_frame = DataFrame(data)
store[name] = _frame # .append(name, _frame, data_columns=_frame.columns)
image_counter = {}
data_counter = {}
table_counter = {}
def process_row(result_table_rows, m, row):
cresults = []
# noinspection PyProtectedMember
tmp = {('meta_' + mk): (mv if type(mv) == int else -1) for mk, mv in m._asdict().items()}
if type(result_table_rows) == list:
result_table_rows = {key: True for key in result_table_rows}
if '_plain' in result_table_rows:
for v in result_table_rows['_plain']:
result_table_rows[v] = True
del result_table_rows['_plain']
def is_wildcarded(s):
return '*' in s
for k, v in list(result_table_rows.items()):
if is_wildcarded(k):
del result_table_rows[k]
for row_key in row.keys():
if fnmatch(row_key, k):
result_table_rows[row_key] = v
for k, v in result_table_rows.items():
if v == 'table':
if k not in table_counter:
table_counter[k] = 0
if k in row and len(row[k]) > 0:
if type(row[k][0]) == list:
# it's a list of lists
# create a mapping table
# point to the mapping table
the_counter = table_counter[k]
new_path = '/tables/_mapping_%s' % (k,)
new_name = '%s_%09d' % (k, the_counter)
tmp[k] = -1
tmp['_mapping_%s' % k] = the_counter
table_counter[k] += 1
i_mapping = []
for n, i_table in enumerate(row[k]):
i_new_path = '/tables/_individual_%s' % (k,)
i_new_name = '%s_%09d' % (k, table_counter[k])
store_table(prefix + i_new_path + '/' + i_new_name, i_table)
i_mapping.append({
'_index': n,
'individual_table': table_counter[k]
})
table_counter[k] += 1
store_table(prefix + new_path + '/' + new_name, i_mapping)
tmp[k] = table_counter[k]
table_counter[k] += 1
else:
new_path = '/tables/%s' % (k,)
new_name = '%s_%09d' % (k, table_counter[k])
store_table(prefix + new_path + '/' + new_name, row[k])
tmp[k] = table_counter[k]
table_counter[k] += 1
else:
tmp[k] = table_counter[k]
table_counter[k] += 1
elif v == 'image':
if k not in image_counter:
image_counter[k] = 0
if k in row:
new_path = '/images/%s' % (k,)
new_name = '%s_%09d' % (k, image_counter[k])
store_image(prefix + new_path, new_name, return_or_uncompress(row[k]))
tmp[k] = image_counter[k]
image_counter[k] += 1
elif v == 'data':
if k not in data_counter:
data_counter[k] = 0
if k in row:
new_path = '/data/%s' % (k,)
new_name = '%s_%09d' % (k, data_counter[k])
store_data(prefix + new_path, new_name, return_or_uncompress(row[k]))
tmp[k] = data_counter[k]
data_counter[k] += 1
else:
if k in row:
tmp[k] = row[k]
else:
tmp[k] = float('nan')
cresults.append(tmp)
return cresults
if 'collected' in result:
collected = []
for m, row in result['collected'].items():
if tabular_name in row:
result_table_rows = row[tabular_name]
collected += process_row(result_table_rows, m, row)
store_table(prefix + 'result_table_collected', collected)
if tabular_name in result:
store_table(prefix + 'result_table', process_row(result[tabular_name], meta, result))
store.close()
success = True
except NodeError:
print("NodeError Exception occurred while writing, " +
"apparently the file has already been used to store similar results.")
# print("Leaving it LOCKED (remove manually!) and trying to write to another file!")
local_timeout = 0
release_lock(lock_file)
except Exception as e:
print("Exception occurred while writing results: " + repr(e))
release_lock(lock_file)
return
release_lock(lock_file)
return result
return _inner_hdf5_output
# Storing other image data or paletted images
# from tables import Atom
# arr = store._handle.create_carray(h5path, name, Atom.from_dtype(data.dtype), data.shape, createparents=True)
# arr[:] = data[:]
# if data.dtype == bool:
# arr.attrs.PALETTE = pal.object_id
# if False and True not in palette_written:
#
# palette = np.zeros((256, 3,), dtype=np.uint8)
#
# s = 256//3
#
# for i in range(0, s):
# palette[i+0*s, :] = [i, 0, 0]
# palette[i+1*s, :] = [s+i, i, 0]
# palette[i+2*s, :] = [s+i, s+i, i]
#
# palette[0, :] = 0
# palette[1, :] = 255
#
# # pytables does not seem to support object references
# # so that nice palette code is actually useless ...
# pal = store._handle.create_array('/', 'palette', palette, createparents=True)
# pal.attrs.CLASS = 'PALETTE'
# pal.attrs.PAL_COLORMODEL = 'RGB'
# pal.attrs.PAL_TYPE = 'STANDARD8'
# pal.attrs.PAL_VERSION = '1.2'
#
# palette_written[True] = True
# #else:
# # pal = store._handle.get_node('/palette')