~ihabunek/toot unlisted

ref: a4c3d03b85081ef575679f20d147bba487a8e7b0 toot/toot/commands.py -rw-r--r-- 9.9 KiB View raw
a4c3d03bIvan Habunek Fix build on sr.ht 3 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# -*- coding: utf-8 -*-

import sys

from toot import api, config
from toot.auth import login_interactive, login_browser_interactive, create_app_interactive
from toot.exceptions import ConsoleError, NotFoundError
from toot.output import (print_out, print_instance, print_account,
                         print_search_results, print_timeline, print_notifications)
from toot.utils import assert_domain_exists, editor_input, multiline_input, EOF_KEY


def get_timeline_generator(app, user, args):
    # Make sure tag, list and public are not used simultaneously
    if len([arg for arg in [args.tag, args.list, args.public] if arg]) > 1:
        raise ConsoleError("Only one of --public, --tag, or --list can be used at one time.")

    if args.local and not (args.public or args.tag):
        raise ConsoleError("The --local option is only valid alongside --public or --tag.")

    if args.instance and not (args.public or args.tag):
        raise ConsoleError("The --instance option is only valid alongside --public or --tag.")

    if args.public:
        instance = args.instance or app.instance
        return api.public_timeline_generator(instance, local=args.local, limit=args.count)
    elif args.tag:
        instance = args.instance or app.instance
        return api.tag_timeline_generator(instance, args.tag, local=args.local, limit=args.count)
    elif args.list:
        return api.timeline_list_generator(app, user, args.list, limit=args.count)
    else:
        return api.home_timeline_generator(app, user, limit=args.count)


def timeline(app, user, args):
    generator = get_timeline_generator(app, user, args)

    while(True):
        try:
            items = next(generator)
        except StopIteration:
            print_out("That's all folks.")
            return

        if args.reverse:
            items = reversed(items)

        print_timeline(items)

        if args.once:
            break

        char = input("\nContinue? [Y/n] ")
        if char.lower() == "n":
            break


def thread(app, user, args):
    toot = api.single_status(app, user, args.status_id)
    context = api.context(app, user, args.status_id)
    thread = []
    for item in context['ancestors']:
        thread.append(item)

    thread.append(toot)

    for item in context['descendants']:
        thread.append(item)

    print_timeline(thread)


def curses(app, user, args):
    generator = get_timeline_generator(app, user, args)
    from toot.ui.app import TimelineApp
    TimelineApp(app, user, generator).run()


def post(app, user, args):
    # TODO: this might be achievable, explore options
    if args.editor and not sys.stdin.isatty():
        raise ConsoleError("Cannot run editor if not in tty.")

    if args.media and len(args.media) > 4:
        raise ConsoleError("Cannot attach more than 4 files.")

    # Read any text that might be piped to stdin
    if not args.text and not sys.stdin.isatty():
        args.text = sys.stdin.read().rstrip()

    if args.media:
        media = [_do_upload(app, user, file) for file in args.media]
        media_ids = [m["id"] for m in media]
    else:
        media = None
        media_ids = None

    if media and not args.text:
        args.text = "\n".join(m['text_url'] for m in media)

    if args.editor:
        args.text = editor_input(args.editor, args.text)
    elif not args.text:
        print_out("Write or paste your toot. Press <yellow>{}</yellow> to post it.".format(EOF_KEY))
        args.text = multiline_input()

    if not args.text:
        raise ConsoleError("You must specify either text or media to post.")

    response = api.post_status(
        app, user, args.text,
        visibility=args.visibility,
        media_ids=media_ids,
        sensitive=args.sensitive,
        spoiler_text=args.spoiler_text,
        in_reply_to_id=args.reply_to,
        language=args.language,
    )

    print_out("Toot posted: <green>{}</green>".format(response.get('url')))


def delete(app, user, args):
    api.delete_status(app, user, args.status_id)
    print_out("<green>✓ Status deleted</green>")


def favourite(app, user, args):
    api.favourite(app, user, args.status_id)
    print_out("<green>✓ Status favourited</green>")


def unfavourite(app, user, args):
    api.unfavourite(app, user, args.status_id)
    print_out("<green>✓ Status unfavourited</green>")


def reblog(app, user, args):
    api.reblog(app, user, args.status_id)
    print_out("<green>✓ Status reblogged</green>")


def unreblog(app, user, args):
    api.unreblog(app, user, args.status_id)
    print_out("<green>✓ Status unreblogged</green>")


def pin(app, user, args):
    api.pin(app, user, args.status_id)
    print_out("<green>✓ Status pinned</green>")


def unpin(app, user, args):
    api.unpin(app, user, args.status_id)
    print_out("<green>✓ Status unpinned</green>")


