SSH登录邮件提醒

缘由

笔者想在每次远程登录服务器的时候都能有一个提醒。最开始想通过短信的方式,后续发现短信费用有点贵,也不划算。考虑通过邮件服务,一样能达到同样效果,还免费。

以下内容均使用root,只考虑超管用户的登录设置提醒。任意用户登录需要考虑文件执行权限。生产环境谨慎使用,做好容错处理以及测试。后续有注意事项,请查阅。

环境搭建

  1. 准备一个邮箱
    这个邮箱是作为发件人。笔者建议申请一个新的邮箱。然后开启这个邮箱的 POP3/SMTP服务,拿到收发邮件授权码。这里我默认用的网易的邮箱。
    关于申请邮件授权码更详细内容可以参考
    https://blog.csdn.net/qq_48896417/article/details/133903185

  2. 准备python的环境
    笔者这个脚本是python运行。需要使用python环境,这里使用conda。
    下载安装miniconda,可以在百度搜索 miniconda下载安装。
    下载完成后按照官方给出的步骤执行后续安装步骤。

安装成功后我们先创建一个python环境

# 创建python环境
conda create -n email-ssh python=3.10

# 切换到新创建的环境
conda activate email-ssh
  1. 下载安装redis单机版
    如果有redis单机模式,那么可以使用。没有或者不想用redis可以不下载。用tinydb本地数据库代替。修改项目的配置文件,后续会提。

  2. 注册ipinfo
    申请一个免费 token 。笔者注册ipinfo是为了获取登录IP更详细的信息。
    如果不想获取登录地址的详细信息,也可以不弄。
    可以在百度搜索 ipinfo.io。

脚本配置

以下内容笔者均使用root操作,方便。

创建项目文件夹

# 创建项目配置文件夹及日志路径以及数据库
cd ~
mkdir -p ~/python_email
mkdir -p ~/python_email/db/
touch ~/python_email/db/db.json
mkdir -p ~/python_email/logs

cd ~/python_email
ls -l

把下列的文件名以及内容都拷贝到服务器上,路径就是刚才创建的文件夹(python_email)下。
总共有 config.py send_email.py requirements.txt encrypt.py 这四个文件,文件名称可以和笔者的保持一致,方便后续操作。

config.py

# -*- coding: utf-8 -*-
EmailList = ["test@qq.com"]

# encrypt data
UserEmail = b''
PasswdEmail = b''

EmailTitle = "VMware测试登录"
EmailHost = "smtp.163.com"
EmailPort = 465 

# default use "redis", or "tinydb" instead
DB_CHOOSE = "tinydb"

# redis config
REDIS_HOST = ""
REDIS_PORT = 65535
REDIS_PASSWD = ""
RED_PRE = ""
RED_TTL = 0  # seconds

# tinyDB config
TinyDB_PATH = r"/root/python_email/db/db.json"
TinyDB_SECONDS = 360  # seconds
TinyDB_TABLE_SSH_LOGIN = "ssh_login"

# log config
BACKUP_COUNT = 3
LOG_PATH = r"/root/python_email/logs/ssh-login.log"

# ipinfo config API: https://ipinfo.io
IPINFO_TOKEN = None

EmailList是收件人列表,换成需要的收件人即可。
UserEmail 和 PasswdEmail 两个字段是发件人邮箱和授权码。目前这里是空字符串,后续这里会分别放入密文的内容。稍后笔者会贴上代码。
EmailTitle是邮件标题,这里可以自定义,其余的日志文件路径以及日志保存天数也都可以自定义。
如果不是开通的163邮箱,那么这个EmailHost字段的内容需要调整下。端口应该默认都是一致的。

send_email.py

# -*- coding: utf-8 -*-
# @Time    : 2023/12/24 19:46
import math
import sys
import redis
import logging
import datetime
import requests
from config import *
from logging.handlers import TimedRotatingFileHandler
from smtplib import SMTP_SSL
from email.mime.text import MIMEText
from cryptography.fernet import Fernet
from tinydb import TinyDB, Query

logger = logging.getLogger("shh-login")
logger.setLevel(logging.INFO)
ssh_handler = TimedRotatingFileHandler(LOG_PATH, "midnight", interval=1, backupCount=BACKUP_COUNT, encoding="utf-8")
ssh_handler.suffix = "%Y-%m-%d"
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ssh_handler.setFormatter(formatter)

