"""Loiter plugin for Telegram (or any Python) bots. Drop-in monetization for the bot's "working..." status messages. Fail-safe by design: every method swallows network errors and returns None/no-op — your bot's behavior never changes if Loiter is down. Usage: from loiter_tg import Loiter sb = Loiter.load() # after one-time `register` line = sb.line() # sponsored line or None ... show it in your status message ... sb.report(display_seconds) # when the status message goes away One-time setup: python3 loiter_tg.py register [--server URL] [--ref CODE] [--out PATH] """ import json, os, sys, time, urllib.request DEFAULT_SERVER = "http://127.0.0.1:8787" DEFAULT_CONF = os.path.expanduser("~/.loiter/bot.json") SECONDS_PER_IMPRESSION = 5 REFRESH_S = 600 def _api(server, method, path, body=None, params=None, timeout=4): url = server + path if params: from urllib.parse import urlencode url += "?" + urlencode(params) data = json.dumps(body).encode() if body is not None else None req = urllib.request.Request(url, data=data, method=method, headers={"Content-Type": "application/json"}) with urllib.request.urlopen(req, timeout=timeout) as r: return json.loads(r.read()) class Loiter: def __init__(self, server, device_key): self.server = server self.device_key = device_key self._ads = [] self._fetched = 0.0 self._i = 0 self._shown = {} # ad_id -> times displayed since last report @classmethod def load(cls, path=DEFAULT_CONF): with open(path) as f: c = json.load(f) return cls(c["server"], c["device_key"]) def _refresh(self): if time.time() - self._fetched < REFRESH_S and self._ads: return try: r = _api(self.server, "GET", "/api/serve", params={"device_key": self.device_key, "n": 6}) self._ads = r.get("ads", []) self._fetched = time.time() except Exception: pass def line(self): """Next sponsored line to display, or None. Rotates through ads.""" try: self._refresh() if not self._ads: return None ad = self._ads[self._i % len(self._ads)] self._i += 1 self._shown[ad["ad_id"]] = self._shown.get(ad["ad_id"], 0) + 1 txt = f"Sponsored · {ad['text']}" if ad.get("url"): txt += f" → {ad['url']}" return txt except Exception: return None def report(self, display_seconds): """Report impressions for the time a sponsored status was on screen.""" try: display_seconds = max(0, int(display_seconds)) total = display_seconds // SECONDS_PER_IMPRESSION if total <= 0 or not self._shown: self._shown = {} return weights = sum(self._shown.values()) items = [] left = total for j, (ad_id, w) in enumerate(sorted(self._shown.items())): n = total * w // weights if j < len(self._shown) - 1 else left n = min(n, left) if n > 0: items.append({"ad_id": ad_id, "count": n}) left -= n self._shown = {} if items: _api(self.server, "POST", "/api/impressions", {"device_key": self.device_key, "items": items, "active_seconds": display_seconds}) except Exception: self._shown = {} def _register(argv): server = DEFAULT_SERVER; ref = None; out = DEFAULT_CONF if "--server" in argv: server = argv[argv.index("--server") + 1] if "--ref" in argv: ref = argv[argv.index("--ref") + 1] if "--out" in argv: out = argv[argv.index("--out") + 1] r = _api(server, "POST", "/api/register", {"platform": "tg-bot", "ref": ref}, timeout=10) os.makedirs(os.path.dirname(out), exist_ok=True) with open(out, "w") as f: json.dump({"server": server, "device_key": r["device_key"], "ref_code": r["ref_code"]}, f, indent=2) os.chmod(out, 0o600) print(f"Registered. Config: {out}\nReferral code: {r['ref_code']}") if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "register": _register(sys.argv[2:]) else: print(__doc__)