缘由
笔者想在每次远程登录服务器的时候都能有一个提醒。最开始想通过短信的方式,后续发现短信费用有点贵,也不划算。考虑通过邮件服务,一样能达到同样效果,还免费。
以下内容均使用root,只考虑超管用户的登录设置提醒。任意用户登录需要考虑文件执行权限。生产环境谨慎使用,做好容错处理以及测试。后续有注意事项,请查阅。
环境搭建
准备一个邮箱
这个邮箱是作为发件人。笔者建议申请一个新的邮箱。然后开启这个邮箱的 POP3/SMTP服务,拿到收发邮件授权码。这里我默认用的网易的邮箱。
关于申请邮件授权码更详细内容可以参考
https://blog.csdn.net/qq_48896417/article/details/133903185准备python的环境
笔者这个脚本是python运行。需要使用python环境,这里使用conda。
下载安装miniconda,可以在百度搜索 miniconda下载安装。
下载完成后按照官方给出的步骤执行后续安装步骤。
安装成功后我们先创建一个python环境
# 创建python环境
conda create -n email-ssh python=3.10
# 切换到新创建的环境
conda activate email-ssh
下载安装redis单机版
如果有redis单机模式,那么可以使用。没有或者不想用redis可以不下载。用tinydb本地数据库代替。修改项目的配置文件,后续会提。注册ipinfo
申请一个免费 token 。笔者注册ipinfo是为了获取登录IP更详细的信息。
如果不想获取登录地址的详细信息,也可以不弄。
可以在百度搜索 ipinfo.io。
脚本配置
以下内容笔者均使用root操作,方便。
创建项目文件夹
# 创建项目配置文件夹及日志路径以及数据库
cd ~
mkdir -p ~/python_email
mkdir -p ~/python_email/db/
touch ~/python_email/db/db.json
mkdir -p ~/python_email/logs
cd ~/python_email
ls -l
把下列的文件名以及内容都拷贝到服务器上,路径就是刚才创建的文件夹(python_email)下。
总共有 config.py send_email.py requirements.txt encrypt.py 这四个文件,文件名称可以和笔者的保持一致,方便后续操作。
config.py
# -*- coding: utf-8 -*-
EmailList = ["test@qq.com"]
# encrypt data
UserEmail = b''
PasswdEmail = b''
EmailTitle = "VMware测试登录"
EmailHost = "smtp.163.com"
EmailPort = 465
# default use "redis", or "tinydb" instead
DB_CHOOSE = "tinydb"
# redis config
REDIS_HOST = ""
REDIS_PORT = 65535
REDIS_PASSWD = ""
RED_PRE = ""
RED_TTL = 0 # seconds
# tinyDB config
TinyDB_PATH = r"/root/python_email/db/db.json"
TinyDB_SECONDS = 360 # seconds
TinyDB_TABLE_SSH_LOGIN = "ssh_login"
# log config
BACKUP_COUNT = 3
LOG_PATH = r"/root/python_email/logs/ssh-login.log"
# ipinfo config API: https://ipinfo.io
IPINFO_TOKEN = None
EmailList是收件人列表,换成需要的收件人即可。
UserEmail 和 PasswdEmail 两个字段是发件人邮箱和授权码。目前这里是空字符串,后续这里会分别放入密文的内容。稍后笔者会贴上代码。
EmailTitle是邮件标题,这里可以自定义,其余的日志文件路径以及日志保存天数也都可以自定义。
如果不是开通的163邮箱,那么这个EmailHost字段的内容需要调整下。端口应该默认都是一致的。
send_email.py
# -*- coding: utf-8 -*-
# @Time : 2023/12/24 19:46
import math
import sys
import redis
import logging
import datetime
import requests
from config import *
from logging.handlers import TimedRotatingFileHandler
from smtplib import SMTP_SSL
from email.mime.text import MIMEText
from cryptography.fernet import Fernet
from tinydb import TinyDB, Query
logger = logging.getLogger("shh-login")
logger.setLevel(logging.INFO)
ssh_handler = TimedRotatingFileHandler(LOG_PATH, "midnight", interval=1, backupCount=BACKUP_COUNT, encoding="utf-8")
ssh_handler.suffix = "%Y-%m-%d"
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ssh_handler.setFormatter(formatter)
# console_handler = logging.StreamHandler()
# console_handler.setLevel(logging.INFO)
logger.addHandler(ssh_handler)
# logger.addHandler(console_handler)
NOW = datetime.datetime.now()
class SimpleEmail:
"""
SimpleEmail class init
"""
def __init__(self, ip, message, key):
self.red_ip = RED_PRE + ip
self.ip = ip
self.message = message
self._key = key
self._username = None
self._password = None
self.host = EmailHost
self.port = EmailPort
self.smtp = None
self._msg = None
self._db_choose = DB_CHOOSE
self._now = NOW
self._init_config()
def _init_config(self):
self._init_db()
self._decrypt_passwd()
def _init_db(self):
"""
init db => redis or tinydb
:return:
"""
if DB_CHOOSE == "redis":
self._init_redis()
else:
self._init_tinydb()
logger.info("use %s" % self._db_choose)
def _init_redis(self):
"""
redis
:return:
"""
try:
self._redis = redis.Redis(host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWD,
db=0, decode_responses=True)
logger.info("redis ping %s" % self._redis.ping())
except Exception as e:
logger.error("redis init failed: %s use tinydb instead redis" % e)
self._db_choose = "tinydb"
self._init_tinydb()
def _init_tinydb(self):
"""
tinydb
:return:
"""
try:
db = TinyDB(TinyDB_PATH)
self._tiny_db = db
except Exception as e:
logger.error("tinydb init failed: %s" % e)
exit(1)
def _decrypt_passwd(self):
"""
decrypt
:return:
"""
try:
f = Fernet(self._key.encode())
self._username = f.decrypt(UserEmail).decode()
self._password = f.decrypt(PasswdEmail).decode()
except Exception as e:
logger.error("解密失败: %s" % e)
exit(1)
def _login(self):
"""
login mail server
:return:
"""
try:
self.smtp = SMTP_SSL(host=self.host, port=self.port)
self.smtp.login(user=self._username, password=self._password)
except Exception as e:
logger.error("login mail server failed: %s" % e)
exit(1)
def _build_msg(self):
"""
mail protocol message
:return:
"""
msg = MIMEText(self.message, "plain", _charset="utf-8")
msg["Subject"] = "{tdate} {title}".format(tdate=self._now.strftime("%Y-%m-%d"), title=EmailTitle)
msg["from"] = self._username
msg["to"] = ','.join(EmailList)
self._msg = msg
def _before_mail(self):
"""
read ip from redis/tinydb
:return:
"""
if self._db_choose == "tinydb":
ssh_query = Query()
time_threshold = math.floor((self._now - datetime.timedelta(seconds=TinyDB_SECONDS)).timestamp())
ssh_login_lst = self._tiny_db.table(TinyDB_TABLE_SSH_LOGIN).search(
(ssh_query.ip == self.ip) & (ssh_query.create_time > time_threshold)
)
if not ssh_login_lst:
logger.info("read from %s, this ip %s not login recently" % (self._db_choose, self.ip))
self._tiny_db.table(TinyDB_TABLE_SSH_LOGIN).insert({
"create_time": math.floor(self._now.timestamp()),
"ip": self.ip
})
return True
else:
logger.info("read from %s, this ip %s is already in cache" % (self._db_choose, self.ip))
return False
store_ip = self._redis.get(self.red_ip)
if not store_ip:
logger.info("read from %s, this ip %s not login recently" % (self._db_choose, self.ip))
self._redis.setex(self.red_ip, RED_TTL, 0)
self._redis.close()
return True
else:
logger.info("read from %s, this ip %s is already in cache" % (self._db_choose, self.ip))
return False
def send(self):
"""
send mail
:return:
"""
try:
if not self._before_mail():
logger.info("program exit")
exit(1)
self._login()
self._build_msg()
self.smtp.sendmail(from_addr=self._username, to_addrs=EmailList, msg=self._msg.as_string())
logger.info("mail send successful %s" % ','.join(EmailList))
except Exception as e:
logger.error("mail send failed: %s" % str(e))
finally:
if self.smtp is not None:
logger.info("logout mail server")
self.smtp.quit()
def search_ip_information(ip):
base_msg = "\ntime: " + NOW.strftime("%Y-%m-%d %H:%M:%S") + "\n"
if not IPINFO_TOKEN:
body_msg = "\n IP Address: " + ip
return base_msg + body_msg
req_url = "https://ipinfo.io/{ip}?token={token}".format(ip=ip, token=IPINFO_TOKEN)
try:
response = requests.get(url=req_url)
if response.status_code == 200:
body_msg = "\n".join([f"{key} : {value}" for key, value in response.json().items()])
else:
body_msg = ip
return base_msg + body_msg
except ValueError as e:
logger.info("value exception: %s, %s" % (ip, e))
except Exception as e:
logger.info("get ip information failed: %s, %s" % (ip, e))
def main():
# check config
login_argv = sys.argv
if len(login_argv) != 6:
logger.error("ssh information get failed: %s" % len(login_argv))
exit(1)
if DB_CHOOSE not in ["redis", "tinydb"]:
logger.error("db config wrong!")
exit(1)
login_user, login_ip, login_time, login_hostname, secret_key = login_argv[1], login_argv[2], login_argv[3], login_argv[4], login_argv[5]
logger.info("login information, user: %s , ip: %s , time: %s" % (login_user, login_ip, login_time))
try:
format_message = search_ip_information(login_ip)
body = f"SSH login {login_hostname} {login_ip} {login_user} login reminder\n{format_message}"
simple_email = SimpleEmail(ip=login_ip, message=body, key=secret_key)
simple_email.send()
except Exception as e:
logger.error(e)
if '__main__' == __name__:
main()
requirements.txt
requests==2.31.0
redis==5.0.1
cryptography==41.0.7
tinydb==4.8.0
安装依赖库
conda activate email-ssh
pip install -r requirements.txt
配置SSH执行python脚本,这里key是空字符串,后续这里会放入生成的key。
vim /etc/ssh/sshrc
#!/bin/bash
user=$USER
ip=${SSH_CLIENT%% *}
time=$(date +%F%t%k:%M)
hostname=$(hostname)
key=""
~/miniconda3/envs/email-ssh/bin/python ~/python_email/send_email.py "$user" "$ip" "$time" "$hostname" "$key"
加密代码
encrypt.py
# -*- coding: utf-8 -*-
from cryptography.fernet import Fernet
u = '发件人邮箱'
p = '邮箱授权码'
def encrypt():
key = Fernet.generate_key()
f = Fernet(key)
username = f.encrypt(u.encode())
passwd = f.encrypt(p.encode())
print(f"username {username} ,\r\n passwd {passwd},\r\n key {key}")
if '__main__' == __name__:
encrypt()
conda activate email-ssh
python encrypt.py
>>> username b"username"
>>> passwd b"passwd"
>>> key b"key"
把最后得到的密文内容 username 和 passwd 替换config.py里面
UserEmail = b''"
PasswdEmail = b''"
key替换掉 /etc/ssh/sshrc 里面的key
关于获取IP地理位置,笔者这里使用的是ipinfo这个网站的接口,将token填入 config.py里的 IPINFO_TOKEN 即可。
登录测试
最后只需要登录即可收到对应邮件。日志就在config.py里面配置的路径。
注意事项
- 本脚本只考虑针对一个用户做提醒。多用户要考虑脚本执行权限以及conda解释器的执行权限,还有 /etc/ssh/sshrc 文件的执行权限。
- 使用tinydb本地数据库的方式需要考虑本地数据库过大的情况。这个可以定期清空下数据库。
- 笔者测试过脚本即使报错也不影响服务器的登录。谨慎考虑在测试登录的时候一定要保留一个或多个超管用户活跃的登录窗口,以防止报错导致无法登录服务器。如果发现问题可以注释掉 sshrc 文件里的脚本调用代码。
- Python环境建议3.8以上。
- 建议可以在虚拟机里调试配置好再拿到真实服务器上。