Verified Commit 266608ac authored by Frederik Enste's avatar Frederik Enste
Browse files

first working version

parent 6337181e
#!/bin/python3
# This script is maintained by https://gitlab.teco.edu/it/debian/teco-pve-autoremove-subscription-message
import pprint
import ldap as ldaplib
from argparse import ArgumentParser
from pprint import PrettyPrinter
from ldap import initialize, SCOPE_SUBTREE
from ldap.ldapobject import LDAPObject
from proxmoxer import ProxmoxAPI
log = pprint.PrettyPrinter(indent=4)
pp = PrettyPrinter(indent=4)
ldap_uri = 'ldaps://ldap.teco.edu:636'
ldap_base = 'dc=teco,dc=edu'
ldap_users = 'ou=People'
ldap_groups = 'ou=Group'
proxmox_user = 'sync@pve'
proxmox_password = 'password'
proxmox_host = 'proxmox.teco.edu'
proxmox_port = '443'
proxmox_ssl_check = True
proxmox_ldap_realm = 'ldap'
proxmox_ldap_managed_groups = ['IT', 'Staff', 'Students']
ldap_group_mapping = {
# 'administrators': 'IGNORE',
'bachelor': 'Students',
# 'bots': 'IGNORE',
'hiwis': 'Students',
'it': 'IT',
'master': 'Students',
'media': 'Students',
'pdf': 'Students',
# 'sekretariat': 'Staff',
# 'ssh': 'IGNORE',
'staff': 'Staff',
# 'teco': 'IGNORE',
}
dry_run = False
be_verbose = False
def log(msg):
if isinstance(msg, str):
print(msg)
else:
pp.pprint(msg)
def verbose(msg):
if be_verbose:
log(msg)
def login():
global dry_run, be_verbose
parser = ArgumentParser(description='Synchronizes the TECO LDAP to the Proxmox User database.')
parser.add_argument('-u', '--user',
dest='user',
required=True,
help='Required argument. Specify the user for Proxmox. '
'The user needs at least PVEUserAdmin role for /access.')
parser.add_argument('-r', '--realm',
dest='realm',
required=True,
help='The Login Realm of the previously specified user')
parser.add_argument('-p', '--password',
dest='password',
metavar='PASS',
required=True,
help='The password for the previously specified user')
parser.add_argument('-n', '--dry-run',
dest='dry_run',
action='store_const',
const=True,
default=False,
help='only list differences instead of applying them')
parser.add_argument('-v', '--verbose',
dest='verbose',
action='store_const',
const=True,
default=False,
help='explain what is being done')
args = parser.parse_args()
dry_run = args.dry_run
be_verbose = args.verbose
verbose('Connecting to LDAP...')
ldap = initialize(ldap_uri,
bytes_mode=False)
verbose('done.')
verbose('Connecting to Proxmox...')
proxmox = ProxmoxAPI(host=proxmox_host,
backend='https',
port=proxmox_port,
user='{}@{}'.format(args.user, args.realm),
password=args.password,
verify_ssl=proxmox_ssl_check)
verbose('done.')
return ldap, proxmox
def get_ldap_groups(ldap: LDAPObject):
verbose('Requesting LDAP group list...')
result = ldap.search_s('{},{}'.format(ldap_groups, ldap_base),
SCOPE_SUBTREE)
verbose('received.')
groups = {}
for (id_str, obj) in result:
if not id_str.startswith('cn='):
continue
name = obj['cn'][0].decode()
if name not in ldap_group_mapping:
continue
users = list(uid.decode() for uid in obj['memberUid'])
groups[name] = users
verbose(groups)
return groups
def get_ldap_users(ldap: LDAPObject, ldap_groups: dict):
verbose('Requesting LDAP user list...')
result = ldap.search_s('{},{}'.format(ldap_users, ldap_base),
SCOPE_SUBTREE)
verbose('received.')
users = {}
for (id_str, obj) in result:
if not id_str.startswith('uid='):
continue
uid = obj['uid'][0].decode()
groups = []
for group in ldap_groups:
if uid in ldap_groups[group]:
groups.append(group)
users[uid] = obj
users[uid]['groups'] = groups
verbose(users)
return users
def get_proxmox_users(proxmox: ProxmoxAPI):
verbose('Requesting Proxmox user list...')
result = proxmox.access.users.get(full=1)
users = {}
for user in result:
if user['userid'].split('@')[1] != proxmox_ldap_realm:
continue
users[user['userid'].split('@')[0]] = {
'userid': user['userid'],
'enable': user.get('enable', 0),
'lastname': user.get('lastname', None),
'firstname': user.get('firstname', None),
'email': user.get('email', None),
'groups': user.get('groups', None)
}
verbose('received.')
verbose(users)
return users
def convert_ldap_users_to_proxmox(users: dict):
proxmox_users = {}
verbose('Converting LDAP user list to proxmox style...')
for user in users:
lastname = users[user].get('sn', [b''])[0].decode()
firstname = users[user].get('givenName', [b''])[0].decode()
groups = []
for group in users[user].get('groups', []):
mapped = ldap_group_mapping[group]
if mapped not in groups:
groups.append(mapped)
mail = users[user].get('mail', [b''])[0].decode()
proxmox_users[user] = {
'userid': '{}@{}'.format(user, proxmox_ldap_realm),
'enable': 1 if users[user].get('groups', False) else 0,
'lastname': lastname.strip() if lastname and firstname else None,
'firstname': firstname.strip() if lastname and firstname else None,
'email': mail.strip() if mail else None,
'groups': ','.join(groups)
}
verbose('done.')
verbose(proxmox_users)
return proxmox_users
ldap = ldaplib.initialize(ldap_uri)
proxmox = ProxmoxAPI(host='192.168.122.11',
backend='https',
user=proxmox_user,
password=proxmox_password,
verify_ssl=False)
def sync_add_user(proxmox: ProxmoxAPI, user: dict):
log('Adding {}...'.format(user['userid']))
log(user)
if not dry_run:
proxmox.access.users.create(userid=user['userid'],
enable=user['enable'],
lastname=user['lastname'],
firstname=user['firstname'],
email=user['email'],
groups=user['groups'])
return None
def get_ldap_users():
result = connection.search_s(ldap_base,
ldap.SCOPE_SUBTREE)
log.pprint(result)
def sync_update_user(proxmox: ProxmoxAPI, user: dict, reference: dict):
user_groups = user['groups'].split(',') if user['groups'] else []
reference_groups = reference['groups'].split(',') if reference['groups'] else []
new_groups = []
for group in user_groups:
if group in reference_groups:
new_groups.insert(reference_groups.index(group), group)
for group in reference_groups:
if group not in proxmox_ldap_managed_groups:
new_groups.insert(reference_groups.index(group), group)
user['groups'] = ','.join(new_groups)
equal = True
for key in user:
if user[key] != reference[key]:
equal = False
break
if not equal:
log('Updating {}...'.format(user['userid']))
log(reference)
log('to')
log(user)
if not dry_run:
proxmox.access.users.set(user['userid'],
enable=user['enable'],
lastname=user['lastname'],
firstname=user['firstname'],
email=user['email'],
groups=user['groups'])
return None
def get_ldap_groups():
connection = ldap.initialize(ldap_uri)
result = connection.search_s(ldap_base,
ldap.SCOPE_SUBTREE)
log.pprint(result)
def sync_delete_user(proxmox: ProxmoxAPI, user: dict):
log('Deleting {}...'.format(user['userid']))
if not dry_run:
proxmox.access.users.delete(user['userid'])
def get_proxmox_users():
log.pprint(proxmox.access.users.get())
log.pprint(proxmox.access.groups.get())
def sync(proxmox: ProxmoxAPI, ldap_user_dict: dict, proxmox_user_dict: dict):
verbose('Syncing LDAP users to Proxmox (dry_run: {})...'.format(dry_run))
for user in ldap_user_dict:
if user not in proxmox_user_dict:
sync_add_user(proxmox, ldap_user_dict[user])
else:
sync_update_user(proxmox, ldap_user_dict[user], proxmox_user_dict[user])
del proxmox_user_dict[user]
for user in proxmox_user_dict:
sync_delete_user(proxmox, proxmox_user_dict[user])
verbose('done.')
def main():
get_proxmox_users()
(ldap, proxmox) = login()
ldap_group_dict: dict = get_ldap_groups(ldap)
ldap_user_dict: dict = get_ldap_users(ldap, ldap_group_dict)
proxmox_user_dict: dict = get_proxmox_users(proxmox)
ldap_user_dict: dict = convert_ldap_users_to_proxmox(ldap_user_dict)
sync(proxmox, ldap_user_dict, proxmox_user_dict)
if __name__ == '__main__':
......
# Make this match the current debian package version
python-ldap>=3.2.0
proxmoxer>=1.1.1
requests #needed for proxmoxer
\ No newline at end of file
python-ldap>=3.2.0 # This should be equal to https://packages.debian.org/buster/python3-ldap
requests>=2.21.0 # This should be equal to https://packages.debian.org/buster/python3-requests
proxmoxer>=1.0.3 # This should be equal to https://packages.debian.org/buster/python3-proxmoxer
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment