11-27 54 views
代码详见开源项目:https://github.com/itnotebooks/cert_manage
本篇是django+celery实现的,由于篇幅的原因本篇中像list页面没有写在这里,相对比较简单
本篇所实现的证书管理不局限于SSL证书,像推送证书、Apple的发布证书、开发者证书等等都可以
0. 最终效果
1. 定义Model
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
class Certs(models.Model): ''' 证书 ''' CERT_METHOD_CHOICES = ( (0, _('Common Domain')), (1, _('Local Pem File')), ) id = models.UUIDField(default=uuid.uuid4, primary_key=True) name = models.CharField(max_length=45, verbose_name=_("Name"), null=False) # 这里的domain会稍微有歧义哈,因为我的环境中主要就是给域名使用的,如果是APP相关的证书请修改成一个相对好理解点的名字 domain = models.CharField(max_length=128, verbose_name=_("Common Domain"), null=True) orther_domain = models.TextField(max_length=4096, null=True, verbose_name=_("Orther Domain")) # 组织名称 organization_name = models.CharField(max_length=64, null=True, blank=True, verbose_name=_("Organization Name")) serial_number = models.CharField(max_length=64, null=True, blank=True, verbose_name=_("Serial number")) # 签发者 issued_by = models.CharField(max_length=64, null=True, verbose_name=_('Issued By')) # 证书类型:DV、OV、EV... cert_type = models.CharField(max_length=4, null=True, blank=True, verbose_name=_("SSL Cert Type")) # 联网请求还是本地的证书文件 method = models.BooleanField(default=0, choices=CERT_METHOD_CHOICES, verbose_name=_("Cert Detection Approach")) domain_url = models.CharField(max_length=128, null=True, blank=True, verbose_name=_("Domain Url")) pem_file = models.TextField(null=True, blank=True, verbose_name=_("Local Pem File")) # 起至日期 notbefore = models.CharField(max_length=24, null=True, blank=True, verbose_name=_('Not Before')) notafter = models.CharField(max_length=24, null=True, blank=True, verbose_name=_('Not After')) # 剩余天数 remain_days = models.IntegerField(max_length=4, null=True, blank=True, verbose_name=_('Remaining Days')) # 负责人,关联用户表 users = models.ManyToManyField(User, verbose_name=_("Contact")) # 记录创建时间 create_date = models.DateTimeField(auto_now_add=True, null=True, blank=True, verbose_name=_('Date created')) # 描述 comment = models.TextField(max_length=128, null=True, blank=True, verbose_name=_('Comment')) def __str__(self): return self.domain @property def get_method(self): return self.get_method_display class Meta: db_table = "assets_certs" |
2. 定义Form
Create和Update共用了同一个Form
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
class SSLCertCreateUpdateForm(forms.ModelForm): # 这里通过checkbox选择是通过联网请求证书还是使用本地的证书 is_domain = forms.BooleanField(initial=True, required=False, help_text=_('common domain OR local pem file'), label='is_domain') class Meta: model = Certs fields = [ 'name', 'method', 'domain_url', 'pem_file', 'users', 'comment' ] widgets = { 'domain_url': forms.TextInput( attrs={ 'placeholder': _('Common Domain') } ), 'users': forms.SelectMultiple(attrs={ 'class': 'select2', 'data-placeholder': _('Select users') }), } help_texts = { 'name': '* required', 'domain_url': 'eg: www.itnotebooks.com', } # 重写init方法 def __init__(self, initial, *args, **kwargs): super(SSLCertCreateUpdateForm, self).__init__(*args, **kwargs) # 只有update时,会传pk id过来 if initial.get('pk'): cert = Certs.objects.get(id=initial.get('pk')) if int(cert.method) == 1: self.fields['is_domain'].initial = False # 用户选择时跳过admin用户 self.fields['users'].queryset = User.objects.exclude(username__in=['admin']) # 重写save方法 def save(self, commit=True): certs = super().save(commit=commit) # 根据checkbox给method赋相应的值 if not self.cleaned_data.get('is_domain'): certs.method = 1 certs.save() return certs |
3. 定义view和路由
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
class SSLCertCreateView(AdminUserRequiredMixin, SuccessMessageMixin, CreateView): model = Certs form_class = forms.SSLCertCreateUpdateForm template_name = 'ssl_cert/ssl_cert_create_update.html' success_url = reverse_lazy('ssl-cert-assets:ssl-cert-list') def get_form(self, form_class=None): form = super().get_form(form_class=form_class) return form # 重写get_form_kwargs,在initial中传入参数pk def get_form_kwargs(self): kwargs = super(SSLCertCreateView, self).get_form_kwargs() if hasattr(self, 'object'): kwargs.update({'instance': self.object}) # Create时没有pk id,所以给个False kwargs['initial']['pk'] = False return kwargs def get_context_data(self, **kwargs): context = { 'app': _('SSLCert'), 'action': _('Create asset'), } kwargs.update(context) return super().get_context_data(**kwargs) def get_success_message(self, cleaned_data): return create_success_msg % ({"name": cleaned_data["name"]}) |
1 |
url(r'^ssl-cert/create/$', views.SSLCertCreateView.as_view(), name='ssl-cert-create'), |
4. 模板
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
{% block form %} <form action="" method="post" class="form-horizontal"> {% if form.non_field_errors %} <div class="alert alert-danger"> {{ form.non_field_errors }} </div> {% endif %} {% csrf_token %} <h3>{% trans 'Basic' %}</h3> {% bootstrap_field form.name layout="horizontal" %} <div class="id-domain"> <div class="form-group"> <label for="{{ form.is_domain.id_for_label }}" class="col-sm-2 control-label">通过域名验证</label> <div class="col-sm-8"> {{ form.is_domain }} <div class="help-block"> {{ form.is_domain.help_text }} </div> </div> </div> </div> <div class="common-domain"> {% bootstrap_field form.domain_url layout="horizontal" %} </div> <div class="local-pem-file"> {% bootstrap_field form.pem_file layout="horizontal" %} </div> {% bootstrap_field form.users layout="horizontal" %} {% bootstrap_field form.comment layout="horizontal" %} <div class="hr-line-dashed"></div> <div class="form-group"> <div class="col-sm-4 col-sm-offset-2"> <button class="btn btn-default" type="reset"> {% trans 'Reset' %}</button> <button id="submit_button" class="btn btn-primary" type="submit">{% trans 'Submit' %}</button> </div> </div> </form> {% endblock %} {% block custom_foot_js %} <script> var is_domain = '#' + '{{ form.is_domain.id_for_label }}'; function is_domainDisplay() { if ($(is_domain).prop('checked')) { $('.local-pem-file').addClass('hidden'); $('.common-domain').removeClass('hidden'); } else { $('.common-domain').addClass('hidden'); $('.local-pem-file').removeClass('hidden'); } } $(document).ready(function () { $('.select2').select2({ allowClear: true }); is_domainDisplay() }).on('change', is_domain, function () { is_domainDisplay() }) </script> {% endblock %} |
4.1 通过访问在线服务器联网解析
至此这个简单的页面就完成了
4.2 通过上传本地证书文件解析
5. Task
我这里是利用celery做的task任务,会定期去刷新证书的详细信息
5.1 定义一个共通方法
此方法用于解析证书详细信息,返回一个字典
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
from datetime import datetime from urllib3.contrib import pyopenssl as reqs # 从域名或pem文件解析SSL证书,获取签发信息 def load_certificate(method, obj): #这里是判断是在线解析,还是本地pem文件解析 if method == 0: cert = reqs.ssl.get_server_certificate((obj, 443)) elif method == 1: cert = obj try: x509 = reqs.OpenSSL.crypto.load_certificate(reqs.OpenSSL.crypto.FILETYPE_PEM, cert) # 获取有效期 notbefore = datetime.strptime(x509.get_notBefore().decode()[0:-1], '%Y%m%d%H%M%S') notafter = datetime.strptime(x509.get_notAfter().decode()[0:-1], '%Y%m%d%H%M%S') # 获取剩余天数 remain_days = notafter - datetime.now() # 组织名称 organization_name = x509.get_subject().organizationName serial_number = x509.get_subject().serialNumber # 判断证书类型 # 目前只发现了能通过这种方式进行区分,如果有更合理的方式记得在评论中告知哈 if serial_number: cert_type = 'EV' elif not organization_name: cert_type = 'DV' else: cert_type = 'OV' ret = { 'domain': x509.get_subject().CN, 'orther_domain': reqs.get_subj_alt_name(x509), 'organization_name': organization_name, 'serial_number': serial_number, 'issued_by': x509.get_issuer().CN, 'cert_type': cert_type, 'notbefore': notbefore.strftime('%Y-%m-%d %H:%M:%S'), 'notafter': notafter.strftime('%Y-%m-%d %H:%M:%S'), 'remain_days': remain_days.days, } except Exception as e: raise Exception(e) return ret |
5.2 邮件模板
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
from django.utils.translation import ugettext as _ def certs_messages_remaind_email(cert, cert_info): base_message = '还有Days天到期' if int(cert_info.get('remain_days')) == 0: message = '已到期' elif int(cert_info.get('remain_days')) < 0: message = '已过期' elif int(cert_info.get('remain_days')) in [90, 60, 45, 30, 15] or int(cert_info.get('remain_days')) <= 14: message = base_message.replace('Days', str(cert_info.get('remain_days'))) else: message = None if message: if len(cert_info.get('orther_domain')) > 0: orther_domain = [domain[1] for domain in cert_info.get('orther_domain')] else: orther_domain = cert_info.get('orther_domain') subject = _('重要通知:%(domain)s证书到期提醒(%(time)s)') % {'domain': cert_info.get('domain'), 'time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')} recipient_list = [user.email for user in cert.users.all()] # 我是遵照html的写法,实际的邮件模板中还有一些html的语法在里面 message = _(""" 艾瑞巴蒂~</br> </br> %(domain)s证书%(message)s,避免影响业务的正常运行请及时更新。</br> =========================================================</br> 颁发机构: %(issued_by)s</br> 证书类型: %(cert_type)s</br> 域名信息: %(domain)s</br> 备用域名: %(orther_domain)s</br> 有效期至: %(notafter)s</br> =========================================================</br> """) % { 'domain': cert_info.get('domain'), 'message': message, 'issued_by': cert_info.get('issued_by'), 'cert_type': cert_info.get('cert_type'), 'orther_domain': orther_domain, 'notafter': cert_info.get('notafter'), } if settings.DEBUG: logger.debug(message) send_mail_async.delay(subject, message, recipient_list, html_message=message) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
from django.core.mail import send_mail from django.conf import settings from django.utils.translation import ugettext as _ from celery import shared_task @shared_task def send_mail_async(*args, **kwargs): if len(args) == 3: args = list(args) args[0] = settings.EMAIL_SUBJECT_PREFIX + args[0] args.insert(2, settings.EMAIL_HOST_USER) args = tuple(args) try: send_mail(*args, **kwargs) except Exception as e: logger.error("Sending mail error: {}".format(e)) |
5.3 task
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
@shared_task def refresh_certs_information_to_db(): for cert in Certs.objects.all(): if cert.method == 0: cert_info = load_certificate(cert.method, cert.domain_url) elif cert.method == 1: cert_info = load_certificate(cert.method, cert.pem_file) for k, v in cert_info.items(): setattr(cert, k, v) if k.startswith('remain_days'): if int(v) <= 90: certs_messages_remaind_email(cert, cert_info) cert.save() return 0 |
6. 页面效果
7. 邮件效果
如果想赏钱,可以用微信扫描下面的二维码,一来能刺激我写博客的欲望,二来好维护云主机的费用; 另外再次标注博客原地址 itnotebooks.com 感谢!
请问可以开源吗?谢谢
近期会单独整理出来放到github上
你好,请问,你在model中设计表结构的时候,verbose_name 使用 _(“xxx”) 这种类似写法有什么神奇的用法么?在google也没看到类似的写法
我之前都写的中文名称。还请指教下 😛 😛
这是django的ugettext_lazy做了个别名