14 Commits
v1.0 ... main

10 changed files with 388 additions and 55 deletions

2
.gitignore vendored
View File

@@ -3,3 +3,5 @@ dist
__pycache__ __pycache__
*.lock *.lock
*.spec *.spec
config/*.conf
config/*.txt

29
CHANGELOG.md Normal file
View File

@@ -0,0 +1,29 @@
# v1.2
- Major changes to default behavior:
- If time is not specified, whatever mode is selected will run until the vacuum gets low on battery. Then it will go home.
- If a time is specified, vacuum will run until the end of elapsed time then it will stop in place.
- Addition of a mode flag:
- `-m quiet`: Automatic direction, low suction
- `-m default`: Automatic direction, medium suction
- `-m max`: Automatic direction, maximum suction
- `-m edge`: Stick to the edges of the room
- `-m spot`: "Spot clean" - goes in goes in a spiral from one point
- Addition of a status flag:
- `-s or --status` will connect to robovac and print status.
- Improve tuya logs and callbacks by associating status names with their values.
- whereas before, status indicators such as "power" and "work mode" were reported as numerical keys, the keys are now just called "power" and "work mode".
- add a return to base flag
- `--return or -r`: when combined with `--time or -t`, will instruct robovac to return to base after time has elapsed. (default is to pause in place when time elapses).
- This is semantically different from `-b or --home` because it only impacts a timed job.
# v1.1
- Fix bugs and improve code efficiency
- remove `--debug` flag
- `--pause` now works just fine.
- script won't hang on erroneous go-home commands.
# v1.0 - Initial release
Initial release

View File

@@ -1,5 +1,5 @@
FROM ubuntu:latest FROM ubuntu:20.04
ARG DEBIAN_FRONTEND=noninteractive ARG DEBIAN_FRONTEND=noninteractive
RUN apt update && apt upgrade -y RUN apt update && apt upgrade -y
RUN apt-get -qq install python3.12-full pipenv -y RUN apt-get -qq install python3.9 python3.9-dev pipenv -y

View File

@@ -10,4 +10,4 @@ pyinstaller = "*"
[dev-packages] [dev-packages]
[requires] [requires]
python_version = "3.12" python_version = "3.9"

185
README.md Normal file
View File

@@ -0,0 +1,185 @@
# Table of Contents <!-- omit in toc -->
- [Compiling](#compiling)
- [Requirements](#requirements)
- [Instructions](#instructions)
- [Configuration](#configuration)
- [Examples](#examples)
- [Command-line](#command-line)
- [Config file](#config-file)
- [Usage](#usage)
- [Examples](#examples-1)
- [Obtaining device credentials](#obtaining-device-credentials)
- [tl;dr](#tldr)
- [Instructions:](#instructions-1)
- [With docker](#with-docker)
- [1. Enter a shell within an ubuntu docker container](#1-enter-a-shell-within-an-ubuntu-docker-container)
- [2. Set up your environment](#2-set-up-your-environment)
- [3. Clone the repo](#3-clone-the-repo)
- [4. Install requirements](#4-install-requirements)
- [5. Get your credentials](#5-get-your-credentials)
# eufy_robovac <!-- omit in toc -->
This is a cli script adapted from [Richard Mitchell's work](https://github.com/mitchellrj/eufy_robovac_). It abandons all the code for integrating with homeassistant in lieu of creating a portable binary to control the thing - either manually or with cron job.
# Compiling
This python script can be compiled to a single executable binary.
## Requirements
- linux machine
- git
- docker
## Instructions
```
git clone https://gitea.raer.me/freyjagp/eufy_robovac.git
cd eufy_robovac
chmod +x compile
./compile
```
This will create a docker image then use it to compile an executable binary to `eufy_robovac/dist/vac`.
# Configuration
The device ID, ip address, and local code are required for this to work. They may be passed as arguments, or through a config file. The defalt path for the config file is `/etc/robovac.conf`. This can be altered with the `-c` or `--config` flags.
## Examples
### Command-line
Set config file to any arbitrary location:
```
robovac --config=/home/user/robovac.conf
```
Provide credentials inline:
```
robovac --device_id=DEVICE_ID --ip=192.168.1.1 --local_code=LOCAL_CODE
```
### Config file
```
[robovac]
device_id=DEVICE_ID
ip=192.168.1.1
local_code=LOCAL_CODE
```
[see here](config/robovac.conf.example)
# Usage
```
usage: vac.py [-h] [-c CONFIG] [--device_id DEVICE_ID] [--ip IP] [--local_code LOCAL_CODE] [--time TIME] [--home] [--pause] [--mode MODE] [--verbose] [--quiet] [--status] [--return]
Control a Robovac device.
options:
-h, --help show this help message and exit
-c CONFIG, --config CONFIG
Path to config file
--device_id DEVICE_ID
Device ID
--ip IP Device IP address
--local_code LOCAL_CODE
Secret key obtained from eufy
--time TIME, -t TIME Cleaning time in minutes
--home, -b Go home
--pause, -p Pause vacuum
--mode MODE, -m MODE Options: default, max, edge, spot, quiet
--verbose, -v Enable verbose logs
--quiet, -q Quiet logs
--status, -s Print status
--return, -r Return to base upon completion (requires -t)
```
## Examples
__Clean for 5 minutes, then go home__
```
robovac -t 5 -r
```
__Clean the edge of the room until the battery runs out, then go home__
```
robovac -m edge
```
__Clean for 20 minutes then go home. Quiet all logs and run process in the background__
```
robovac -t 20 -r -q > /dev/null 2>&1 &
```
# Obtaining device credentials
## tl;dr
use [this](https://github.com/markbajaj/eufy-device-id-python) repo and follow the instructions.
## Instructions:
You'll need to install git, python3, build-essential, pipenv, libffi-dev, python3-dev, and libssl-dev with your package manager. Then you can do
```
git clone https://github.com/markbajaj/eufy-device-id-python
cd eufy-device-id-python
pipenv install
pipenv shell
python -m eufy_local_id_grabber "YOUR_EUFY_EMAIL" "YOUR_EUFY_PASSWORD"
```
This might not work and requires installing packages you may or may not want/need on your base os. We can do better.
### With docker
#### 1. Enter a shell within an ubuntu docker container
```
docker run -it ubuntu:latest bash
```
#### 2. Set up your environment
```
export DEBIAN_FRONTEND=noninteractive
apt update
apt install -y git python3 python3-venv python3-pip build-essential pipenv libffi-dev python3-dev libssl-dev
```
#### 3. Clone the repo
```
git clone https://github.com/markbajaj/eufy-device-id-python.git
cd eufy-device-id-python
```
#### 4. Install requirements
```
pipenv install
```
#### 5. Get your credentials
```
pipenv shell
python -m eufy_local_id_grabber "YOUR_EUFY_EMAIL" "YOUR_EUFY_PASSWORD"
```
Output:
```
Home: <home ID>
Device: RoboVac, device ID <DEVICE_ID>, local key <LOCAL_KEY>
```

1
VERSION Normal file
View File

@@ -0,0 +1 @@
version=1.2

View File

@@ -1,8 +1,8 @@
#!/bin/bash #!/bin/bash
NAME="compile" NAME="robovac-build"
docker buildx build . -t $NAME docker buildx build . -t $NAME
docker run -v $PWD:/opt/compile --rm -w "/opt/compile" $NAME bash -c "pipenv install;pipenv run pyinstaller -F src/vac.py" docker run -v $PWD:/opt/build --rm -w "/opt/build" $NAME bash -c "pipenv install;pipenv run pyinstaller -F src/vac.py"

View File

@@ -0,0 +1,4 @@
[robovac]
device_id=DEVICE_ID
ip=192.168.1.1
local_code=LOCAL_CODE

View File

@@ -536,7 +536,22 @@ class TuyaDevice:
async def async_update_state(self, state_message, _): async def async_update_state(self, state_message, _):
self._dps.update(state_message.payload["dps"]) self._dps.update(state_message.payload["dps"])
_LOGGER.info("Received updated state {}: {}".format(self, self._dps)) dict1 = {
'POWER': '1',
'PLAY_PAUSE': '2',
'DIRECTION': '3',
'WORK_MODE': '5',
'WORK_STATUS': '15',
'GO_HOME': '101',
'CLEAN_SPEED': '102',
'FIND_ROBOT': '103',
'BATTERY_LEVEL': '104',
'ERROR_CODE': '106'
}
dict2 = self._dps
combined_dict = {key: dict2[value] for key, value in dict1.items() if value in dict2}
_LOGGER.info("Received updated state {}: {}".format(self, combined_dict))
@property @property
def state(self): def state(self):

View File

@@ -2,19 +2,44 @@
import asyncio import asyncio
import signal import signal
import logging import logging
import pprint
import sys import sys
import argparse import argparse
from enum import Enum from enum import Enum
import os import os
from pprint import pprint
from robovac.robovac import Robovac from robovac.robovac import Robovac, CleanSpeed, WorkMode, WorkStatus
DEFAULT_TIME = False
stop_event = asyncio.Event() stop_event = asyncio.Event()
device_status = {
'POWER': '1',
'PLAY_PAUSE': '2',
'DIRECTION': '3',
'WORK_MODE': '5',
'WORK_STATUS': '15',
'GO_HOME': '101',
'CLEAN_SPEED': '102',
'FIND_ROBOT': '103',
'BATTERY_LEVEL': '104',
'ERROR_CODE': '106'
}
class Modes:
DEFAULT = CleanSpeed.BOOST_IQ, WorkMode.AUTO
QUIET = CleanSpeed.STANDARD, WorkMode.AUTO
MAX = CleanSpeed.MAX, WorkMode.AUTO
EDGE = CleanSpeed.MAX, WorkMode.EDGE
SPOT = CleanSpeed.MAX, WorkMode.SPOT
def signal_handler(): def signal_handler():
print(f'\nExiting...') print(f'\nExiting...')
stop_event.set() stop_event.set()
@@ -52,12 +77,23 @@ def parse_robovac_config(path="/home/freyja/Desktop/robovac.conf"):
return credentials return credentials
async def ordered_callback(message,device):
# print(message)
dict1 = device_status
dict2 = device.state
combined_dict = {key: dict2[value] for key, value in dict1.items() if value in dict2}
pprint(combined_dict)
async def callback(message,device): async def callback(message,device):
print(message) # print(message)
pprint.pprint(device.state) dict1 = device_status
dict2 = device.state
combined_dict = {key: dict2[value] for key, value in dict1.items() if value in dict2}
print(combined_dict)
async def stepper(time): async def stepper(time) -> bool:
total_seconds = time * 60 total_seconds = time * 60
step = 1 step = 1
elapsed = 0 elapsed = 0
@@ -67,51 +103,79 @@ async def stepper(time):
return True return True
async def async_auto_clean(r: Robovac,time): async def async_clean(r: Robovac, mode: Modes):
await r.async_start_cleaning(callback) if r.work_mode != mode[1]:
await stepper(time) await r.async_set_work_mode(mode[1],callback)
await r.async_pause() await asyncio.sleep(1)
await r.async_set_clean_speed(mode[0],callback)
await asyncio.sleep(1)
async def async_go_home(r: Robovac): async def async_go_home(r: Robovac):
if r.work_status != WorkStatus.CHARGING:
await r.async_go_home(callback) await r.async_go_home(callback)
await asyncio.sleep(1)
async def async_pause(r: Robovac): async def async_pause(r: Robovac):
if r.work_status != WorkStatus.CHARGING:
await r.async_pause(callback) await r.async_pause(callback)
await asyncio.sleep(1)
async def async_main(device_id,ip,local_code,time,go_home,debug,pause): async def async_main(
device_id,
ip,
local_code,
time=DEFAULT_TIME,
go_home=False,
pause=False,
mode=None,
status=False,
rtb=False
):
asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, signal_handler) asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, signal_handler)
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, signal_handler) asyncio.get_event_loop().add_signal_handler(signal.SIGINT, signal_handler)
r = Robovac(device_id,ip,local_code) r = Robovac(device_id,ip,local_code)
await r.async_connect(callback) await r.async_connect(ordered_callback if status else callback)
await asyncio.sleep(2) await asyncio.sleep(2)
if debug: if status:
stop_event.set()
if mode and not go_home and not pause:
while not stop_event.is_set(): while not stop_event.is_set():
if not go_home: await async_clean(r, mode)
print("Auto cleaning") if time:
await stepper(time) await stepper(time)
if go_home: stop_event.set()
print("Go home")
if pause:
print("Pause") if go_home and not pause:
else:
while not stop_event.is_set(): while not stop_event.is_set():
if not go_home:
await async_auto_clean(r, time = int(time))
await async_go_home(r) await async_go_home(r)
if go_home: stop_event.set()
if pause and not go_home:
while not stop_event.is_set():
await async_pause(r)
stop_event.set()
if time and not pause and not go_home and not status:
if rtb:
await async_go_home(r) await async_go_home(r)
if pause: elif not rtb:
await async_pause(r) await async_pause(r)
if not debug: if r._connected:
if stop_event.is_set() and not r.go_home and not pause: await async_go_home(r) await r.async_disconnect()
if r._connected: await r.async_disconnect() return
def main(*args, **kwargs): def main(*args, **kwargs):
@@ -141,23 +205,26 @@ def main(*args, **kwargs):
defaults = {} defaults = {}
if not use_config:
print("Configuration skipped")
parser = argparse.ArgumentParser(description="Control a Robovac device.") parser = argparse.ArgumentParser(description="Control a Robovac device.")
parser.add_argument('-c', '--config', help="Path to config file", default=early_args.config) parser.add_argument('-c', '--config',help="Path to config file",default=early_args.config)
parser.add_argument('--device_id', help="Device ID", default=None if early_args.device_id else defaults.get('device_id')) parser.add_argument('--device_id',help="Device ID",default=None if early_args.device_id else defaults.get('device_id'))
parser.add_argument('--ip', help="Device IP address", default=None if early_args.ip else defaults.get('ip')) parser.add_argument('--ip',help="Device IP address",default=None if early_args.ip else defaults.get('ip'))
parser.add_argument('--local_code', help="Secret key obtained from eufy", default=None if early_args.local_code else defaults.get('local_code')) parser.add_argument('--local_code',help="Secret key obtained from eufy",default=None if early_args.local_code else defaults.get('local_code'))
parser.add_argument('--time', '-t', type=int, default=20, help="Cleaning time in minutes") parser.add_argument('--time', '-t',type=int,help="Cleaning time in minutes",default=DEFAULT_TIME)
parser.add_argument('--pause','-p', action='store_true', dest="pause", default=False, help="Pause vacuum") parser.add_argument('--home', '-b',action='store_true',help="Go home",default=False,dest="go_home")
parser.add_argument('--home', '-b', action='store_true', dest="go_home", default=False, help="Go home") parser.add_argument('--pause','-p',action='store_true',help="Pause vacuum",default=False,dest="pause")
parser.add_argument('--debug','-d', action='store_true', dest="debug", default=False, help="Enter debugging mode (won't send commands to vacuum)") parser.add_argument('--mode', '-m',dest="mode",default="",help = "Options: default, max, edge, spot, quiet")
parser.add_argument('--verbose','-v', action='store_true', dest="verbose", default=False, help="Enable verbose logs") parser.add_argument('--verbose','-v',action='store_true',dest="verbose",default=False,help="Enable verbose logs")
parser.add_argument('--quiet','-q', action='store_true', dest="quiet", default=False, help="Quiet logs") parser.add_argument('--quiet','-q',action='store_true',dest="quiet",default=False,help="Quiet logs")
parser.add_argument('--status','-s',action='store_true',dest="status",default=False,help="Print status")
parser.add_argument('--return','-r',action='store_true',dest="rtb",default=False,help="Return to base upon completion (requires -t)")
args = parser.parse_args() args = parser.parse_args()
if args.rtb and not args.time:
parser.error("Return to base may only be set if a duration is specified.")
if args.quiet and args.verbose: if args.quiet and args.verbose:
parser.error("Cannot set quiet and verbose mode simultaneously.") parser.error("Cannot set quiet and verbose mode simultaneously.")
@@ -165,19 +232,49 @@ def main(*args, **kwargs):
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
elif args.quiet: elif args.quiet:
logging.basicConfig(level=logging.CRITICAL) logging.basicConfig(level=logging.CRITICAL)
sys.stdout = open(os.devnull, 'w')
elif args.status:
logging.basicConfig(level=logging.CRITICAL)
else: else:
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
if args.mode != "" and (args.pause or args.go_home):
print("Mode will be overridden.")
args.mode = None
elif args.mode == "" and (args.pause or args.go_home):
args.mode = None
elif args.mode.lower() == "" and not (args.pause or args.go_home):
args.mode = Modes.DEFAULT
elif args.mode.lower() == "default":
args.mode = Modes.DEFAULT
elif args.mode.lower() == "max":
args.mode = Modes.MAX
elif args.mode.lower() == "spot":
args.mode = Modes.SPOT
elif args.mode.lower() == "edge":
args.mode = Modes.EDGE
elif args.mode.lower() == "quiet":
args.mode = Modes.QUIET
else:
parser.error(f'Invalid mode selection')
if not use_config:
print("Configuration skipped")
missing = [key for key in ['device_id', 'ip', 'local_code'] if getattr(args, key) is None] missing = [key for key in ['device_id', 'ip', 'local_code'] if getattr(args, key) is None]
if missing: if missing:
parser.error(f"Missing required argument(s): {', '.join(missing)}") parser.error(f"Missing required argument(s): {', '.join(missing)}")
try: try:
asyncio.run(async_main(args.device_id, args.ip, args.local_code, args.time, args.go_home,args.debug,args.pause)) asyncio.run(async_main(args.device_id, args.ip, args.local_code, args.time, args.go_home,args.pause, args.mode, args.status,args.rtb))
except Exception as e: except Exception as e:
if args.debug or args.verbose: if args.verbose:
print(e) print(e)
else: else:
print("An error occured.") print("An error occured.")