Module ro_py.extensions.bots
This extension houses functions that allow generation of Bot objects, which interpret commands.
Expand source code
"""
This extension houses functions that allow generation of Bot objects, which interpret commands.
"""
from ro_py.utilities.errors import ChatError
from ro_py.client import Client
from ro_py.chat import Message
from sys import stderr
from time import sleep
import asyncio
import iso8601
class Context:
    def __init__(self, cso, latest_data, notif_data):
        self.cso = cso
        self.requests = cso.requests
        self.conversation_id = notif_data["conversation_id"]
        self.actor_target_id = notif_data["actor_target_id"]
        self.actor_type = notif_data["actor_type"]
        self.type = notif_data["type"]
        self.sequence_number = notif_data["sequence_number"]
        self.id = latest_data["id"]
        self.content = latest_data["content"]
        self.sender_type = latest_data["senderType"]
        self.sender_id = latest_data["senderTargetId"]
        self.decorators = latest_data["decorators"]
        self.message_type = latest_data["messageType"]
        self.read = latest_data["read"]
        self.sent = iso8601.parse_date(latest_data["sent"])
    async def send(self, content):
        send_message_req = await self.requests.post(
            url="https://chat.roblox.com/v2/send-message",
            data={
                "message": content,
                "conversationId": self.conversation_id
            }
        )
        send_message_json = send_message_req.json()
        if send_message_json["sent"]:
            return Message(self.cso, send_message_json["messageId"], self.id)
        else:
            raise ChatError(send_message_json["statusMessage"])
class Bot(Client):
    def __init__(self, prefix="!"):
        super().__init__()
        self.prefix = prefix
        self.commands = {}
        self.events = {}
        self.evtloop = asyncio.new_event_loop()
        self.keepgoing = False
    def _generate_help(self):
        help_string = f"Prefix: {self.prefix}"
        for command in self.commands:
            help_string = help_string + "\n" + command + ": " + command.help[:24]
        return help_string
    def run(self, token, background=False):
        self.keepgoing = True
        self.token_login(token)
        self.notifications.on_notification = self._on_notification
        self.evtloop = self.cso.evtloop
        self.evtloop.run_until_complete(self._run())
        if not background:
            while self.keepgoing:
                sleep(1/32)
    def stop(self):
        self.keepgoing = False
    async def _process_command(self, data, n_data):
        content = data["content"]
        if content.startswith(self.prefix):
            content = content[len(self.prefix):]
            content_split = content.split(" ")
            command = content_split[0]
            if command in self.commands:
                context = Context(
                    cso=self.cso,
                    latest_data=data,
                    notif_data=n_data
                )
                try:
                    await self.commands[command](context, *content_split[1:])
                except Exception as e:
                    if "on_error" in self.events:
                        await self.events["on_error"](context, e)
                    else:
                        stderr.write("Ignoring error: " + str(e) + "\n")
                        await context.send("Something went wrong when running this command.")
    async def _on_notification(self, notification):
        if notification.type == "NewMessage":
            latest_req = await self.requests.get(
                url="https://chat.roblox.com/v2/get-messages",
                params={
                    "conversationId": notification.data["conversation_id"],
                    "pageSize": 1
                }
            )
            latest_data = latest_req.json()[0]
            await self._process_command(latest_data, notification.data)
    async def _run(self):
        await self.notifications.initialize()
    def command(self, *args, **kwargs):
        def decorator(func):
            if isinstance(func, Command):
                raise TypeError('Callback is already a command.')
            command = Command(func=func, **kwargs)
            self.commands[func.__name__] = command
            return command
        return decorator
    def event(self, *args, **kwargs):
        def decorator(func):
            command = Command(func=func, **kwargs)
            self.events[func.__name__] = command
            return command
        return decorator
class Command:
    def __init__(self, func, **kwargs):
        if not asyncio.iscoroutinefunction(func):
            raise TypeError('Callback must be a coroutine.')
        self._callback = func
        self.help = func.__doc__ or "No help available."
    @property
    def callback(self):
        return self._callback
    async def __call__(self, *args, **kwargs):
        return await self.callback(*args, **kwargs)
Classes
class Bot (prefix='!')- 
Represents an authenticated Roblox client.
Parameters
token:str- Authentication token. You can take this from the .ROBLOSECURITY cookie in your browser.
 
