标签:config query join void upload you mysql基础 b2b open
抽屉官网:http://dig.chouti.com/
settings = {
‘template_path‘: ‘views‘, #模板文件路径
‘static_path‘: ‘statics‘, #静态文件路径
‘static_url_prefix‘: ‘/statics/‘, #静态文件前缀
‘autoreload‘: True,
‘ui_methods‘: mt
}
application = tornado.web.Application([
(r"/index", home.IndexHandler), #主页
(r"/check_code", account.CheckCodeHandler), #验证码
(r"/send_msg", account.SendMsgHandler), #邮箱验证码
(r"/register", account.RegisterHandler), #注册
(r"/login", account.LoginHandler), #登陆
(r"/upload_image", home.UploadImageHandler), #上传图片
(r"/comment", home.CommentHandler), #评论
(r"/favor", home.FavorHandler), #点赞
], **settings)

下面我们将根据上图文件目录由上到下做一一分析:
本项目所有前端反馈均是通过BaseResponse类实现的:
class BaseResponse:
def __init__(self):
self.status = False #状态信息,是否注册成功,是否登陆成功,是否点赞成功、是否评论成功等
self.code = StatusCodeEnum.Success
self.data = None #前端需要展示的数据
self.summary = None #错误信息
self.message = {} #字典类型的错误信息
前端:
后端:
数据库:
缓存:
import tornado.web
from backend.session.session import SessionFactory
class BaseRequestHandler(tornado.web.RequestHandler):
def initialize(self):
self.session = SessionFactory.get_session_obj(self)
fields:包含字符串、邮箱、数字、checkbox、文件类型验证
forms:核心验证处理,返回验证是否成功self._valid_status、成功后的数据提self._value_dict、错误信息self._error_dict
初始化
字符串匹配
ip匹配
邮箱匹配
数字匹配
checkbox匹配
文件匹配核心验证处理:
from backend.form import fields
class BaseForm:
def __init__(self):
self._value_dict = {} #数据字典
self._error_dict = {} #错误信息字典
self._valid_status = True #是否验证成功
def valid(self, handler):
for field_name, field_obj in self.__dict__.items():
if field_name.startswith(‘_‘): #过滤私有字段
continue
if type(field_obj) == fields.CheckBoxField: #checkbox处理
post_value = handler.get_arguments(field_name, None)
elif type(field_obj) == fields.FileField: #文件处理
post_value = []
file_list = handler.request.files.get(field_name, [])
for file_item in file_list:
post_value.append(file_item[‘filename‘])
else:
post_value = handler.get_argument(field_name, None)
field_obj.match(field_name, post_value) #匹配
if field_obj.is_valid: #如果验证成功
self._value_dict[field_name] = field_obj.value #提取数据
else:
self._error_dict[field_name] = field_obj.error #错误信息
self._valid_status = False
return self._valid_status #返回是否验证成功
以注册为例:
前端:
Htmljs:
js后台处理:
首先需要编写RegisterForm:
class RegisterForm(BaseForm): #需要继承上面的form验证核心处理类
def __init__(self): #初始化每一个input标签的name
self.username = StringField() #input标签name=对应类型的类
self.email = EmailField()
self.password = StringField()
self.email_code = StringField()
super(RegisterForm, self).__init__()
后台RegisterHandler:
class RegisterHandler(BaseRequestHandler):
def post(self, *args, **kwargs):
rep = BaseResponse() #总的返回前端的类,包含是否注册成功的状态、错误信息
form = account.RegisterForm() #实例化RegisterForm
if form.valid(self): #调用baseform核心验证处理函数valid,返回是否验证成功
current_date = datetime.datetime.now()
limit_day = current_date - datetime.timedelta(minutes=1)
conn = ORM.session() #获取数据库session对象<br> #查看验证码是否过期
is_valid_code = conn.query(ORM.SendMsg).filter(ORM.SendMsg.email == form._value_dict[‘email‘],
ORM.SendMsg.code == form._value_dict[‘email_code‘],
ORM.SendMsg.ctime > limit_day).count()
if not is_valid_code:
rep.message[‘email_code‘] = ‘邮箱验证码不正确或过期‘
self.write(json.dumps(rep.__dict__))
return
has_exists_email = conn.query(ORM.UserInfo).filter(ORM.UserInfo.email == form._value_dict[‘email‘]).count()#邮箱是否存在
if has_exists_email:
rep.message[‘email‘] = ‘邮箱已经存在‘
self.write(json.dumps(rep.__dict__))
return
has_exists_username = conn.query(ORM.UserInfo).filter(
ORM.UserInfo.username == form._value_dict[‘username‘]).count() #用户名是否存在
if has_exists_username:
rep.message[‘email‘] = ‘用户名已经存在‘
self.write(json.dumps(rep.__dict__))
return<br> #按数据库表的列订制form._value_dict
form._value_dict[‘ctime‘] = current_date form._value_dict.pop(‘email_code‘) obj = ORM.UserInfo(**form._value_dict) conn.add(obj)<br>
conn.flush()
conn.refresh(obj) #将自增id也提取出来
user_info_dict = {‘nid‘: obj.nid, ‘email‘: obj.email, ‘username‘: obj.username}
conn.query(ORM.SendMsg).filter_by(email=form._value_dict[‘email‘]).delete()#删除本次邮箱验证码
conn.commit()
conn.close()
self.session[‘is_login‘] = True #注册成功后定义登陆成功
self.session[‘user_info‘] = user_info_dict 用户信息写入session
rep.status = True
else:
rep.message = form._error_dict #错误信息
self.write(json.dumps(rep.__dict__)) #返回给前端,前端ajax success接收并处理,在前端页面展示
1.应用工厂方法模式定义session保存的位置,用户只需在配置文件修改即可
class SessionFactory:
@staticmethod
def get_session_obj(handler):
obj = None
if config.SESSION_TYPE == "cache": #缓存
obj = CacheSession(handler)
elif config.SESSION_TYPE == "memcached": #memcached
obj = MemcachedSession(handler) <br> elif config.SESSION_TYPE == "redis": #radis<br> obj = RedisSession(handler) <br> return obj
2.缓存session
缓存Sesson3.memcache session
memcache session4.radis session
radis session注:验证码需要依赖session。
check_code.pyclass CheckCodeHandler(BaseRequestHandler):
def get(self, *args, **kwargs):
stream = io.BytesIO()
img, code = check_code.create_validate_code()
img.save(stream, "png")
self.session["CheckCode"] = code #利用session保存验证码
self.write(stream.getvalue())
路由配置:(r"/check_code", account.CheckCodeHandler),
前端:
<img class="check-img" src="/check_code" alt="验证码" onclick="ChangeCode(this);">
js:
<script>
function ChangeCode(ths) {
ths.src += ‘?‘;
}
</script>

