Module ro_py.groups

This file houses functions and classes that pertain to Roblox groups.

Expand source code
"""

This file houses functions and classes that pertain to Roblox groups.

"""

import copy
from enum import Enum

import iso8601
import asyncio

from ro_py.wall import Wall
from ro_py.roles import Role
from ro_py.events import Event
from ro_py.users import BaseUser
from typing import Tuple, Callable
from ro_py.events import EventTypes
from ro_py.utilities.errors import NotFound
from ro_py.bases.baseuser import PartialUser
from ro_py.utilities.pages import Pages, SortOrder
from ro_py.utilities.clientobject import ClientObject

from ro_py.utilities.url import url
endpoint = url("groups")


class Shout:
    """
    Represents a group shout.
    """
    def __init__(self, cso, group, shout_data):
        self.cso = cso
        self.requests = cso.requests
        self.group = group
        self.data = shout_data
        self.body = shout_data["body"]
        self.created = iso8601.parse_date(shout_data["created"])
        self.updated = iso8601.parse_date(shout_data["updated"])

        # TODO: Make this a PartialUser
        self.poster = None

    def __str__(self):
        return self.body

    async def __call__(self, message):
        """
        Updates the shout of the group.
        Please note that doing so will completely delete this Shout object and return a new Shout object.
        The parent group's shout parameter will also be updated accordingly.

        Parameters
        ----------
        message : str
            Message that will overwrite the current shout of a group.

        Returns
        -------
        ro_py.groups.Shout

        """
        shout_req = await self.requests.patch(
            url=endpoint + f"/v1/groups/{self.group.id}/status",
            data={
                "message": message
            }
        )
        self.group.shout = Shout(self.cso, self.group, shout_req.json())
        return self.group.shout


class JoinRequest:
    def __init__(self, cso, data, group):
        self.requests = cso.requests
        self.group = group
        self.requester = PartialUser(cso, data['requester'])
        self.created = iso8601.parse_date(data['created'])

    async def accept(self):
        accept_req = await self.requests.post(
            url=endpoint + f"/v1/groups/{self.group.id}/join-requests/users/{self.requester.id}"
        )
        return accept_req.status_code == 200

    async def decline(self):
        accept_req = await self.requests.delete(
            url=endpoint + f"/v1/groups/{self.group.id}/join-requests/users/{self.requester.id}"
        )
        return accept_req.status_code == 200


class Actions(Enum):
    delete_post = "deletePost"
    remove_member = "removeMember"
    accept_join_request = "acceptJoinRequest"
    decline_join_request = "declineJoinRequest"
    post_shout = "postShout"
    change_rank = "changeRank"
    buy_ad = "buyAd"
    send_ally_request = "sendAllyRequest"
    create_enemy = "createEnemy"
    accept_ally_request = "acceptAllyRequest"
    decline_ally_request = "declineAllyRequest"
    delete_ally = "deleteAlly"
    add_group_place = "addGroupPlace"
    delete_group_place = "deleteGroupPlace"
    create_items = "createItems"
    configure_items = "configureItems"
    spend_group_funds = "spendGroupFunds"
    change_owner = "changeOwner"
    delete = "delete"
    adjust_currency_amounts = "adjustCurrencyAmounts"
    abandon = "abandon"
    claim = "claim"
    Rename = "rename"
    change_description = "changeDescription"
    create_group_asset = "createGroupAsset"
    upload_group_asset = "uploadGroupAsset"
    configure_group_asset = "configureGroupAsset"
    revert_group_asset = "revertGroupAsset"
    create_group_developer_product = "createGroupDeveloperProduct"
    configure_group_game = "configureGroupGame"
    lock = "lock"
    unlock = "unlock"
    create_game_pass = "createGamePass"
    create_badge = "createBadge"
    configure_badge = "configureBadge"
    save_place = "savePlace"
    publish_place = "publishPlace"
    invite_to_clan = "inviteToClan"
    kick_from_clan = "kickFromClan"
    cancel_clan_invite = "cancelClanInvite"
    buy_clan = "buyClan"


class Action:
    def __init__(self, cso, data, group):
        self.group = group
        self.actor = Member(cso, data['actor']['user']['userId'], data['actor']['user']['username'], group, Role(cso, group, data['actor']['role']))
        self.action = data['actionType']
        self.created = iso8601.parse_date(data['created'])
        self.data = data['description']


def action_handler(cso, data, args):
    actions = []
    for action in data:
        actions.append(Action(cso, action, args))
    return actions


def join_request_handler(cso, data, args):
    join_requests = []
    for request in data:
        join_requests.append(JoinRequest(cso, request, args))
    return join_requests


def member_handler(cso, data, args):
    members = []
    for member in data:
        members.append(member)
    return members


