Fix logic errors in previous commits, update version readme and changelog, add features, improve things, etc.

This commit is contained in:
2025-09-11 21:49:43 -07:00
parent 720c24ed89
commit 0ea920b628
5 changed files with 151 additions and 98 deletions

View File

@@ -185,7 +185,7 @@ class TuyaCipher:
'>IIIH', encrypted_data, 3)
return 15
return 0
def decrypt(self, command, data):
prefix_size = self.get_prefix_size_and_validate(command, data)
@@ -536,7 +536,22 @@ class TuyaDevice:
async def async_update_state(self, state_message, _):
self._dps.update(state_message.payload["dps"])
_LOGGER.info("Received updated state {}: {}".format(self, self._dps))
dict1 = {
'POWER': '1',
'PLAY_PAUSE': '2',
'DIRECTION': '3',
'WORK_MODE': '5',
'WORK_STATUS': '15',
'GO_HOME': '101',
'CLEAN_SPEED': '102',
'FIND_ROBOT': '103',
'BATTERY_LEVEL': '104',
'ERROR_CODE': '106'
}
dict2 = self._dps
combined_dict = {key: dict2[value] for key, value in dict1.items() if value in dict2}
_LOGGER.info("Received updated state {}: {}".format(self, combined_dict))
@property
def state(self):

View File