前端:
<div class="inp">
<input class="regiter-temp" name="code" class="email-code" type="text" placeholder="请输入邮箱验证码" />
<a onclick="SendCode(this);" class="fetch-code" >获取验证码</a>
</div>
js:
function SendCode(ths) {
// var email = $(ths).prev().val();
var email = $(‘#email‘).val();
$.ajax({
url: ‘/send_code‘,
type: ‘POST‘,
data: {em: email},
success: function (arg) {
console.log(arg);
},
error: function () {
}
});
}
路由配置:
(r"/send_code", account.SendCodeHandler),
后台handler:
class SendCodeHandler(BaseRequestHandler):
def post(self, *args, **kwargs):
ret = {‘status‘: True, "data": "", "error": ""}
email = self.get_argument(‘em‘, None)
if email:
code = commons.random_code() #获取随机验证码
message.email([email,], code) #发送验证码到邮箱
conn = chouti_orm.session() #获取数据库session对象
obj = chouti_orm.SendCode(email=email,code=code, stime=datetime.datetime.now()) #写入数据库
conn.add(obj)
conn.commit()
else:
ret[‘status‘] = False
ret[‘error‘] = "邮箱格式错误"
self.write(json.dumps(ret))
发送邮箱验证码函数:
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
def email(email_list, content, subject="抽屉新热榜-用户注册"): #email_list邮件列表,content邮件内容,subject:发送标题
msg = MIMEText(content, ‘plain‘, ‘utf-8‘)
msg[‘From‘] = formataddr(["抽屉新热榜",‘wptawy@126.com‘])
msg[‘Subject‘] = subject
server = smtplib.SMTP("smtp.126.com", 25) 邮箱引擎
server.login("youxiang@126.com", "mima") #邮箱名,密码
server.sendmail(‘wptawy@126.com‘, email_list, msg.as_string())
server.quit()
案例:
html:
<a id="fetch_code" class="fetch-code" href="javascript:void(0);">获取验证码</a>
js:
function BindSendMsg(){
$("#fetch_code").click(function(){
$(‘#register_error_summary‘).empty(); #清空错误信息
var email = $(‘#email‘).val(); #获取邮箱地址
if(email.trim().length == 0){ #判断是否输入邮箱
$(‘#register_error_summary‘).text(‘请输入注册邮箱‘);
return;
}
if($(this).hasClass(‘sending‘)){ #判断是否已经发送
return;
}
var ths = $(this);
var time = 60; 设置倒计时时间为60s
$.ajax({
url: "/send_msg",
type: ‘POST‘,
data: {email: email},
dataType: ‘json‘,
success: function(arg){
if(!arg.status){ #是否发送成功
$(‘#register_error_summary‘).text(arg.summary); #不成功显示错误信息
}else{
ths.addClass(‘sending‘); #成功后显示已发送状态
var interval = setInterval(function(){
ths.text("已发送(" + time + ")");
time -= 1; #定时器每运行一次,计数器减1
if(time <= 0){
clearInterval(interval); #一分钟过完,清除定时器
ths.removeClass(‘sending‘);# 移除已发送状态
ths.text("获取验证码");# 恢复未发送状态
}
}, 1000);#定时器每隔1s运行一次
}
}
});
});
}
附:一些常见模块:
1.随机验证码获取:
随机验证码2.md5加密
def generate_md5(value):
r = str(time.time())
obj = hashlib.md5(r.encode(‘utf-8‘))
obj.update(value.encode(‘utf-8‘))
return obj.hexdigest()
案例:
前端:
<div class="pagination">
{% raw str_page%} #展示原生html
</div>
url配置:
(r"/index/(?P<page>\d*)", IndexHandler),
分页模块:
分页模块注:Pagination实例化接收两个参数:当前页current_page、新闻总数all_item,其中current_page一般通过url分组元素直接获取
后台handler:
class IndexHandler(BaseRequestHandler):
def get(self, page=1):
conn = ORM.session() #获取数据库session对象
all_count = conn.query(ORM.News).count()#计算新闻总数
obj = Pagination(page, all_count) #实例化pagination对象
current_user_id = self.session[‘user_info‘][‘nid‘] if self.session[‘is_login‘] else 0 #如果登陆获取用户id,否则,用户id=0,下面的查询结果也为空
result = conn.query(ORM.News.nid,
ORM.News.title,
ORM.News.url,
ORM.News.content,
ORM.News.ctime,
ORM.UserInfo.username,
ORM.NewsType.caption,
ORM.News.favor_count,
ORM.News.comment_count,
ORM.Favor.nid.label(‘has_favor‘)).join(ORM.NewsType, isouter=True).join(ORM.UserInfo, isouter=True).join(ORM.Favor, and_(ORM.Favor.user_info_id == current_user_id, ORM.News.nid == ORM.Favor.news_id), isouter=True)[obj.start:10] #从每页开始向下取10条,即每页显示10条新闻
conn.close()
str_page = obj.string_pager(‘/index/‘) 获取页码的字符串格式html
self.render(‘home/index.html‘, str_page=str_page, news_list=result)
1.普通登陆验证
def auth_login_redirect(func):
def inner(self, *args, **kwargs):
if not self.session[‘is_login‘]:
self.redirect(config.LOGIN_URL)
return
func(self, *args, **kwargs)
return inner
2.ajax提交数据的登陆验证
def auth_login_json(func):
def inner(self, *args, **kwargs):
if not self.session[‘is_login‘]:
rep = BaseResponse()
rep.summary = "auth failed"
self.write(json.dumps(rep.__dict__))
return
func(self, *args, **kwargs)
return inner
前端:
<form style="display: inline-block" id="upload_img_form" name="form" action="/upload_image" method="POST" enctype="multipart/form-data" > <a id="fakeFile" class="fake-file"> <span>上传图片</span> <input type="file" name="img" onchange="UploadImage(this);"/> <input type="text" name="url" class="hide" /> </a> <iframe id=‘upload_img_iframe‘ name=‘upload_img_iframe‘ src="" class="hide"></iframe> </form>
js:
js路由配置:
(r"/upload_image", home.UploadImageHandler),
后台handler:
class UploadImageHandler(BaseRequestHandler):
@decrator.auth_login_json #上传前登陆验证
def post(self, *args, **kwargs):
rep = BaseResponse() 前端回应类
try:
file_metas = self.request.files["img"] 获取图片列表
for meta in file_metas:
file_name = meta[‘filename‘] #图片名
file_path = os.path.join(‘statics‘, ‘upload‘, commons.generate_md5(file_name)) #保存地址
with open(file_path, ‘wb‘) as up:
up.write(meta[‘body‘]) #在服务器写入图片
rep.status = True #写入成功
rep.data = file_path
except Exception as ex:
rep.summary = str(ex)#错误信息
self.write(json.dumps(rep.__dict__)) #反馈给前端

1.定义需要验证的form类
class IndexForm(BaseForm):
def __init__(self):
self.title = StringField() #标题
self.content = StringField(required=False) 内容
self.url = StringField(required=False) 图片url
self.news_type_id = IntegerField() 新闻类型
super(IndexForm, self).__init__()
2.前端html:
<div class="f4"> <a class="submit right" id="submit_img">提交</a> <span class="error-msg right"></span> </div>
3.js
View Code后台handler:
@decrator.auth_login_json #发布前登陆验证
def post(self, *args, **kwargs):
rep = BaseResponse()
form = IndexForm()#实例化Indexform
if form.valid(self):
# title,content,href,news_type,user_info_id
#写入数据库
input_dict = copy.deepcopy(form._value_dict)
input_dict[‘ctime‘] = datetime.datetime.now()
input_dict[‘user_info_id‘] = self.session[‘user_info‘][‘nid‘]
conn = ORM.session()
conn.add(ORM.News(**input_dict))
conn.commit()
conn.close()
rep.status = True #写入成功
else:
rep.message = form._error_dict #错误信息
self.write(json.dumps(rep.__dict__))
前端html:
<a href="javascript:void(0);" class="digg-a" title="推荐" onclick="DoFavor(this,{{item[0]}});">
{% if item[9] %} #是否已点过赞
<span class="hand-icon icon-digg active"></span>
{% else %}
<span class="hand-icon icon-digg"></span>
{% end %}
<b id="favor_count_{{item[0]}}">{{item[7]}}</b> #点赞数量<br></a>
js:
点赞后台handler:
View Code点赞+1和-1动态效果js:
点赞+1效果
点赞-1效果案例:
前端html:
<div class="box-r">
<a href="javascript:void(0);" class="pub-icons add-pub-btn add-pub-btn-unvalid" onclick="DoComment({{item[0]}})">评论</a> #携带新闻id
<a href="javascript:void(0);" class="loading-ico loading-ico-top pub-loading-top hide">发布中...</a>
</div>
{% raw tree(comment_tree) %}
创建评论树字典函数:
build_tree递归生成评论树函数:
tree_search前端调用uimethod,生成评论html:
uimethod后台handler:
后台handlerjs:
js标签:config query join void upload you mysql基础 b2b open
原文地址:https://www.cnblogs.com/ExMan/p/10308817.html