class Group(ClientObject):
    """
    Represents a group.
    """
    def __init__(self, cso, group_id):
        super().__init__()
        self.cso = cso
        """Client Shared Object"""
        self.requests = cso.requests
        "Requests object."
        self.id = group_id
        "Group ID."
        self.wall = Wall(self.cso, self)
        """Wall object."""
        self.name = None
        """Group name."""
        self.description = None
        """Group description."""
        self.owner = None
        """Group owner."""
        self.member_count = None
        """Group member count."""
        self.is_builders_club_only = None
        """True if the group is Builders Club (Premium) only. This seems to have been removed."""
        self.public_entry_allowed = None
        """Public entry allowed (private/public group)"""
        self.shout = None
        """Current group shout (Shout)"""
        self.events = Events(cso, self)
        """Events object."""
        self.is_locked = False
        """True if this is a locked group."""

    async def update(self):
        """
        Updates the group's information.
        """
        group_info_req = await self.requests.get(endpoint + f"/v1/groups/{self.id}")
        group_info = group_info_req.json()
        self.name = group_info["name"]
        self.description = group_info["description"]
        self.owner = await self.cso.client.get_user(group_info["owner"]["userId"])
        self.member_count = group_info["memberCount"]
        self.is_builders_club_only = group_info["isBuildersClubOnly"]
        self.public_entry_allowed = group_info["publicEntryAllowed"]
        if group_info.get('shout'):
            self.shout = Shout(self.cso, self, group_info['shout'])
        else:
            self.shout = None
        if "isLocked" in group_info:
            self.is_locked = group_info["isLocked"]

    async def update_shout(self, message):
        """
        Updates the shout of the group.
        DEPRECATED: Just call group.shout()

        Parameters
        ----------
        message : str
            Message that will overwrite the current shout of a group.

        Returns
        -------
        int
        """
        return await self.shout(message)

    async def get_roles(self):
        """
        Gets all roles of the group.

        Returns
        -------
        list
        """
        role_req = await self.requests.get(
            url=endpoint + f"/v1/groups/{self.id}/roles"
        )
        roles = []
        for role in role_req.json()['roles']:
            roles.append(Role(self.cso, self, role))
        return roles

    async def get_member_by_id(self, user_id):
        # Get list of group user is in.
        member_req = await self.requests.get(
            url=endpoint + f"/v2/users/{user_id}/groups/roles"
        )
        data = member_req.json()

        # Find group in list.
        group_data = None
        for group in data['data']:
            if group['group']['id'] == self.id:
                group_data = group
                break

        # Check if user is in group.
        if not group_data:
            raise NotFound(f"The user {user_id} was not found in group {self.id}")

        # Create data to return.
        role = Role(self.cso, self, group_data['role'])
        member = Member(self.cso, user_id, "", self, role)
        return member

    async def get_member_by_username(self, name):
        user = await self.cso.client.get_user_by_username(name)
        member_req = await self.requests.get(
            url=endpoint + f"/v2/users/{user.id}/groups/roles"
        )
        data = member_req.json()

        # Find group in list.
        group_data = None
        for group in data['data']:
            if group['group']['id'] == self.id:
                group_data = group
                break

        # Check if user is in group.
        if not group_data:
            raise NotFound(f"The user {name} was not found in group {self.id}")

        # Create data to return.
        role = Role(self.cso, self, group_data['role'])
        member = Member(self.cso, user.id, user.name, self, role)
        return member

    async def get_join_requests(self, sort_order=SortOrder.Ascending, limit=100):
        pages = Pages(
            cso=self.cso,
            url=endpoint + f"/v1/groups/{self.id}/join-requests",
            sort_order=sort_order,
            limit=limit,
            handler=join_request_handler,
            handler_args=self
        )
        await pages.get_page()
        return pages

    async def get_members(self, sort_order=SortOrder.Ascending, limit=100):
        pages = Pages(
            cso=self.cso,
            url=endpoint + f"/v1/groups/{self.id}/users?limit=100&sortOrder=Desc",
            sort_order=sort_order,
            limit=limit,
            handler=member_handler,
            handler_args=self
        )

        await pages.get_page()
        return pages

    async def get_audit_logs(self, action_filter: Actions = None, sort_order=SortOrder.Ascending, limit=100):
        parameters = {}
        if action_filter:
            parameters['actionType'] = action_filter

        pages = Pages(
            cso=self.cso,
            url=endpoint + f"/v1/groups/{self.id}/audit-log",
            handler=action_handler,
            extra_parameters=parameters,
            handler_args=self,
            limit=limit,
            sort_order=sort_order
        )

        await pages.get_page()
        return pages


class PartialGroup:
    """
    Represents a group with less information.

    Different information will be present here in different circumstances.
    If it was generated as a game owner, it might only contain an ID and a name.
    If it was generated from, let's say, groups/v2/users/userid/groups/roles, it'll also contain a member count.
    """
    def __init__(self, cso, data):
        self.cso = cso
        self.requests = cso.requests
        self.id = data["id"]
        self.name = data["name"]
        self.member_count = None
        if "memberCount" in data:
            self.member_count = data["memberCount"]

    async def expand(self):
        return self.cso.client.get_group(self.id)


class Member(BaseUser):
    """
    Represents a user in a group.

    Parameters
    ----------
    cso : ro_py.utilities.requests.Requests
            Requests object to use for API requests.
    user_id : int
            The id of a user.
    name : str
            The name of the user.
    group : ro_py.groups.Group
            The group the user is in.
    role : ro_py.roles.Role
            The role the user has is the group.
    """
    def __init__(self, cso, user_id, name, group, role):
        super().__init__(cso, user_id)
        self.name = name
        self.role = role
        self.group = group

    async def update_role(self):
        """
        Updates the role information of the user.

        Returns
        -------
        ro_py.roles.Role
        """
        member_req = await self.requests.get(
            url=endpoint + f"/v2/users/{self.id}/groups/roles"
        )
        data = member_req.json()
        for role in data['data']:
            if role['group']['id'] == self.group.id:
                self.role = Role(self.cso, self.group, role['role'])
                break
        return self.role

    async def change_rank(self, num) -> Tuple[Role, Role]:
        """
        Changes the users rank specified by a number.
        If num is 1 the users role will go up by 1.
        If num is -1 the users role will go down by 1.

        Parameters
        ----------
        num : int
                How much to change the rank by.
        """
        await self.update_role()
        roles = await self.group.get_roles()
        old_role = copy.copy(self.role)
        role_counter = -1
        for group_role in roles:
            role_counter += 1
            if group_role.rank == self.role.rank:
                break
        if not roles:
            raise NotFound(f"User {self.id} is not in group {self.group.id}")
        await self.setrank(roles[role_counter + num].id)
        self.role = roles[role_counter + num].id
        return old_role, roles[role_counter + num]

    async def promote(self):
        """
        Promotes the user.

        Returns
        -------
        int
        """
        return await self.change_rank(1)

    async def demote(self):
        """
        Demotes the user.

        Returns
        -------
        int
        """
        return await self.change_rank(-1)

    async def setrank(self, rank):
        """
        Sets the users role to specified role using rank id.

        Parameters
        ----------
        rank : int
                Rank id

        Returns
        -------
        bool
        """
        rank_request = await self.requests.patch(
            url=endpoint + f"/v1/groups/{self.group.id}/users/{self.id}",
            data={
                "roleId": rank
            }
        )
        return rank_request.status_code == 200

    async def setrole(self, role_num):
        """
         Sets the users role to specified role using role number (1-255).

         Parameters
         ----------
         role_num : int
                Role number (1-255)

         Returns
         -------
         bool
         """
        roles = await self.group.get_roles()
        rank_role = None
        for role in roles:
            if role.rank == role_num:
                rank_role = role
                break
        if not rank_role:
            raise NotFound(f"Role {role_num} not found")
        return await self.setrank(rank_role.id)

    async def exile(self):
        exile_req = await self.requests.delete(
            url=endpoint + f"/v1/groups/{self.group.id}/users/{self.id}"
        )
        return exile_req.status_code == 200


