经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Python » 查看文章
python注册钉钉回调事件的实现
来源:jb51  时间:2021/8/9 13:56:52  对本文有异议

钉钉API文档:https://ding-doc.dingtalk.com/doc#/serverapi2/skn8ld

钉钉有回调事件流程,有哪些回调?比如:通讯录回调、审批回调等等,拿通讯录回调来说,就是当你公司组织架构发生变动时,会自动触发你自己注册的回调地址,然后根据回调信息做一些自定义的处理,不得不说,钉钉真的是解决了协同办公的很多问题,非常nice,但就回调事件来说,每个企业只能注册一个回调地址,即使你要监听的是不同的事件、即使还有其他业务线需要用到回调,也只能不多于一个回调,当然这都是出于人家服务多方面考虑的设计,废话不多说,

好,流程:

  一个注册端向钉钉发送注册请求,钉钉callback你自己的服务url,必须是公网访问的地址,并且该地址返回钉钉json数据来告诉钉钉回调成功

  注册成功之后这个地址就可以接收钉钉的回调post请求,做一些自定义处理,记得返回json

好,代码

1、注册端

  1. mport requests, json
  2.  
  3.  
  4. class DingDingCallBack(object):
  5. def __init__(self):
  6. self.appsecret=''
  7. self.appkey=''
  8. self.api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s"%(self.appkey,self.appsecret)
  9. self.aes_key = ""
  10. self.callbackUrl = "回调url"
  11. self.headers = {
  12. 'Content-Type': 'application/json',
  13. 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.96 Safari/537.36',
  14. 'username':'zhangsan',
  15. 'password':'123456'
  16. }
  17.  
  18. def get_token(self):
  19. res = requests.get(self.api_url)
  20. if res.status_code == 200:
  21. str_res = res.text
  22. token = (json.loads(str_res)).get('access_token')
  23. return token
  24.  
  25. def regist_call_back(self):
  26. url = 'https://oapi.dingtalk.com/call_back/register_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
  27. data = {
  28. "call_back_tag": ["bpms_task_change", "bpms_instance_change"], # 审批回调
  29. "token": "qxN3cm",
  30. "aes_key": self.aes_key,
  31. "url":self.callbackUrl,
  32. }
  33. data1 = json.dumps(data)
  34. response = requests.post(url, headers=self.headers, data=data1)
  35. print(response)
  36. print(response.text)
  37.  
  38. def query_callback(self):
  39. url = 'https://oapi.dingtalk.com/call_back/get_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
  40. response = requests.get(url, headers=self.headers, )
  41. print(response)
  42. print(response.text)
  43.  
  44.  
  45. def update_call_back(self):
  46. url = 'https://oapi.dingtalk.com/call_back/update_call_back?access_token={}'.format(self.get_token())
  47. data = {
  48. "call_back_tag": ["bpms_task_change", "bpms_instance_change","user_add_org","user_leave_org","org_dept_create","org_dept_modify","org_dept_remove"],
  49. "token": "自定义字符串",
  50. "aes_key": self.aes_key,
  51. "url":self.callbackUrl
  52. }
  53. data1 = json.dumps(data)
  54. response = requests.post(url, headers=self.headers, data=data1)
  55. print(response)
  56. print(response.text)
  57.  
  58. def get_fail(self):
  59. url = 'https://oapi.dingtalk.com/call_back/get_call_back_failed_result?access_token={}'.format(self.get_token())
  60. response = requests.get(url, headers=self.headers, )
  61. print(response.text)# todo 删除回调
  62. def delete_callback(self):
  63. url = 'https://oapi.dingtalk.com/call_back/delete_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
  64. result = requests.get(url)
  65. print(result)
  66. print(result.text)
  67.  
  68. #
  69. if __name__ == '__main__':
  70. dingOBJ = DingDingCallBack()
  71. # todo 获取钉钉token
  72. # # dingOBJ.get_token()
  73. # todo 获取回调失败
  74. # dingOBJ.get_fail()
  75. # todo 注册回调
  76. # dingOBJ.regist_call_back()
  77. # todo 查询回调事件
  78. dingOBJ.query_callback()
  79. # todo 修改回调事件
  80. # dingOBJ.update_call_back()
  81. # todo 删除回调
  82. # dingOBJ.delete_callback()

