#!/usr/bin/env python3 import asyncio import signal import logging import pprint import sys import argparse from enum import Enum import os from robovac.robovac import Robovac DEFAULT_TIME = 20 stop_event = asyncio.Event() def signal_handler(): print(f'\nExiting...') stop_event.set() def parse_robovac_config(path="/home/freyja/Desktop/robovac.conf"): config = {} if not os.path.exists(path): raise FileNotFoundError(f"Config file not found at {path}") with open(path, 'r') as f: for line in f: line = line.strip() if line and not line.startswith("#"): # Skip empty lines and comments if '=' in line: key, value = map(str.strip, line.split('=', 1)) config[key] = value required_keys = ['device_id', 'ip', 'local_code'] if not all(key in config for key in required_keys): missing = [key for key in required_keys if key not in config] raise ValueError(f"Missing keys in config: {', '.join(missing)}") class credentials(Enum): device_id = config['device_id'] ip = config['ip'] local_code = config['local_code'] return credentials async def callback(message,device): print(message) pprint.pprint(device.state) async def stepper(time): total_seconds = time * 60 step = 1 elapsed = 0 while elapsed < total_seconds and not stop_event.is_set(): await asyncio.sleep(step) elapsed += step return True async def async_auto_clean(r: Robovac,time): await r.async_start_cleaning(callback) await stepper(time) await r.async_pause() async def async_go_home(r: Robovac): await r.async_go_home(callback) async def async_pause(r: Robovac): await r.async_pause(callback) async def async_main( device_id, ip, local_code, time=DEFAULT_TIME, go_home=False, pause=False ): asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, signal_handler) asyncio.get_event_loop().add_signal_handler(signal.SIGINT, signal_handler) r = Robovac(device_id,ip,local_code) await r.async_connect(callback) await asyncio.sleep(2) if not go_home and not pause: while not stop_event.is_set(): await async_auto_clean(r, time = int(time)) await async_go_home(r) stop_event.set() if go_home and not pause: while not stop_event.is_set(): await async_go_home(r) stop_event.set() if pause and not go_home: while not stop_event.is_set(): await async_pause(r) stop_event.set() if not r.go_home and not pause: await async_go_home(r) if r._connected: await r.async_disconnect() def main(*args, **kwargs): early_parser = argparse.ArgumentParser(add_help=False) early_parser.add_argument('-c', '--config', help="Path to config file", default="/etc/robovac.conf") early_parser.add_argument('--device_id') early_parser.add_argument('--ip') early_parser.add_argument('--local_code') early_args, remaining_args = early_parser.parse_known_args() use_config = not (early_args.device_id or early_args.ip or early_args.local_code) defaults = {} if use_config: try: creds = parse_robovac_config(early_args.config) defaults = { "device_id": creds.device_id.value, "ip": creds.ip.value, "local_code": creds.local_code.value } except Exception: defaults = {} parser = argparse.ArgumentParser(description="Control a Robovac device.") parser.add_argument('-c', '--config', help="Path to config file", default=early_args.config) parser.add_argument('--device_id', help="Device ID", default=None if early_args.device_id else defaults.get('device_id')) parser.add_argument('--ip', help="Device IP address", default=None if early_args.ip else defaults.get('ip')) parser.add_argument('--local_code', help="Secret key obtained from eufy", default=None if early_args.local_code else defaults.get('local_code')) parser.add_argument('--time', '-t', type=int, default=DEFAULT_TIME, help="Cleaning time in minutes") parser.add_argument('--home', '-b', action='store_true', dest="go_home", default=False, help="Go home") parser.add_argument('--pause','-p', action='store_true', dest="pause", default=False, help="Pause vacuum") # parser.add_argument('--debug','-d', action='store_true', dest="debug", default=False, help="Enter debugging mode (won't send commands to vacuum)") parser.add_argument('--verbose','-v', action='store_true', dest="verbose", default=False, help="Enable verbose logs") parser.add_argument('--quiet','-q', action='store_true', dest="quiet", default=False, help="Quiet logs") args = parser.parse_args() if args.quiet and args.verbose: parser.error("Cannot set quiet and verbose mode simultaneously.") elif args.verbose: logging.basicConfig(level=logging.DEBUG) elif args.quiet: logging.basicConfig(level=logging.CRITICAL) sys.stdout = open(os.devnull, 'w') else: logging.basicConfig(level=logging.INFO) if not use_config: print("Configuration skipped") missing = [key for key in ['device_id', 'ip', 'local_code'] if getattr(args, key) is None] if missing: parser.error(f"Missing required argument(s): {', '.join(missing)}") try: asyncio.run(async_main(args.device_id, args.ip, args.local_code, args.time, args.go_home,args.pause)) except Exception as e: if args.debug or args.verbose: print(e) else: print("An error occured.") if __name__ == "__main__": main(*sys.argv[1:])