# -*- coding: utf-8 -*-
"""The profiler classes."""
import codecs
import gzip
import os
import time
[docs]
class CPUTimeMeasurement(object):
"""The CPU time measurement.
Attributes:
start_sample_time (float): start sample time or None if not set.
total_cpu_time (float): total CPU time or None if not set.
"""
[docs]
def __init__(self):
"""Initializes the CPU time measurement."""
super(CPUTimeMeasurement, self).__init__()
self._start_cpu_time = None
self.start_sample_time = None
self.total_cpu_time = None
[docs]
def SampleStart(self):
"""Starts measuring the CPU time."""
self._start_cpu_time = time.perf_counter()
self.start_sample_time = time.time()
self.total_cpu_time = 0
[docs]
def SampleStop(self):
"""Stops measuring the CPU time."""
if self._start_cpu_time is not None:
self.total_cpu_time += time.perf_counter() - self._start_cpu_time
[docs]
class StorageProfiler(object):
"""The storage profiler."""
_FILENAME_PREFIX = 'storage'
_FILE_HEADER = (
'Time\tName\tOperation\tDescription\tProcessing time\tData size\t'
'Compressed data size\n')
[docs]
def __init__(self, identifier, path):
"""Initializes a storage profiler.
Sample files are gzip compressed UTF-8 encoded CSV files.
Args:
identifier (str): identifier of the profiling session used to create
the sample filename.
path (str): path of the sample file.
"""
super(StorageProfiler, self).__init__()
self._identifier = identifier
self._path = path
self._profile_measurements = {}
self._sample_file = None
self._start_time = None
def _WritesString(self, content):
"""Writes a string to the sample file.
Args:
content (str): content to write to the sample file.
"""
content_bytes = codecs.encode(content, 'utf-8')
self._sample_file.write(content_bytes)
[docs]
@classmethod
def IsSupported(cls):
"""Determines if the profiler is supported.
Returns:
bool: True if the profiler is supported.
"""
return True
[docs]
def Sample(
self, profile_name, operation, description, data_size,
compressed_data_size):
"""Takes a sample of data read or written for profiling.
Args:
profile_name (str): name of the profile to sample.
operation (str): operation, either 'read' or 'write'.
description (str): description of the data read.
data_size (int): size of the data read in bytes.
compressed_data_size (int): size of the compressed data read in bytes.
"""
measurements = self._profile_measurements.get(profile_name)
if measurements:
sample_time = measurements.start_sample_time
processing_time = measurements.total_cpu_time
else:
sample_time = time.time()
processing_time = 0.0
self._WritesString((
f'{sample_time:f}\t{profile_name:s}\t{operation:s}\t{description:s}\t'
f'{processing_time:f}\t{data_size:d}\t{compressed_data_size:d}\n'))
[docs]
def Start(self):
"""Starts the profiler."""
filename = f'{self._FILENAME_PREFIX:s}-{self._identifier:s}.csv.gz'
if self._path:
filename = os.path.join(self._path, filename)
self._sample_file = gzip.open(filename, 'wb')
self._WritesString(self._FILE_HEADER)
self._start_time = time.time()
[docs]
def StartTiming(self, profile_name):
"""Starts timing CPU time.
Args:
profile_name (str): name of the profile to sample.
"""
if profile_name not in self._profile_measurements:
self._profile_measurements[profile_name] = CPUTimeMeasurement()
self._profile_measurements[profile_name].SampleStart()
[docs]
def Stop(self):
"""Stops the profiler."""
self._sample_file.close()
self._sample_file = None
[docs]
def StopTiming(self, profile_name):
"""Stops timing CPU time.
Args:
profile_name (str): name of the profile to sample.
"""
measurements = self._profile_measurements.get(profile_name)
if measurements:
measurements.SampleStop()