forked from github.com/eufy_robovac
Adapt this repo to my needs
- remove homeassistant stuff - flesh out a featureful cli script
This commit is contained in:
187
src/vac.py
Executable file
187
src/vac.py
Executable file
@@ -0,0 +1,187 @@
|
||||
#!/usr/bin/env python3
|
||||
import asyncio
|
||||
import signal
|
||||
import logging
|
||||
import pprint
|
||||
import sys
|
||||
import argparse
|
||||
from enum import Enum
|
||||
import os
|
||||
|
||||
|
||||
from robovac.robovac import Robovac
|
||||
|
||||
|
||||
stop_event = asyncio.Event()
|
||||
|
||||
|
||||
def signal_handler():
|
||||
print(f'\nExiting...')
|
||||
stop_event.set()
|
||||
|
||||
|
||||
def parse_robovac_config(path="/home/freyja/Desktop/robovac.conf"):
|
||||
config = {}
|
||||
|
||||
|
||||
if not os.path.exists(path):
|
||||
raise FileNotFoundError(f"Config file not found at {path}")
|
||||
|
||||
|
||||
with open(path, 'r') as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line and not line.startswith("#"): # Skip empty lines and comments
|
||||
if '=' in line:
|
||||
key, value = map(str.strip, line.split('=', 1))
|
||||
config[key] = value
|
||||
|
||||
|
||||
required_keys = ['device_id', 'ip', 'local_code']
|
||||
if not all(key in config for key in required_keys):
|
||||
missing = [key for key in required_keys if key not in config]
|
||||
raise ValueError(f"Missing keys in config: {', '.join(missing)}")
|
||||
|
||||
|
||||
class credentials(Enum):
|
||||
device_id = config['device_id']
|
||||
ip = config['ip']
|
||||
local_code = config['local_code']
|
||||
|
||||
|
||||
return credentials
|
||||
|
||||
|
||||
async def callback(message,device):
|
||||
print(message)
|
||||
pprint.pprint(device.state)
|
||||
|
||||
|
||||
async def stepper(time):
|
||||
total_seconds = time * 60
|
||||
step = 1
|
||||
elapsed = 0
|
||||
while elapsed < total_seconds and not stop_event.is_set():
|
||||
await asyncio.sleep(step)
|
||||
elapsed += step
|
||||
return True
|
||||
|
||||
|
||||
async def async_auto_clean(r: Robovac,time):
|
||||
await r.async_start_cleaning(callback)
|
||||
await stepper(time)
|
||||
await r.async_pause()
|
||||
|
||||
|
||||
async def async_go_home(r: Robovac):
|
||||
await r.async_go_home(callback)
|
||||
|
||||
|
||||
async def async_pause(r: Robovac):
|
||||
await r.async_pause(callback)
|
||||
|
||||
|
||||
async def async_main(device_id,ip,local_code,time,go_home,debug,pause):
|
||||
asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, signal_handler)
|
||||
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, signal_handler)
|
||||
r = Robovac(device_id,ip,local_code)
|
||||
await r.async_connect(callback)
|
||||
await asyncio.sleep(2)
|
||||
|
||||
|
||||
if debug:
|
||||
while not stop_event.is_set():
|
||||
if not go_home:
|
||||
print("Auto cleaning")
|
||||
await stepper(time)
|
||||
if go_home:
|
||||
print("Go home")
|
||||
if pause:
|
||||
print("Pause")
|
||||
else:
|
||||
while not stop_event.is_set():
|
||||
if not go_home:
|
||||
await async_auto_clean(r, time = int(time))
|
||||
await async_go_home(r)
|
||||
if go_home:
|
||||
await async_go_home(r)
|
||||
if pause:
|
||||
await async_pause(r)
|
||||
|
||||
|
||||
if not debug:
|
||||
if stop_event.is_set() and not r.go_home and not pause: await async_go_home(r)
|
||||
if r._connected: await r.async_disconnect()
|
||||
|
||||
|
||||
def main(*args, **kwargs):
|
||||
early_parser = argparse.ArgumentParser(add_help=False)
|
||||
early_parser.add_argument('-c', '--config', help="Path to config file", default="/etc/robovac.conf")
|
||||
early_parser.add_argument('--device_id')
|
||||
early_parser.add_argument('--ip')
|
||||
early_parser.add_argument('--local_code')
|
||||
|
||||
|
||||
early_args, remaining_args = early_parser.parse_known_args()
|
||||
|
||||
|
||||
use_config = not (early_args.device_id or early_args.ip or early_args.local_code)
|
||||
|
||||
|
||||
defaults = {}
|
||||
if use_config:
|
||||
try:
|
||||
creds = parse_robovac_config(early_args.config)
|
||||
defaults = {
|
||||
"device_id": creds.device_id.value,
|
||||
"ip": creds.ip.value,
|
||||
"local_code": creds.local_code.value
|
||||
}
|
||||
except Exception:
|
||||
defaults = {}
|
||||
|
||||
|
||||
if not use_config:
|
||||
print("Configuration skipped")
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(description="Control a Robovac device.")
|
||||
parser.add_argument('-c', '--config', help="Path to config file", default=early_args.config)
|
||||
parser.add_argument('--device_id', help="Device ID", default=None if early_args.device_id else defaults.get('device_id'))
|
||||
parser.add_argument('--ip', help="Device IP address", default=None if early_args.ip else defaults.get('ip'))
|
||||
parser.add_argument('--local_code', help="Secret key obtained from eufy", default=None if early_args.local_code else defaults.get('local_code'))
|
||||
parser.add_argument('--time', '-t', type=int, default=20, help="Cleaning time in minutes")
|
||||
parser.add_argument('--pause','-p', action='store_true', dest="pause", default=False, help="Pause vacuum")
|
||||
parser.add_argument('--home', '-b', action='store_true', dest="go_home", default=False, help="Go home")
|
||||
parser.add_argument('--debug','-d', action='store_true', dest="debug", default=False, help="Enter debugging mode (won't send commands to vacuum)")
|
||||
parser.add_argument('--verbose','-v', action='store_true', dest="verbose", default=False, help="Enable verbose logs")
|
||||
parser.add_argument('--quiet','-q', action='store_true', dest="quiet", default=False, help="Quiet logs")
|
||||
args = parser.parse_args()
|
||||
|
||||
|
||||
if args.quiet and args.verbose:
|
||||
parser.error("Cannot set quiet and verbose mode simultaneously.")
|
||||
elif args.verbose:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
elif args.quiet:
|
||||
logging.basicConfig(level=logging.CRITICAL)
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
|
||||
missing = [key for key in ['device_id', 'ip', 'local_code'] if getattr(args, key) is None]
|
||||
if missing:
|
||||
parser.error(f"Missing required argument(s): {', '.join(missing)}")
|
||||
|
||||
|
||||
try:
|
||||
asyncio.run(async_main(args.device_id, args.ip, args.local_code, args.time, args.go_home,args.debug,args.pause))
|
||||
except Exception as e:
|
||||
if args.debug or args.verbose:
|
||||
print(e)
|
||||
else:
|
||||
print("An error occured.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(*sys.argv[1:])
|
||||
Reference in New Issue
Block a user