aegis_sim.recording.ticker

 1from multiprocessing import Process
 2import time
 3import pathlib
 4from datetime import datetime
 5from .recorder import Recorder
 6import logging
 7
 8
 9class Ticker(Recorder):
10    def __init__(self, TICKER_RATE, odir: pathlib.Path):
11        self.TICKER_RATE = TICKER_RATE
12        self.ticker_path = odir / "ticker.txt"
13        self.process = None
14        self.pid = None
15
16    def start_process(self):
17        # daemon=True ensures this subprocess is automatically killed when the
18        # parent process exits, preventing orphaned ticker processes (e.g. if
19        # the simulation crashes or tests finish without calling stop_process).
20        # This does not affect normal operation: the parent stays alive for the
21        # entire simulation, so the ticker keeps running until stop_process()
22        # is called at the end of sim().
23        self.process = Process(target=self.tick, daemon=True)
24        self.process.start()
25        self.pid = self.process.pid
26
27    def stop_process(self):
28        self.process.terminate()
29        self.process.join()
30
31    def tick(self):
32        while True:
33            self.write()
34            time.sleep(self.TICKER_RATE)
35
36    def write(self):
37        """
38        # OUTPUT SPECIFICATION
39        path: /ticker.txt
40        filetype: txt
41        category: log
42        description: A live file useful for determining whether the simulation is still running. It gets updated every TICKER_RATE seconds; if it is not updated, the simulation is not running.
43        trait granularity: N/A
44        time granularity: N/A
45        frequency parameter: TICKER_RATE
46        structure: A txt file with datetime stamp (%Y-%m-%d %H:%M:%S) in one line)
47        """
48        timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
49        with open(self.ticker_path, "w") as file:
50            file.write(timestamp)
51
52    def read(self):
53        if not self.ticker_path.exists():
54            logging.error(f"{self.ticker_path} does not exist.")
55            return
56        with open(self.ticker_path, "r") as file:
57            return file.read()
58
59    def has_stopped(self):
60        since_last = self.since_last()
61        return since_last > self.TICKER_RATE
62
63    def since_last(self):
64        timestamp_recorded = self.read()
65        if timestamp_recorded is None or timestamp_recorded == "":
66            logging.info(f"timestamp_recorded is '{timestamp_recorded}'")
67            return
68        timestamp_now = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
69        dt_recorded = datetime.strptime(timestamp_recorded, "%Y-%m-%d %H:%M:%S")
70        dt_now = datetime.strptime(timestamp_now, "%Y-%m-%d %H:%M:%S")
71        time_difference = (dt_now - dt_recorded).total_seconds()
72        return time_difference
class Ticker(aegis_sim.recording.recorder.Recorder):
10class Ticker(Recorder):
11    def __init__(self, TICKER_RATE, odir: pathlib.Path):
12        self.TICKER_RATE = TICKER_RATE
13        self.ticker_path = odir / "ticker.txt"
14        self.process = None
15        self.pid = None
16
17    def start_process(self):
18        # daemon=True ensures this subprocess is automatically killed when the
19        # parent process exits, preventing orphaned ticker processes (e.g. if
20        # the simulation crashes or tests finish without calling stop_process).
21        # This does not affect normal operation: the parent stays alive for the
22        # entire simulation, so the ticker keeps running until stop_process()
23        # is called at the end of sim().
24        self.process = Process(target=self.tick, daemon=True)
25        self.process.start()
26        self.pid = self.process.pid
27
28    def stop_process(self):
29        self.process.terminate()
30        self.process.join()
31
32    def tick(self):
33        while True:
34            self.write()
35            time.sleep(self.TICKER_RATE)
36
37    def write(self):
38        """
39        # OUTPUT SPECIFICATION
40        path: /ticker.txt
41        filetype: txt
42        category: log
43        description: A live file useful for determining whether the simulation is still running. It gets updated every TICKER_RATE seconds; if it is not updated, the simulation is not running.
44        trait granularity: N/A
45        time granularity: N/A
46        frequency parameter: TICKER_RATE
47        structure: A txt file with datetime stamp (%Y-%m-%d %H:%M:%S) in one line)
48        """
49        timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
50        with open(self.ticker_path, "w") as file:
51            file.write(timestamp)
52
53    def read(self):
54        if not self.ticker_path.exists():
55            logging.error(f"{self.ticker_path} does not exist.")
56            return
57        with open(self.ticker_path, "r") as file:
58            return file.read()
59
60    def has_stopped(self):
61        since_last = self.since_last()
62        return since_last > self.TICKER_RATE
63
64    def since_last(self):
65        timestamp_recorded = self.read()
66        if timestamp_recorded is None or timestamp_recorded == "":
67            logging.info(f"timestamp_recorded is '{timestamp_recorded}'")
68            return
69        timestamp_now = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
70        dt_recorded = datetime.strptime(timestamp_recorded, "%Y-%m-%d %H:%M:%S")
71        dt_now = datetime.strptime(timestamp_now, "%Y-%m-%d %H:%M:%S")
72        time_difference = (dt_now - dt_recorded).total_seconds()
73        return time_difference
Ticker(TICKER_RATE, odir: pathlib.Path)
11    def __init__(self, TICKER_RATE, odir: pathlib.Path):
12        self.TICKER_RATE = TICKER_RATE
13        self.ticker_path = odir / "ticker.txt"
14        self.process = None
15        self.pid = None
TICKER_RATE
ticker_path
process
pid
def start_process(self):
17    def start_process(self):
18        # daemon=True ensures this subprocess is automatically killed when the
19        # parent process exits, preventing orphaned ticker processes (e.g. if
20        # the simulation crashes or tests finish without calling stop_process).
21        # This does not affect normal operation: the parent stays alive for the
22        # entire simulation, so the ticker keeps running until stop_process()
23        # is called at the end of sim().
24        self.process = Process(target=self.tick, daemon=True)
25        self.process.start()
26        self.pid = self.process.pid
def stop_process(self):
28    def stop_process(self):
29        self.process.terminate()
30        self.process.join()
def tick(self):
32    def tick(self):
33        while True:
34            self.write()
35            time.sleep(self.TICKER_RATE)
def write(self):
37    def write(self):
38        """
39        # OUTPUT SPECIFICATION
40        path: /ticker.txt
41        filetype: txt
42        category: log
43        description: A live file useful for determining whether the simulation is still running. It gets updated every TICKER_RATE seconds; if it is not updated, the simulation is not running.
44        trait granularity: N/A
45        time granularity: N/A
46        frequency parameter: TICKER_RATE
47        structure: A txt file with datetime stamp (%Y-%m-%d %H:%M:%S) in one line)
48        """
49        timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
50        with open(self.ticker_path, "w") as file:
51            file.write(timestamp)

OUTPUT SPECIFICATION

path: /ticker.txt filetype: txt category: log description: A live file useful for determining whether the simulation is still running. It gets updated every TICKER_RATE seconds; if it is not updated, the simulation is not running. trait granularity: N/A time granularity: N/A frequency parameter: TICKER_RATE structure: A txt file with datetime stamp (%Y-%m-%d %H:%M:%S) in one line)

def read(self):
53    def read(self):
54        if not self.ticker_path.exists():
55            logging.error(f"{self.ticker_path} does not exist.")
56            return
57        with open(self.ticker_path, "r") as file:
58            return file.read()
def has_stopped(self):
60    def has_stopped(self):
61        since_last = self.since_last()
62        return since_last > self.TICKER_RATE
def since_last(self):
64    def since_last(self):
65        timestamp_recorded = self.read()
66        if timestamp_recorded is None or timestamp_recorded == "":
67            logging.info(f"timestamp_recorded is '{timestamp_recorded}'")
68            return
69        timestamp_now = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
70        dt_recorded = datetime.strptime(timestamp_recorded, "%Y-%m-%d %H:%M:%S")
71        dt_now = datetime.strptime(timestamp_now, "%Y-%m-%d %H:%M:%S")
72        time_difference = (dt_now - dt_recorded).total_seconds()
73        return time_difference