# -*- coding: utf-8 -*- import json import requests import hashlib import time from typing import Union from Crypto.Cipher import AES def get_sign(ts: Union[str, int], app_key: str, app_secret: str): ts = str(ts) hl = hashlib.md5() p = app_key + app_secret + ts hl.update(p.encode(encoding='utf-8')) return hl.hexdigest() def aesEcbEncrypt(key, src): size = 16 padding = lambda s: s + (size - len(s.encode()) % size) * chr(size - len(s.encode()) % size) pdata = padding(src).encode('utf-8') kdata = key.encode('utf-8') cip = AES.new(kdata, AES.MODE_ECB) ret = cip.encrypt(pdata).hex() return ret def aesEcbDecrypt(key, src): size = 16 padtrim = lambda s: s[:-ord(s[len(s) - 1:])] endata = bytes().fromhex(src) kdata = key.encode('utf-8') cip = AES.new(kdata, AES.MODE_ECB) decData = cip.decrypt(endata) trimpadData = padtrim(decData) return trimpadData.decode('utf-8') def apiVisitBySign(apiUrl, param, secret, enc, appkey): pdata = json.dumps(param, separators=(',', ':'), ensure_ascii=False) params = param if enc: pdata = aesEcbEncrypt(secret, pdata) params = { 'encrypt_data': pdata, } ts = str(int(time.time())) sign = get_sign(ts, appkey, secret) headers = { 'Content-Type': 'application/json', 'timestamp': ts, "appkey":appkey, "sign":sign } session = requests.session() res = session.get(apiUrl, params=params, headers=headers, verify=False) if res.status_code != 200: print(r.status_code) return print('origin res is:', res.text) jdata = res.json() if jdata.get('data'): print(jdata['data']) if enc: print('need decrypt') ret = aesEcbDecrypt(secret, jdata['data']) print(ret) def apiVisitByToken(apiUrl, param, secret, enc, token): pdata = json.dumps(param, separators=(',', ':'), ensure_ascii=False) params = param if enc: pdata = aesEcbEncrypt(secret, pdata) params = { 'encrypt_data': pdata, } headers = { 'Content-Type': 'application/json', 'token': token, } session = requests.session() res = session.get(apiUrl, params=params, headers=headers, verify=False) if res.status_code != 200: print(r.status_code) return print('origin res is:', res.text) jdata = res.json() if jdata.get('data'): print(jdata['data']) if enc: print('need decrypt') ret = aesEcbDecrypt(secret, jdata['data']) print(ret) def getToken(tokenUrl, user, password): session = requests.session() params = { 'user': user, 'password':password, } headers = { 'Content-Type': 'application/json', } res = session.get(tokenUrl, params=params, headers=headers, verify=False) if res.status_code != 200: print(res.status_code) return '' print('token origin res is', res.text) jdata = res.json() return jdata.get('token') if __name__ == '__main__': # 填写配置 tokenUrl = 'http://127.0.0.1:41002/api/v1/token' apiUrl = 'http://127.0.0.1:41002/api/v1/query/2001' user = '529db83441acff61a054eba562185515' password = 'DMh7lbyv' secret = '1749f2a7019db090ca7c9cc69e64033c' param = { 'plate_no': '川Y01M21', 'plate_type': '02', 'owner': "吴军" } tokenType = False if tokenType: token = getToken(tokenUrl, user, password) if token == '': print('token get failed') apiVisitByToken(apiUrl, param, secret, True, token) else: apiVisitBySign(apiUrl, param, secret, True, user)