def reblogged_by(app, user, args):
    for account in api.reblogged_by(app, user, args.status_id):
        print_out("{}\n @{}".format(account['display_name'], account['acct']))


def auth(app, user, args):
    config_data = config.load_config()

    if not config_data["users"]:
        print_out("You are not logged in to any accounts")
        return

    active_user = config_data["active_user"]

    print_out("Authenticated accounts:")
    for uid, u in config_data["users"].items():
        active_label = "ACTIVE" if active_user == uid else ""
        print_out("* <green>{}</green> <yellow>{}</yellow>".format(uid, active_label))

    path = config.get_config_file_path()
    print_out("\nAuth tokens are stored in: <blue>{}</blue>".format(path))


def login_cli(app, user, args):
    app = create_app_interactive(instance=args.instance, scheme=args.scheme)
    login_interactive(app, args.email)

    print_out()
    print_out("<green>✓ Successfully logged in.</green>")


def login(app, user, args):
    app = create_app_interactive(instance=args.instance, scheme=args.scheme)
    login_browser_interactive(app)

    print_out()
    print_out("<green>✓ Successfully logged in.</green>")


def logout(app, user, args):
    user = config.load_user(args.account, throw=True)
    config.delete_user(user)
    print_out("<green>✓ User {} logged out</green>".format(config.user_id(user)))


def activate(app, user, args):
    user = config.load_user(args.account, throw=True)
    config.activate_user(user)
    print_out("<green>✓ User {} active</green>".format(config.user_id(user)))


def upload(app, user, args):
    response = _do_upload(app, user, args.file)

    msg = "Successfully uploaded media ID <yellow>{}</yellow>, type '<yellow>{}</yellow>'"

    print_out()
    print_out(msg.format(response['id'], response['type']))
    print_out("Original URL: <green>{}</green>".format(response['url']))
    print_out("Preview URL:  <green>{}</green>".format(response['preview_url']))
    print_out("Text URL:     <green>{}</green>".format(response['text_url']))


def search(app, user, args):
    response = api.search(app, user, args.query, args.resolve)
    print_search_results(response)


def _do_upload(app, user, file):
    print_out("Uploading media: <green>{}</green>".format(file.name))
    return api.upload_media(app, user, file)


def _find_account(app, user, account_name):
    """For a given account name, returns the Account object.

    Raises an exception if not found.
    """
    if not account_name:
        raise ConsoleError("Empty account name given")

    accounts = api.search_accounts(app, user, account_name)

    if account_name[0] == "@":
        account_name = account_name[1:]

    for account in accounts:
        if account['acct'] == account_name:
            return account

    raise ConsoleError("Account not found")


def follow(app, user, args):
    account = _find_account(app, user, args.account)
    api.follow(app, user, account['id'])
    print_out("<green>✓ You are now following {}</green>".format(args.account))


def unfollow(app, user, args):
    account = _find_account(app, user, args.account)
    api.unfollow(app, user, account['id'])
    print_out("<green>✓ You are no longer following {}</green>".format(args.account))


def mute(app, user, args):
    account = _find_account(app, user, args.account)
    api.mute(app, user, account['id'])
    print_out("<green>✓ You have muted {}</green>".format(args.account))


def unmute(app, user, args):
    account = _find_account(app, user, args.account)
    api.unmute(app, user, account['id'])
    print_out("<green>✓ {} is no longer muted</green>".format(args.account))


def block(app, user, args):
    account = _find_account(app, user, args.account)
    api.block(app, user, account['id'])
    print_out("<green>✓ You are now blocking {}</green>".format(args.account))


def unblock(app, user, args):
    account = _find_account(app, user, args.account)
    api.unblock(app, user, account['id'])
    print_out("<green>✓ {} is no longer blocked</green>".format(args.account))


def whoami(app, user, args):
    account = api.verify_credentials(app, user)
    print_account(account)


def whois(app, user, args):
    account = _find_account(app, user, args.account)
    print_account(account)


def instance(app, user, args):
    name = args.instance or (app and app.instance)
    if not name:
        raise ConsoleError("Please specify instance name.")

    assert_domain_exists(name)

    try:
        instance = api.get_instance(name, args.scheme)
        print_instance(instance)
    except NotFoundError:
        raise ConsoleError(
            "Instance not found at {}.\n"
            "The given domain probably does not host a Mastodon instance.".format(name)
        )


def notifications(app, user, args):
    if args.clear:
        api.clear_notifications(app, user)
        print_out("<green>Cleared notifications</green>")
        return

    notifications = api.get_notifications(app, user)
    if not notifications:
        print_out("<yellow>No notification</yellow>")
        return

    print_notifications(notifications)


def tui(app, user, args):
    from .tui.app import TUI
    TUI.create(app, user).run()