13 Commits

18 changed files with 543 additions and 383 deletions

5
.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
build/
dist
__pycache__
*.lock
*.spec

29
CHANGELOG.md Normal file
View File

@@ -0,0 +1,29 @@
# v1.2
- Major changes to default behavior:
- If time is not specified, whatever mode is selected will run until the vacuum gets low on battery. Then it will go home.
- If a time is specified, vacuum will run until the end of elapsed time then it will stop in place.
- Addition of a mode flag:
- `-m quiet`: Automatic direction, low suction
- `-m default`: Automatic direction, medium suction
- `-m max`: Automatic direction, maximum suction
- `-m edge`: Stick to the edges of the room
- `-m spot`: "Spot clean" - goes in goes in a spiral from one point
- Addition of a status flag:
- `-s or --status` will connect to robovac and print status.
- Improve tuya logs and callbacks by associating status names with their values.
- whereas before, status indicators such as "power" and "work mode" were reported as numerical keys, the keys are now just called "power" and "work mode".
- add a return to base flag
- `--return or -r`: when combined with `--time or -t`, will instruct robovac to return to base after time has elapsed. (default is to pause in place when time elapses).
- This is semantically different from `-b or --home` because it only impacts a timed job.
# v1.1
- Fix bugs and improve code efficiency
- remove `--debug` flag
- `--pause` now works just fine.
- script won't hang on erroneous go-home commands.
# v1.0 - Initial release
Initial release

5
Dockerfile Normal file
View File

@@ -0,0 +1,5 @@
FROM ubuntu:20.04
ARG DEBIAN_FRONTEND=noninteractive
RUN apt update && apt upgrade -y
RUN apt-get -qq install python3.9 python3.9-dev pipenv -y

13
LICENSE
View File

@@ -1,13 +0,0 @@
Copyright 2019 Richard Mitchell
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

13
Pipfile Normal file
View File

@@ -0,0 +1,13 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
cryptograpy = "*"
pyinstaller = "*"
[dev-packages]
[requires]
python_version = "3.9"

201
README.md
View File