2、回调端:以下示例代码为python2,django 框架

  1. # -*- coding:utf-8 -*-
  2.  
  3. from django.shortcuts import render
  4. from django.http import JsonResponse,HttpResponse
  5. from django.views.generic import View
  6. from django.views.decorators.csrf import csrf_exempt
  7. from DingCrypto import DingCrypto
  8. import random,string,time,json,requests,simplejson
  9. # Create your views here.from ast import literal_eval
  10.  
  11.  
  12. encodingAesKey = ''
  13. key = ''
  14. token = '自定义字符串'
  15. appsecret = ''
  16. appkey = ''
  17.  
  18. api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s"%(appkey,appsecret)
  19.  
  20.  
  21. def randam_string(n):
  22. ran_str = ''.join(random.sample(string.ascii_letters + string.digits, n))
  23. return ran_str
  24.  
  25. # 注册回调
  26. @csrf_exempt
  27. def workOrderCallback(request):
  28. if request.method == 'GET':
  29. return JsonResponse({'code':200,'msg':'ok'})
  30.  
  31.  
  32. if request.method == 'POST':
  33. print request.GET
  34. dingCrypto = DingCrypto(encodingAesKey,key)
  35. nonce = randam_string(8)
  36. timestamp = str(int(round(time.time())))
  37. encrpyt = dingCrypto.encrypt('success')
  38. # print nonce,timestamp,token,encrpyt
  39. signature = dingCrypto.generateSignature(nonce=nonce,timestamp=timestamp,token=token,msg_encrypt=encrpyt)
  40. new_data = {
  41. 'data': {
  42. 'msg_signature': signature,
  43. 'timeStamp': timestamp,
  44. 'nonce': nonce,
  45. 'encrypt': encrpyt
  46. },
  47. }
  48.  
  49.  
  50. encrpyt11 = dingCrypto.decrypt(encrpyt)
  51.  
  52. return JsonResponse(new_data.get('data'))
  53.  
  54. # 响应回调
  55. class CallBack(View):
  56. def get(self,request,*args,**kwargs):
  57. return JsonResponse({'code':200,'msg':'ok'})
  58.  
  59. def get_token(self):
  60. res = requests.get(api_url)
  61. if res.status_code == 200:
  62. str_res = res.text
  63. token = (json.loads(str_res)).get('access_token')
  64. return token
  65.  
  66.  
  67. def post(self,request,*args,**kwargs):
  68.  
  69. # data = request.GET
  70.  
  71. dingCrypto = DingCrypto(encodingAesKey, key)
  72. nonce = randam_string(8)
  73. timestamp = str(int(round(time.time())))
  74. encrpyt = dingCrypto.encrypt('success')
  75. # signature = dingCrypto.generateSignature(nonce=nonce,timestamp=timestamp,token=token,msg_encrypt=encrpyt)
  76. callback = json.loads(dingCrypto.decrypt(json.loads(request.body).get('encrypt')))
  77. if callback['EventType'] == 'bpms_instance_change': # 审批实例开始,结束
  78. url = 'https://oapi.dingtalk.com/topapi/processinstance/get?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
  79. instace_ = {
  80. "process_instance_id": callback['processInstanceId']
  81. }
  82. data2 = json.dumps(instace_)
  83. req = requests.post(url,data=data2)
  84. data = literal_eval(str(req.text)).get('process_instance')
  85. excute_workorder(callback['processInstanceId'],data)
  86. elif callback['EventType'] == 'bpms_task_change': # 审批任务开始,结束,转交
  87. print 'bpms_task_change'
  88. elif callback['EventType'] == 'user_add_org':
  89. print '用户增加'elif callback['EventType'] == 'user_leave_org':
  90. print '用户离职'
  91. elif callback['EventType'] == 'org_dept_create':
  92. print '组织架构添加'elif callback['EventType'] == 'org_dept_modify':
  93. print '组织架构变更'elif callback['EventType'] == 'org_dept_remove':
  94. print '组织架构删除'return HttpResponse(encrpyt)

到此这篇关于python注册钉钉回调事件的实现的文章就介绍到这了,更多相关python注册钉钉回调事件内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号