12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- import csv
- import os
- from datetime import datetime
- import dataclasses
- import zipfile
- import logging
- from structures.common import BaseMeasurement
- logger = logging.getLogger(__name__)
- class CSVStorage:
- files = {}
- def __init__(self, path) -> None:
- self.path = path
- self.zipname = os.path.join(self.path, F"logs_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.zip")
-
- def write(self, values: list):
- try:
- for meas in values:
- if not meas.series in self.files:
- self.files[meas.series] = CSVFile(self.path, meas.series, self.zipname)
- self.files[meas.series].write(meas)
- except Exception as ex:
- logger.exception("CSV write failed")
- class CSVFile:
- file = None
- filename = None
- row_count = 0
- def __init__(self, path, series, zipname) -> None:
- self.path = path
- self.series = series
- if not os.path.exists(self.path):
- os.mkdir(self.path)
- self.zipname = zipname
- self.new_file()
- def new_file(self):
- if self.file:
- self.file.close()
- with zipfile.ZipFile(self.zipname, 'a', compression=zipfile.ZIP_BZIP2, compresslevel=9) as zf:
- zf.write(self.filename, os.path.basename(self.filename))
- os.remove(self.filename)
-
- self.filename = os.path.join(self.path, F"{self.series}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv")
- self.file = open(self.filename, "w", newline='')
- self.writer = csv.writer(self.file, delimiter=',')
- def write(self, meas):
- row = dataclass_to_dict(meas)
- if self.row_count == 0:
- self.writer.writerow(row)
- self.writer.writerow(row.values())
- self.row_count += 1
- if self.row_count % 1000 == 0:
- self.file.flush()
- if self.row_count > 50000:
- self.new_file()
- self.row_count = 0
- def dataclass_to_dict(dc, prefix=""):
- ret = {}
- for field in dataclasses.fields(dc):
- value = getattr(dc, field.name)
- if type(value) is tuple:
- for i, v in enumerate(value):
- ret[F"{prefix}{field.name}_{i}"] = v
- elif isinstance(value, BaseMeasurement):
- ret.update(dataclass_to_dict(value, F"{prefix}{value.series}_"))
- else:
- ret[F"{prefix}{field.name}"] = value
- return ret
|