Module ro_py.extensions.bots
This extension houses functions that allow generation of Bot objects, which interpret commands.
Expand source code
"""
This extension houses functions that allow generation of Bot objects, which interpret commands.
"""
from ro_py.utilities.errors import ChatError
from ro_py.client import Client
from ro_py.chat import Message
from sys import stderr
from time import sleep
import asyncio
import iso8601
class Context:
def __init__(self, cso, latest_data, notif_data):
self.cso = cso
self.requests = cso.requests
self.conversation_id = notif_data["conversation_id"]
self.actor_target_id = notif_data["actor_target_id"]
self.actor_type = notif_data["actor_type"]
self.type = notif_data["type"]
self.sequence_number = notif_data["sequence_number"]
self.id = latest_data["id"]
self.content = latest_data["content"]
self.sender_type = latest_data["senderType"]
self.sender_id = latest_data["senderTargetId"]
self.decorators = latest_data["decorators"]
self.message_type = latest_data["messageType"]
self.read = latest_data["read"]
self.sent = iso8601.parse_date(latest_data["sent"])
async def send(self, content):
send_message_req = await self.requests.post(
url="https://chat.roblox.com/v2/send-message",
data={
"message": content,
"conversationId": self.conversation_id
}
)
send_message_json = send_message_req.json()
if send_message_json["sent"]:
return Message(self.cso, send_message_json["messageId"], self.id)
else:
raise ChatError(send_message_json["statusMessage"])
class Bot(Client):
def __init__(self, prefix="!"):
super().__init__()
self.prefix = prefix
self.commands = {}
self.events = {}
self.evtloop = asyncio.new_event_loop()
self.keepgoing = False
def _generate_help(self):
help_string = f"Prefix: {self.prefix}"
for command in self.commands:
help_string = help_string + "\n" + command + ": " + command.help[:24]
return help_string
def run(self, token, background=False):
self.keepgoing = True
self.token_login(token)
self.notifications.on_notification = self._on_notification
self.evtloop = self.cso.evtloop
self.evtloop.run_until_complete(self._run())
if not background:
while self.keepgoing:
sleep(1/32)
def stop(self):
self.keepgoing = False
async def _process_command(self, data, n_data):
content = data["content"]
if content.startswith(self.prefix):
content = content[len(self.prefix):]
content_split = content.split(" ")
command = content_split[0]
if command in self.commands:
context = Context(
cso=self.cso,
latest_data=data,
notif_data=n_data
)
try:
await self.commands[command](context, *content_split[1:])
except Exception as e:
if "on_error" in self.events:
await self.events["on_error"](context, e)
else:
stderr.write("Ignoring error: " + str(e) + "\n")
await context.send("Something went wrong when running this command.")
async def _on_notification(self, notification):
if notification.type == "NewMessage":
latest_req = await self.requests.get(
url="https://chat.roblox.com/v2/get-messages",
params={
"conversationId": notification.data["conversation_id"],
"pageSize": 1
}
)
latest_data = latest_req.json()[0]
await self._process_command(latest_data, notification.data)
async def _run(self):
await self.notifications.initialize()
def command(self, *args, **kwargs):
def decorator(func):
if isinstance(func, Command):
raise TypeError('Callback is already a command.')
command = Command(func=func, **kwargs)
self.commands[func.__name__] = command
return command
return decorator
def event(self, *args, **kwargs):
def decorator(func):
command = Command(func=func, **kwargs)
self.events[func.__name__] = command
return command
return decorator
class Command:
def __init__(self, func, **kwargs):
if not asyncio.iscoroutinefunction(func):
raise TypeError('Callback must be a coroutine.')
self._callback = func
self.help = func.__doc__ or "No help available."
@property
def callback(self):
return self._callback
async def __call__(self, *args, **kwargs):
return await self.callback(*args, **kwargs)
Classes
class Bot (prefix='!')
-
Represents an authenticated Roblox client.
Parameters
token
:str
- Authentication token. You can take this from the .ROBLOSECURITY cookie in your browser.
Expand source code
class Bot(Client): def __init__(self, prefix="!"): super().__init__() self.prefix = prefix self.commands = {} self.events = {} self.evtloop = asyncio.new_event_loop() self.keepgoing = False def _generate_help(self): help_string = f"Prefix: {self.prefix}" for command in self.commands: help_string = help_string + "\n" + command + ": " + command.help[:24] return help_string def run(self, token, background=False): self.keepgoing = True self.token_login(token) self.notifications.on_notification = self._on_notification self.evtloop = self.cso.evtloop self.evtloop.run_until_complete(self._run()) if not background: while self.keepgoing: sleep(1/32) def stop(self): self.keepgoing = False async def _process_command(self, data, n_data): content = data["content"] if content.startswith(self.prefix): content = content[len(self.prefix):] content_split = content.split(" ") command = content_split[0] if command in self.commands: context = Context( cso=self.cso, latest_data=data, notif_data=n_data ) try: await self.commands[command](context, *content_split[1:]) except Exception as e: if "on_error" in self.events: await self.events["on_error"](context, e) else: stderr.write("Ignoring error: " + str(e) + "\n") await context.send("Something went wrong when running this command.") async def _on_notification(self, notification): if notification.type == "NewMessage": latest_req = await self.requests.get( url="https://chat.roblox.com/v2/get-messages", params={ "conversationId": notification.data["conversation_id"], "pageSize": 1 } ) latest_data = latest_req.json()[0] await self._process_command(latest_data, notification.data) async def _run(self): await self.notifications.initialize() def command(self, *args, **kwargs): def decorator(func): if isinstance(func, Command): raise TypeError('Callback is already a command.') command = Command(func=func, **kwargs) self.commands[func.__name__] = command return command return decorator def event(self, *args, **kwargs): def decorator(func): command = Command(func=func, **kwargs) self.events[func.__name__] = command return command return decorator
Ancestors
Methods
def command(self, *args, **kwargs)
-
Expand source code
def command(self, *args, **kwargs): def decorator(func): if isinstance(func, Command): raise TypeError('Callback is already a command.') command = Command(func=func, **kwargs) self.commands[func.__name__] = command return command return decorator
def event(self, *args, **kwargs)
-
Expand source code
def event(self, *args, **kwargs): def decorator(func): command = Command(func=func, **kwargs) self.events[func.__name__] = command return command return decorator
def run(self, token, background=False)
-
Expand source code
def run(self, token, background=False): self.keepgoing = True self.token_login(token) self.notifications.on_notification = self._on_notification self.evtloop = self.cso.evtloop self.evtloop.run_until_complete(self._run()) if not background: while self.keepgoing: sleep(1/32)
def stop(self)
-
Expand source code
def stop(self): self.keepgoing = False
Inherited members
class Command (func, **kwargs)
-
Expand source code
class Command: def __init__(self, func, **kwargs): if not asyncio.iscoroutinefunction(func): raise TypeError('Callback must be a coroutine.') self._callback = func self.help = func.__doc__ or "No help available." @property def callback(self): return self._callback async def __call__(self, *args, **kwargs): return await self.callback(*args, **kwargs)
Instance variables
var callback
-
Expand source code
@property def callback(self): return self._callback
class Context (cso, latest_data, notif_data)
-
Expand source code
class Context: def __init__(self, cso, latest_data, notif_data): self.cso = cso self.requests = cso.requests self.conversation_id = notif_data["conversation_id"] self.actor_target_id = notif_data["actor_target_id"] self.actor_type = notif_data["actor_type"] self.type = notif_data["type"] self.sequence_number = notif_data["sequence_number"] self.id = latest_data["id"] self.content = latest_data["content"] self.sender_type = latest_data["senderType"] self.sender_id = latest_data["senderTargetId"] self.decorators = latest_data["decorators"] self.message_type = latest_data["messageType"] self.read = latest_data["read"] self.sent = iso8601.parse_date(latest_data["sent"]) async def send(self, content): send_message_req = await self.requests.post( url="https://chat.roblox.com/v2/send-message", data={ "message": content, "conversationId": self.conversation_id } ) send_message_json = send_message_req.json() if send_message_json["sent"]: return Message(self.cso, send_message_json["messageId"], self.id) else: raise ChatError(send_message_json["statusMessage"])
Methods
async def send(self, content)
-
Expand source code
async def send(self, content): send_message_req = await self.requests.post( url="https://chat.roblox.com/v2/send-message", data={ "message": content, "conversationId": self.conversation_id } ) send_message_json = send_message_req.json() if send_message_json["sent"]: return Message(self.cso, send_message_json["messageId"], self.id) else: raise ChatError(send_message_json["statusMessage"])