class Events:
    def __init__(self, cso, group):
        self.cso = cso
        self.group = group

    async def bind(self, func: Callable, event: EventTypes, delay: int = 15):
        """
        Binds a function to an event.

        Parameters
        ----------
        func : function
                Function that will be bound to the event.
        event : ro_py.events.EventTypes
                Event that will be bound to the function.
        delay : int
                How many seconds between each poll.
        """
        if event == EventTypes.on_join_request:
            event = Event(self.on_join_request, EventTypes.on_join_request, (func, None), delay)
            self.cso.event_handler.add_event(event)
        if event == EventTypes.on_wall_post:
            event = Event(self.on_wall_post, EventTypes.on_wall_post, (func, None), delay)
            self.cso.event_handler.add_event(event)
        if event == EventTypes.on_group_change:
            event = Event(self.on_group_change, EventTypes.on_group_change, (func, None), delay)
            self.cso.event_handler.add_event(event)
        await self.cso.event_handler.listen()

    async def on_join_request(self, func: Callable, old_req, event: Event):
        if not old_req:
            current_group_reqs = await self.group.get_join_requests()
            old_arguments = list(event.arguments)
            old_arguments[1] = current_group_reqs.data[0].requester.id
            return event.edit(arguments=tuple(old_arguments))

        current_group_reqs = await self.group.get_join_requests()
        current_group_reqs = current_group_reqs.data

        if current_group_reqs[0].requester.id != old_req:
            new_reqs = []

            for request in current_group_reqs:
                if request.requester.id == old_req:
                    break
                new_reqs.append(request)

            old_arguments = list(event.arguments)
            old_arguments[1] = current_group_reqs[0].requester.id
            event.edit(arguments=tuple(old_arguments))

            for new_req in new_reqs:
                asyncio.create_task(func(new_req))

    async def on_wall_post(self, func: Callable, newest_wall_post, event: Event):
        if not newest_wall_post:
            current_wall_posts = await self.group.wall.get_posts(sort_order=SortOrder.Descending)
            old_arguments = list(event.arguments)
            old_arguments[1] = current_wall_posts.data[0].id
            return event.edit(arguments=tuple(old_arguments))

        current_wall_posts = await self.group.wall.get_posts(sort_order=SortOrder.Descending)
        current_wall_posts = current_wall_posts.data

        post = current_wall_posts[0]
        if post.id != newest_wall_post:
            new_posts = []

            for post in current_wall_posts:
                if post.id == newest_wall_post:
                    break
                new_posts.append(post)

            old_arguments = list(event.arguments)
            old_arguments[1] = current_wall_posts[0].id
            event.edit(arguments=tuple(old_arguments))

            for new_post in new_posts:
                asyncio.create_task(func(new_post))

    async def on_group_change(self, func: Callable, current_group, event: Event):
        if not current_group:
            await self.group.update()
            old_arguments = list(event.arguments)
            old_arguments[1] = copy.copy(self.group)
            return event.edit(arguments=tuple(old_arguments))

        await self.group.update()

        has_changed = False
        for attr, value in current_group.__dict__.items():
            other_value = getattr(self.group, attr)
            if attr == "shout":
                if str(value) != str(other_value):
                    has_changed = True
                else:
                    continue
            if other_value != value:
                has_changed = True

        if has_changed:
            old_arguments = list(event.arguments)
            old_arguments[1] = copy.copy(self.group)
            event.edit(arguments=tuple(old_arguments))
            asyncio.create_task(func(current_group, self.group))

    """
    async def on_audit_log(self, func: Callable, delay: int):
        audit_log = await self.group.get_audit_logs()
        audit_log = audit_log.data[0]
        while True:
            await asyncio.sleep(delay)
            new_audit = await self.group.get_audit_logs()
            new_audits = []
            for audit in new_audit.data:
                if audit.created == audit_log.created:
                    print(audit.created, audit_log.created, audit.created == audit_log.created)
                    break
                else:
                    print(audit.created, audit_log.created)
                    new_audits.append(audit)
            if len(new_audits) > 0:
                audit_log = new_audit.data[0]
                for new in new_audits:
                    asyncio.create_task(func(new))
    """

Functions

def action_handler(cso, data, args)
Expand source code
def action_handler(cso, data, args):
    actions = []
    for action in data:
        actions.append(Action(cso, action, args))
    return actions
def join_request_handler(cso, data, args)
Expand source code
def join_request_handler(cso, data, args):
    join_requests = []
    for request in data:
        join_requests.append(JoinRequest(cso, request, args))
    return join_requests
def member_handler(cso, data, args)
Expand source code
def member_handler(cso, data, args):
    members = []
    for member in data:
        members.append(member)
    return members

Classes

class Action (cso, data, group)
Expand source code
class Action:
    def __init__(self, cso, data, group):
        self.group = group
        self.actor = Member(cso, data['actor']['user']['userId'], data['actor']['user']['username'], group, Role(cso, group, data['actor']['role']))
        self.action = data['actionType']
        self.created = iso8601.parse_date(data['created'])
        self.data = data['description']
