123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- # -*- coding: utf-8 -*-
- import json
- import requests
- import hashlib
- import time
- from typing import Union
- from Crypto.Cipher import AES
- def get_sign(ts: Union[str, int], app_key: str, app_secret: str):
- ts = str(ts)
- hl = hashlib.md5()
- p = app_key + app_secret + ts
- hl.update(p.encode(encoding='utf-8'))
- return hl.hexdigest()
- def aesEcbEncrypt(key, src):
- size = 16
- padding = lambda s: s + (size - len(s.encode()) % size) * chr(size - len(s.encode()) % size)
- pdata = padding(src).encode('utf-8')
- kdata = key.encode('utf-8')
- cip = AES.new(kdata, AES.MODE_ECB)
- ret = cip.encrypt(pdata).hex()
- return ret
- def aesEcbDecrypt(key, src):
- size = 16
- padtrim = lambda s: s[:-ord(s[len(s) - 1:])]
- endata = bytes().fromhex(src)
- kdata = key.encode('utf-8')
- cip = AES.new(kdata, AES.MODE_ECB)
- decData = cip.decrypt(endata)
- trimpadData = padtrim(decData)
- return trimpadData.decode('utf-8')
- def apiVisitBySign(apiUrl, param, secret, enc, appkey):
- pdata = json.dumps(param, separators=(',', ':'), ensure_ascii=False)
- params = param
- if enc:
- pdata = aesEcbEncrypt(secret, pdata)
- params = {
- 'encrypt_data': pdata,
- }
- ts = str(int(time.time()))
- sign = get_sign(ts, appkey, secret)
- headers = {
- 'Content-Type': 'application/json',
- 'timestamp': ts,
- "appkey":appkey,
- "sign":sign
- }
- session = requests.session()
- res = session.get(apiUrl, params=params, headers=headers, verify=False)
- if res.status_code != 200:
- print(r.status_code)
- return
- print('origin res is:', res.text)
- jdata = res.json()
- if jdata.get('data'):
- print(jdata['data'])
- if enc:
- print('need decrypt')
- ret = aesEcbDecrypt(secret, jdata['data'])
- print(ret)
- def apiVisitByToken(apiUrl, param, secret, enc, token):
- pdata = json.dumps(param, separators=(',', ':'), ensure_ascii=False)
- params = param
- if enc:
- pdata = aesEcbEncrypt(secret, pdata)
- params = {
- 'encrypt_data': pdata,
- }
- headers = {
- 'Content-Type': 'application/json',
- 'token': token,
- }
- session = requests.session()
- res = session.get(apiUrl, params=params, headers=headers, verify=False)
- if res.status_code != 200:
- print(r.status_code)
- return
- print('origin res is:', res.text)
- jdata = res.json()
- if jdata.get('data'):
- print(jdata['data'])
- if enc:
- print('need decrypt')
- ret = aesEcbDecrypt(secret, jdata['data'])
- print(ret)
- def getToken(tokenUrl, user, password):
- session = requests.session()
- params = {
- 'user': user,
- 'password':password,
- }
- headers = {
- 'Content-Type': 'application/json',
- }
- res = session.get(tokenUrl, params=params, headers=headers, verify=False)
- if res.status_code != 200:
- print(res.status_code)
- return ''
- print('token origin res is', res.text)
- jdata = res.json()
- return jdata.get('token')
- if __name__ == '__main__':
- # 填写配置
- tokenUrl = 'http://127.0.0.1:41002/api/v1/token'
- apiUrl = 'http://127.0.0.1:41002/api/v1/query/2001'
- user = '529db83441acff61a054eba562185515'
- password = 'DMh7lbyv'
- secret = '1749f2a7019db090ca7c9cc69e64033c'
- param = {
- 'plate_no': '川Y01M21',
- 'plate_type': '02',
- 'owner': "吴军"
- }
- tokenType = False
- if tokenType:
- token = getToken(tokenUrl, user, password)
- if token == '':
- print('token get failed')
- apiVisitByToken(apiUrl, param, secret, True, token)
- else:
- apiVisitBySign(apiUrl, param, secret, True, user)
|