因为有需要编写了一款虚拟币检测

达到某个价格时发送Wxpusher通知

在linux上

服务器后台挂机启动nohup python3 x.py

import requests
import time
from datetime import datetime

# 配置
API_URL = "https://api.huobi.pro/market/detail?symbol=melaniausdt"
APP_TOKEN = "token"
WXPUSER_URL = "https://wxpusher.zjiecode.com/api/send/message"
USER_ID = "uid"  # 设置正确的UID

# 价格阈值
PRICE_THRESHOLDS = [6,7,8,9,10,11,12]

# 通知状态记录
notification_states = {threshold: False for threshold in PRICE_THRESHOLDS}

def send_notification(price, threshold):
    """发送WxPusher通知"""
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    message = f"当前价格 {price}  已超过{threshold}\n{current_time}"
  
    # 构建GET请求URL
    params = {
        "appToken": APP_TOKEN,
        "content": message,
        "contentType": 1,
        "uid": USER_ID,
    }
  
    try:
        print(f"正在发送通知,参数: {params}")  # 调试信息
        response = requests.get(WXPUSER_URL, params=params)
        print(f"通知接口响应: {response.text}")  # 调试信息
  
        response_data = response.json()
        if response.status_code == 200 and response_data.get('code') == 1000:
            print(f"通知发送成功: {message}")
        else:
            print(f"通知发送失败: {response.text}")
            print(f"失败状态码: {response.status_code}")
            print(f"失败响应: {response_data}")
    except Exception as e:
        print(f"发送通知时出错: {str(e)}")
        import traceback
        print(f"详细错误信息: {traceback.format_exc()}")

def check_price():
    """检查价格并在需要时发送通知"""
    try:
        print("正在获取价格数据...")  # 调试信息
        response = requests.get(API_URL)
        pass
        data = response.json()
        pass
  
        if data["status"] == "ok":
            current_price = float(data["tick"]["close"])
            print(f"当前价格: {current_price}")
  
            for threshold in PRICE_THRESHOLDS:
                if current_price > threshold:
                    # 如果价格超过阈值且之前没有发送过通知
                    if not notification_states[threshold]:
                        send_notification(current_price, threshold)
                        notification_states[threshold] = True
                else:
                    # 价格低于阈值,重置通知状态
                    notification_states[threshold] = False
        else:
            print(f"API返回状态不正常: {data}")  # 调试信息
  
    except Exception as e:
        print(f"检查价格时出错: {str(e)}")
        print(f"错误类型: {type(e)}")  # 调试信息
        import traceback
        print(f"详细错误信息: {traceback.format_exc()}")  # 打印完整错误堆栈

def main():
    print("价格监控启动...")
    print(f"监控阈值: {PRICE_THRESHOLDS}")  # 显示监控的价格阈值
    print("开始循环检查价格...")
    while True:
        try:
            check_price()
        except Exception as e:
            print(f"主循环发生错误: {str(e)}")
        time.sleep(2)

if __name__ == "__main__":
    main() 

win版

支持火币、CoinEx、Crypto、芝麻gate.io

api接口

微信图片_20250125143751.png

使用c#开发

源码

https://oss.8cdn.cn/d/code/WindowsFormsApp1.zip?sign=Xd5i6WcIHd4nEwtekp3voFN2lU2ZRyLZ_YCbBbSkte0=:0

编译后现成的

https://oss.8cdn.cn/d/tools/tools.exe?sign=c9DQYVVzOLrvz1E3q73GNXtepfbT2bVxc-zWIAG5qAs=:0
最后修改:2025 年 01 月 25 日
如果觉得我的文章对你有用,请随意赞赏