Expand source code
class Bot(Client): def __init__(self, prefix="!"): super().__init__() self.prefix = prefix self.commands = {} self.events = {} self.evtloop = asyncio.new_event_loop() self.keepgoing = False def _generate_help(self): help_string = f"Prefix: {self.prefix}" for command in self.commands: help_string = help_string + "\n" + command + ": " + command.help[:24] return help_string def run(self, token, background=False): self.keepgoing = True self.token_login(token) self.notifications.on_notification = self._on_notification self.evtloop = self.cso.evtloop self.evtloop.run_until_complete(self._run()) if not background: while self.keepgoing: sleep(1/32) def stop(self): self.keepgoing = False async def _process_command(self, data, n_data): content = data["content"] if content.startswith(self.prefix): content = content[len(self.prefix):] content_split = content.split(" ") command = content_split[0] if command in self.commands: context = Context( cso=self.cso, latest_data=data, notif_data=n_data ) try: await self.commands[command](context, *content_split[1:]) except Exception as e: if "on_error" in self.events: await self.events["on_error"](context, e) else: stderr.write("Ignoring error: " + str(e) + "\n") await context.send("Something went wrong when running this command.") async def _on_notification(self, notification): if notification.type == "NewMessage": latest_req = await self.requests.get( url="https://chat.roblox.com/v2/get-messages", params={ "conversationId": notification.data["conversation_id"], "pageSize": 1 } ) latest_data = latest_req.json()[0] await self._process_command(latest_data, notification.data) async def _run(self): await self.notifications.initialize() def command(self, *args, **kwargs): def decorator(func): if isinstance(func, Command): raise TypeError('Callback is already a command.') command = Command(func=func, **kwargs) self.commands[func.__name__] = command return command return decorator def event(self, *args, **kwargs): def decorator(func): command = Command(func=func, **kwargs) self.events[func.__name__] = command return command return decoratorAncestors
Methods
def command(self, *args, **kwargs)- 
Expand source code
def command(self, *args, **kwargs): def decorator(func): if isinstance(func, Command): raise TypeError('Callback is already a command.') command = Command(func=func, **kwargs) self.commands[func.__name__] = command return command return decorator def event(self, *args, **kwargs)- 
Expand source code
def event(self, *args, **kwargs): def decorator(func): command = Command(func=func, **kwargs) self.events[func.__name__] = command return command return decorator def run(self, token, background=False)- 
Expand source code
def run(self, token, background=False): self.keepgoing = True self.token_login(token) self.notifications.on_notification = self._on_notification self.evtloop = self.cso.evtloop self.evtloop.run_until_complete(self._run()) if not background: while self.keepgoing: sleep(1/32) def stop(self)- 
Expand source code
def stop(self): self.keepgoing = False 
Inherited members
 class Command (func, **kwargs)- 
Expand source code
class Command: def __init__(self, func, **kwargs): if not asyncio.iscoroutinefunction(func): raise TypeError('Callback must be a coroutine.') self._callback = func self.help = func.__doc__ or "No help available." @property def callback(self): return self._callback async def __call__(self, *args, **kwargs): return await self.callback(*args, **kwargs)Instance variables
var callback- 
Expand source code
@property def callback(self): return self._callback 
 class Context (cso, latest_data, notif_data)- 
Expand source code
class Context: def __init__(self, cso, latest_data, notif_data): self.cso = cso self.requests = cso.requests self.conversation_id = notif_data["conversation_id"] self.actor_target_id = notif_data["actor_target_id"] self.actor_type = notif_data["actor_type"] self.type = notif_data["type"] self.sequence_number = notif_data["sequence_number"] self.id = latest_data["id"] self.content = latest_data["content"] self.sender_type = latest_data["senderType"] self.sender_id = latest_data["senderTargetId"] self.decorators = latest_data["decorators"] self.message_type = latest_data["messageType"] self.read = latest_data["read"] self.sent = iso8601.parse_date(latest_data["sent"]) async def send(self, content): send_message_req = await self.requests.post( url="https://chat.roblox.com/v2/send-message", data={ "message": content, "conversationId": self.conversation_id } ) send_message_json = send_message_req.json() if send_message_json["sent"]: return Message(self.cso, send_message_json["messageId"], self.id) else: raise ChatError(send_message_json["statusMessage"])Methods
async def send(self, content)- 
Expand source code
async def send(self, content): send_message_req = await self.requests.post( url="https://chat.roblox.com/v2/send-message", data={ "message": content, "conversationId": self.conversation_id } ) send_message_json = send_message_req.json() if send_message_json["sent"]: return Message(self.cso, send_message_json["messageId"], self.id) else: raise ChatError(send_message_json["statusMessage"])