# console_handler = logging.StreamHandler()
# console_handler.setLevel(logging.INFO)

logger.addHandler(ssh_handler)
# logger.addHandler(console_handler)

NOW = datetime.datetime.now()


class SimpleEmail:
    """
    SimpleEmail class init
    """

    def __init__(self, ip, message, key):
        self.red_ip = RED_PRE + ip
        self.ip = ip
        self.message = message
        self._key = key
        self._username = None
        self._password = None
        self.host = EmailHost
        self.port = EmailPort
        self.smtp = None
        self._msg = None
        self._db_choose = DB_CHOOSE
        self._now = NOW
        self._init_config()

    def _init_config(self):
        self._init_db()
        self._decrypt_passwd()

    def _init_db(self):
        """
        init db => redis or tinydb
        :return: 
        """
        if DB_CHOOSE == "redis":
            self._init_redis()
        else:
            self._init_tinydb()
        logger.info("use %s" % self._db_choose)

    def _init_redis(self):
        """
        redis
        :return:
        """
        try:
            self._redis = redis.Redis(host=REDIS_HOST,
                                      port=REDIS_PORT,
                                      password=REDIS_PASSWD,
                                      db=0, decode_responses=True)
            logger.info("redis ping %s" % self._redis.ping())
        except Exception as e:
            logger.error("redis init failed: %s use tinydb instead redis" % e)
            self._db_choose = "tinydb"
            self._init_tinydb()

    def _init_tinydb(self):
        """
        tinydb
        :return:
        """
        try:
            db = TinyDB(TinyDB_PATH)
            self._tiny_db = db
        except Exception as e:
            logger.error("tinydb init failed: %s" % e)
            exit(1)

    def _decrypt_passwd(self):
        """
        decrypt
        :return:
        """
        try:
            f = Fernet(self._key.encode())
            self._username = f.decrypt(UserEmail).decode()
            self._password = f.decrypt(PasswdEmail).decode()
        except Exception as e:
            logger.error("解密失败: %s" % e)
            exit(1)

    def _login(self):
        """
        login mail server
        :return:
        """
        try:
            self.smtp = SMTP_SSL(host=self.host, port=self.port)
            self.smtp.login(user=self._username, password=self._password)
        except Exception as e:
            logger.error("login mail server failed: %s" % e)
            exit(1)

    def _build_msg(self):
        """
        mail protocol message
        :return: 
        """
        msg = MIMEText(self.message, "plain", _charset="utf-8")
        msg["Subject"] = "{tdate} {title}".format(tdate=self._now.strftime("%Y-%m-%d"), title=EmailTitle)
        msg["from"] = self._username
        msg["to"] = ','.join(EmailList)
        self._msg = msg

    def _before_mail(self):
        """
        read ip from redis/tinydb
        :return:
        """
        if self._db_choose == "tinydb":
            ssh_query = Query()
            time_threshold = math.floor((self._now - datetime.timedelta(seconds=TinyDB_SECONDS)).timestamp())
            ssh_login_lst = self._tiny_db.table(TinyDB_TABLE_SSH_LOGIN).search(
                (ssh_query.ip == self.ip) & (ssh_query.create_time > time_threshold)
            )
            if not ssh_login_lst:
                logger.info("read from %s, this ip %s not login recently" % (self._db_choose, self.ip))
                self._tiny_db.table(TinyDB_TABLE_SSH_LOGIN).insert({
                    "create_time": math.floor(self._now.timestamp()),
                    "ip": self.ip
                })
                return True
            else:
                logger.info("read from %s, this ip %s is already in cache" % (self._db_choose, self.ip))
                return False

        store_ip = self._redis.get(self.red_ip)
        if not store_ip:
            logger.info("read from %s, this ip %s not login recently" % (self._db_choose, self.ip))
            self._redis.setex(self.red_ip, RED_TTL, 0)
            self._redis.close()
            return True
        else:
            logger.info("read from %s, this ip %s is already in cache" % (self._db_choose, self.ip))
            return False

    def send(self):
        """
        send mail
        :return:
        """
        try:
            if not self._before_mail():
                logger.info("program exit")
                exit(1)

            self._login()
            self._build_msg()
            self.smtp.sendmail(from_addr=self._username, to_addrs=EmailList, msg=self._msg.as_string())
            logger.info("mail send successful %s" % ','.join(EmailList))
        except Exception as e:
            logger.error("mail send failed: %s" % str(e))
        finally:
            if self.smtp is not None:
                logger.info("logout mail server")
                self.smtp.quit()


