Добавление уведомлений при смене треков

This commit is contained in:
2025-11-25 18:36:08 +07:00
parent 5b567c46e9
commit 86fc70941b
2 changed files with 138 additions and 3 deletions

View File

@@ -11,7 +11,7 @@
on_song_change: ["/usr/bin/python3", "/home/crud/.config/rmpc/notify.py"],
volume_step: 5, volume_step: 5,
max_fps: 30, max_fps: 30,
scrolloff: 0, scrolloff: 0,
@@ -69,7 +69,7 @@
"<C-j>": PaneDown, "<C-j>": PaneDown,
"<C-h>": PaneLeft, "<C-h>": PaneLeft,
"<C-l>": PaneRight, "<C-l>": PaneRight,
"<C-u>": UpHalf, "<PageUp>": UpHalf,
"N": PreviousResult, "N": PreviousResult,
"a": Add, "a": Add,
"A": AddAll, "A": AddAll,
@@ -82,7 +82,7 @@
"<CR>": Confirm, "<CR>": Confirm,
"i": FocusInput, "i": FocusInput,
"J": MoveDown, "J": MoveDown,
"<C-d>": DownHalf, "<PageDown>": DownHalf,
"/": EnterSearch, "/": EnterSearch,
"<C-c>": Close, "<C-c>": Close,
"<Esc>": Close, "<Esc>": Close,

135
config/rmpc/notify.py Executable file
View File

@@ -0,0 +1,135 @@
#!/usr/bin/env python3
import fcntl
import os
import time
import eyed3
import tempfile
import subprocess
from mpd import MPDClient
from pathlib import Path
from typing import Optional, Union
import hashlib
# lock file to prevent parallel runs / duplicate notifications
LOCK_FN = "/tmp/rmpc-notify.lock"
DUPLICATE_WINDOW = 5 # seconds to consider same file a duplicate
exception_buff = []
# Acquire non-blocking exclusive lock; if fails, another instance is running -> exit
_lock_fd = open(LOCK_FN, "a+")
try:
fcntl.flock(_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
_lock_fd.seek(0)
_lock_fd.truncate()
_lock_fd.write(f"{os.getpid()} {time.time()}\n")
_lock_fd.flush()
except BlockingIOError:
# Another instance is running — exit silently
raise SystemExit(0)
_last_shown = {"id": None, "ts": 0}
def file_id(path: Path) -> str:
try:
st = path.stat()
return f"{path}:{st.st_mtime_ns}"
except Exception:
return hashlib.sha1(str(path).encode()).hexdigest()
def extract(track: Path) -> Optional[Path]:
try:
audio = eyed3.load(track)
except Exception as e:
exception_buff.append(f"eyed3.load error: {e}")
return None
if not audio or not audio.tag:
exception_buff.append("track data unavailable")
return None
# Защита от проблем с парсингом дат (например "0")
try:
rd = getattr(audio.tag, "recording_date", None)
if rd:
raw = getattr(rd, "text", None) or str(rd)
if raw == "0" or raw.strip() == "":
audio.tag.recording_date = None
except Exception:
pass
images = getattr(audio.tag, "images", None)
if not images:
return None
try:
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as temp:
temp.write(images[0].image_data)
return temp.name
except Exception as e:
exception_buff.append(f"image extract error: {e}")
return None
def notify(icon: Union[str, Path], title: str, body: str) -> None:
try:
subprocess.Popen(["notify-send", "-i", str(icon), "-a", "rmpc", "-t", "2500", title, body])
except Exception as e:
exception_buff.append(e)
def main():
global _last_shown
try:
client = MPDClient()
client.connect("localhost", 6600)
song = client.currentsong() or {}
except Exception as e:
exception_buff.append(f"mpd connection error: {e}")
song = {}
music_path = Path("/home/zloy_linux/Музыка/iTunes/")
track_rel = song.get('file')
if not track_rel:
exception_buff.append("no track file in mpd response")
track_path = None
else:
track_path = music_path / track_rel
cover_path = None
if not track_path or not track_path.exists():
exception_buff.append("track file not found")
else:
ident = file_id(track_path)
now = time.time()
# дедупликация по файлу+mtime в пределах окна
if _last_shown["id"] == ident and (now - _last_shown["ts"]) < DUPLICATE_WINDOW:
return # пропускаем повторный показ
cover_path = extract(track_path)
_last_shown["id"] = ident
_last_shown["ts"] = now
artist = song.get('artist', 'Unknown artist')
title = song.get('title', 'Unknown title')
body = f"[E] {', '.join(str(e) for e in exception_buff)}" if exception_buff else artist
notify(cover_path or "audio-x-generic", title, body)
if cover_path:
time.sleep(1)
try:
os.remove(cover_path)
except Exception as e:
# не критично — логим в exception_buff и выводим в stdout
print(f"[E] cleanup error: {e}")
if __name__ == "__main__":
try:
main()
finally:
try:
# release lock by closing descriptor (file remains but unlocked)
_lock_fd.close()
except Exception:
pass