@@ -1,44 +1,185 @@
# Eufy Robovac control for Python # Table of Contents <!-- omit in toc -->
Work in progress! - [Compiling](#compiling)
- [Requirements](#requirements)
- [Instructions](#instructions)
- [Configuration](#configuration)
- [Examples](#examples)
- [Command-line](#command-line)
- [Config file](#config-file)
- [Usage](#usage)
- [Examples](#examples-1)
- [Obtaining device credentials](#obtaining-device-credentials)
- [tl;dr](#tldr)
- [Instructions:](#instructions-1)
- [With docker](#with-docker)
- [1. Enter a shell within an ubuntu docker container](#1-enter-a-shell-within-an-ubuntu-docker-container)
- [2. Set up your environment](#2-set-up-your-environment)
- [3. Clone the repo](#3-clone-the-repo)
- [4. Install requirements](#4-install-requirements)
- [5. Get your credentials](#5-get-your-credentials)
## Installation # eufy_robovac <!-- omit in toc -->
Pre-requisites:
* openssl (used for encryption) This is a cli script adapted from [Richard Mitchell's work](https://github.com/mitchellrj/eufy_robovac_). It abandons all the code for integrating with homeassistant in lieu of creating a portable binary to control the thing - either manually or with cron job.
# Compiling
This python script can be compiled to a single executable binary.
## Requirements
- linux machine
- git
- docker
## Instructions
``` ```
git clone https://github.com/mitchellrj/eufy_robovac.git git clone https://gitea.raer.me/freyjagp/eufy_robovac.git
cd eufy_robovac cd eufy_robovac
python3 -m venv . chmod +x compile
bin/pip install -e . ./compile
``` ```
## Demo usage This will create a docker image then use it to compile an executable binary to `eufy_robovac/dist/vac`.
```
bin/demo DEVICE_ID IP LOCAL_KEY
```
The demo: # Configuration
* connects to your device,
* prints its state out,
* starts cleaning,
* waits 30 seconds,
* sends the device home,
* waits 10 seconds,
* disconnects & exits
## Home Assistant integration The device ID, ip address, and local code are required for this to work. They may be passed as arguments, or through a config file. The defalt path for the config file is `/etc/robovac.conf`. This can be altered with the `-c` or `--config` flags.
**EXPERIMENTAL!** ## Examples
Copy the contents of the `eufy_robovac` folder to `custom_components/eufy_vacuum` in your home assistant configuration directory. Then add the following to your configuration file: ### Command-line
Set config file to any arbitrary location:
``` ```
eufy_vacuum: robovac --config=/home/user/robovac.conf
devices: ```
- name: Robovac
address: 192.168.1.80 Provide credentials inline:
access_token: YOUR LOCAL KEY HERE
id: YOUR DEVICE ID HERE ```
type: T2118 robovac --device_id=DEVICE_ID --ip=192.168.1.1 --local_code=LOCAL_CODE
```
### Config file
```
[robovac]
device_id=DEVICE_ID
ip=192.168.1.1
local_code=LOCAL_CODE
```
[see here](config/robovac.conf.example)
# Usage
```
usage: vac.py [-h] [-c CONFIG] [--device_id DEVICE_ID] [--ip IP] [--local_code LOCAL_CODE] [--time TIME] [--home] [--pause] [--mode MODE] [--verbose] [--quiet] [--status] [--return]
Control a Robovac device.
options:
-h, --help show this help message and exit
-c CONFIG, --config CONFIG
Path to config file
--device_id DEVICE_ID
Device ID
--ip IP Device IP address
--local_code LOCAL_CODE
Secret key obtained from eufy
--time TIME, -t TIME Cleaning time in minutes
--home, -b Go home
--pause, -p Pause vacuum
--mode MODE, -m MODE Options: default, max, edge, spot, quiet
--verbose, -v Enable verbose logs
--quiet, -q Quiet logs
--status, -s Print status
--return, -r Return to base upon completion (requires -t)
```
## Examples
__Clean for 5 minutes, then go home__
```
robovac -t 5 -r
```
__Clean the edge of the room until the battery runs out, then go home__
```
robovac -m edge
```
__Clean for 20 minutes then go home. Quiet all logs and run process in the background__
```
robovac -t 20 -r -q > /dev/null 2>&1 &
```
# Obtaining device credentials
## tl;dr
use [this](https://github.com/markbajaj/eufy-device-id-python) repo and follow the instructions.
## Instructions:
You'll need to install git, python3, build-essential, pipenv, libffi-dev, python3-dev, and libssl-dev with your package manager. Then you can do
```
git clone https://github.com/markbajaj/eufy-device-id-python
cd eufy-device-id-python
pipenv install
pipenv shell
python -m eufy_local_id_grabber "YOUR_EUFY_EMAIL" "YOUR_EUFY_PASSWORD"
```
This might not work and requires installing packages you may or may not want/need on your base os. We can do better.
### With docker
#### 1. Enter a shell within an ubuntu docker container
```
docker run -it ubuntu:latest bash
```
#### 2. Set up your environment
```
export DEBIAN_FRONTEND=noninteractive
apt update
apt install -y git python3 python3-venv python3-pip build-essential pipenv libffi-dev python3-dev libssl-dev
```
#### 3. Clone the repo
```
git clone https://github.com/markbajaj/eufy-device-id-python.git
cd eufy-device-id-python
```
#### 4. Install requirements
```
pipenv install
```
#### 5. Get your credentials
```
pipenv shell
python -m eufy_local_id_grabber "YOUR_EUFY_EMAIL" "YOUR_EUFY_PASSWORD"
```
Output:
```
Home: <home ID>
Device: RoboVac, device ID <DEVICE_ID>, local key <LOCAL_KEY>
``` ```

1
VERSION Normal file
View File

@@ -0,0 +1 @@
version=1.2

8
compile Executable file
View File

@@ -0,0 +1,8 @@
#!/bin/bash
NAME="robovac-build"
docker buildx build . -t $NAME
docker run -v $PWD:/opt/build --rm -w "/opt/build" $NAME bash -c "pipenv install;pipenv run pyinstaller -F src/vac.py"

View File

@@ -0,0 +1,4 @@
[robovac]
device_id=DEVICE_ID
ip=192.168.1.1
local_code=LOCAL_CODE

View File

@@ -1,58 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2019 Richard Mitchell
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import logging
import pprint
import sys
from eufy_robovac.robovac import Robovac
logging.basicConfig(level=logging.DEBUG)
async def connected_callback(message, device):
print("Connected. Current device state:")
pprint.pprint(device.state)
async def cleaning_started_callback(message, device):
print("Cleaning started.")
async def async_main(device_id, ip, local_key=None, *args, **kwargs):
r = Robovac(device_id, ip, local_key, *args, **kwargs)
await r.async_connect(connected_callback)
await asyncio.sleep(1)
print("Starting cleaning...")
await r.async_start_cleaning(cleaning_started_callback)
await asyncio.sleep(5)
print("Pausing...")
r.play_pause = False
await asyncio.sleep(1)
print("Disconnecting...")
await r.async_disconnect()
def main(*args, **kwargs):
if not args:
args = sys.argv[1:]
asyncio.run(async_main(*args, **kwargs))
if __name__ == '__main__':
main(*sys.argv[1:])

View File

@@ -1,45 +0,0 @@
### HA support
"""Support for Eufy devices."""
import logging
import voluptuous as vol
from homeassistant.const import (
CONF_ACCESS_TOKEN, CONF_ADDRESS, CONF_DEVICES, CONF_NAME,
CONF_ID, CONF_TYPE)
from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
DOMAIN = 'eufy_vacuum'
DEVICE_SCHEMA = vol.Schema({
vol.Required(CONF_ADDRESS): cv.string,
vol.Optional(CONF_ACCESS_TOKEN): cv.string,
vol.Required(CONF_ID): cv.string,
vol.Required(CONF_TYPE): cv.string,
vol.Optional(CONF_NAME): cv.string
})
CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.Schema({
vol.Optional(CONF_DEVICES, default=[]):
vol.All(cv.ensure_list, [DEVICE_SCHEMA]),
}),
}, extra=vol.ALLOW_EXTRA)
def setup(hass, config):
"""Set up Eufy devices."""
for device_info in config.get(DOMAIN, {}).get(CONF_DEVICES, []):
device = {}
device['address'] = device_info[CONF_ADDRESS]
device['local_key'] = device_info.get(CONF_ACCESS_TOKEN)
device['device_id'] = device_info[CONF_ID]
device['name'] = device_info.get(CONF_NAME)
device['model'] = device_info[CONF_TYPE]
discovery.load_platform(hass, 'vacuum', DOMAIN, device, config)
return True

View File

@@ -1,181 +0,0 @@
"""Support for Eufy vacuum cleaners."""
import logging
from homeassistant.components.vacuum import (
PLATFORM_SCHEMA,
STATE_CLEANING, STATE_DOCKED, STATE_IDLE, STATE_PAUSED, STATE_RETURNING,
STATE_ERROR,
SUPPORT_BATTERY, SUPPORT_CLEAN_SPOT, SUPPORT_FAN_SPEED, SUPPORT_LOCATE,
SUPPORT_PAUSE, SUPPORT_RETURN_HOME, SUPPORT_STATUS, SUPPORT_START,
SUPPORT_TURN_ON, SUPPORT_TURN_OFF,
VacuumEntity)
from . import robovac
_LOGGER = logging.getLogger(__name__)
FAN_SPEED_OFF = 'Off'
FAN_SPEED_STANDARD = 'Standard'
FAN_SPEED_BOOST_IQ = 'Boost IQ'
FAN_SPEED_MAX = 'Max'
FAN_SPEEDS = {
robovac.CleanSpeed.NO_SUCTION: FAN_SPEED_OFF,
robovac.CleanSpeed.STANDARD: FAN_SPEED_STANDARD,
robovac.CleanSpeed.BOOST_IQ: FAN_SPEED_BOOST_IQ,
robovac.CleanSpeed.MAX: FAN_SPEED_MAX,
}
SUPPORT_ROBOVAC_T2118 = (
SUPPORT_BATTERY | SUPPORT_CLEAN_SPOT | SUPPORT_FAN_SPEED | SUPPORT_LOCATE |
SUPPORT_PAUSE | SUPPORT_RETURN_HOME | SUPPORT_START | SUPPORT_STATUS |
SUPPORT_TURN_OFF | SUPPORT_TURN_ON
)
MODEL_CONFIG = {
'T2118': {
'fan_speeds': FAN_SPEEDS,
'support': SUPPORT_ROBOVAC_T2118
}
}
def setup_platform(hass, config, add_entities, device_config=None):
"""Set up Eufy vacuum cleaners."""
if device_config is None:
return
add_entities([EufyVacuum(device_config)], True)
class EufyVacuum(VacuumEntity):
"""Representation of a Eufy vacuum cleaner."""
def __init__(self, device_config):
"""Initialize the light."""
try:
self._config = MODEL_CONFIG[device_config['model'].upper()]
except KeyError:
raise RuntimeError("Unsupported model {}".format(
device_config['model']))
self._fan_speed_reverse_mapping = {
v: k for k, v in self._config['fan_speeds'].items()}
self._device_id = device_config['device_id']
self.robovac = robovac.Robovac(
device_config['device_id'], device_config['address'],
device_config['local_key'])
self._name = device_config['name']
async def async_update(self):
"""Synchronise state from the vacuum."""
await self.robovac.async_get()
@property
def unique_id(self):
"""Return the ID of this vacuum."""
return self._device_id
@property
def name(self):
"""Return the name of the device if any."""
return self._name
@property
def is_on(self):
"""Return true if device is on."""
return self.robovac.work_status == robovac.WorkStatus.RUNNING
@property
def supported_features(self):
"""Flag vacuum cleaner robot features that are supported."""
return self._config['support']
@property
def fan_speed(self):
"""Return the fan speed of the vacuum cleaner."""
return self._config['fan_speeds'].get(
self.robovac.clean_speed, FAN_SPEED_OFF)
@property
def fan_speed_list(self):
"""Get the list of available fan speed steps of the vacuum cleaner."""
return list(self._config['fan_speeds'].values())
@property
def battery_level(self):
"""Return the battery level of the vacuum cleaner."""
return self.robovac.battery_level
@property
def status(self):
"""Return the status of the vacuum cleaner."""
if self.robovac.error_code != robovac.ErrorCode.NO_ERROR:
return STATE_ERROR
elif self.robovac.go_home:
return STATE_RETURNING
elif self.robovac.work_status == robovac.WorkStatus.RUNNING:
return STATE_CLEANING
elif self.robovac.work_status == robovac.WorkStatus.CHARGING:
return STATE_DOCKED
elif self.robovac.work_status == robovac.WorkStatus.RECHARGE_NEEDED:
# Should be captured by `go_home` above, but just in case
return STATE_RETURNING
elif self.robovac.work_status == robovac.WorkStatus.SLEEPING:
return STATE_IDLE
elif self.robovac.work_status == robovac.WorkStatus.STAND_BY:
return STATE_IDLE
elif self.robovac.work_status == robovac.WorkStatus.COMPLETED:
return STATE_DOCKED
@property
def available(self) -> bool:
"""Return True if entity is available."""
return True
async def async_return_to_base(self, **kwargs):
"""Set the vacuum cleaner to return to the dock."""
await self.robovac.async_go_home()
async def async_clean_spot(self, **kwargs):
"""Perform a spot clean-up."""
await self.robovac.async_set_work_mode(robovac.WorkMode.SPOT)
async def async_locate(self, **kwargs):
"""Locate the vacuum cleaner."""
await self.robovac.async_find_robot()
async def async_set_fan_speed(self, fan_speed, **kwargs):
"""Set fan speed."""
clean_speed = self._fan_speed_reverse_mapping[fan_speed]
await self.robovac.async_set_clean_speed(clean_speed)
async def async_turn_on(self, **kwargs):
"""Turn the vacuum on."""
await self.robovac.async_set_work_mode(robovac.WorkMode.AUTO)
async def async_turn_off(self, **kwargs):
"""Turn the vacuum off and return to home."""
await self.async_return_to_base()
async def async_start(self, **kwargs):
"""Resume the cleaning cycle."""
await self.async_turn_on()
async def async_resume(self, **kwargs):
"""Resume the cleaning cycle."""
await self.robovac.async_play()
async def async_pause(self, **kwargs):
"""Pause the cleaning cycle."""
await self.robovac.async_pause()
async def async_start_pause(self, **kwargs):
"""Pause the cleaning task or resume it."""
if self.robovac.play_pause:
await self.async_pause()
else:
await self.async_play()

View File

@@ -1,48 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2019 Richard Mitchell
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from setuptools import setup, find_packages
import sys
import warnings
dynamic_requires = []
setup(
name='eufy_robovac',
version="0.0",
author='Richard Mitchell',
author_email='eufy-robovac@mitch.org.uk',
url='http://github.com/mitchellrj/eufy_robovac',
packages=find_packages(),
scripts=[],
description='Python API for controlling Eufy robotic vacuum cleaners',
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'License :: OSI Approved :: Apache Software License',
'Operating System :: OS Independent',
'Programming Language :: Python',
],
install_requires=[
'cryptography'
],
entry_points = {
'console_scripts': ['demo=eufy_robovac.demo:main'],
}
)

View File

@@ -15,8 +15,3 @@
# limitations under the License. # limitations under the License.
from .robovac import Robovac from .robovac import Robovac
try:
from .platform import *
except ImportError:
pass

View File

@@ -15,6 +15,7 @@
# limitations under the License. # limitations under the License.
import logging import logging
import asyncio
from .property import DeviceProperty, StringEnum from .property import DeviceProperty, StringEnum
from .tuya import TuyaDevice from .tuya import TuyaDevice
@@ -86,6 +87,7 @@ class Robovac(TuyaDevice):
BATTERY_LEVEL = '104' BATTERY_LEVEL = '104'
ERROR_CODE = '106' ERROR_CODE = '106'
power = DeviceProperty(POWER) power = DeviceProperty(POWER)
play_pause = DeviceProperty(PLAY_PAUSE) play_pause = DeviceProperty(PLAY_PAUSE)
direction = DeviceProperty(DIRECTION) direction = DeviceProperty(DIRECTION)
@@ -117,3 +119,6 @@ class Robovac(TuyaDevice):
async def async_set_clean_speed(self, clean_speed, callback=None): async def async_set_clean_speed(self, clean_speed, callback=None):
await self.async_set({self.CLEAN_SPEED: str(clean_speed)}, callback) await self.async_set({self.CLEAN_SPEED: str(clean_speed)}, callback)
async def async_move(self, direction, callback=None):
await self.async_set({self.DIRECTION: str(direction)}, callback)

View File

@@ -536,7 +536,22 @@ class TuyaDevice:
async def async_update_state(self, state_message, _): async def async_update_state(self, state_message, _):
self._dps.update(state_message.payload["dps"]) self._dps.update(state_message.payload["dps"])
_LOGGER.info("Received updated state {}: {}".format(self, self._dps)) dict1 = {
'POWER': '1',
'PLAY_PAUSE': '2',
'DIRECTION': '3',
'WORK_MODE': '5',
'WORK_STATUS': '15',
'GO_HOME': '101',
'CLEAN_SPEED': '102',
'FIND_ROBOT': '103',
'BATTERY_LEVEL': '104',
'ERROR_CODE': '106'
}
dict2 = self._dps
combined_dict = {key: dict2[value] for key, value in dict1.items() if value in dict2}
_LOGGER.info("Received updated state {}: {}".format(self, combined_dict))
@property @property
def state(self): def state(self):

284
src/vac.py Executable file
View File

@@ -0,0 +1,284 @@
#!/usr/bin/env python3
import asyncio
import signal
import logging
import sys
import argparse
from enum import Enum
import os
from pprint import pprint
from robovac.robovac import Robovac, CleanSpeed, WorkMode, WorkStatus
DEFAULT_TIME = False
stop_event = asyncio.Event()
device_status = {
'POWER': '1',
'PLAY_PAUSE': '2',
'DIRECTION': '3',
'WORK_MODE': '5',
'WORK_STATUS': '15',
'GO_HOME': '101',
'CLEAN_SPEED': '102',
'FIND_ROBOT': '103',
'BATTERY_LEVEL': '104',
'ERROR_CODE': '106'
}
class Modes:
DEFAULT = CleanSpeed.BOOST_IQ, WorkMode.AUTO
QUIET = CleanSpeed.STANDARD, WorkMode.AUTO
MAX = CleanSpeed.MAX, WorkMode.AUTO
EDGE = CleanSpeed.MAX, WorkMode.EDGE
SPOT = CleanSpeed.MAX, WorkMode.SPOT
def signal_handler():
print(f'\nExiting...')
stop_event.set()
def parse_robovac_config(path="/home/freyja/Desktop/robovac.conf"):
config = {}
if not os.path.exists(path):
raise FileNotFoundError(f"Config file not found at {path}")
with open(path, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"): # Skip empty lines and comments
if '=' in line:
key, value = map(str.strip, line.split('=', 1))
config[key] = value
required_keys = ['device_id', 'ip', 'local_code']
if not all(key in config for key in required_keys):
missing = [key for key in required_keys if key not in config]
raise ValueError(f"Missing keys in config: {', '.join(missing)}")
class credentials(Enum):
device_id = config['device_id']
ip = config['ip']
local_code = config['local_code']
return credentials
async def ordered_callback(message,device):
# print(message)
dict1 = device_status
dict2 = device.state
combined_dict = {key: dict2[value] for key, value in dict1.items() if value in dict2}
pprint(combined_dict)
async def callback(message,device):
# print(message)
dict1 = device_status
dict2 = device.state
combined_dict = {key: dict2[value] for key, value in dict1.items() if value in dict2}
print(combined_dict)
async def stepper(time) -> bool:
total_seconds = time * 60
step = 1
elapsed = 0
while elapsed < total_seconds and not stop_event.is_set():
await asyncio.sleep(step)
elapsed += step
return True
async def async_clean(r: Robovac, mode: Modes):
if r.work_mode != mode[1]:
await r.async_set_work_mode(mode[1],callback)
await asyncio.sleep(1)
await r.async_set_clean_speed(mode[0],callback)
await asyncio.sleep(1)
async def async_go_home(r: Robovac):
if r.work_status != WorkStatus.CHARGING:
await r.async_go_home(callback)
await asyncio.sleep(1)
async def async_pause(r: Robovac):
if r.work_status != WorkStatus.CHARGING:
await r.async_pause(callback)
await asyncio.sleep(1)
async def async_main(
device_id,
ip,
local_code,
time=DEFAULT_TIME,
go_home=False,
pause=False,
mode=None,
status=False,
rtb=False
):
asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, signal_handler)
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, signal_handler)
r = Robovac(device_id,ip,local_code)
await r.async_connect(ordered_callback if status else callback)
await asyncio.sleep(2)
if status:
stop_event.set()
if mode and not go_home and not pause:
while not stop_event.is_set():
await async_clean(r, mode)
if time:
await stepper(time)
stop_event.set()
if go_home and not pause:
while not stop_event.is_set():
await async_go_home(r)
stop_event.set()
if pause and not go_home:
while not stop_event.is_set():
await async_pause(r)
stop_event.set()
if time and not pause and not go_home and not status:
if rtb:
await async_go_home(r)
elif not rtb:
await async_pause(r)
if r._connected:
await r.async_disconnect()
return
def main(*args, **kwargs):
early_parser = argparse.ArgumentParser(add_help=False)
early_parser.add_argument('-c', '--config', help="Path to config file", default="/etc/robovac.conf")
early_parser.add_argument('--device_id')
early_parser.add_argument('--ip')
early_parser.add_argument('--local_code')
early_args, remaining_args = early_parser.parse_known_args()
use_config = not (early_args.device_id or early_args.ip or early_args.local_code)
defaults = {}
if use_config:
try:
creds = parse_robovac_config(early_args.config)
defaults = {
"device_id": creds.device_id.value,
"ip": creds.ip.value,
"local_code": creds.local_code.value
}
except Exception:
defaults = {}
parser = argparse.ArgumentParser(description="Control a Robovac device.")
parser.add_argument('-c', '--config',help="Path to config file",default=early_args.config)
parser.add_argument('--device_id',help="Device ID",default=None if early_args.device_id else defaults.get('device_id'))
parser.add_argument('--ip',help="Device IP address",default=None if early_args.ip else defaults.get('ip'))
parser.add_argument('--local_code',help="Secret key obtained from eufy",default=None if early_args.local_code else defaults.get('local_code'))
parser.add_argument('--time', '-t',type=int,help="Cleaning time in minutes",default=DEFAULT_TIME)
parser.add_argument('--home', '-b',action='store_true',help="Go home",default=False,dest="go_home")
parser.add_argument('--pause','-p',action='store_true',help="Pause vacuum",default=False,dest="pause")
parser.add_argument('--mode', '-m',dest="mode",default="",help = "Options: default, max, edge, spot, quiet")
parser.add_argument('--verbose','-v',action='store_true',dest="verbose",default=False,help="Enable verbose logs")
parser.add_argument('--quiet','-q',action='store_true',dest="quiet",default=False,help="Quiet logs")
parser.add_argument('--status','-s',action='store_true',dest="status",default=False,help="Print status")
parser.add_argument('--return','-r',action='store_true',dest="rtb",default=False,help="Return to base upon completion (requires -t)")
args = parser.parse_args()
if args.rtb and not args.time:
parser.error("Return to base may only be set if a duration is specified.")
if args.quiet and args.verbose:
parser.error("Cannot set quiet and verbose mode simultaneously.")
elif args.verbose:
logging.basicConfig(level=logging.DEBUG)
elif args.quiet:
logging.basicConfig(level=logging.CRITICAL)
sys.stdout = open(os.devnull, 'w')
elif args.status:
logging.basicConfig(level=logging.CRITICAL)
else:
logging.basicConfig(level=logging.INFO)
if args.mode != "" and (args.pause or args.go_home):
print("Mode will be overridden.")
args.mode = None
elif args.mode == "" and (args.pause or args.go_home):
args.mode = None
elif args.mode.lower() == "" and not (args.pause or args.go_home):
args.mode = Modes.DEFAULT
elif args.mode.lower() == "default":
args.mode = Modes.DEFAULT
elif args.mode.lower() == "max":
args.mode = Modes.MAX
elif args.mode.lower() == "spot":
args.mode = Modes.SPOT
elif args.mode.lower() == "edge":
args.mode = Modes.EDGE
elif args.mode.lower() == "quiet":
args.mode = Modes.QUIET
else:
parser.error(f'Invalid mode selection')
if not use_config:
print("Configuration skipped")
missing = [key for key in ['device_id', 'ip', 'local_code'] if getattr(args, key) is None]
if missing:
parser.error(f"Missing required argument(s): {', '.join(missing)}")
try:
asyncio.run(async_main(args.device_id, args.ip, args.local_code, args.time, args.go_home,args.pause, args.mode, args.status,args.rtb))
except Exception as e:
if args.verbose:
print(e)
else:
print("An error occured.")
if __name__ == "__main__":
main(*sys.argv[1:])