邯城往事 邯城往事

>>> 展颜笑夙愿,一笑泯恩仇 <<<

目录
通过 python 获取钉钉后台人员
/    

通过 python 获取钉钉后台人员

通过钉钉官方接口可定时同步得知企业离职人员便于认证管理同步,大大简便了人员账号的管理。

请求带有参数的 API 接口

注意请求接口白名单情况
可参考钉钉开发文档:
创建应用:
image.png

image.png

代码示例:

import requests
import json
import sys
import os
corpId="35c2365db2f1378bfd74f96b7c7a529f"
corpSecret="13dc3517caa3da9bee5e7b6b2ead8938"

headers = {'Content-Type': 'application/json;charset=utf-8'}
api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s≈psecret=%s"%(corpId,corpSecret)

def get_token():
    try:
        res = requests.get(api_url,headers=headers)
        if res.status_code == 200:
            str_res = res.text
            token = (json.loads(str_res)).get('access_token')
            return token
    except:
        print('请求失败')

if __name__ == "__main__":
    get_token()

数据不清洗,get 所有人员:

代码:

import requests
import json
import sys
import os
# import time
# print(time.strftime('%Y-%m-%d %H:%m:%S'))
corpId="35c2365db2f1378bfd74f96b7c7a529f"
corpSecret="13dc3517caa3da9bee5e7b6b2ead8938"

headers = {'Content-Type': 'application/json;charset=utf-8'}
api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s≈psecret=%s"%(corpId,corpSecret)

def get_token():
    try:
        res = requests.get(api_url,headers=headers)
        if res.status_code == 200:
            str_res = res.text
            token = (json.loads(str_res)).get('access_token')
            return token
    except:
        print('请求失败')

def get_bumen(token):
    bumen_url = "https://oapi.dingtalk.com/department/list?access_token=%s&id=1"%(token)
    res_bumen = requests.get(bumen_url,headers=headers)
    str_bumen = res_bumen.text
    json_bumen = json.loads(str_bumen)
    res = json_bumen.get('department')
    bname_list = []
    bid_list = []
    for i in res:
        bumen_name = i.get('name')
        bname_list.append(bumen_name)
        bumen_id = i.get('id')
        bid_list.append(bumen_id)
    bumen_res = list(zip(bname_list,bid_list))
    bumen_data = dict(bumen_res)
## get_member
    mname = []
    mid = []
    for m in bid_list: #循环部门id
        member_url = "https://oapi.dingtalk.com/user/simplelist?access_token=%s&department_id=%s"%(token,m)
        res_member = requests.get(member_url,headers=headers)
        with open('./member_id.txt', 'a', encoding='utf-8') as f:
            f.write(res_member.text)

if __name__ == "__main__":
    key = get_token()
    get_bumen(key)

人名转拼音模块,暂不添加

import pypinyin
str = '崔建哲,中国,狸米,大傻,杨和苏'
pinyin_name = ''.join(pypinyin.lazy_pinyin(str))
print(pinyin_name)

分别请求部门详情部门成员详情角色详情,进行数据清洗,并且同步人员信息发送邮件

代码示例:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#   power by  cuijianzhe  #
import requests
import json
import sys
import os
import xlsxwriter

#with open('pwd','r',encoding='utf-8') as f:
#    for k in f.readlines():
#        key = k.replace('\n', '')
#        key_list = key.split(',')

#corpId=key_list[0]
#corpSecret=key_list[1]
corpId="dingding"
corpSecret="dingding"
headers = {'Content-Type': 'application/json;charset=utf-8'}
api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s"%(corpId,corpSecret)

workbook = xlsxwriter.Workbook('./user_list.xlsx') #创建一个ExceL 文档
worksheet = workbook.add_worksheet()  #创建一个工作对象
worksheet.set_column('A:E',20) #设定第一列Adap E40像素
#bold = workbook.add_format({'bold':True}) #加粗
bold = workbook.add_format({
        'bold':  True,  # 字体加粗
        'border': 1,  # 单元格边框宽度
        'align': 'center',  # 水平对齐方式
        'valign': 'vcenter',  # 垂直对齐方式
        'fg_color': '#F4B084',  # 单元格背景颜色
        'text_wrap': True,  # 是否自动换行
    })
bold1 = workbook.add_format({'align': 'center'})
heading = ['姓名','手机号','工号','职位']  #设置表头
worksheet.write_row('A1',heading,bold)

def get_token():
    try:
        res = requests.get(api_url,headers=headers)
        if res.status_code == 200:
            str_res = res.text
            token = (json.loads(str_res)).get('access_token')
            return token
    except:
        print('请求失败')

