aegis_sim.recording.ticker
1from multiprocessing import Process 2import time 3import pathlib 4from datetime import datetime 5from .recorder import Recorder 6import logging 7 8 9class Ticker(Recorder): 10 def __init__(self, TICKER_RATE, odir: pathlib.Path): 11 self.TICKER_RATE = TICKER_RATE 12 self.ticker_path = odir / "ticker.txt" 13 self.process = None 14 self.pid = None 15 16 def start_process(self): 17 # daemon=True ensures this subprocess is automatically killed when the 18 # parent process exits, preventing orphaned ticker processes (e.g. if 19 # the simulation crashes or tests finish without calling stop_process). 20 # This does not affect normal operation: the parent stays alive for the 21 # entire simulation, so the ticker keeps running until stop_process() 22 # is called at the end of sim(). 23 self.process = Process(target=self.tick, daemon=True) 24 self.process.start() 25 self.pid = self.process.pid 26 27 def stop_process(self): 28 self.process.terminate() 29 self.process.join() 30 31 def tick(self): 32 while True: 33 self.write() 34 time.sleep(self.TICKER_RATE) 35 36 def write(self): 37 """ 38 # OUTPUT SPECIFICATION 39 path: /ticker.txt 40 filetype: txt 41 category: log 42 description: A live file useful for determining whether the simulation is still running. It gets updated every TICKER_RATE seconds; if it is not updated, the simulation is not running. 43 trait granularity: N/A 44 time granularity: N/A 45 frequency parameter: TICKER_RATE 46 structure: A txt file with datetime stamp (%Y-%m-%d %H:%M:%S) in one line) 47 """ 48 timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) 49 with open(self.ticker_path, "w") as file: 50 file.write(timestamp) 51 52 def read(self): 53 if not self.ticker_path.exists(): 54 logging.error(f"{self.ticker_path} does not exist.") 55 return 56 with open(self.ticker_path, "r") as file: 57 return file.read() 58 59 def has_stopped(self): 60 since_last = self.since_last() 61 return since_last > self.TICKER_RATE 62 63 def since_last(self): 64 timestamp_recorded = self.read() 65 if timestamp_recorded is None or timestamp_recorded == "": 66 logging.info(f"timestamp_recorded is '{timestamp_recorded}'") 67 return 68 timestamp_now = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) 69 dt_recorded = datetime.strptime(timestamp_recorded, "%Y-%m-%d %H:%M:%S") 70 dt_now = datetime.strptime(timestamp_now, "%Y-%m-%d %H:%M:%S") 71 time_difference = (dt_now - dt_recorded).total_seconds() 72 return time_difference
10class Ticker(Recorder): 11 def __init__(self, TICKER_RATE, odir: pathlib.Path): 12 self.TICKER_RATE = TICKER_RATE 13 self.ticker_path = odir / "ticker.txt" 14 self.process = None 15 self.pid = None 16 17 def start_process(self): 18 # daemon=True ensures this subprocess is automatically killed when the 19 # parent process exits, preventing orphaned ticker processes (e.g. if 20 # the simulation crashes or tests finish without calling stop_process). 21 # This does not affect normal operation: the parent stays alive for the 22 # entire simulation, so the ticker keeps running until stop_process() 23 # is called at the end of sim(). 24 self.process = Process(target=self.tick, daemon=True) 25 self.process.start() 26 self.pid = self.process.pid 27 28 def stop_process(self): 29 self.process.terminate() 30 self.process.join() 31 32 def tick(self): 33 while True: 34 self.write() 35 time.sleep(self.TICKER_RATE) 36 37 def write(self): 38 """ 39 # OUTPUT SPECIFICATION 40 path: /ticker.txt 41 filetype: txt 42 category: log 43 description: A live file useful for determining whether the simulation is still running. It gets updated every TICKER_RATE seconds; if it is not updated, the simulation is not running. 44 trait granularity: N/A 45 time granularity: N/A 46 frequency parameter: TICKER_RATE 47 structure: A txt file with datetime stamp (%Y-%m-%d %H:%M:%S) in one line) 48 """ 49 timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) 50 with open(self.ticker_path, "w") as file: 51 file.write(timestamp) 52 53 def read(self): 54 if not self.ticker_path.exists(): 55 logging.error(f"{self.ticker_path} does not exist.") 56 return 57 with open(self.ticker_path, "r") as file: 58 return file.read() 59 60 def has_stopped(self): 61 since_last = self.since_last() 62 return since_last > self.TICKER_RATE 63 64 def since_last(self): 65 timestamp_recorded = self.read() 66 if timestamp_recorded is None or timestamp_recorded == "": 67 logging.info(f"timestamp_recorded is '{timestamp_recorded}'") 68 return 69 timestamp_now = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) 70 dt_recorded = datetime.strptime(timestamp_recorded, "%Y-%m-%d %H:%M:%S") 71 dt_now = datetime.strptime(timestamp_now, "%Y-%m-%d %H:%M:%S") 72 time_difference = (dt_now - dt_recorded).total_seconds() 73 return time_difference
def
start_process(self):
17 def start_process(self): 18 # daemon=True ensures this subprocess is automatically killed when the 19 # parent process exits, preventing orphaned ticker processes (e.g. if 20 # the simulation crashes or tests finish without calling stop_process). 21 # This does not affect normal operation: the parent stays alive for the 22 # entire simulation, so the ticker keeps running until stop_process() 23 # is called at the end of sim(). 24 self.process = Process(target=self.tick, daemon=True) 25 self.process.start() 26 self.pid = self.process.pid
def
write(self):
37 def write(self): 38 """ 39 # OUTPUT SPECIFICATION 40 path: /ticker.txt 41 filetype: txt 42 category: log 43 description: A live file useful for determining whether the simulation is still running. It gets updated every TICKER_RATE seconds; if it is not updated, the simulation is not running. 44 trait granularity: N/A 45 time granularity: N/A 46 frequency parameter: TICKER_RATE 47 structure: A txt file with datetime stamp (%Y-%m-%d %H:%M:%S) in one line) 48 """ 49 timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) 50 with open(self.ticker_path, "w") as file: 51 file.write(timestamp)
OUTPUT SPECIFICATION
path: /ticker.txt filetype: txt category: log description: A live file useful for determining whether the simulation is still running. It gets updated every TICKER_RATE seconds; if it is not updated, the simulation is not running. trait granularity: N/A time granularity: N/A frequency parameter: TICKER_RATE structure: A txt file with datetime stamp (%Y-%m-%d %H:%M:%S) in one line)
def
since_last(self):
64 def since_last(self): 65 timestamp_recorded = self.read() 66 if timestamp_recorded is None or timestamp_recorded == "": 67 logging.info(f"timestamp_recorded is '{timestamp_recorded}'") 68 return 69 timestamp_now = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()) 70 dt_recorded = datetime.strptime(timestamp_recorded, "%Y-%m-%d %H:%M:%S") 71 dt_now = datetime.strptime(timestamp_now, "%Y-%m-%d %H:%M:%S") 72 time_difference = (dt_now - dt_recorded).total_seconds() 73 return time_difference