csv_file.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import csv
  2. import os
  3. from datetime import datetime
  4. import dataclasses
  5. import zipfile
  6. import logging
  7. from structures.common import BaseMeasurement
  8. logger = logging.getLogger(__name__)
  9. class CSVStorage:
  10. files = {}
  11. def __init__(self, path) -> None:
  12. self.path = path
  13. self.zipname = os.path.join(self.path, F"logs_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.zip")
  14. def write(self, values: list):
  15. try:
  16. for meas in values:
  17. if not meas.series in self.files:
  18. self.files[meas.series] = CSVFile(self.path, meas.series, self.zipname)
  19. self.files[meas.series].write(meas)
  20. except Exception as ex:
  21. logger.exception("CSV write failed")
  22. class CSVFile:
  23. file = None
  24. filename = None
  25. row_count = 0
  26. def __init__(self, path, series, zipname) -> None:
  27. self.path = path
  28. self.series = series
  29. if not os.path.exists(self.path):
  30. os.mkdir(self.path)
  31. self.zipname = zipname
  32. self.new_file()
  33. def new_file(self):
  34. if self.file:
  35. self.file.close()
  36. with zipfile.ZipFile(self.zipname, 'a', compression=zipfile.ZIP_BZIP2, compresslevel=9) as zf:
  37. zf.write(self.filename, os.path.basename(self.filename))
  38. os.remove(self.filename)
  39. self.filename = os.path.join(self.path, F"{self.series}_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv")
  40. self.file = open(self.filename, "w", newline='')
  41. self.writer = csv.writer(self.file, delimiter=',')
  42. def write(self, meas):
  43. row = dataclass_to_dict(meas)
  44. if self.row_count == 0:
  45. self.writer.writerow(row)
  46. self.writer.writerow(row.values())
  47. self.row_count += 1
  48. if self.row_count % 1000 == 0:
  49. self.file.flush()
  50. if self.row_count > 50000:
  51. self.new_file()
  52. self.row_count = 0
  53. def dataclass_to_dict(dc, prefix=""):
  54. ret = {}
  55. for field in dataclasses.fields(dc):
  56. value = getattr(dc, field.name)
  57. if type(value) is tuple:
  58. for i, v in enumerate(value):
  59. ret[F"{prefix}{field.name}_{i}"] = v
  60. elif isinstance(value, BaseMeasurement):
  61. ret.update(dataclass_to_dict(value, F"{prefix}{value.series}_"))
  62. else:
  63. ret[F"{prefix}{field.name}"] = value
  64. return ret