class Actions (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code
class Actions(Enum):
    delete_post = "deletePost"
    remove_member = "removeMember"
    accept_join_request = "acceptJoinRequest"
    decline_join_request = "declineJoinRequest"
    post_shout = "postShout"
    change_rank = "changeRank"
    buy_ad = "buyAd"
    send_ally_request = "sendAllyRequest"
    create_enemy = "createEnemy"
    accept_ally_request = "acceptAllyRequest"
    decline_ally_request = "declineAllyRequest"
    delete_ally = "deleteAlly"
    add_group_place = "addGroupPlace"
    delete_group_place = "deleteGroupPlace"
    create_items = "createItems"
    configure_items = "configureItems"
    spend_group_funds = "spendGroupFunds"
    change_owner = "changeOwner"
    delete = "delete"
    adjust_currency_amounts = "adjustCurrencyAmounts"
    abandon = "abandon"
    claim = "claim"
    Rename = "rename"
    change_description = "changeDescription"
    create_group_asset = "createGroupAsset"
    upload_group_asset = "uploadGroupAsset"
    configure_group_asset = "configureGroupAsset"
    revert_group_asset = "revertGroupAsset"
    create_group_developer_product = "createGroupDeveloperProduct"
    configure_group_game = "configureGroupGame"
    lock = "lock"
    unlock = "unlock"
    create_game_pass = "createGamePass"
    create_badge = "createBadge"
    configure_badge = "configureBadge"
    save_place = "savePlace"
    publish_place = "publishPlace"
    invite_to_clan = "inviteToClan"
    kick_from_clan = "kickFromClan"
    cancel_clan_invite = "cancelClanInvite"
    buy_clan = "buyClan"

Ancestors

  • enum.Enum

Class variables

var Rename
var abandon
var accept_ally_request
var accept_join_request
var add_group_place
var adjust_currency_amounts
var buy_ad
var buy_clan
var cancel_clan_invite
var change_description
var change_owner
var change_rank
var claim
var configure_badge
var configure_group_asset
var configure_group_game
var configure_items
var create_badge
var create_enemy
var create_game_pass
var create_group_asset
var create_group_developer_product
var create_items
var decline_ally_request
var decline_join_request
var delete
var delete_ally
var delete_group_place
var delete_post
var invite_to_clan
var kick_from_clan
var lock
var post_shout
var publish_place
var remove_member
var revert_group_asset
var save_place
var send_ally_request
var spend_group_funds
var unlock
var upload_group_asset
class Events (cso, group)
Expand source code
class Events:
    def __init__(self, cso, group):
        self.cso = cso
        self.group = group

    async def bind(self, func: Callable, event: EventTypes, delay: int = 15):
        """
        Binds a function to an event.

        Parameters
        ----------
        func : function
                Function that will be bound to the event.
        event : ro_py.events.EventTypes
                Event that will be bound to the function.
        delay : int
                How many seconds between each poll.
        """
        if event == EventTypes.on_join_request:
            event = Event(self.on_join_request, EventTypes.on_join_request, (func, None), delay)
            self.cso.event_handler.add_event(event)
        if event == EventTypes.on_wall_post:
            event = Event(self.on_wall_post, EventTypes.on_wall_post, (func, None), delay)
            self.cso.event_handler.add_event(event)
        if event == EventTypes.on_group_change:
            event = Event(self.on_group_change, EventTypes.on_group_change, (func, None), delay)
            self.cso.event_handler.add_event(event)
        await self.cso.event_handler.listen()

    async def on_join_request(self, func: Callable, old_req, event: Event):
        if not old_req:
            current_group_reqs = await self.group.get_join_requests()
            old_arguments = list(event.arguments)
            old_arguments[1] = current_group_reqs.data[0].requester.id
            return event.edit(arguments=tuple(old_arguments))

        current_group_reqs = await self.group.get_join_requests()
        current_group_reqs = current_group_reqs.data

        if current_group_reqs[0].requester.id != old_req:
            new_reqs = []

            for request in current_group_reqs:
                if request.requester.id == old_req:
                    break
                new_reqs.append(request)

            old_arguments = list(event.arguments)
            old_arguments[1] = current_group_reqs[0].requester.id
            event.edit(arguments=tuple(old_arguments))

            for new_req in new_reqs:
                asyncio.create_task(func(new_req))

    async def on_wall_post(self, func: Callable, newest_wall_post, event: Event):
        if not newest_wall_post:
            current_wall_posts = await self.group.wall.get_posts(sort_order=SortOrder.Descending)
            old_arguments = list(event.arguments)
            old_arguments[1] = current_wall_posts.data[0].id
            return event.edit(arguments=tuple(old_arguments))

        current_wall_posts = await self.group.wall.get_posts(sort_order=SortOrder.Descending)
        current_wall_posts = current_wall_posts.data

        post = current_wall_posts[0]
        if post.id != newest_wall_post:
            new_posts = []

            for post in current_wall_posts:
                if post.id == newest_wall_post:
                    break
                new_posts.append(post)

            old_arguments = list(event.arguments)
            old_arguments[1] = current_wall_posts[0].id
            event.edit(arguments=tuple(old_arguments))

            for new_post in new_posts:
                asyncio.create_task(func(new_post))

    async def on_group_change(self, func: Callable, current_group, event: Event):
        if not current_group:
            await self.group.update()
            old_arguments = list(event.arguments)
            old_arguments[1] = copy.copy(self.group)
            return event.edit(arguments=tuple(old_arguments))

        await self.group.update()

        has_changed = False
        for attr, value in current_group.__dict__.items():
            other_value = getattr(self.group, attr)
            if attr == "shout":
                if str(value) != str(other_value):
                    has_changed = True
                else:
                    continue
            if other_value != value:
                has_changed = True

        if has_changed:
            old_arguments = list(event.arguments)
            old_arguments[1] = copy.copy(self.group)
            event.edit(arguments=tuple(old_arguments))
            asyncio.create_task(func(current_group, self.group))

    """
    async def on_audit_log(self, func: Callable, delay: int):
        audit_log = await self.group.get_audit_logs()
        audit_log = audit_log.data[0]
        while True:
            await asyncio.sleep(delay)
            new_audit = await self.group.get_audit_logs()
            new_audits = []
            for audit in new_audit.data:
                if audit.created == audit_log.created:
                    print(audit.created, audit_log.created, audit.created == audit_log.created)
                    break
                else:
                    print(audit.created, audit_log.created)
                    new_audits.append(audit)
            if len(new_audits) > 0:
                audit_log = new_audit.data[0]
                for new in new_audits:
                    asyncio.create_task(func(new))
    """

Methods

async def bind(self, func: Callable, event: EventTypes, delay: int = 15)

Binds a function to an event.

Parameters

func : function
Function that will be bound to the event.
event : EventTypes
Event that will be bound to the function.
delay : int
How many seconds between each poll.
Expand source code
async def bind(self, func: Callable, event: EventTypes, delay: int = 15):
    """
    Binds a function to an event.

    Parameters
    ----------
    func : function
            Function that will be bound to the event.
    event : ro_py.events.EventTypes
            Event that will be bound to the function.
    delay : int
            How many seconds between each poll.
    """
    if event == EventTypes.on_join_request:
        event = Event(self.on_join_request, EventTypes.on_join_request, (func, None), delay)
        self.cso.event_handler.add_event(event)
    if event == EventTypes.on_wall_post:
        event = Event(self.on_wall_post, EventTypes.on_wall_post, (func, None), delay)
        self.cso.event_handler.add_event(event)
    if event == EventTypes.on_group_change:
        event = Event(self.on_group_change, EventTypes.on_group_change, (func, None), delay)
        self.cso.event_handler.add_event(event)
    await self.cso.event_handler.listen()
async def on_group_change(self, func: Callable, current_group, event: Event)
Expand source code
async def on_group_change(self, func: Callable, current_group, event: Event):
    if not current_group:
        await self.group.update()
        old_arguments = list(event.arguments)
        old_arguments[1] = copy.copy(self.group)
        return event.edit(arguments=tuple(old_arguments))

    await self.group.update()

    has_changed = False
    for attr, value in current_group.__dict__.items():
        other_value = getattr(self.group, attr)
        if attr == "shout":
            if str(value) != str(other_value):
                has_changed = True
            else:
                continue
        if other_value != value:
            has_changed = True

    if has_changed:
        old_arguments = list(event.arguments)
        old_arguments[1] = copy.copy(self.group)
        event.edit(arguments=tuple(old_arguments))
        asyncio.create_task(func(current_group, self.group))
async def on_join_request(self, func: Callable, old_req, event: Event)
Expand source code
async def on_join_request(self, func: Callable, old_req, event: Event):
    if not old_req:
        current_group_reqs = await self.group.get_join_requests()
        old_arguments = list(event.arguments)
        old_arguments[1] = current_group_reqs.data[0].requester.id
        return event.edit(arguments=tuple(old_arguments))

    current_group_reqs = await self.group.get_join_requests()
    current_group_reqs = current_group_reqs.data

    if current_group_reqs[0].requester.id != old_req:
        new_reqs = []

        for request in current_group_reqs:
            if request.requester.id == old_req:
                break
            new_reqs.append(request)

        old_arguments = list(event.arguments)
        old_arguments[1] = current_group_reqs[0].requester.id
        event.edit(arguments=tuple(old_arguments))

        for new_req in new_reqs:
            asyncio.create_task(func(new_req))
async def on_wall_post(self, func: Callable, newest_wall_post, event: Event)
Expand source code
async def on_wall_post(self, func: Callable, newest_wall_post, event: Event):
    if not newest_wall_post:
        current_wall_posts = await self.group.wall.get_posts(sort_order=SortOrder.Descending)
        old_arguments = list(event.arguments)
        old_arguments[1] = current_wall_posts.data[0].id
        return event.edit(arguments=tuple(old_arguments))

    current_wall_posts = await self.group.wall.get_posts(sort_order=SortOrder.Descending)
    current_wall_posts = current_wall_posts.data

    post = current_wall_posts[0]
    if post.id != newest_wall_post:
        new_posts = []

        for post in current_wall_posts:
            if post.id == newest_wall_post:
                break
            new_posts.append(post)

        old_arguments = list(event.arguments)
        old_arguments[1] = current_wall_posts[0].id
        event.edit(arguments=tuple(old_arguments))

        for new_post in new_posts:
            asyncio.create_task(func(new_post))
class Group (cso, group_id)

Represents a group.

Expand source code
class Group(ClientObject):
    """
    Represents a group.
    """
    def __init__(self, cso, group_id):
        super().__init__()
        self.cso = cso
        """Client Shared Object"""
        self.requests = cso.requests
        "Requests object."
        self.id = group_id
        "Group ID."
        self.wall = Wall(self.cso, self)
        """Wall object."""
        self.name = None
        """Group name."""
        self.description = None
        """Group description."""
        self.owner = None
        """Group owner."""
        self.member_count = None
        """Group member count."""
        self.is_builders_club_only = None
        """True if the group is Builders Club (Premium) only. This seems to have been removed."""
        self.public_entry_allowed = None
        """Public entry allowed (private/public group)"""
        self.shout = None
        """Current group shout (Shout)"""
        self.events = Events(cso, self)
        """Events object."""
        self.is_locked = False
        """True if this is a locked group."""

    async def update(self):
        """
        Updates the group's information.
        """
        group_info_req = await self.requests.get(endpoint + f"/v1/groups/{self.id}")
        group_info = group_info_req.json()
        self.name = group_info["name"]
        self.description = group_info["description"]
        self.owner = await self.cso.client.get_user(group_info["owner"]["userId"])
        self.member_count = group_info["memberCount"]
        self.is_builders_club_only = group_info["isBuildersClubOnly"]
        self.public_entry_allowed = group_info["publicEntryAllowed"]
        if group_info.get('shout'):
            self.shout = Shout(self.cso, self, group_info['shout'])
        else:
            self.shout = None
        if "isLocked" in group_info:
            self.is_locked = group_info["isLocked"]

    async def update_shout(self, message):
        """
        Updates the shout of the group.
        DEPRECATED: Just call group.shout()

        Parameters
        ----------
        message : str
            Message that will overwrite the current shout of a group.

        Returns
        -------
        int
        """
        return await self.shout(message)

    async def get_roles(self):
        """
        Gets all roles of the group.

        Returns
        -------
        list
        """
        role_req = await self.requests.get(
            url=endpoint + f"/v1/groups/{self.id}/roles"
        )
        roles = []
        for role in role_req.json()['roles']:
            roles.append(Role(self.cso, self, role))
        return roles

    async def get_member_by_id(self, user_id):
        # Get list of group user is in.
        member_req = await self.requests.get(
            url=endpoint + f"/v2/users/{user_id}/groups/roles"
        )
        data = member_req.json()

        # Find group in list.
        group_data = None
        for group in data['data']:
            if group['group']['id'] == self.id:
                group_data = group
                break

        # Check if user is in group.
        if not group_data:
            raise NotFound(f"The user {user_id} was not found in group {self.id}")

        # Create data to return.
        role = Role(self.cso, self, group_data['role'])
        member = Member(self.cso, user_id, "", self, role)
        return member

    async def get_member_by_username(self, name):
        user = await self.cso.client.get_user_by_username(name)
        member_req = await self.requests.get(
            url=endpoint + f"/v2/users/{user.id}/groups/roles"
        )
        data = member_req.json()

        # Find group in list.
        group_data = None
        for group in data['data']:
            if group['group']['id'] == self.id:
                group_data = group
                break

        # Check if user is in group.
        if not group_data:
            raise NotFound(f"The user {name} was not found in group {self.id}")

        # Create data to return.
        role = Role(self.cso, self, group_data['role'])
        member = Member(self.cso, user.id, user.name, self, role)
        return member

    async def get_join_requests(self, sort_order=SortOrder.Ascending, limit=100):
        pages = Pages(
            cso=self.cso,
            url=endpoint + f"/v1/groups/{self.id}/join-requests",
            sort_order=sort_order,
            limit=limit,
            handler=join_request_handler,
            handler_args=self
        )
        await pages.get_page()
        return pages

    async def get_members(self, sort_order=SortOrder.Ascending, limit=100):
        pages = Pages(
            cso=self.cso,
            url=endpoint + f"/v1/groups/{self.id}/users?limit=100&sortOrder=Desc",
            sort_order=sort_order,
            limit=limit,
            handler=member_handler,
            handler_args=self
        )

        await pages.get_page()
        return pages

    async def get_audit_logs(self, action_filter: Actions = None, sort_order=SortOrder.Ascending, limit=100):
        parameters = {}
        if action_filter:
            parameters['actionType'] = action_filter

        pages = Pages(
            cso=self.cso,
            url=endpoint + f"/v1/groups/{self.id}/audit-log",
            handler=action_handler,
            extra_parameters=parameters,
            handler_args=self,
            limit=limit,
            sort_order=sort_order
        )

        await pages.get_page()
        return pages

Ancestors

Instance variables

var cso

Client Shared Object

var description

Group description.

var events

Events object.

var id

Group ID.

var is_builders_club_only

True if the group is Builders Club (Premium) only. This seems to have been removed.

var is_locked

True if this is a locked group.

var member_count

Group member count.

var name

Group name.

var owner

Group owner.

var public_entry_allowed

Public entry allowed (private/public group)

var requests

Requests object.

var shout

Current group shout (Shout)

var wall

Wall object.

Methods

async def get_audit_logs(self, action_filter: Actions = None, sort_order=SortOrder.Ascending, limit=100)
Expand source code
async def get_audit_logs(self, action_filter: Actions = None, sort_order=SortOrder.Ascending, limit=100):
    parameters = {}
    if action_filter:
        parameters['actionType'] = action_filter

    pages = Pages(
        cso=self.cso,
        url=endpoint + f"/v1/groups/{self.id}/audit-log",
        handler=action_handler,
        extra_parameters=parameters,
        handler_args=self,
        limit=limit,
        sort_order=sort_order
    )

    await pages.get_page()
    return pages
async def get_join_requests(self, sort_order=SortOrder.Ascending, limit=100)
Expand source code
async def get_join_requests(self, sort_order=SortOrder.Ascending, limit=100):
    pages = Pages(
        cso=self.cso,
        url=endpoint + f"/v1/groups/{self.id}/join-requests",
        sort_order=sort_order,
        limit=limit,
        handler=join_request_handler,
        handler_args=self
    )
    await pages.get_page()
    return pages
async def get_member_by_id(self, user_id)
Expand source code
async def get_member_by_id(self, user_id):
    # Get list of group user is in.
    member_req = await self.requests.get(
        url=endpoint + f"/v2/users/{user_id}/groups/roles"
    )
    data = member_req.json()

    # Find group in list.
    group_data = None
    for group in data['data']:
        if group['group']['id'] == self.id:
            group_data = group
            break

    # Check if user is in group.
    if not group_data:
        raise NotFound(f"The user {user_id} was not found in group {self.id}")

    # Create data to return.
    role = Role(self.cso, self, group_data['role'])
    member = Member(self.cso, user_id, "", self, role)
    return member
async def get_member_by_username(self, name)
Expand source code
async def get_member_by_username(self, name):
    user = await self.cso.client.get_user_by_username(name)
    member_req = await self.requests.get(
        url=endpoint + f"/v2/users/{user.id}/groups/roles"
    )
    data = member_req.json()

    # Find group in list.
    group_data = None
    for group in data['data']:
        if group['group']['id'] == self.id:
            group_data = group
            break

    # Check if user is in group.
    if not group_data:
        raise NotFound(f"The user {name} was not found in group {self.id}")

    # Create data to return.
    role = Role(self.cso, self, group_data['role'])
    member = Member(self.cso, user.id, user.name, self, role)
    return member
async def get_members(self, sort_order=SortOrder.Ascending, limit=100)
Expand source code
async def get_members(self, sort_order=SortOrder.Ascending, limit=100):
    pages = Pages(
        cso=self.cso,
        url=endpoint + f"/v1/groups/{self.id}/users?limit=100&sortOrder=Desc",
        sort_order=sort_order,
        limit=limit,
        handler=member_handler,
        handler_args=self
    )

    await pages.get_page()
    return pages
async def get_roles(self)

Gets all roles of the group.

Returns

list
 
Expand source code
async def get_roles(self):
    """
    Gets all roles of the group.

    Returns
    -------
    list
    """
    role_req = await self.requests.get(
        url=endpoint + f"/v1/groups/{self.id}/roles"
    )
    roles = []
    for role in role_req.json()['roles']:
        roles.append(Role(self.cso, self, role))
    return roles
async def update(self)

Updates the group's information.

Expand source code
async def update(self):
    """
    Updates the group's information.
    """
    group_info_req = await self.requests.get(endpoint + f"/v1/groups/{self.id}")
    group_info = group_info_req.json()
    self.name = group_info["name"]
    self.description = group_info["description"]
    self.owner = await self.cso.client.get_user(group_info["owner"]["userId"])
    self.member_count = group_info["memberCount"]
    self.is_builders_club_only = group_info["isBuildersClubOnly"]
    self.public_entry_allowed = group_info["publicEntryAllowed"]
    if group_info.get('shout'):
        self.shout = Shout(self.cso, self, group_info['shout'])
    else:
        self.shout = None
    if "isLocked" in group_info:
        self.is_locked = group_info["isLocked"]
async def update_shout(self, message)

Updates the shout of the group. DEPRECATED: Just call group.shout()

Parameters

message : str
Message that will overwrite the current shout of a group.

Returns

int
 
Expand source code
async def update_shout(self, message):
    """
    Updates the shout of the group.
    DEPRECATED: Just call group.shout()

    Parameters
    ----------
    message : str
        Message that will overwrite the current shout of a group.

    Returns
    -------
    int
    """
    return await self.shout(message)
class JoinRequest (cso, data, group)
Expand source code
class JoinRequest:
    def __init__(self, cso, data, group):
        self.requests = cso.requests
        self.group = group
        self.requester = PartialUser(cso, data['requester'])
        self.created = iso8601.parse_date(data['created'])

    async def accept(self):
        accept_req = await self.requests.post(
            url=endpoint + f"/v1/groups/{self.group.id}/join-requests/users/{self.requester.id}"
        )
        return accept_req.status_code == 200

    async def decline(self):
        accept_req = await self.requests.delete(
            url=endpoint + f"/v1/groups/{self.group.id}/join-requests/users/{self.requester.id}"
        )
        return accept_req.status_code == 200

Methods

async def accept(self)
Expand source code
async def accept(self):
    accept_req = await self.requests.post(
        url=endpoint + f"/v1/groups/{self.group.id}/join-requests/users/{self.requester.id}"
    )
    return accept_req.status_code == 200
async def decline(self)
Expand source code
async def decline(self):
    accept_req = await self.requests.delete(
        url=endpoint + f"/v1/groups/{self.group.id}/join-requests/users/{self.requester.id}"
    )
    return accept_req.status_code == 200
class Member (cso, user_id, name, group, role)

Represents a user in a group.

Parameters

cso : Requests
Requests object to use for API requests.
user_id : int
The id of a user.
name : str
The name of the user.
group : Group
The group the user is in.
role : Role
The role the user has is the group.
Expand source code
class Member(BaseUser):
    """
    Represents a user in a group.

    Parameters
    ----------
    cso : ro_py.utilities.requests.Requests
            Requests object to use for API requests.
    user_id : int
            The id of a user.
    name : str
            The name of the user.
    group : ro_py.groups.Group
            The group the user is in.
    role : ro_py.roles.Role
            The role the user has is the group.
    """
    def __init__(self, cso, user_id, name, group, role):
        super().__init__(cso, user_id)
        self.name = name
        self.role = role
        self.group = group

    async def update_role(self):
        """
        Updates the role information of the user.

        Returns
        -------
        ro_py.roles.Role
        """
        member_req = await self.requests.get(
            url=endpoint + f"/v2/users/{self.id}/groups/roles"
        )
        data = member_req.json()
        for role in data['data']:
            if role['group']['id'] == self.group.id:
                self.role = Role(self.cso, self.group, role['role'])
                break
        return self.role

    async def change_rank(self, num) -> Tuple[Role, Role]:
        """
        Changes the users rank specified by a number.
        If num is 1 the users role will go up by 1.
        If num is -1 the users role will go down by 1.

        Parameters
        ----------
        num : int
                How much to change the rank by.
        """
        await self.update_role()
        roles = await self.group.get_roles()
        old_role = copy.copy(self.role)
        role_counter = -1
        for group_role in roles:
            role_counter += 1
            if group_role.rank == self.role.rank:
                break
        if not roles:
            raise NotFound(f"User {self.id} is not in group {self.group.id}")
        await self.setrank(roles[role_counter + num].id)
        self.role = roles[role_counter + num].id
        return old_role, roles[role_counter + num]

    async def promote(self):
        """
        Promotes the user.

        Returns
        -------
        int
        """
        return await self.change_rank(1)

    async def demote(self):
        """
        Demotes the user.

        Returns
        -------
        int
        """
        return await self.change_rank(-1)

    async def setrank(self, rank):
        """
        Sets the users role to specified role using rank id.

        Parameters
        ----------
        rank : int
                Rank id

        Returns
        -------
        bool
        """
        rank_request = await self.requests.patch(
            url=endpoint + f"/v1/groups/{self.group.id}/users/{self.id}",
            data={
                "roleId": rank
            }
        )
        return rank_request.status_code == 200

    async def setrole(self, role_num):
        """
         Sets the users role to specified role using role number (1-255).

         Parameters
         ----------
         role_num : int
                Role number (1-255)

         Returns
         -------
         bool
         """
        roles = await self.group.get_roles()
        rank_role = None
        for role in roles:
            if role.rank == role_num:
                rank_role = role
                break
        if not rank_role:
            raise NotFound(f"Role {role_num} not found")
        return await self.setrank(rank_role.id)

    async def exile(self):
        exile_req = await self.requests.delete(
            url=endpoint + f"/v1/groups/{self.group.id}/users/{self.id}"
        )
        return exile_req.status_code == 200

Ancestors

Methods

async def change_rank(self, num) ‑> Tuple[RoleRole]

Changes the users rank specified by a number. If num is 1 the users role will go up by 1. If num is -1 the users role will go down by 1.

Parameters

num : int
How much to change the rank by.
Expand source code
async def change_rank(self, num) -> Tuple[Role, Role]:
    """
    Changes the users rank specified by a number.
    If num is 1 the users role will go up by 1.
    If num is -1 the users role will go down by 1.

    Parameters
    ----------
    num : int
            How much to change the rank by.
    """
    await self.update_role()
    roles = await self.group.get_roles()
    old_role = copy.copy(self.role)
    role_counter = -1
    for group_role in roles:
        role_counter += 1
        if group_role.rank == self.role.rank:
            break
    if not roles:
        raise NotFound(f"User {self.id} is not in group {self.group.id}")
    await self.setrank(roles[role_counter + num].id)
    self.role = roles[role_counter + num].id
    return old_role, roles[role_counter + num]
async def demote(self)

Demotes the user.

Returns

int
 
Expand source code
async def demote(self):
    """
    Demotes the user.

    Returns
    -------
    int
    """
    return await self.change_rank(-1)
async def exile(self)
Expand source code
async def exile(self):
    exile_req = await self.requests.delete(
        url=endpoint + f"/v1/groups/{self.group.id}/users/{self.id}"
    )
    return exile_req.status_code == 200
async def promote(self)

Promotes the user.

Returns

int
 
Expand source code
async def promote(self):
    """
    Promotes the user.

    Returns
    -------
    int
    """
    return await self.change_rank(1)
async def setrank(self, rank)

Sets the users role to specified role using rank id.

Parameters

rank : int
Rank id

Returns

bool
 
Expand source code
async def setrank(self, rank):
    """
    Sets the users role to specified role using rank id.

    Parameters
    ----------
    rank : int
            Rank id

    Returns
    -------
    bool
    """
    rank_request = await self.requests.patch(
        url=endpoint + f"/v1/groups/{self.group.id}/users/{self.id}",
        data={
            "roleId": rank
        }
    )
    return rank_request.status_code == 200
async def setrole(self, role_num)

Sets the users role to specified role using role number (1-255).

Parameters

role_num : int
Role number (1-255)

Returns

bool
 
Expand source code
async def setrole(self, role_num):
    """
     Sets the users role to specified role using role number (1-255).

     Parameters
     ----------
     role_num : int
            Role number (1-255)

     Returns
     -------
     bool
     """
    roles = await self.group.get_roles()
    rank_role = None
    for role in roles:
        if role.rank == role_num:
            rank_role = role
            break
    if not rank_role:
        raise NotFound(f"Role {role_num} not found")
    return await self.setrank(rank_role.id)
async def update_role(self)

Updates the role information of the user.

Returns

Role
 
Expand source code
async def update_role(self):
    """
    Updates the role information of the user.

    Returns
    -------
    ro_py.roles.Role
    """
    member_req = await self.requests.get(
        url=endpoint + f"/v2/users/{self.id}/groups/roles"
    )
    data = member_req.json()
    for role in data['data']:
        if role['group']['id'] == self.group.id:
            self.role = Role(self.cso, self.group, role['role'])
            break
    return self.role

Inherited members

class PartialGroup (cso, data)

Represents a group with less information.

Different information will be present here in different circumstances. If it was generated as a game owner, it might only contain an ID and a name. If it was generated from, let's say, groups/v2/users/userid/groups/roles, it'll also contain a member count.

Expand source code
class PartialGroup:
    """
    Represents a group with less information.

    Different information will be present here in different circumstances.
    If it was generated as a game owner, it might only contain an ID and a name.
    If it was generated from, let's say, groups/v2/users/userid/groups/roles, it'll also contain a member count.
    """
    def __init__(self, cso, data):
        self.cso = cso
        self.requests = cso.requests
        self.id = data["id"]
        self.name = data["name"]
        self.member_count = None
        if "memberCount" in data:
            self.member_count = data["memberCount"]

    async def expand(self):
        return self.cso.client.get_group(self.id)

Methods

async def expand(self)
Expand source code
async def expand(self):
    return self.cso.client.get_group(self.id)
class Shout (cso, group, shout_data)

Represents a group shout.

Expand source code
class Shout:
    """
    Represents a group shout.
    """
    def __init__(self, cso, group, shout_data):
        self.cso = cso
        self.requests = cso.requests
        self.group = group
        self.data = shout_data
        self.body = shout_data["body"]
        self.created = iso8601.parse_date(shout_data["created"])
        self.updated = iso8601.parse_date(shout_data["updated"])

        # TODO: Make this a PartialUser
        self.poster = None

    def __str__(self):
        return self.body

    async def __call__(self, message):
        """
        Updates the shout of the group.
        Please note that doing so will completely delete this Shout object and return a new Shout object.
        The parent group's shout parameter will also be updated accordingly.

        Parameters
        ----------
        message : str
            Message that will overwrite the current shout of a group.

        Returns
        -------
        ro_py.groups.Shout

        """
        shout_req = await self.requests.patch(
            url=endpoint + f"/v1/groups/{self.group.id}/status",
            data={
                "message": message
            }
        )
        self.group.shout = Shout(self.cso, self.group, shout_req.json())
        return self.group.shout