|
@@ -1,8 +1,17 @@
|
|
# -*- coding: utf-8 -*-
|
|
# -*- coding: utf-8 -*-
|
|
import json
|
|
import json
|
|
import requests
|
|
import requests
|
|
|
|
+import hashlib
|
|
|
|
+import time
|
|
|
|
+from typing import Union
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Cipher import AES
|
|
|
|
|
|
|
|
+def get_sign(ts: Union[str, int], app_key: str, app_secret: str):
|
|
|
|
+ ts = str(ts)
|
|
|
|
+ hl = hashlib.md5()
|
|
|
|
+ p = app_key + app_secret + ts
|
|
|
|
+ hl.update(p.encode(encoding='utf-8'))
|
|
|
|
+ return hl.hexdigest()
|
|
|
|
|
|
def aesEcbEncrypt(key, src):
|
|
def aesEcbEncrypt(key, src):
|
|
size = 16
|
|
size = 16
|
|
@@ -23,13 +32,45 @@ def aesEcbDecrypt(key, src):
|
|
trimpadData = padtrim(decData)
|
|
trimpadData = padtrim(decData)
|
|
return trimpadData.decode('utf-8')
|
|
return trimpadData.decode('utf-8')
|
|
|
|
|
|
-def apiVisit(apiUrl, param, secret, enc, token):
|
|
|
|
|
|
+def apiVisitBySign(apiUrl, param, secret, enc, appkey):
|
|
pdata = json.dumps(param, separators=(',', ':'), ensure_ascii=False)
|
|
pdata = json.dumps(param, separators=(',', ':'), ensure_ascii=False)
|
|
params = param
|
|
params = param
|
|
if enc:
|
|
if enc:
|
|
pdata = aesEcbEncrypt(secret, pdata)
|
|
pdata = aesEcbEncrypt(secret, pdata)
|
|
- params = {
|
|
|
|
- 'encrypt_data': pdata
|
|
|
|
|
|
+ params = {
|
|
|
|
+ 'encrypt_data': pdata,
|
|
|
|
+ }
|
|
|
|
+ ts = str(int(time.time()))
|
|
|
|
+ sign = get_sign(ts, appkey, secret)
|
|
|
|
+ headers = {
|
|
|
|
+ 'Content-Type': 'application/json',
|
|
|
|
+ 'timestamp': ts,
|
|
|
|
+ "appkey":appkey,
|
|
|
|
+ "sign":sign
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ session = requests.session()
|
|
|
|
+ res = session.get(apiUrl, params=params, headers=headers, verify=False)
|
|
|
|
+ if res.status_code != 200:
|
|
|
|
+ print(r.status_code)
|
|
|
|
+ return
|
|
|
|
+ print('origin res is:', res.text)
|
|
|
|
+ jdata = res.json()
|
|
|
|
+ if jdata.get('data'):
|
|
|
|
+ print(jdata['data'])
|
|
|
|
+ if enc:
|
|
|
|
+ print('need decrypt')
|
|
|
|
+ ret = aesEcbDecrypt(secret, jdata['data'])
|
|
|
|
+ print(ret)
|
|
|
|
+
|
|
|
|
+def apiVisitByToken(apiUrl, param, secret, enc, token):
|
|
|
|
+ pdata = json.dumps(param, separators=(',', ':'), ensure_ascii=False)
|
|
|
|
+ params = param
|
|
|
|
+ if enc:
|
|
|
|
+ pdata = aesEcbEncrypt(secret, pdata)
|
|
|
|
+ params = {
|
|
|
|
+ 'encrypt_data': pdata,
|
|
}
|
|
}
|
|
headers = {
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Content-Type': 'application/json',
|
|
@@ -73,19 +114,25 @@ def getToken(tokenUrl, user, password):
|
|
return jdata.get('token')
|
|
return jdata.get('token')
|
|
|
|
|
|
|
|
|
|
|
|
+
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
# 填写配置
|
|
# 填写配置
|
|
- tokenUrl = ''
|
|
|
|
- apiUrl = ''
|
|
|
|
- user = ''
|
|
|
|
- password = ''
|
|
|
|
- secret = ''
|
|
|
|
-
|
|
|
|
- token = getToken(tokenUrl, user, password)
|
|
|
|
- if token == '':
|
|
|
|
- print('token get failed')
|
|
|
|
- # 填写参数
|
|
|
|
|
|
+ tokenUrl = 'http://127.0.0.1:41002/api/v1/token'
|
|
|
|
+ apiUrl = 'http://127.0.0.1:41002/api/v1/query/2001'
|
|
|
|
+ user = '529db83441acff61a054eba562185515'
|
|
|
|
+ password = 'DMh7lbyv'
|
|
|
|
+ secret = '1749f2a7019db090ca7c9cc69e64033c'
|
|
|
|
+
|
|
param = {
|
|
param = {
|
|
- '': '',
|
|
|
|
|
|
+ 'plate_no': '川Y01M21',
|
|
|
|
+ 'plate_type': '02',
|
|
|
|
+ 'owner': "吴军"
|
|
}
|
|
}
|
|
- apiVisit(apiUrl, param, secret, True, token)
|
|
|
|
|
|
+ tokenType = True
|
|
|
|
+ if tokenType:
|
|
|
|
+ token = getToken(tokenUrl, user, password)
|
|
|
|
+ if token == '':
|
|
|
|
+ print('token get failed')
|
|
|
|
+ apiVisitByToken(apiUrl, param, secret, True, token)
|
|
|
|
+ else:
|
|
|
|
+ apiVisitBySign(apiUrl, param, secret, True, user)
|