钉钉API文档:https://ding-doc.dingtalk.com/doc#/serverapi2/skn8ld
钉钉有回调事件流程,有哪些回调?比如:通讯录回调、审批回调等等,拿通讯录回调来说,就是当你公司组织架构发生变动时,会自动触发你自己注册的回调地址,然后根据回调信息做一些自定义的处理,不得不说,钉钉真的是解决了协同办公的很多问题,非常nice,但就回调事件来说,每个企业只能注册一个回调地址,即使你要监听的是不同的事件、即使还有其他业务线需要用到回调,也只能不多于一个回调,当然这都是出于人家服务多方面考虑的设计,废话不多说,
好,流程:
一个注册端向钉钉发送注册请求,钉钉callback你自己的服务url,必须是公网访问的地址,并且该地址返回钉钉json数据来告诉钉钉回调成功
注册成功之后这个地址就可以接收钉钉的回调post请求,做一些自定义处理,记得返回json
好,代码
1、注册端
- mport requests, json
-
-
- class DingDingCallBack(object):
- def __init__(self):
- self.appsecret=''
- self.appkey=''
- self.api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s"%(self.appkey,self.appsecret)
- self.aes_key = ""
- self.callbackUrl = "回调url"
- self.headers = {
- 'Content-Type': 'application/json',
- 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.96 Safari/537.36',
- 'username':'zhangsan',
- 'password':'123456'
- }
-
- def get_token(self):
- res = requests.get(self.api_url)
- if res.status_code == 200:
- str_res = res.text
- token = (json.loads(str_res)).get('access_token')
- return token
-
- def regist_call_back(self):
- url = 'https://oapi.dingtalk.com/call_back/register_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
- data = {
- "call_back_tag": ["bpms_task_change", "bpms_instance_change"], # 审批回调
- "token": "qxN3cm",
- "aes_key": self.aes_key,
- "url":self.callbackUrl,
- }
- data1 = json.dumps(data)
- response = requests.post(url, headers=self.headers, data=data1)
- print(response)
- print(response.text)
-
- def query_callback(self):
- url = 'https://oapi.dingtalk.com/call_back/get_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
- response = requests.get(url, headers=self.headers, )
- print(response)
- print(response.text)
-
-
- def update_call_back(self):
- url = 'https://oapi.dingtalk.com/call_back/update_call_back?access_token={}'.format(self.get_token())
- data = {
- "call_back_tag": ["bpms_task_change", "bpms_instance_change","user_add_org","user_leave_org","org_dept_create","org_dept_modify","org_dept_remove"],
- "token": "自定义字符串",
- "aes_key": self.aes_key,
- "url":self.callbackUrl
- }
- data1 = json.dumps(data)
- response = requests.post(url, headers=self.headers, data=data1)
- print(response)
- print(response.text)
-
- def get_fail(self):
- url = 'https://oapi.dingtalk.com/call_back/get_call_back_failed_result?access_token={}'.format(self.get_token())
- response = requests.get(url, headers=self.headers, )
- print(response.text)# todo 删除回调
- def delete_callback(self):
- url = 'https://oapi.dingtalk.com/call_back/delete_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
- result = requests.get(url)
- print(result)
- print(result.text)
-
- #
- if __name__ == '__main__':
- dingOBJ = DingDingCallBack()
- # todo 获取钉钉token
- # # dingOBJ.get_token()
- # todo 获取回调失败
- # dingOBJ.get_fail()
- # todo 注册回调
- # dingOBJ.regist_call_back()
- # todo 查询回调事件
- dingOBJ.query_callback()
- # todo 修改回调事件
- # dingOBJ.update_call_back()
- # todo 删除回调
- # dingOBJ.delete_callback()
2、回调端:以下示例代码为python2,django 框架
- # -*- coding:utf-8 -*-
-
- from django.shortcuts import render
- from django.http import JsonResponse,HttpResponse
- from django.views.generic import View
- from django.views.decorators.csrf import csrf_exempt
- from DingCrypto import DingCrypto
- import random,string,time,json,requests,simplejson
- # Create your views here.from ast import literal_eval
-
-
- encodingAesKey = ''
- key = ''
- token = '自定义字符串'
- appsecret = ''
- appkey = ''
-
- api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s"%(appkey,appsecret)
-
-
- def randam_string(n):
- ran_str = ''.join(random.sample(string.ascii_letters + string.digits, n))
- return ran_str
-
- # 注册回调
- @csrf_exempt
- def workOrderCallback(request):
- if request.method == 'GET':
- return JsonResponse({'code':200,'msg':'ok'})
-
-
- if request.method == 'POST':
- print request.GET
- dingCrypto = DingCrypto(encodingAesKey,key)
- nonce = randam_string(8)
- timestamp = str(int(round(time.time())))
- encrpyt = dingCrypto.encrypt('success')
- # print nonce,timestamp,token,encrpyt
- signature = dingCrypto.generateSignature(nonce=nonce,timestamp=timestamp,token=token,msg_encrypt=encrpyt)
- new_data = {
- 'data': {
- 'msg_signature': signature,
- 'timeStamp': timestamp,
- 'nonce': nonce,
- 'encrypt': encrpyt
- },
- }
-
-
- encrpyt11 = dingCrypto.decrypt(encrpyt)
-
- return JsonResponse(new_data.get('data'))
-
- # 响应回调
- class CallBack(View):
- def get(self,request,*args,**kwargs):
- return JsonResponse({'code':200,'msg':'ok'})
-
- def get_token(self):
- res = requests.get(api_url)
- if res.status_code == 200:
- str_res = res.text
- token = (json.loads(str_res)).get('access_token')
- return token
-
-
- def post(self,request,*args,**kwargs):
-
- # data = request.GET
-
- dingCrypto = DingCrypto(encodingAesKey, key)
- nonce = randam_string(8)
- timestamp = str(int(round(time.time())))
- encrpyt = dingCrypto.encrypt('success')
- # signature = dingCrypto.generateSignature(nonce=nonce,timestamp=timestamp,token=token,msg_encrypt=encrpyt)
- callback = json.loads(dingCrypto.decrypt(json.loads(request.body).get('encrypt')))
- if callback['EventType'] == 'bpms_instance_change': # 审批实例开始,结束
- url = 'https://oapi.dingtalk.com/topapi/processinstance/get?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
- instace_ = {
- "process_instance_id": callback['processInstanceId']
- }
- data2 = json.dumps(instace_)
- req = requests.post(url,data=data2)
- data = literal_eval(str(req.text)).get('process_instance')
- excute_workorder(callback['processInstanceId'],data)
- elif callback['EventType'] == 'bpms_task_change': # 审批任务开始,结束,转交
- print 'bpms_task_change'
- elif callback['EventType'] == 'user_add_org':
- print '用户增加'elif callback['EventType'] == 'user_leave_org':
- print '用户离职'
- elif callback['EventType'] == 'org_dept_create':
- print '组织架构添加'elif callback['EventType'] == 'org_dept_modify':
- print '组织架构变更'elif callback['EventType'] == 'org_dept_remove':
- print '组织架构删除'return HttpResponse(encrpyt)
到此这篇关于python注册钉钉回调事件的实现的文章就介绍到这了,更多相关python注册钉钉回调事件内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!