~robin_jadoul/organizers-bot

8c0388959ca51bb9214c520faeb7b2941fafe405 — Matteo Rizzo 1 year, 11 months ago 7aa82e4
Don't recreate the S3 client for every file
1 files changed, 61 insertions(+), 72 deletions(-)

M organizers_bot/transcript.py
M organizers_bot/transcript.py => organizers_bot/transcript.py +61 -72
@@ 72,10 72,15 @@ class TranscriptManager:
        self.existing_assets = set()

    async def create(self, category: discord.CategoryChannel, ctx: discord_slash.SlashContext):
        self.log.info("Creating transcript for %s", category.name)
        trans = Transcript(self, category, ctx)
        await trans.build()
        await trans.sync_to_archive()
        session = aiobotocore.get_session()
        async with session.create_client('s3',
                endpoint_url='https://s3.us-west-002.backblazeb2.com',
                aws_access_key_id = config.s3.keyID,
                aws_secret_access_key = config.s3.key) as s3:
            self.log.info("Creating transcript for %s", category.name)
            trans = Transcript(self, category, ctx)
            await trans.build(s3)
            await trans.sync_to_archive()

    def get_target_path(self, url: str) -> str:
        discord_parsed = parse.urlparse(url)


@@ 83,55 88,39 @@ class TranscriptManager:
        target_path = os.path.join("assets", target_path)
        return target_path

    async def save_contents(self, target_path: str, contents: bytes):
        session = aiobotocore.get_session()
        async with session.create_client('s3',
                endpoint_url='https://s3.us-west-002.backblazeb2.com',
                aws_access_key_id = config.s3.keyID,
                aws_secret_access_key = config.s3.key) as s3:
            sha1 = hashlib.sha1(contents).hexdigest()
            # delete any pre-existing assets.
            existing_ok = False
            self.log.info("Saving asset to %s", target_path)
            response: dict = await s3.list_objects_v2(Bucket=config.s3.bucket_name, Prefix=target_path)
            for existing in response.get("Contents", []):
                # self.log.info("Have existing %s", existing)
                key = existing["Key"]
                obj = await s3.head_object(
    async def save_contents(self, target_path: str, contents: bytes, s3):
        sha1 = hashlib.sha1(contents).hexdigest()
        # delete any pre-existing assets.
        existing_ok = False
        self.log.info("Saving asset to %s", target_path)
        response: dict = await s3.list_objects_v2(Bucket=config.s3.bucket_name, Prefix=target_path)
        for existing in response.get("Contents", []):
            # self.log.info("Have existing %s", existing)
            key = existing["Key"]
            obj = await s3.head_object(
                Bucket=config.s3.bucket_name,
                Key=key
            )
            # self.log.info("Have existing obj: %s", obj)
            existing_sha1 = None
            if "Metadata" in obj:
                if "sha1" in obj["Metadata"]:
                    existing_sha1 = obj["Metadata"]["sha1"]
            if existing_sha1 != sha1:
                self.log.info("Deleting out of date %s", target_path)
                versions: dict = await s3.list_object_versions(
                    Bucket=config.s3.bucket_name,
                    Key=key
                    Prefix=target_path
                )
                # self.log.info("Have existing obj: %s", obj)
                existing_sha1 = None
                if "Metadata" in obj:
                    if "sha1" in obj["Metadata"]:
                        existing_sha1 = obj["Metadata"]["sha1"]
                if existing_sha1 != sha1:
                    self.log.info("Deleting out of date %s", target_path)
                    versions: dict = await s3.list_object_versions(
                        Bucket=config.s3.bucket_name,
                        Prefix=target_path
                    )
                    for version in versions.get("Versions", []) + versions.get("DeleteMarkers", []):
                        await s3.delete_object(Bucket=config.s3.bucket_name, Key=key, VersionId=version["VersionId"])
                else:
                    self.log.info("Found existing one: %s (%s, %s)", target_path, existing_sha1, sha1)
                    return key

            # async for existing in self.s3_bucket.file_names(backblaze.settings.FileSettings(None, prefix=target_path)):
            #     filem, file, _ = existing
            #     if sha1 != filem.content_sha1:
            #         await file.delete()
            #     else:
            #         return filem.file_name
            # filem, file = await self.s3_bucket.upload(backblaze.settings.UploadSettings(target_path), contents)
            filename = os.path.basename(target_path)
            # self.log.info("type: %s, dir: %s", type(self.s3), dir(self.s3))
            await s3.put_object(Bucket=config.s3.bucket_name, Key=target_path, Body=contents, Metadata={"sha1" : sha1})
            # self.log.info("Finished upload to %s", target_path)
            # self.log.info("Uploaded to %s", file.file_id)

    async def save_asset(self, discord_url: discord.Asset) -> str:
                for version in versions.get("Versions", []) + versions.get("DeleteMarkers", []):
                    await s3.delete_object(Bucket=config.s3.bucket_name, Key=key, VersionId=version["VersionId"])
            else:
                self.log.info("Found existing one: %s (%s, %s)", target_path, existing_sha1, sha1)
                return key

        await s3.put_object(Bucket=config.s3.bucket_name, Key=target_path, Body=contents, Metadata={"sha1" : sha1})

    async def save_asset(self, discord_url: discord.Asset, s3) -> str:
        """Save an asset found at discord_url to assets/path_from_discord_url.
        Returns the URL to access the asset at.



@@ 156,10 145,10 @@ class TranscriptManager:
        # self.log.info("Uploading from %s to %s", discord_url, target_path)
        # response = self.session.get(discord_url)
        contents = await discord_url.read()
        await self.save_contents(target_path, contents)
        await self.save_contents(target_path, contents, s3)
        return target_path

    async def save_url(self, url: str, target_path=None) -> str:
    async def save_url(self, url: str, s3, target_path=None) -> str:
        if target_path is None:
            target_path = self.get_target_path(url)
        if target_path in self.existing_assets:


@@ 171,10 160,10 @@ class TranscriptManager:
            resp.raise_for_status()
            contents = await resp.content.read()
            log.info("Downloaded contents %s: %d", url, len(contents))
            await self.save_contents(target_path, contents)
            await self.save_contents(target_path, contents, s3)
            return target_path

    async def save_msg_contents(self, message: discord.message.Message, msg: dict) -> dict:
    async def save_msg_contents(self, message: discord.message.Message, msg: dict, s3) -> dict:
        """Saves any contents found in message and changes the url if need be.

        Parameters


@@ 188,16 177,16 @@ class TranscriptManager:
            [description]
        """
        author: discord.User = message.author
        await self.save_asset(author.avatar_url_as(static_format="png"))
        await self.save_asset(author.avatar_url_as(static_format="png"), s3)
        #todo save emojis!
        if "sticker_items" in msg:
            for idx, sticker in enumerate(msg["sticker_items"]):
                # log.info("Have sticker: %s, %s", sticker, sticker.image_url_as())
                url = f"https://media.discordapp.net/stickers/{sticker['id']}.png?size=256&passthrough=false"
                new_url = await self.save_url(url)
                new_url = await self.save_url(url, s3)
                msg["sticker_items"][idx]["url"] = new_url
        for idx, attachment in enumerate(message.attachments):
            new_url = await self.save_url(attachment.url)
            new_url = await self.save_url(attachment.url, s3)
            msg["attachments"][idx]["proxy_url"] = new_url
            msg["attachments"][idx]["url"] = new_url
        for idx, embed in enumerate(message.embeds):


@@ 214,28 203,28 @@ class TranscriptManager:
            embed_dict = msg["embeds"][idx]
            if embed.video.url is not discord.Embed.Empty:
                video_path = os.path.join(target_path, "video.mp4")
                new_url = await self.save_url(embed.video.url, video_path)
                new_url = await self.save_url(embed.video.url, s3, video_path)
                embed_dict["video"]["url"] = new_url
            if embed.thumbnail.proxy_url is not discord.Embed.Empty:
                new_url = await self.save_url(embed.thumbnail.proxy_url, os.path.join(target_path, "thumbnail.png"))
                new_url = await self.save_url(embed.thumbnail.proxy_url, s3, os.path.join(target_path, "thumbnail.png"))
                embed_dict["thumbnail"]["url"] = new_url
                embed_dict["thumbnail"]["proxy_url"] = new_url
            if embed.image.proxy_url is not discord.Embed.Empty:
                # self.log.info("Embed image: %s", embed.image)
                new_url = await self.save_url(embed.image.proxy_url, os.path.join(target_path, "image.png"))
                new_url = await self.save_url(embed.image.proxy_url, s3, os.path.join(target_path, "image.png"))
                embed_dict["image"]["url"] = new_url
                embed_dict["image"]["proxy_url"] = new_url
            msg["embeds"][idx] = embed_dict
        for idx, reaction in enumerate(message.reactions):
            if reaction.custom_emoji:
                new_url = await self.save_asset(reaction.emoji.url_as())
                new_url = await self.save_asset(reaction.emoji.url_as(), s3)
                msg["reactions"][idx]["emoji"]["url"] = new_url
        return msg

    async def save_json(self, data, filepath):
    async def save_json(self, data, filepath, s3):
        # self.log.info("Saving json to %s", filepath)
        json_data = json.dumps(data).encode("utf8")
        await self.save_contents(filepath, json_data)
        await self.save_contents(filepath, json_data, s3)


class Transcript:


@@ 268,7 257,7 @@ class Transcript:
        else:
            await self.status_msg.edit(content=status_msg)

    async def build_messages(self, channel: discord.TextChannel) -> list:
    async def build_messages(self, channel: discord.TextChannel, s3) -> list:
        """Builds a list of message json objects, that are found inside channel.
        It also downloads any found attachments to s3 and replaces the links to them.



@@ 291,17 280,17 @@ class Transcript:
            changed_msgs = []
            channel_meta = os.path.join(channel_folder, "meta.json")
            channel_json = await self.http.get_channel(channel.id)
            await self.mgr.save_json(channel_json, channel_meta)
            await self.mgr.save_json(channel_json, channel_meta, s3)
            async for item in json_history(channel, oldest_first=True):
                message, data = item
                # self.log.info("Retrieved message: %s (%s)", message, data)
                changed = await self.mgr.save_msg_contents(message, copy.deepcopy(data))
                changed = await self.mgr.save_msg_contents(message, copy.deepcopy(data), s3)
                changed_msgs.append(changed)
                og_msgs.append(data)
            messages_path = os.path.join(channel_folder, "messages.json")
            await self.mgr.save_json(changed_msgs, messages_path)
            await self.mgr.save_json(changed_msgs, messages_path, s3)
            orig_path = os.path.join(channel_folder, "messages.orig.json")
            await self.mgr.save_json(og_msgs, orig_path)
            await self.mgr.save_json(og_msgs, orig_path, s3)
        except Exception as e:
            log.exception("Failed to build transcript for channel %s", channel.name)
            await self.ctx.channel.send(f"Failed to build transcript for channel {channel.name}: {e}")


@@ 312,16 301,16 @@ class Transcript:
        # with open("originals.json", "w") as f:
        #     json.dump(og_msgs, f, indent=4, sort_keys=True)

    async def build(self):
    async def build(self, s3):
        self.log.info("Building Transcript")
        await self.update_status("Building Transcript")
        try:
            category_channel = await self.http.get_channel(self.category.id)
            category_json = os.path.join(self.json_folder, "meta.json")
            await self.mgr.save_json(category_channel, category_json)
            await self.mgr.save_json(category_channel, category_json, s3)
            channel_waits = []
            for channel in self.category.channels:
                channel_waits.append(self.build_messages(channel))
                channel_waits.append(self.build_messages(channel, s3))
            await asyncio.gather(*channel_waits)
        except:
            log.exception("Failed to build transcript")