def get_bumen(token):

    bumen_url = "https://oapi.dingtalk.com/department/list?access_token=%s&id=1"%(token)
    res_bumen = requests.get(bumen_url,headers=headers)
    str_bumen = res_bumen.text
    json_bumen = json.loads(str_bumen)
    code = json_bumen.get('errcode')
    if code != 0:
        print('接口返回错误,{}'.format(json_bumen.get('errmsg')))
    res = json_bumen.get('department')
    bname_list = []
    bid_list = []
    for i in res:
        bumen_name = i.get('name')
        bname_list.append(bumen_name)
        bumen_id = i.get('id')
        bid_list.append(bumen_id)

    '''
    bumen_res = list(zip(bname_list,bid_list)) #将部门名称和id打包成list
    bumen_data = dict(bumen_res)
    get_member 获取成员组信息
    '''
    mname = []
    mid = []
    for m in bid_list: #循环部门id
        member_url = "https://oapi.dingtalk.com/user/simplelist?access_token=%s&department_id=%s"%(token,m)
        res_member = requests.get(member_url,headers=headers)
        # with open('./member_id.txt', 'a', encoding='utf-8') as f:
        #     f.write(res_member.text)
        json_member = json.loads(res_member.text)
        list_member = json_member.get('userlist')  #取userlist列表出来
        for l in list_member:  #循环遍历userlist,拎出id和name
            member_id = l.get('userid')
            mid.append(member_id)
            member_name = l.get('name')
            mname.append(member_name)
    member_res = list(zip(mname,mid))
    member_data = dict(member_res) #得到所有部门的人员name和id
    '''
    get成员详细信息
    '''
    user_list = []
    name_list = []
    post_list = []
    userid_list = []
    for n in member_data:
        minfo_url = "https://oapi.dingtalk.com/user/get?access_token=%s&userid=%s"%(token,member_data[n])
        res_minfo = requests.get(minfo_url,headers=headers)
        minfo_json = json.loads(res_minfo.text)

        for info in minfo_json:
            if info == 'mobile':
                user_list.append(minfo_json.get(info))
            if info == 'name':
                name_list.append(minfo_json.get(info))
            if info == 'position':
                post_list.append(minfo_json.get(info))
            if info == 'userid':
                userid_list.append(minfo_json.get(info))
    data = [
        name_list,
        user_list,
        userid_list,
        post_list,
    ]

    worksheet.write_column('A2', data[0], bold1)
    worksheet.write_column('B2', data[1], bold1)
    worksheet.write_column('C2', data[2], bold1)
    worksheet.write_column('D2', data[3], bold1)
    workbook.close()
    # user_data = str(list(zip(name_list,user_list,userid_list,post_list)))
    '''
    user_data = json.dumps(user_zip)
     此文件方式本地保存不能中文编码
    '''
    # with open('/scripts/limi/tongxunlu.txt','w',encoding='utf-8') as j:
    #     j.write(user_data)

    #以下进行人员信息同步
    online = []
    offline = []
    list_files = os.listdir('./')
    if "name_old.txt" in list_files:
        '''
        将文本中的内容转换成列表
        '''
        with open('./name_old.txt', 'r',encoding='utf-8') as d:
            s = d.read()
            h = s.split(',')
            for b in h:
                if b in name_list:
                    pass
                else:
                    offline.append(b)
            for on in name_list:
                if on in h:
                    pass
                else:
                    online.append(on)
    else:
        '''
        将name_list转换成str,保存到本地
        '''
        with open('./name_old.txt', 'w', encoding='utf-8') as m:
            for name in name_list:
                m.write(str(name))
                m.write(',')
       # os.rename('name.txt','name_old.txt')
    with open('./name_old.txt', 'r', encoding='utf-8') as o:
        s = o.read()
        old = s.split(',')
    with open('./offline.txt', 'w', encoding='utf-8') as z:
        for name in offline:
            z.write(str(name))
            z.write(',')
    with open('./online.txt', 'w', encoding='utf-8') as q:
        for name in online:
            q.write(str(name))
            q.write(',')
    '''
    更新old人员列表信息
    '''
    with open('./name_old.txt', 'w', encoding='utf-8') as m:
        for name in name_list:
            m.write(str(name))
            m.write(',')

    '''
    发送邮件模块
    '''
    import smtplib
    from email.mime.text import MIMEText
    from email.utils import formataddr
    from email.mime.multipart import MIMEMultipart
    import time
    date = time.strftime('%Y-%m-%d',time.localtime())
    my_sender = 'jianzhecui@163.com'
    my_pass = 'cuijianzhe'
    my_user = ['598941324@qq.com','111111111']
    with open('./offline.txt', 'r',encoding='utf-8') as d:
        s = d.read()
        list_s = s.split(',')
    with open('./online.txt', 'r',encoding='utf-8') as p:
        a = p.read()
        list_a = a.split(',')

    body = """
    日期:%s,昨日离职%s人,名单:(%s),入职%s个,名单:(%s),最近公司总人数%s个,现在公司总人数%s个.
    """%(date,len(list_s[:-2]),s.rstrip(','),len(list_a[:-1]),a.rstrip(','),len(old[:-1]),len(name_list))
    msg = MIMEMultipart()   #构造附件
    fujian = MIMEText(open('./user_list.xlsx','rb').read(),'base64','utf-8')
    fujian['Content-Type'] = 'application/octet-stream'
    fujian["Content-Disposition"] = 'attachment; filename="user_list.xlsx"'
    msg.attach(MIMEText(body, 'plain', 'utf-8'))
    msg.attach(fujian)
    msg['From'] = formataddr(["Cuijianzhe", my_sender])
    msg['To'] = ','.join(my_user)
    msg['Subject'] = '人员同步'
    server = smtplib.SMTP_SSL("smtp.163.com", 465)
    server.login(my_sender, my_pass)
    server.sendmail(my_sender, msg['To'].split(','), msg.as_string())
    server.quit()

if __name__ == "__main__":
    key = get_token()
    get_bumen(key)

同步一些工具可行可改可适配可认证……

查看邮件信息(附件构造为 Excel 或者 txt 文本文件):

image.png
image.png
image.png

搭讪 你就破功了,老弟!