Files
eufy_robovac/src/vac.py

284 lines
7.1 KiB
Python
Executable File

#!/usr/bin/env python3
import asyncio
import signal
import logging
import pprint
import sys
import argparse
from enum import Enum
import os
from robovac.robovac import Robovac, CleanSpeed, WorkMode
from robovac.property import StringEnum
DEFAULT_TIME = 20
stop_event = asyncio.Event()
class Modes(StringEnum):
DEFAULT = CleanSpeed.BOOST_IQ, WorkMode.AUTO
MAX = CleanSpeed.MAX, WorkMode.AUTO
EDGE = CleanSpeed.MAX, WorkMode.EDGE
SPOT = CleanSpeed.MAX, WorkMode.SPOT
def signal_handler():
print(f'\nExiting...')
stop_event.set()
def parse_robovac_config(path="/home/freyja/Desktop/robovac.conf"):
config = {}
if not os.path.exists(path):
raise FileNotFoundError(f"Config file not found at {path}")
with open(path, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"): # Skip empty lines and comments
if '=' in line:
key, value = map(str.strip, line.split('=', 1))
config[key] = value
required_keys = ['device_id', 'ip', 'local_code']
if not all(key in config for key in required_keys):
missing = [key for key in required_keys if key not in config]
raise ValueError(f"Missing keys in config: {', '.join(missing)}")
class credentials(Enum):
device_id = config['device_id']
ip = config['ip']
local_code = config['local_code']
return credentials
async def callback(message,device):
print(message)
pprint.pprint(device.state)
async def stepper(time):
total_seconds = time * 60
step = 1
elapsed = 0
while elapsed < total_seconds and not stop_event.is_set():
await asyncio.sleep(step)
elapsed += step
return True
async def async_clean(r: Robovac, time: int, mode: Modes):
if mode:
await r.async_set_clean_speed(mode[0])
await r.async_set_work_mode(mode[1])
await r.async_start_cleaning(callback)
await stepper(time)
await r.async_pause()
async def async_go_home(r: Robovac):
await r.async_go_home(callback)
async def async_pause(r: Robovac):
await r.async_pause(callback)
async def async_main(
device_id,
ip,
local_code,
time=DEFAULT_TIME,
go_home=False,
pause=False,
mode=Modes.DEFAULT
):
asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, signal_handler)
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, signal_handler)
r = Robovac(device_id,ip,local_code)
await r.async_connect(callback)
await asyncio.sleep(2)
if mode and not go_home and not pause:
while not stop_event.is_set():
await async_clean(r, time, mode)
await async_go_home(r)
stop_event.set()
if go_home and not pause:
while not stop_event.is_set():
await async_go_home(r)
stop_event.set()
if pause and not go_home:
while not stop_event.is_set():
await async_pause(r)
stop_event.set()
if not r.go_home and not pause:
await async_go_home(r)
if r._connected:
await r.async_disconnect()
def main(*args, **kwargs):
early_parser = argparse.ArgumentParser(add_help=False)
early_parser.add_argument('-c', '--config', help="Path to config file", default="/etc/robovac.conf")
early_parser.add_argument('--device_id')
early_parser.add_argument('--ip')
early_parser.add_argument('--local_code')
early_args, remaining_args = early_parser.parse_known_args()
use_config = not (early_args.device_id or early_args.ip or early_args.local_code)
defaults = {}
if use_config:
try:
creds = parse_robovac_config(early_args.config)
defaults = {
"device_id": creds.device_id.value,
"ip": creds.ip.value,
"local_code": creds.local_code.value
}
except Exception:
defaults = {}
parser = argparse.ArgumentParser(description="Control a Robovac device.")
parser.add_argument(
'-c', '--config',
help="Path to config file",
default=early_args.config
)
parser.add_argument(
'--device_id',
help="Device ID",
default=None if early_args.device_id else defaults.get('device_id')
)
parser.add_argument(
'--ip',
help="Device IP address",
default=None if early_args.ip else defaults.get('ip')
)
parser.add_argument(
'--local_code',
help="Secret key obtained from eufy",
default=None if early_args.local_code else defaults.get('local_code')
)
parser.add_argument(
'--time', '-t',
type=int,
help="Cleaning time in minutes",
default=DEFAULT_TIME
)
parser.add_argument(
'--home', '-b',
action='store_true',
help="Go home"
default=False,
dest="go_home",
)
parser.add_argument(
'--pause','-p',
action='store_true',
help="Pause vacuum",
default=False,
dest="pause",
)
parser.add_argument(
'--mode', '-m',
dest="mode",
default="default",
help = '''Options: default, max, edge, spot \n
default: Automatic work mode, medium suction \n
max: Automatic work mode, max suction \n
edge: Clean edges of room, max suction \n
spot: Spot clean, max suction \n
'''
)
parser.add_argument(
'--verbose','-v',
action='store_true',
dest="verbose",
default=False,
help="Enable verbose logs"
)
parser.add_argument(
'--quiet','-q',
action='store_true',
dest="quiet",
default=False,
help="Quiet logs"
)
args = parser.parse_args()
if args.quiet and args.verbose:
parser.error("Cannot set quiet and verbose mode simultaneously.")
elif args.verbose:
logging.basicConfig(level=logging.DEBUG)
elif args.quiet:
logging.basicConfig(level=logging.CRITICAL)
sys.stdout = open(os.devnull, 'w')
else:
logging.basicConfig(level=logging.INFO)
if args.mode.lower() and (args.pause or args.home):
print("Mode will be overridden.")
args.mode = None
elif args.mode.lower() == "default":
args.mode = Modes.DEFAULT
elif args.mode.lower() == "max":
args.mode = Modes.MAX
elif args.mode.lower() == "spot":
args.mode = Modes.SPOT
elif args.mode.lower() == "edge":
args.mode = Modes.EDGE
if not use_config:
print("Configuration skipped")
missing = [key for key in ['device_id', 'ip', 'local_code'] if getattr(args, key) is None]
if missing:
parser.error(f"Missing required argument(s): {', '.join(missing)}")
try:
asyncio.run(async_main(args.device_id, args.ip, args.local_code, args.time, args.go_home,args.pause, args.mode))
except Exception as e:
if args.debug or args.verbose:
print(e)
else:
print("An error occured.")
if __name__ == "__main__":
main(*sys.argv[1:])