def search_ip_information(ip):
    base_msg = "\ntime: " + NOW.strftime("%Y-%m-%d %H:%M:%S") + "\n"
    if not IPINFO_TOKEN:
        body_msg = "\n IP Address: " + ip
        return base_msg + body_msg
    req_url = "https://ipinfo.io/{ip}?token={token}".format(ip=ip, token=IPINFO_TOKEN)
    try:
        response = requests.get(url=req_url)
        if response.status_code == 200:
            body_msg = "\n".join([f"{key} : {value}" for key, value in response.json().items()])
        else:
            body_msg = ip
        return base_msg + body_msg
    except ValueError as e:
        logger.info("value exception: %s, %s" % (ip, e))
    except Exception as e:
        logger.info("get ip information failed: %s, %s" % (ip, e))


def main():
    # check config
    login_argv = sys.argv
    if len(login_argv) != 6:
        logger.error("ssh information get failed: %s" % len(login_argv))
        exit(1)
    if DB_CHOOSE not in ["redis", "tinydb"]:
        logger.error("db config wrong!")
        exit(1)

    login_user, login_ip, login_time, login_hostname, secret_key = login_argv[1], login_argv[2], login_argv[3], login_argv[4], login_argv[5]
    logger.info("login information, user: %s , ip: %s , time: %s" % (login_user, login_ip, login_time))
    try:
        format_message = search_ip_information(login_ip)
        body = f"SSH login {login_hostname} {login_ip} {login_user} login reminder\n{format_message}"
        simple_email = SimpleEmail(ip=login_ip, message=body, key=secret_key)
        simple_email.send()
    except Exception as e:
        logger.error(e)


if '__main__' == __name__:
    main()

requirements.txt

requests==2.31.0
redis==5.0.1
cryptography==41.0.7
tinydb==4.8.0

安装依赖库

conda activate email-ssh
pip install -r requirements.txt

配置SSH执行python脚本,这里key是空字符串,后续这里会放入生成的key。

vim /etc/ssh/sshrc

#!/bin/bash

user=$USER
ip=${SSH_CLIENT%% *}
time=$(date +%F%t%k:%M)
hostname=$(hostname)
key=""

~/miniconda3/envs/email-ssh/bin/python ~/python_email/send_email.py  "$user" "$ip" "$time" "$hostname" "$key"

加密代码
encrypt.py

# -*- coding: utf-8 -*-
from cryptography.fernet import Fernet

u = '发件人邮箱'
p = '邮箱授权码'

def encrypt():
    key = Fernet.generate_key()
    f = Fernet(key)
    username = f.encrypt(u.encode())
    passwd = f.encrypt(p.encode())

    print(f"username {username} ,\r\n passwd {passwd},\r\n key {key}")

if '__main__' == __name__:
    encrypt()
conda activate email-ssh
python encrypt.py
>>> username  b"username"
>>> passwd b"passwd"
>>> key b"key"

把最后得到的密文内容 username 和 passwd 替换config.py里面
UserEmail = b''"
PasswdEmail = b''"
key替换掉 /etc/ssh/sshrc 里面的key

关于获取IP地理位置,笔者这里使用的是ipinfo这个网站的接口,将token填入 config.py里的 IPINFO_TOKEN 即可。

登录测试

最后只需要登录即可收到对应邮件。日志就在config.py里面配置的路径。

注意事项

  1. 本脚本只考虑针对一个用户做提醒。多用户要考虑脚本执行权限以及conda解释器的执行权限,还有 /etc/ssh/sshrc 文件的执行权限。
  2. 使用tinydb本地数据库的方式需要考虑本地数据库过大的情况。这个可以定期清空下数据库。
  3. 笔者测试过脚本即使报错也不影响服务器的登录。谨慎考虑在测试登录的时候一定要保留一个或多个超管用户活跃的登录窗口,以防止报错导致无法登录服务器。如果发现问题可以注释掉 sshrc 文件里的脚本调用代码。
  4. Python环境建议3.8以上。
  5. 建议可以在虚拟机里调试配置好再拿到真实服务器上。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容