因为有需要编写了一款虚拟币检测
达到某个价格时发送Wxpusher通知
在linux上
服务器后台挂机启动nohup python3 x.py
import requests
import time
from datetime import datetime
# 配置
API_URL = "https://api.huobi.pro/market/detail?symbol=melaniausdt"
APP_TOKEN = "token"
WXPUSER_URL = "https://wxpusher.zjiecode.com/api/send/message"
USER_ID = "uid" # 设置正确的UID
# 价格阈值
PRICE_THRESHOLDS = [6,7,8,9,10,11,12]
# 通知状态记录
notification_states = {threshold: False for threshold in PRICE_THRESHOLDS}
def send_notification(price, threshold):
"""发送WxPusher通知"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"当前价格 {price} 已超过{threshold}\n{current_time}"
# 构建GET请求URL
params = {
"appToken": APP_TOKEN,
"content": message,
"contentType": 1,
"uid": USER_ID,
}
try:
print(f"正在发送通知,参数: {params}") # 调试信息
response = requests.get(WXPUSER_URL, params=params)
print(f"通知接口响应: {response.text}") # 调试信息
response_data = response.json()
if response.status_code == 200 and response_data.get('code') == 1000:
print(f"通知发送成功: {message}")
else:
print(f"通知发送失败: {response.text}")
print(f"失败状态码: {response.status_code}")
print(f"失败响应: {response_data}")
except Exception as e:
print(f"发送通知时出错: {str(e)}")
import traceback
print(f"详细错误信息: {traceback.format_exc()}")
def check_price():
"""检查价格并在需要时发送通知"""
try:
print("正在获取价格数据...") # 调试信息
response = requests.get(API_URL)
pass
data = response.json()
pass
if data["status"] == "ok":
current_price = float(data["tick"]["close"])
print(f"当前价格: {current_price}")
for threshold in PRICE_THRESHOLDS:
if current_price > threshold:
# 如果价格超过阈值且之前没有发送过通知
if not notification_states[threshold]:
send_notification(current_price, threshold)
notification_states[threshold] = True
else:
# 价格低于阈值,重置通知状态
notification_states[threshold] = False
else:
print(f"API返回状态不正常: {data}") # 调试信息
except Exception as e:
print(f"检查价格时出错: {str(e)}")
print(f"错误类型: {type(e)}") # 调试信息
import traceback
print(f"详细错误信息: {traceback.format_exc()}") # 打印完整错误堆栈
def main():
print("价格监控启动...")
print(f"监控阈值: {PRICE_THRESHOLDS}") # 显示监控的价格阈值
print("开始循环检查价格...")
while True:
try:
check_price()
except Exception as e:
print(f"主循环发生错误: {str(e)}")
time.sleep(2)
if __name__ == "__main__":
main()
win版
支持火币、CoinEx、Crypto、芝麻gate.io
api接口
使用c#开发
源码
https://oss.8cdn.cn/d/code/WindowsFormsApp1.zip?sign=Xd5i6WcIHd4nEwtekp3voFN2lU2ZRyLZ_YCbBbSkte0=:0
编译后现成的
https://oss.8cdn.cn/d/tools/tools.exe?sign=c9DQYVVzOLrvz1E3q73GNXtepfbT2bVxc-zWIAG5qAs=:0
2 条评论
这篇文章不错!
?情感共鸣类?