Working join command in control channel

This commit is contained in:
Guy Van Sanden 2025-08-31 18:24:20 +02:00
parent bed40c8f50
commit f26e875613

View File

@ -2,122 +2,223 @@
import asyncio
import json
import os
import re
import uuid
import websockets
__version__ = '0.1.0'
__version__ = "0.5.1" # parse chatItem.content + deep debug + 'join ... as <alias>'
# SimpleX business bot, invite a preset SimpleX user to an auto-created group
# AGPL 3.0 or later. Written by Taurix IT https://www.taurix.net
# Defaults match your setup (no env needed)
WS_URL = os.environ.get("SXC_WS", "ws://127.0.0.1:5225")
CONTROL_GROUP = (os.environ.get("SXC_CONTROL_GROUP") or "BotTest").strip()
ANNOUNCE_NEW = os.environ.get("SXC_ANNOUNCE", "1").strip() != "0"
DEBUG = os.environ.get("SXC_DEBUG", "1").strip() == "1"
WS_URL = os.environ.get("SXC_WS", "ws://127.0.0.1:5225")
OPERATOR_ALIAS = os.environ.get("SXC_OPERATOR_ALIAS", "@Operator")
OPERATOR_ADDR = os.environ.get("SXC_OPERATOR_ADDR", "").strip()
INVITE_CMDS = ["/a", "a"] # try WS-style first, then CLI-style
JOIN_RE = re.compile(r"^\s*/?\s*join\s+(.+?)(?:\s+as\s+(\S+))?\s*$", re.IGNORECASE)
def cmd(c: str) -> str:
return json.dumps({"corrId": str(uuid.uuid4()), "cmd": c}, separators=(",", ":"))
def q(s: str) -> str:
return '"' + s.replace('"', r'\"') + '"'
async def say(ws, group: str, text: str):
if group:
await ws.send(cmd(f"#{group} {text}"))
# Cache per-group info
groups = {} # gid -> {"name": str, "customerId": str, "invited": bool}
# Cache: gid -> {"name": str, "customerId": str}
groups = {}
async def ensure_operator_contact(ws):
if OPERATOR_ADDR:
# Idempotent: fine to call on start
await ws.send(cmd(f"/c {OPERATOR_ADDR}"))
def _first(*vals):
for v in vals:
if isinstance(v, str) and v:
return v
return ""
def extract_business_chat(resp: dict):
# Primary event in your logs
"""Return (gid, name, customerId) when a business chat is reported."""
if resp.get("type") == "acceptingBusinessRequest":
gi = resp.get("groupInfo", {})
gname = gi.get("localDisplayName") or gi.get("groupProfile", {}).get("displayName")
gi = resp.get("groupInfo", {}) or {}
gname = _first(gi.get("localDisplayName"), (gi.get("groupProfile") or {}).get("displayName"))
gid = gi.get("groupId")
bchat = gi.get("businessChat") or {}
cust = bchat.get("customerId")
cust = (gi.get("businessChat") or {}).get("customerId")
return gid, gname, cust
# Fallback from the first items burst
if resp.get("type") == "newChatItems":
for it in resp.get("chatItems", []):
ci = it.get("chatInfo", {})
ci = it.get("chatInfo", {}) or {}
if ci.get("type") == "group" and ci.get("businessChat"):
gi = ci.get("groupInfo", {})
gname = gi.get("localDisplayName") or gi.get("groupProfile", {}).get("displayName")
gi = ci.get("groupInfo", {}) or {}
gname = _first(gi.get("localDisplayName"), (gi.get("groupProfile") or {}).get("displayName"))
gid = gi.get("groupId")
cust = ci.get("businessChat", {}).get("customerId")
cust = (ci.get("businessChat") or {}).get("customerId")
return gid, gname, cust
return None, None, None
def member_connected(resp: dict):
def _content_and_meta(it: dict):
"""Return (content_dict, meta_dict) from either top-level or chatItem.*."""
chatItem = it.get("chatItem") or {}
cont = it.get("content") or chatItem.get("content") or {}
meta = it.get("meta") or chatItem.get("meta") or {}
return cont, meta
def extract_text_from_item(it: dict) -> str:
"""Be liberal: support msgContent, direct text/markdown, and meta.itemText fallback."""
cont, meta = _content_and_meta(it)
# 1) Standard nested message content
if cont.get("type") in ("rcvMsgContent", "sndMsgContent"):
mc = cont.get("msgContent") or {}
for key in ("text", "markdown"):
txt = mc.get(key)
if isinstance(txt, str) and txt.strip():
return txt.strip()
# 2) Some builds may put text directly on content
for key in ("text", "markdown"):
txt = cont.get(key)
if isinstance(txt, str) and txt.strip():
return txt.strip()
# 3) Fallback to meta.itemText, skipping known noisy system lines
mt = meta.get("itemText")
if isinstance(mt, str):
t = mt.strip()
noisy = ("disappearing messages", "direct messages", "voice messages",
"files and media", "simplex links", "member reports", "recent history")
if t and not any(t.lower().startswith(n) for n in noisy):
return t
return ""
def extract_group_text_items(resp: dict):
"""
Yield tuples for each candidate message in newChatItems:
(group_name, group_id, direction, sender_alias, text, ctype)
We still yield when text == "" so we can debug items that didn't parse.
"""
if resp.get("type") != "newChatItems":
return None, None
return
for it in resp.get("chatItems", []):
ci = it.get("chatInfo", {})
ci = it.get("chatInfo", {}) or {}
if ci.get("type") != "group":
continue
gid = ci.get("groupInfo", {}).get("groupId")
cont = it.get("content", {})
if cont.get("type") == "rcvGroupEvent" and cont.get("rcvGroupEvent", {}).get("type") == "memberConnected":
gm = it.get("chatItem", {}).get("groupMember") or {}
member_id = gm.get("memberId")
disp = (gm.get("memberProfile", {}) or {}).get("displayName") or gm.get("localDisplayName") or ""
return gid, (member_id, disp)
return None, None
ginfo = ci.get("groupInfo", {}) or {}
gname = _first(ginfo.get("localDisplayName"), (ginfo.get("groupProfile") or {}).get("displayName"))
gid = ginfo.get("groupId")
if not gname or gid is None:
continue
async def invite_operator(ws, gid: int):
info = groups.get(gid)
if not info or info.get("invited"):
# chatDir may appear at item root or under chatItem
chatdir = it.get("chatDir") or (it.get("chatItem") or {}).get("chatDir") or {}
dtyp = (chatdir.get("type") or "").lower() # 'grouprcv' or 'groupsnd'
direction = "rcv" if dtyp == "grouprcv" else "snd"
# Sender alias can appear at item root or under chatItem
gm = it.get("groupMember") or (it.get("chatItem") or {}).get("groupMember") or {}
mp = (gm.get("memberProfile") or {})
sender_alias = _first(mp.get("localAlias"), gm.get("localDisplayName"), mp.get("displayName"))
cont, _meta = _content_and_meta(it)
ctype = cont.get("type")
text = extract_text_from_item(it)
yield (gname, gid, direction, sender_alias, text, ctype, it)
def is_control_group(name: str) -> bool:
return name == CONTROL_GROUP
def resolve_chat_name(user_supplied: str) -> str:
"""Resolve to exact business chat name by exact or unambiguous prefix match; else return as-is."""
if not user_supplied:
return user_supplied
for info in groups.values():
if info.get("name") == user_supplied:
return user_supplied
cand = [info["name"] for info in groups.values() if info.get("name", "").startswith(user_supplied)]
return cand[0] if len(cand) == 1 else user_supplied
async def handle_control_command(ws, group_name: str, sender_alias: str, text: str):
m = JOIN_RE.match(text or "")
if not m:
return
gname = info["name"]
await ws.send(cmd(f"/a {gname} {OPERATOR_ALIAS}"))
await ws.send(cmd(f"#{gname} Added {OPERATOR_ALIAS} to assist."))
info["invited"] = True
print(f"[invited] {OPERATOR_ALIAS} -> group {gname} (id {gid})")
# Allow 'join <chat> as <alias>' to override when the daemon doesn't expose sender alias
raw_chat = (m.group(1) or "").strip()
override_alias = (m.group(2) or "").strip()
target_alias = override_alias or sender_alias
if not target_alias:
await say(ws, group_name, "Cannot resolve your contact alias. Use a token alias or type: join <chat> as <your_alias>")
return
chatname = resolve_chat_name(raw_chat)
if not chatname:
await say(ws, group_name, "usage: join <chatname>")
return
for c in INVITE_CMDS:
invite = f"{c} {chatname} {target_alias}" # no quotes
if DEBUG:
print(f"[join] trying: {invite}")
await ws.send(cmd(invite))
await say(ws, group_name, f"Invited {target_alias} to {chatname}")
async def announce_new_chat(ws, gname: str):
if ANNOUNCE_NEW and CONTROL_GROUP:
await say(ws, CONTROL_GROUP, f"New business chat: {gname} — type: join {gname}")
def _shorten(obj, limit=1400):
try:
s = json.dumps(obj, ensure_ascii=False, separators=(",", ":"))
except Exception:
s = str(obj)
return s if len(s) <= limit else s[:limit] + ""
async def run():
print(f"[cfg] WS={WS_URL} CONTROL_GROUP={CONTROL_GROUP} ANNOUNCE={ANNOUNCE_NEW} DEBUG={DEBUG}")
print("[cfg] Auto-invite: DISABLED")
while True:
try:
async with websockets.connect(WS_URL, max_size=None) as ws:
print(f"[ok] connected {WS_URL}")
await ensure_operator_contact(ws)
await ws.send(cmd("/help"))
async for raw in ws:
# print(f"[frame] {raw}")
try:
msg = json.loads(raw)
except Exception:
if DEBUG:
print(f"[raw-str] {raw[:300]!r}")
continue
resp = msg.get("resp") or {}
rtype = resp.get("type")
# 1) See a business chat -> cache it and invite operator
# 1) Detect and announce new business chats
gid, gname, cust = extract_business_chat(resp)
if isinstance(gid, int) and gname and cust:
if gid not in groups:
groups[gid] = {"name": gname, "customerId": cust, "invited": False}
print(f"[chat] business chat created gid={gid} name='{gname}' customerId={cust}")
await invite_operator(ws, gid)
groups[gid] = {"name": gname, "customerId": cust}
print(f"[chat] business chat gid={gid} name='{gname}' customerId={cust}")
await announce_new_chat(ws, gname)
# 2) Inspect all group items and log them; process control commands
if rtype == "newChatItems":
for gname2, gid2, direction, sender_alias, text, ctype, raw_item in extract_group_text_items(resp):
if DEBUG:
print(f"[item] group='{gname2}' gid={gid2} dir={direction} sender='{sender_alias}' ctype='{ctype}' text={text!r}")
if direction == "rcv" and is_control_group(gname2):
if not text:
# dump raw for this unparsed control message
print(f"[item-raw] {_shorten(raw_item)}")
else:
await handle_control_command(ws, gname2, sender_alias, text)
# 2) Member joins
jgid, member = member_connected(resp)
if isinstance(jgid, int) and member:
mem_id, disp = member
info = groups.get(jgid)
if info:
if mem_id == info["customerId"]:
print(f"[join] customer connected to gid={jgid} as '{disp}'")
else:
print(f"[join] non-customer joined gid={jgid}: '{disp}'")
except Exception as e:
print(f"[ws] {e}; retrying in 2s")
await asyncio.sleep(2)
if __name__ == "__main__":
# env:
# export SXC_WS=ws://127.0.0.1:5225
# export SXC_OPERATOR_ALIAS="Taurix-Operator" # pick a unique alias
# export SXC_OPERATOR_ADDR="simplex://..." # optional
# Example:
# simplex-chat -d taurix-business -p 5225
# python3 simplex_customerservicebot.py
asyncio.run(run())