安装拓展
pip3 install --upgrade setuptools wheel -i https://mirrors.aliyun.com/pypi/simple
pip3 install setuptools-rust -i https://mirrors.aliyun.com/pypi/simple
pip3 install paramiko -i https://mirrors.aliyun.com/pypi/simple
txt
@echo off
echo 正在清除最近打开的文本文件记录...
:: 清除"最近使用的项目"文件夹中的内容
del /q %APPDATA%\Microsoft\Windows\Recent\*.lnk
del /q %APPDATA%\Microsoft\Windows\Recent\AutomaticDestinations\*.automaticDestinations-ms
del /q %APPDATA%\Microsoft\Windows\Recent\CustomDestinations\*.customDestinations-ms
:: 清除记事本(Notepad)特定的最近文件列表
reg delete "HKEY_CURRENT_USER\Software\Microsoft\Windows\CurrentVersion\Applets\Notepad\Recent File List" /f
:: 清除文件资源管理器的历史记录
reg delete "HKEY_CURRENT_USER\Software\Microsoft\Windows\CurrentVersion\Explorer\RecentDocs\.txt" /f
:: 刷新资源管理器
taskkill /f /im explorer.exe
start explorer.exe
echo 清除完成!
echo 已删除最近打开的文本文件记录。
pause
Powershell
# 清除最近的项目
Remove-Item "$env:APPDATA\Microsoft\Windows\Recent\*" -Force -Recurse
# 清除最近打开的文件列表
Remove-Item "HKCU:\Software\Microsoft\Windows\CurrentVersion\Explorer\RecentDocs\*" -Force -Recurse
# 清除记事本历史记录
Remove-Item "HKCU:\Software\Microsoft\Windows\CurrentVersion\Applets\Notepad\Recent File List" -Force -ErrorAction SilentlyContinue
# 重启资源管理器
Stop-Process -Name explorer
Start-Process explorer
1.python多线程查询
a.b.c.1-a.b.c.255是否开放80端口并且进行排序
import threading
import socket
from concurrent.futures import ThreadPoolExecutor
# 定义要扫描的IP地址范围和端口范围
start_ip = "a.b.c.1"
end_ip = "a.b.c.255"
start_port = 22
end_port = 22
# 定义一个函数用于检查端口是否开放
def scan_port(ip, port):
try:
# 创建套接字对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置超时时间为1秒
s.settimeout(1)
# 尝试连接到目标IP的指定端口
result = s.connect_ex((ip, port))
# 如果端口开放,则输出结果
if result == 0:
print(f"{ip}:{port}")
# 关闭套接字连接
s.close()
except:
pass
# 定义一个函数用于扫描指定IP地址范围的端口
def scan_ip_range(ip_range):
with ThreadPoolExecutor(max_workers=50) as executor:
for ip in ip_range:
for port in range(start_port, end_port + 1):
executor.submit(scan_port, ip, port)
# 将IP地址范围转换为列表
def ip_range(start_ip, end_ip):
start = list(map(int, start_ip.split(".")))
end = list(map(int, end_ip.split(".")))
temp = start
ip_range = []
ip_range.append(start_ip)
while temp != end:
start[3] += 1
for i in (3, 2, 1):
if temp[i] == 256:
temp[i] = 0
temp[i-1] += 1
ip_range.append(".".join(map(str, temp)))
return ip_range
# 执行端口扫描
ip_range_list = ip_range(start_ip, end_ip)
scan_ip_range(ip_range_list)
验证200
import threading
import requests
def check_status(ip):
url = f"http://{ip}/ceshi"
try:
response = requests.get(url, timeout=1)
if response.status_code == 200:
print(f"IP {ip} 请求成功!")
except requests.RequestException:
pass # 不输出连接失败的信息
def main():
file_path = "/home/port.txt"
with open(file_path, 'r') as file:
ip_addresses = [line.strip() for line in file.readlines()]
threads = []
for ip in ip_addresses:
thread = threading.Thread(target=check_status, args=(ip,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
main()
ssh密钥爆破
import paramiko
# 设置 SSH 密钥路径和用户名
private_key_path = '/root/.ssh/id_rsa'
username = 'root'
# 读取 port1.txt 文件中的 IP 和端口
with open('port1.txt', 'r') as f:
lines = f.readlines()
# 初始化 SSH 客户端
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 遍历每一行 IP 和端口
for line in lines:
ip, port = line.strip().split(':')
try:
# 明确加载 RSA 密钥
private_key = paramiko.RSAKey.from_private_key_file(private_key_path)
# 尝试使用 SSH 连接
client.connect(ip, port=int(port), username=username, pkey=private_key)
print(f"Successfully connected to {ip}:{port}")
# 一旦成功连接,立即断开并跳出循环
client.close()
break # 如果连接成功,立刻停止后续操作
except Exception as e:
print(f"Failed to connect to {ip}:{port} - {e}")
ssh密码爆破
import paramiko
# 定义要连接的IP地址列表
ip_addresses = [
"1.1.1.1",
"1.1.1.1"
]
# 设置SSH连接的用户名和密码
username = "root"
password = "root"
# 尝试连接服务器
for ip in ip_addresses:
print(f"尝试连接到 {ip}...")
try:
# 创建 SSH 客户端
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip, username=username, password=password)
print(f"成功连接到 {ip}!")
break # 如果连接成功,则跳出循环
except Exception as e:
print(f"连接失败:{str(e)}")
finally:
client.close() # 关闭 SSH 客户端连接
匹配
import requests
from urllib3.exceptions import InsecureRequestWarning
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
# 禁用 SSL 警告
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# 创建线程锁用于打印
print_lock = threading.Lock()
def test_url(region, url, ip, port):
"""测试单个URL的可访问性"""
# 从原URL提取路径部分
path = url.split('baidu.com/')[1]
# 构建新的URL
new_url = f'http://{ip}:{port}/{path}'
try:
response = requests.get(new_url, verify=False, timeout=5)
status = response.status_code
with print_lock:
pass
except Exception as e:
status = 0
with print_lock:
pass
return (ip, port, region, new_url, status)
# 读取 URL 列表
with open('url.txt', 'r', encoding='utf-8') as file:
url_lines = [line.strip() for line in file.readlines()]
# 读取 IP 地址列表
with open('1.txt', 'r', encoding='utf-8') as file:
ip_list = [line.strip() for line in file.readlines()]
# 打印总数信息
print(f"URL 总数:{len(url_lines)}")
print(f"IP 总数:{len(ip_list)}")
print(f"总组合数:{len(url_lines) * len(ip_list)}")
# 存储新的 URL 和 IP 信息的列表
url_ip_pairs = []
# 创建任务列表
tasks = []
for line in url_lines:
region, url = line.split(' : ')
# 对每个URL,尝试所有的IP
for ip_port in ip_list:
ip, port = ip_port.split(':')
tasks.append((region, url, ip, port))
# 使用线程池执行任务
with ThreadPoolExecutor(max_workers=100) as executor:
# 提交所有任务
future_to_url = {executor.submit(test_url, region, url, ip, port): (region, url)
for region, url, ip, port in tasks}
# 收集结果
for future in as_completed(future_to_url):
result = future.result()
url_ip_pairs.append(result)
# 按 IP 地址和端口号排序
url_ip_pairs.sort(key=lambda x: (tuple(map(int, x[0].split('.'))), int(x[1])))
# 将排序后的 URL 写入 cg.txt 文件,只写入状态码为 200 的URL
with open('cg.txt', 'w', encoding='utf-8') as file:
success_count = 0
for ip, port, region, new_url, status in url_ip_pairs:
if status == 200:
file.write(f'{region} : {new_url}\n')
success_count += 1
print(f"\n总结:")
print(f"总计测试:{len(url_ip_pairs)} 个")
print(f"成功访问(状态码200):{success_count} 个")
需要修改系统限制
# 查看系统限制
ulimit -a
/etc/security/limits.conf
ulimit -n
* soft nofile 655350
* hard nofile 655350
# 查看当前进程数
ps -eLf | wc -l
# 查看系统允许的最大线程数
cat /proc/sys/kernel/threads-max
# 查看单个进程允许的最大线程数
cat /proc/sys/kernel/pid_max
优化过的
查看开放的端口
import threading
import socket
import queue
import time
import sys
import os
# 重定向错误输出到 /dev/null
sys.stderr = open(os.devnull, 'w')
# 定义要扫描的IP地址范围和端口范围
start_ip = "1.1.1.1"
end_ip = "1.1.1.255"
start_port = 1
end_port = 1
# 自定义线程数
THREAD_COUNT = 2000
# 创建任务队列和结果列表
task_queue = queue.Queue()
successful_connections = 0
connection_lock = threading.Lock()
# 创建结果列表用于存储成功的连接
results = []
results_lock = threading.Lock()
def scan_port():
global successful_connections
while True:
try:
# 从队列获取任务
try:
ip, port = task_queue.get_nowait() # 使用 get_nowait() 替代 get(timeout=1)
except queue.Empty:
return
# 创建套接字对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(3) # 减少超时时间到0.5秒
# 尝试连接
if s.connect_ex((ip, port)) == 0:
with connection_lock:
successful_connections += 1
with results_lock:
results.append(f"{ip}:{port}")
print(f"{ip}:{port}")
s.close()
task_queue.task_done()
except Exception:
try:
task_queue.task_done()
except:
pass
def ip_range(start_ip, end_ip):
start = list(map(int, start_ip.split(".")))
end = list(map(int, end_ip.split(".")))
temp = start.copy() # 使用 copy() 避免修改原始列表
ip_range = []
ip_range.append(start_ip)
while temp != end:
temp[3] += 1 # 修改 temp 而不是 start
for i in (3, 2, 1):
if temp[i] == 256:
temp[i] = 0
temp[i-1] += 1
ip_range.append(".".join(map(str, temp)))
return ip_range
def print_status(total_tasks):
remaining = task_queue.qsize()
completed = total_tasks - remaining
if completed > 0 and total_tasks > 0:
print(f"\r进度: {completed}/{total_tasks} ({completed/total_tasks*100:.1f}%) 发现开放端口: {successful_connections}", end='', flush=True)
def main():
# 获取IP范围列表
ip_range_list = ip_range(start_ip, end_ip)
total_tasks = len(ip_range_list) * (end_port - start_port + 1)
print(f"总任务数: {total_tasks}")
print(f"使用线程数: {THREAD_COUNT}")
# 将所有任务加入队列
for port in range(start_port, end_port + 1): # 先遍历端口
for ip in ip_range_list: # 再遍历IP
task_queue.put((ip, port))
# 创建并启动线程
threads = []
for _ in range(THREAD_COUNT):
t = threading.Thread(target=scan_port)
t.daemon = True
t.start()
threads.append(t)
# 显示进度
try:
while task_queue.qsize() > 0:
print_status(total_tasks)
time.sleep(0.1) # 减少刷新间隔
except KeyboardInterrupt:
print("\n扫描被用户中断")
return
# 等待所有任务完成
task_queue.join()
print("\n扫描完成")
# 将结果保存到文件
if results: # 只有在有结果时才创建文件
with open('c1.txt', 'w') as f:
for result in sorted(results, key=lambda x: (tuple(map(int, x.split(':')[0].split('.'))), int(x.split(':')[1]))):
f.write(result + '\n')
print(f"发现 {len(results)} 个开放端口,结果已保存到 c1.txt")
else:
print("未发现开放端口")
if __name__ == "__main__":
start_time = time.time()
main()
print(f"总耗时: {time.time() - start_time:.2f}秒")
ssh筛选
import paramiko
import socket
import threading
import queue
import time
import sys
import os
# 重定向标准错误输出到 /dev/null
sys.stderr = open(os.devnull, 'w')
# 禁用 paramiko 的警告信息
import warnings
warnings.filterwarnings(action='ignore',module='.*paramiko.*')
# 从文件读取IP:端口列表
with open('c1.txt', 'r') as f:
targets = [line.strip() for line in f.readlines()]
# 创建任务队列和结果列表
task_queue = queue.Queue()
password_auth = [] # 支持密码登录的服务器
results_lock = threading.Lock()
# 线程数
THREAD_COUNT = 100
def check_ssh_auth(ip, port):
try:
transport = paramiko.Transport((ip, int(port)))
transport.start_client()
# 尝试获取可用的认证方法
try:
transport.auth_none('test')
except paramiko.BadAuthenticationType as err:
# 获取服务器支持的认证方法
allowed_types = err.allowed_types
if 'password' in allowed_types:
with results_lock:
password_auth.append(f"{ip}:{port}")
print(f"发现支持密码登录的服务器: {ip}:{port}")
except:
pass
finally:
try:
transport.close()
except:
pass
def worker():
while True:
try:
target = task_queue.get_nowait()
if target is None:
break
ip, port = target.split(':')
check_ssh_auth(ip, port)
task_queue.task_done()
except queue.Empty:
break
except:
pass
def main():
print(f"开始检测 {len(targets)} 个目标...")
# 将目标添加到队列
for target in targets:
task_queue.put(target)
# 创建并启动线程
threads = []
for _ in range(THREAD_COUNT):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
threads.append(t)
# 显示进度
total = len(targets)
while task_queue.qsize() > 0:
remaining = task_queue.qsize()
completed = total - remaining
print(f"\r进度: {completed}/{total} ({completed/total*100:.1f}%) "
f"发现支持密码登录: {len(password_auth)}",
end='', flush=True)
time.sleep(0.1)
# 等待所有任务完成
task_queue.join()
print("\n检测完成")
# 显示最终结果
if password_auth:
print(f"\n发现 {len(password_auth)} 个支持密码登录的服务器:")
for result in sorted(password_auth, key=lambda x: tuple(map(int, x.split(':')[0].split('.') + [x.split(':')[1]]))):
print(result)
else:
print("未发现支持密码登录的服务器")
if __name__ == "__main__":
start_time = time.time()
main()
print(f"总耗时: {time.time() - start_time:.2f}秒")
4 条评论
文章的叙述风格独特,用词精准,让人回味无穷。
这是一篇佳作,无论是从内容、语言还是结构上,都堪称完美。
若能对分论点进一步细分,结构会更立体。
作者以简洁明了的语言,传达了深刻的思想和情感。