~fnux/meta.sr.ht

ref: 3bad716474023aa367775f87683c3c2debc38e39 meta.sr.ht/metasrht/auth/ldap.py -rw-r--r-- 6.0 KiB
3bad7164Timothée Floure auth: handle LDAP connection failures 5 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import sys

from srht.config import cfg, cfgb
from srht.database import db
from srht.validation import Validation

from metasrht.audit import audit_log
from metasrht.auth.base import AuthMethod, get_user
from metasrht.auth_validation import *
from metasrht.types import User, UserType

# FIXME: we assume user DN is formatted as follow: uid=$username,$user_base.
# Should we make it configurable?

class LDAPAuthMethod(AuthMethod):
    def __init__(self):
        try:
            import ldap
        except ImportError:
            print(
                "could not import 'ldap', this is necessary for LDAP "
                "authentication; please install python-ldap or change "
                "'auth-method=ldap' in the configuration file.",
                file=sys.stderr)
            sys.exit(1)

        self.ldap = ldap
        self.server = cfg('meta.sr.ht::auth::ldap', 'server')
        self.user_base = cfg('meta.sr.ht::auth::ldap', 'user-base')
        self.create_users = cfgb('meta.sr.ht::auth::ldap', 'create-users')

        self.bind_dn = cfg('meta.sr.ht::auth::ldap', 'bind-dn')
        self.bind_pw = cfg('meta.sr.ht::auth::ldap', 'bind-password')

        # Initialize connection to LDAP server.
        try:
            self.conn = False
            self.conn = self._get_ldap_conn()
        except Exception as e:
            print("Could not query LDAP server {}: {}".format(self.server, e))

    def _get_ldap_conn(self, retry=True):
        if not self.conn:
            self.conn = self.ldap.initialize(self.server)
            self.conn.protocol_version = self.ldap.VERSION3

        # Check if the connection is still alive - it would have been cut if
        # the LDAP server restarted. We try again once, and raise an exception
        # if it does not work.
        try:
            self.conn.simple_bind_s(self.bind_dn, self.bind_pw)
        except self.ldap.SERVER_DOWN as e:
            if retry:
                self.conn = False
                self._get_ldap_conn(False)
            else:
                raise(e)

        return self.conn

    def get_dn_for(self, username: str) -> str:
        return "uid={},{}".format(username, self.user_base)

    def ldap_get_user(self, username: str) -> tuple:
        try:
            ldap_search_filter = "uid={}".format(username)
            import_attributes = ['uid', 'mail']
            ldap_match = self._get_ldap_conn().search_s(
                    self.user_base,
                    self.ldap.SCOPE_SUBTREE,
                    ldap_search_filter,
                    import_attributes)

            if len(ldap_match) != 1:
                # Either no match or multiple ones - we don't want to deal with
                # this result.
                return None
            else:
                (dn, ldap_entry) = ldap_match[0]
                uid = ldap_entry['uid'][0].decode("utf-8").lower()
                mail = ldap_entry['mail'][0].decode("utf-8").strip()
                return (uid, mail)

        except Exception as e:
            print("Something went wrong during user LDAP lookup: {}".format(e))
            return None

    def user_valid(self, valid: Validation, username: str, password: str) \
            -> bool:
        user = get_user(username)

        if user is None:
            ldap_match = self.ldap_get_user(username)

            # Not user matching this username in database.
            if not ldap_match:
                valid.error('Username or password incorrect')
                return False

            # Since users will get auto-created here (in prepare_user), validate
            # the username and emails to ensure valid entries in the database.
            (uid, email) = ldap_match
            valid_dummy = Validation({})

            validate_username(valid_dummy, username)
            validate_email(valid_dummy, email)

            if not valid_dummy.ok:
                valid.error('Username or password incorrect')
                return False

            if not self.create_users:
                valid.error('Username or password incorrect')
                return False
        else:
            # Make sure we're using the actual user name for LDAP authentication,
            # even when the user logs in with the email address
            username = user.username


        # Query LDAP server.
        try:
            self._get_ldap_conn().simple_bind_s(self.get_dn_for(username), password)
        except self.ldap.INVALID_CREDENTIALS:
            valid.error('Username or password incorrect')
            return False
        except Exception as e:
            valid.error('Something went wrong while querying the '
                    'authentication backend. Please try again later or contact '
                    'your administrator.')
            return False

        return True

    def prepare_user(self, username: str) -> User:
        user = get_user(username)

        if user is None:
            assert self.create_users, ("tried to call prepare_user for an user"
            "that doesn't exist, and create_users is false")

            ldap_match = self.ldap_get_user(username)
            (uid, email) = ldap_match
            user = self.create(username, email)

        return user

    def create(self, username: str, email: str) -> User:
        user = User(username)
        user.email = email
        user.password = ''
        user.invites = cfg("meta.sr.ht::settings", "user-invites", default=0)

        user.confirmation_hash = None
        user.user_type = UserType.active_non_paying

        db.session.add(user)
        db.session.commit()

        audit_log("account created", user=user)

        return user

    def set_user_email(self, user: User, email: str) -> bool:
        ldif = [(self.ldap.MOD_REPLACE, 'mail', str.encode(email))]
        self._get_ldap_conn().modify_s(self.get_dn_for(user.username), ldif)

        user.email = email
        db.session.commit()

    def set_user_password(self, user: User, password: str) -> bool:
        audit_log("password reset", user=user)
        self._get_ldap_conn().passwd_s(self.get_dn_for(user.username), None, password)