#!/usr/bin/env python3 import asyncio import json import os import re import uuid import websockets from collections import defaultdict from typing import Any, Dict, Iterable, Tuple __version__ = "0.5.5" # fix announce typo, keep robust sender + case-insensitive join # Defaults match your setup WS_URL = os.environ.get("SXC_WS", "ws://127.0.0.1:5225") CONTROL_GROUP = (os.environ.get("SXC_CONTROL_GROUP") or "BotTest").strip() ANNOUNCE_NEW = os.environ.get("SXC_ANNOUNCE", "1").strip() != "0" DEBUG = os.environ.get("SXC_DEBUG", "1").strip() == "1" LOG_RAW = os.environ.get("SXC_LOG_RAW", "0").strip() == "1" INVITE_CMDS = ["/a", "a"] # try WS-style first, then CLI-style JOIN_RE = re.compile(r"^\s*/?\s*join\s+(.+?)(?:\s+as\s+(\S+))?\s*$", re.IGNORECASE) def cmd(c: str) -> str: return json.dumps({"corrId": str(uuid.uuid4()), "cmd": c}, separators=(",", ":")) async def say(ws, group: str, text: str): if group: await ws.send(cmd(f"#{group} {text}")) # Caches # gid -> {"name": str, "customerId": str} groups: Dict[int, Dict[str, str]] = {} # gid -> { groupMemberId(int) : alias(str) } alias_map: Dict[int, Dict[int, str]] = defaultdict(dict) def _first(*vals): for v in vals: if isinstance(v, str) and v: return v return "" def _shorten(obj, limit=1600): try: s = json.dumps(obj, ensure_ascii=False, separators=(",", ":")) except Exception: s = str(obj) return s if len(s) <= limit else s[:limit] + " …" def extract_business_chat(resp: dict): """Return (gid, name, customerId) when a business chat is reported.""" if resp.get("type") == "acceptingBusinessRequest": gi = resp.get("groupInfo", {}) or {} gname = _first(gi.get("localDisplayName"), (gi.get("groupProfile") or {}).get("displayName")) gid = gi.get("groupId") cust = (gi.get("businessChat") or {}).get("customerId") return gid, gname, cust if resp.get("type") == "newChatItems": for it in resp.get("chatItems", []): ci = it.get("chatInfo", {}) or {} if ci.get("type") == "group" and ci.get("businessChat"): gi = ci.get("groupInfo", {}) or {} gname = _first(gi.get("localDisplayName"), (gi.get("groupProfile") or {}).get("displayName")) gid = gi.get("groupId") cust = (ci.get("businessChat") or {}).get("customerId") return gid, gname, cust return None, None, None def _content_and_meta(it: dict): """Return (content_dict, meta_dict) from either top-level or chatItem.*.""" chatItem = it.get("chatItem") or {} cont = it.get("content") or chatItem.get("content") or {} meta = it.get("meta") or chatItem.get("meta") or {} return cont, meta def extract_text_from_item(it: dict) -> str: """Support msgContent, direct text/markdown, and meta.itemText fallback.""" cont, meta = _content_and_meta(it) # 1) Standard nested message content if cont.get("type") in ("rcvMsgContent", "sndMsgContent"): mc = cont.get("msgContent") or {} for key in ("text", "markdown"): txt = mc.get(key) if isinstance(txt, str) and txt.strip(): return txt.strip() # 2) Some builds may put text directly on content for key in ("text", "markdown"): txt = cont.get(key) if isinstance(txt, str) and txt.strip(): return txt.strip() # 3) Fallback to meta.itemText, skipping known noisy system lines mt = meta.get("itemText") if isinstance(mt, str): t = mt.strip() noisy = ("disappearing messages", "direct messages", "voice messages", "files and media", "simplex links", "member reports", "recent history") if t and not any(t.lower().startswith(n) for n in noisy): return t return "" def deep_iter_members(obj: Any) -> Iterable[dict]: """Recursively yield any dicts that look like groupMember objects.""" if isinstance(obj, dict): if ("groupMemberId" in obj) and ("memberProfile" in obj or "localDisplayName" in obj): yield obj for v in obj.values(): yield from deep_iter_members(v) elif isinstance(obj, list): for v in obj: yield from deep_iter_members(v) def update_alias_map_from_member(gid: int, gm: dict): """Capture alias against groupMemberId whenever we see a member object.""" try: gmid = gm.get("groupMemberId") except Exception: gmid = None mp = (gm.get("memberProfile") or {}) alias = _first(mp.get("localAlias"), gm.get("localDisplayName"), mp.get("displayName")) if isinstance(gid, int) and isinstance(gmid, int) and alias: alias_map.setdefault(gid, {})[gmid] = alias if DEBUG: print(f"[alias-map] gid={gid} gmid={gmid} alias='{alias}'") def _direction_from(it: dict) -> str: chatItem = it.get("chatItem") or {} chatdir = it.get("chatDir") or chatItem.get("chatDir") or {} dtyp = (chatdir.get("type") or "").lower() # 'grouprcv' or 'groupsnd' return "rcv" if dtyp == "grouprcv" else "snd" def _primary_group_member(it: dict) -> dict: """Prefer the member embedded under chatItem.chatDir.groupMember for sender.""" chatItem = it.get("chatItem") or {} chatDir = chatItem.get("chatDir") or {} gm = chatDir.get("groupMember") if gm: return gm gm = chatItem.get("groupMember") if gm: return gm gm = it.get("groupMember") return gm or {} def extract_group_items(resp: dict) -> Iterable[Tuple[str, int, str, str, Any, str, str, dict]]: """ Yield tuples: (group_name, group_id, direction, sender_alias, sender_gmid, text, ctype, raw_item) """ if resp.get("type") != "newChatItems": return for it in resp.get("chatItems", []): ci = it.get("chatInfo", {}) or {} if ci.get("type") != "group": continue ginfo = ci.get("groupInfo", {}) or {} gname = _first(ginfo.get("localDisplayName"), (ginfo.get("groupProfile") or {}).get("displayName")) gid = ginfo.get("groupId") if not gname or gid is None: continue direction = _direction_from(it) # Deep-scan to populate alias map from any member objects for gm in deep_iter_members(it): update_alias_map_from_member(gid, gm) # Extract sender from preferred location gm = _primary_group_member(it) try: gmid = gm.get("groupMemberId") except Exception: gmid = None mp = (gm.get("memberProfile") or {}) sender_alias = _first(mp.get("localAlias"), gm.get("localDisplayName"), mp.get("displayName")) cont, _meta = _content_and_meta(it) ctype = cont.get("type") text = extract_text_from_item(it) yield (gname, gid, direction, sender_alias, gmid, text, ctype, it) def is_control_group(name: str) -> bool: return name == CONTROL_GROUP def resolve_chat_name(user_supplied: str) -> str: """Resolve to exact business chat name by exact or unambiguous prefix match; else return as-is.""" if not user_supplied: return user_supplied for info in groups.values(): if info.get("name") == user_supplied: return user_supplied cand = [info["name"] for info in groups.values() if info.get("name", "").startswith(user_supplied)] return cand[0] if len(cand) == 1 else user_supplied def resolve_sender_alias(gid: int, gmid, fallback_alias: str) -> str: """Use explicit alias if present; else look up by groupMemberId; else return empty.""" if fallback_alias: return fallback_alias if isinstance(gid, int) and isinstance(gmid, int): alias = alias_map.get(gid, {}).get(gmid) or "" if alias and DEBUG: print(f"[alias-resolved] gid={gid} gmid={gmid} -> '{alias}'") return alias return "" async def handle_control_command(ws, group_name: str, gid: int, sender_alias: str, sender_gmid, text: str): m = JOIN_RE.match(text or "") if not m: return # Allow 'join as ' to override when daemon doesn't expose alias raw_chat = (m.group(1) or "").strip() override_alias = (m.group(2) or "").strip() eff_alias = override_alias or resolve_sender_alias(gid, sender_gmid, sender_alias) if not eff_alias: await say(ws, group_name, "Cannot resolve your contact alias. Use a token alias or type: join as ") return chatname = resolve_chat_name(raw_chat) if not chatname: await say(ws, group_name, "usage: join ") return for c in INVITE_CMDS: invite = f"{c} {chatname} {eff_alias}" # no quotes if DEBUG: print(f"[join] trying: {invite}") await ws.send(cmd(invite)) await say(ws, group_name, f"Invited {eff_alias} to {chatname}") async def announce_new_chat(ws, gname: str): if ANNOUNCE_NEW and CONTROL_GROUP: if DEBUG: print(f"[announce] posting new business chat '{gname}' to control group '{CONTROL_GROUP}'") await say(ws, CONTROL_GROUP, f"New business chat: {gname} — type: join {gname}") async def run(): print(f"[cfg] WS={WS_URL} CONTROL_GROUP={CONTROL_GROUP} ANNOUNCE={ANNOUNCE_NEW} DEBUG={DEBUG} LOG_RAW={LOG_RAW}") print("[cfg] Auto-invite: DISABLED") while True: try: async with websockets.connect(WS_URL, max_size=None) as ws: print(f"[ok] connected {WS_URL}") await ws.send(cmd("/help")) async for raw in ws: try: msg = json.loads(raw) except Exception: if DEBUG: print(f"[raw-str] {raw[:300]!r} …") continue resp = msg.get("resp") or {} rtype = resp.get("type") if LOG_RAW and rtype: print(f"[EVT] {rtype}: {_shorten(resp)}") # Learn aliases from join events if rtype == "joinedGroupMember": try: ginfo = resp.get("groupInfo") or {} gid = ginfo.get("groupId") member = resp.get("member") or {} update_alias_map_from_member(gid, member) except Exception: pass # Detect and announce new business chats gid_b, gname_b, cust = extract_business_chat(resp) if isinstance(gid_b, int) and gname_b and cust: if gid_b not in groups: groups[gid_b] = {"name": gname_b, "customerId": cust} # fixed key print(f"[chat] business chat gid={gid_b} name='{gname_b}' customerId={cust}") await announce_new_chat(ws, gname_b) # Inspect group items and process control commands if rtype == "newChatItems": for gname2, gid2, direction, sender_alias, sender_gmid, text, ctype, raw_item in extract_group_items(resp): if DEBUG: print(f"[item] group='{gname2}' gid={gid2} dir={direction} sender='{sender_alias}' gmid={sender_gmid} ctype='{ctype}' text={text!r}") if direction == "rcv" and gname2 == CONTROL_GROUP: if (not sender_alias) and (sender_gmid is None) and DEBUG: print(f"[item-raw] {_shorten(raw_item)}") if text: await handle_control_command(ws, gname2, gid2, sender_alias, sender_gmid, text) except Exception as e: print(f"[ws] {e}; retrying in 2s") await asyncio.sleep(2) if __name__ == "__main__": # Example: # simplex-chat -d taurix-business -p 5225 # python3 simplex_customerservicebot.py asyncio.run(run())