@@ -2,25 +2,39 @@
import asyncio
import signal
import logging
import pprint
import sys
import argparse
from enum import Enum
import os
from pprint import pprint
from robovac.robovac import Robovac, CleanSpeed, WorkMode
from robovac.property import StringEnum
from robovac.robovac import Robovac, CleanSpeed, WorkMode, WorkStatus
DEFAULT_TIME = 20
DEFAULT_TIME = False
stop_event = asyncio.Event()
class Modes(StringEnum):
device_status = {
'POWER': '1',
'PLAY_PAUSE': '2',
'DIRECTION': '3',
'WORK_MODE': '5',
'WORK_STATUS': '15',
'GO_HOME': '101',
'CLEAN_SPEED': '102',
'FIND_ROBOT': '103',
'BATTERY_LEVEL': '104',
'ERROR_CODE': '106'
}
class Modes:
DEFAULT = CleanSpeed.BOOST_IQ, WorkMode.AUTO
QUIET = CleanSpeed.STANDARD, WorkMode.AUTO
MAX = CleanSpeed.MAX, WorkMode.AUTO
EDGE = CleanSpeed.MAX, WorkMode.EDGE
SPOT = CleanSpeed.MAX, WorkMode.SPOT
@@ -63,12 +77,23 @@ def parse_robovac_config(path="/home/freyja/Desktop/robovac.conf"):
return credentials
async def ordered_callback(message,device):
# print(message)
dict1 = device_status
dict2 = device.state
combined_dict = {key: dict2[value] for key, value in dict1.items() if value in dict2}
pprint(combined_dict)
async def callback(message,device):
print(message)
pprint.pprint(device.state)
# print(message)
dict1 = device_status
dict2 = device.state
combined_dict = {key: dict2[value] for key, value in dict1.items() if value in dict2}
print(combined_dict)
async def stepper(time):
async def stepper(time) -> bool:
total_seconds = time * 60
step = 1
elapsed = 0
@@ -78,21 +103,25 @@ async def stepper(time):
return True
async def async_clean(r: Robovac, time: int, mode: Modes):
if mode:
await r.async_set_clean_speed(mode[0])
await r.async_set_work_mode(mode[1])
await r.async_start_cleaning(callback)
await stepper(time)
await r.async_pause()
async def async_clean(r: Robovac, mode: Modes):
if r.work_mode != mode[1]:
await r.async_set_work_mode(mode[1],callback)
await asyncio.sleep(1)
await r.async_set_clean_speed(mode[0],callback)
await asyncio.sleep(1)
async def async_go_home(r: Robovac):
await r.async_go_home(callback)
if r.work_status != WorkStatus.CHARGING:
await r.async_go_home(callback)
await asyncio.sleep(1)
async def async_pause(r: Robovac):
await r.async_pause(callback)
if r.work_status != WorkStatus.CHARGING:
await r.async_pause(callback)
await asyncio.sleep(1)
async def async_main(
@@ -102,19 +131,26 @@ async def async_main(
time=DEFAULT_TIME,
go_home=False,
pause=False,
mode=Modes.DEFAULT
mode=None,
status=False,
rtb=False
):
asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, signal_handler)
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, signal_handler)
r = Robovac(device_id,ip,local_code)
await r.async_connect(callback)
await r.async_connect(ordered_callback if status else callback)
await asyncio.sleep(2)
if status:
stop_event.set()
if mode and not go_home and not pause:
while not stop_event.is_set():
await async_clean(r, time, mode)
await async_go_home(r)
await async_clean(r, mode)
if time:
await stepper(time)
stop_event.set()
@@ -130,12 +166,16 @@ async def async_main(
stop_event.set()
if not r.go_home and not pause:
await async_go_home(r)
if time and not pause and not go_home and not status:
if rtb:
await async_go_home(r)
elif not rtb:
await async_pause(r)
if r._connected:
await r.async_disconnect()
return
def main(*args, **kwargs):
@@ -166,75 +206,25 @@ def main(*args, **kwargs):
parser = argparse.ArgumentParser(description="Control a Robovac device.")
parser.add_argument(
'-c', '--config',
help="Path to config file",
default=early_args.config
)
parser.add_argument(
'--device_id',
help="Device ID",
default=None if early_args.device_id else defaults.get('device_id')
)
parser.add_argument(
'--ip',
help="Device IP address",
default=None if early_args.ip else defaults.get('ip')
)
parser.add_argument(
'--local_code',
help="Secret key obtained from eufy",
default=None if early_args.local_code else defaults.get('local_code')
)
parser.add_argument(
'--time', '-t',
type=int,
help="Cleaning time in minutes",
default=DEFAULT_TIME
)
parser.add_argument(
'--home', '-b',
action='store_true',
help="Go home",
default=False,
dest="go_home",
)
parser.add_argument(
'--pause','-p',
action='store_true',
help="Pause vacuum",
default=False,
dest="pause",
)
parser.add_argument(
'--mode', '-m',
dest="mode",
default="default",
help = '''Options: default, max, edge, spot \n
default: Automatic work mode, medium suction \n
max: Automatic work mode, max suction \n
edge: Clean edges of room, max suction \n
spot: Spot clean, max suction \n
'''
)
parser.add_argument(
'--verbose','-v',
action='store_true',
dest="verbose",
default=False,
help="Enable verbose logs"
)
parser.add_argument(
'--quiet','-q',
action='store_true',
dest="quiet",
default=False,
help="Quiet logs"
)
parser.add_argument('-c', '--config',help="Path to config file",default=early_args.config)
parser.add_argument('--device_id',help="Device ID",default=None if early_args.device_id else defaults.get('device_id'))
parser.add_argument('--ip',help="Device IP address",default=None if early_args.ip else defaults.get('ip'))
parser.add_argument('--local_code',help="Secret key obtained from eufy",default=None if early_args.local_code else defaults.get('local_code'))
parser.add_argument('--time', '-t',type=int,help="Cleaning time in minutes",default=DEFAULT_TIME)
parser.add_argument('--home', '-b',action='store_true',help="Go home",default=False,dest="go_home")
parser.add_argument('--pause','-p',action='store_true',help="Pause vacuum",default=False,dest="pause")
parser.add_argument('--mode', '-m',dest="mode",default="",help = "Options: default, max, edge, spot, quiet")
parser.add_argument('--verbose','-v',action='store_true',dest="verbose",default=False,help="Enable verbose logs")
parser.add_argument('--quiet','-q',action='store_true',dest="quiet",default=False,help="Quiet logs")
parser.add_argument('--status','-s',action='store_true',dest="status",default=False,help="Print status")
parser.add_argument('--return','-r',action='store_true',dest="rtb",default=False,help="Return to base upon completion (requires -t)")
args = parser.parse_args()
if args.rtb and not args.time:
parser.error("Return to base may only be set if a duration is specified.")
if args.quiet and args.verbose:
parser.error("Cannot set quiet and verbose mode simultaneously.")
@@ -243,13 +233,19 @@ def main(*args, **kwargs):
elif args.quiet:
logging.basicConfig(level=logging.CRITICAL)
sys.stdout = open(os.devnull, 'w')
elif args.status:
logging.basicConfig(level=logging.CRITICAL)
else:
logging.basicConfig(level=logging.INFO)
if args.mode.lower() and (args.pause or args.home):
if args.mode != "" and (args.pause or args.go_home):
print("Mode will be overridden.")
args.mode = None
elif args.mode == "" and (args.pause or args.go_home):
args.mode = None
elif args.mode.lower() == "" and not (args.pause or args.go_home):
args.mode = Modes.DEFAULT
elif args.mode.lower() == "default":
args.mode = Modes.DEFAULT
elif args.mode.lower() == "max":
@@ -258,6 +254,10 @@ def main(*args, **kwargs):
args.mode = Modes.SPOT
elif args.mode.lower() == "edge":
args.mode = Modes.EDGE
elif args.mode.lower() == "quiet":
args.mode = Modes.QUIET
else:
parser.error(f'Invalid mode selection')
if not use_config:
@@ -272,9 +272,9 @@ def main(*args, **kwargs):
try:
asyncio.run(async_main(args.device_id, args.ip, args.local_code, args.time, args.go_home,args.pause, args.mode))
asyncio.run(async_main(args.device_id, args.ip, args.local_code, args.time, args.go_home,args.pause, args.mode, args.status,args.rtb))
except Exception as e:
if args.debug or args.verbose:
if args.verbose:
print(e)
else:
print("An error occured.")