一、认证组件

1. 分析源码

通过分析源码了解认证组件的方法调用过程

APIView 的 dispatch 中使用 initial 方法实现初始化并进行三大认证,第一步就是认证组件

rest_framework/views.py

class APIView(View):
# ...
def initial(self, request, *args, **kwargs):
# ...
# 认证组件:校验用户
# 这里调用 perform_authentication 实现认证
self.perform_authentication(request)
# 权限组件:校验用户权限
self.check_permissions(request)
# 频率组件:限制视图接口被访问次数
self.check_throttles(request)

def perform_authentication(self, request):
"""
Perform authentication on the incoming request.

Note that if you override this and simply 'pass', then authentication
will instead be performed lazily, the first time either
`request.user` or `request.auth` is accessed.
"""
class APIView(View):
# ...
def initial(self, request, *args, **kwargs):
# ...
# 认证组件:校验用户
# 这里调用 perform_authentication 实现认证
self.perform_authentication(request)
# 权限组件:校验用户权限
self.check_permissions(request)
# 频率组件:限制视图接口被访问次数
self.check_throttles(request)

def perform_authentication(self, request):
"""
Perform authentication on the incoming request.

Note that if you override this and simply 'pass', then authentication
will instead be performed lazily, the first time either
`request.user` or `request.auth` is accessed.
"""
# 去 request 中调用 user 方法属性
request.user

request.user 去 request 中找 user 方法属性,找到认证方法实现过程

rest_framework/request.py

class Request:
@property
def user(self):
"""
Returns the user associated with the current request, as authenticated
by the authentication classes provided to the request.
"""
# 属性来源是 _属性名
if not hasattr(self, '_user'):
# 回收错误信息
with wrap_attributeerrors():
# 没用户,认证处用户
self._authenticate()
# 有用户,直接返回用户
return self._user

# 认证方法
def _authenticate(self):
"""
Attempt to authenticate the request using each authentication instance
in turn.
"""

# 遍历拿到认证器,进行认证
# self.authenticators,配置的一堆认证类产生的认证类对象组成的 list
for authenticator in self.authenticators:
# 该方法被 try 包裹,代表该方法会抛异常,抛异常代表认证失败
try:
# 认证器(对象)调用认证方法 authenticate(认证类对象self,request 请求对象)
# 返回值:登录的用户与认证的信息组成的 tuple
user_auth_tuple = authenticator.authenticate(self)
except exceptions.APIException:
# 非法用户
self._not_authenticated()
raise

# 处理返回值
if user_auth_tuple is not None:
self._authenticator = authenticator
# 合法用户
# 如果有返回值,就将登录用户与登录认证分别保存到 request.user request.auth
self.user, self.auth = user_auth_tuple
return

# 游客
class Request:
@property
def user(self):
"""
Returns the user associated with the current request, as authenticated
by the authentication classes provided to the request.
"""
# 属性来源是 _属性名
if not hasattr(self, '_user'):
# 回收错误信息
with wrap_attributeerrors():
# 没用户,认证处用户
self._authenticate()
# 有用户,直接返回用户
return self._user

# 认证方法
def _authenticate(self):
"""
Attempt to authenticate the request using each authentication instance
in turn.
"""

# 遍历拿到认证器,进行认证
# self.authenticators,配置的一堆认证类产生的认证类对象组成的 list
for authenticator in self.authenticators:
# 该方法被 try 包裹,代表该方法会抛异常,抛异常代表认证失败
try:
# 认证器(对象)调用认证方法 authenticate(认证类对象self,request 请求对象)
# 返回值:登录的用户与认证的信息组成的 tuple
user_auth_tuple = authenticator.authenticate(self)
except exceptions.APIException:
# 非法用户
self._not_authenticated()
raise

# 处理返回值
if user_auth_tuple is not None:
self._authenticator = authenticator
# 合法用户
# 如果有返回值,就将登录用户与登录认证分别保存到 request.user request.auth
self.user, self.auth = user_auth_tuple
return

# 游客
# 如果返回值 user_auth_tuple,代表认证通过,但是没有登录用户和登录认证信息,代表游客
self._not_authenticated()

寻找 authenticators 如何定义

rest_framework/views.py

class APIView(View):

# The following policies may be set at either globally, or per-view.
# 配置认证类
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES

def dispatch(self, request, *args, **kwargs):
"""
`.dispatch()` is pretty much the same as Django's regular dispatch,
but with extra hooks for startup, finalize, and exception handling.
"""
self.args = args
self.kwargs = kwargs
# 请求模块(解析模块)
request = self.initialize_request(request, *args, **kwargs)
self.request = request
self.headers = self.default_response_headers # deprecate?

try:
# 进入三大认证模块
self.initial(request, *args, **kwargs)

# Get the appropriate handler method
if request.method.lower() in self.http_method_names:
handler = getattr(self, request.method.lower(),
self.http_method_not_allowed)
else:
handler = self.http_method_not_allowed

response = handler(request, *args, **kwargs)

except Exception as exc:
response = self.handle_exception(exc)

self.response = self.finalize_response(request, response, *args, **kwargs)
return self.response


def initialize_request(self, request, *args, **kwargs):
"""
Returns the initial request object.
"""
parser_context = self.get_parser_context(request)

return Request(
request,
# 获取解析类
parsers=self.get_parsers(),
# 获取认证器
authenticators=self.get_authenticators(),
negotiator=self.get_content_negotiator(),
parser_context=parser_context
)

# 获取认证器
def get_authenticators(self):
"""
Instantiates and returns the list of authenticators that this view can use.
"""
# 实例化一堆认证类对象
return [auth() for auth class APIView(View):

# The following policies may be set at either globally, or per-view.
# 配置认证类
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES

def dispatch(self, request, *args, **kwargs):
"""
`.dispatch()` is pretty much the same as Django's regular dispatch,
but with extra hooks for startup, finalize, and exception handling.
"""
self.args = args
self.kwargs = kwargs
# 请求模块(解析模块)
request = self.initialize_request(request, *args, **kwargs)
self.request = request
self.headers = self.default_response_headers # deprecate?

try:
# 进入三大认证模块
self.initial(request, *args, **kwargs)

# Get the appropriate handler method
if request.method.lower() in self.http_method_names:
handler = getattr(self, request.method.lower(),
self.http_method_not_allowed)
else:
handler = self.http_method_not_allowed

response = handler(request, *args, **kwargs)

except Exception as exc:
response = self.handle_exception(exc)

self.response = self.finalize_response(request, response, *args, **kwargs)
return self.response


def initialize_request(self, request, *args, **kwargs):
"""
Returns the initial request object.
"""
parser_context = self.get_parser_context(request)

return Request(
request,
# 获取解析类
parsers=self.get_parsers(),
# 获取认证器
authenticators=self.get_authenticators(),
negotiator=self.get_content_negotiator(),
parser_context=parser_context
)

# 获取认证器
def get_authenticators(self):
"""
Instantiates and returns the list of authenticators that this view can use.
"""
# 实例化一堆认证类对象
return [auth() for auth in self.authentication_classes]

了解到认证器是通过一系列人证类对象实例化后定义

我们进去 SessionAuthentication 查看默认配置的认证类的实现

class SessionAuthentication(BaseAuthentication):
"""
Use Django's session framework for authentication.
"""

def authenticate(self, request):
"""
Returns a `User` if the request session currently has a logged in user.
Otherwise returns `None`.
"""

# 得到用户
user = getattr(request._request, 'user', None)

# Unauthenticated, CSRF validation not required
if not user or not user.is_active:
# 没有解析出,代表游客
return None

# 解析出用户后,要重新启用 csrf 认证
# 如果 csrf 认证失败,就出现异常,认证为非法用户
self.enforce_csrf(request)

# CSRF passed with authenticated user
# 认证为合法用户,没有返回认证信息
return (user, None)

class BasicAuthentication(BaseAuthentication):
"""
HTTP Basic authentication against username/password.
"""
www_authenticate_realm = 'api'

def authenticate(self, request):
"""
Returns a `User` if a correct username and password have been supplied
using HTTP Basic authentication. Otherwise returns `None`.
"""
# 获取认证信息:该认证信息是两段式(basic 认证字符串)
auth = get_authorization_header(request).split()

# 没有认证信息,认证为游客
if not auth or auth[0].lower() != b'basic':
return None

# 有认证信息,格式错误,认证为非法用户
if len(auth) == 1:
msg = _('Invalid basic header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid basic header. Credentials string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)

try:
auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
except (TypeError, UnicodeDecodeError, binascii.Error):
msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
raise exceptions.AuthenticationFailed(msg)

userid, password = auth_parts[0], auth_parts[2]
# 认证信息处理出用户主键和密码,进一步得到用户对象
return self.authenticate_credentials(userid, password, request)

def authenticate_credentials(self, userid, password, request=None):
"""
Authenticate the userid and password against username and password
with optional request for context.
"""
credentials = {
get_user_model().USERNAME_FIELD: userid,
'password': password
}
user = authenticate(request=request, **credentials)

# 如果没有该用户或非活跃用户,认证为非法用户
if user is None:
raise exceptions.AuthenticationFailed(_('Invalid username/password.'))

if not user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))

return (user, None)

def authenticate_header(self, request):
return class SessionAuthentication(BaseAuthentication):
"""
Use Django's session framework for authentication.
"""

def authenticate(self, request):
"""
Returns a `User` if the request session currently has a logged in user.
Otherwise returns `None`.
"""

# 得到用户
user = getattr(request._request, 'user', None)

# Unauthenticated, CSRF validation not required
if not user or not user.is_active:
# 没有解析出,代表游客
return None

# 解析出用户后,要重新启用 csrf 认证
# 如果 csrf 认证失败,就出现异常,认证为非法用户
self.enforce_csrf(request)

# CSRF passed with authenticated user
# 认证为合法用户,没有返回认证信息
return (user, None)

class BasicAuthentication(BaseAuthentication):
"""
HTTP Basic authentication against username/password.
"""
www_authenticate_realm = 'api'

def authenticate(self, request):
"""
Returns a `User` if a correct username and password have been supplied
using HTTP Basic authentication. Otherwise returns `None`.
"""
# 获取认证信息:该认证信息是两段式(basic 认证字符串)
auth = get_authorization_header(request).split()

# 没有认证信息,认证为游客
if not auth or auth[0].lower() != b'basic':
return None

# 有认证信息,格式错误,认证为非法用户
if len(auth) == 1:
msg = _('Invalid basic header. No credentials provided.')
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _('Invalid basic header. Credentials string should not contain spaces.')
raise exceptions.AuthenticationFailed(msg)

try:
auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(':')
except (TypeError, UnicodeDecodeError, binascii.Error):
msg = _('Invalid basic header. Credentials not correctly base64 encoded.')
raise exceptions.AuthenticationFailed(msg)

userid, password = auth_parts[0], auth_parts[2]
# 认证信息处理出用户主键和密码,进一步得到用户对象
return self.authenticate_credentials(userid, password, request)

def authenticate_credentials(self, userid, password, request=None):
"""
Authenticate the userid and password against username and password
with optional request for context.
"""
credentials = {
get_user_model().USERNAME_FIELD: userid,
'password': password
}
user = authenticate(request=request, **credentials)

# 如果没有该用户或非活跃用户,认证为非法用户
if user is None:
raise exceptions.AuthenticationFailed(_('Invalid username/password.'))

if not user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))

return (user, None)

def authenticate_header(self, request):
return 'Basic realm="%s"' % self.www_authenticate_realm

我们现在查看默认认证类配置定义位置

rest_framework/setting.py

DEFAULTS = {
# ...
# 默认认证类配置
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
DEFAULTS = {
# ...
# 默认认证类配置
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication'
],
}

根据这个,现在就可以自定义项目配置文件

2. 全局配置认证

编写自己项目的settings.py

# 全局局部配置
REST_FRAMEWORK = {
# ...
# 配置默认认证类
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
# 全局局部配置
REST_FRAMEWORK = {
# ...
# 配置默认认证类
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.SessionAuthentication',
'rest_framework.authentication.BasicAuthentication'
],
}

二、自定义认证类

可以看到以上默认的认证类,所有的规则都是固定的。尤其是做session认证时,会调用csrf,但是对于前后端分离的情况,这种规则并不友好。

1. 代码实现

  • 继承 BasicAuthentication

  • 重写 authenticate 方法

  • 实现根据自定义认证规则,确定是否有权限

  • 认证规则:

    •   游客:无认证信息,返回 None
      
    •   非法用户:有认证信息,认证失败,抛异常
      
    •   合法用户:有认证信息,认证成功,返回元组
      
  • 进行全局或局部配置

    • 全局:配置文件 settings.py
    • 局部:在视图类 import
  • 前台在请求头携带认证信息,且默认规范用 Authorization 字段携带认证信息

authentications.py

from rest_framework.authentication import BasicAuthentication
from rest_framework.exceptions import AuthenticationFailed

from api import models


class MyAuthentication(BasicAuthentication):
# 重新 authenticate 方法,自定义认证规则
def authenticate(self, request):
# 认证规则要基于条件:
# 游客:无认证信息,返回 None
# 非法用户:有认证信息,认证失败,抛异常
# 合法用户:有认证信息,认证成功,返回元组
# 前台在请求头携带认证信息,且默认规范用 Authorization 字段携带认证信息
# 后端固定在请求对象的 META字典中 HTTP_AUTHORIZATION 获取
auth = request.META.get('HTTP_AUTHORIZATION', None
)

# 游客认证
if auth is None:
return None

# 设置认证字段规则(两端式):“auth 认证字符串”
auth_list = auth.split()

# 校验合法还是非法用户
if len(auth_list) != 2 or auth_list[0].lower() != 'auth':
raise AuthenticationFailed('The authentication information is incorrect! Illegal user')

# 合法的用户需要进一步从 auth_list[1] 解析
# 假设一种情况:信息为 abc.123.xyz 就可以解析 admin 用户
# 实际开发时,该逻辑一定需要校验用户
if auth_list[1] != 'abc.123.xyz':
raise AuthenticationFailed('User verification failed! Illegal user')

user = models.User.objects.filter(username='baimoc').first()

if not user:
raise AuthenticationFailed('User data is incorrect! Illegal user')

return (user, from rest_framework.authentication import BasicAuthentication
from rest_framework.exceptions import AuthenticationFailed

from api import models


class MyAuthentication(BasicAuthentication):
# 重新 authenticate 方法,自定义认证规则
def authenticate(self, request):
# 认证规则要基于条件:
# 游客:无认证信息,返回 None
# 非法用户:有认证信息,认证失败,抛异常
# 合法用户:有认证信息,认证成功,返回元组
# 前台在请求头携带认证信息,且默认规范用 Authorization 字段携带认证信息
# 后端固定在请求对象的 META字典中 HTTP_AUTHORIZATION 获取
auth = request.META.get('HTTP_AUTHORIZATION', None
)

# 游客认证
if auth is None:
return None

# 设置认证字段规则(两端式):“auth 认证字符串”
auth_list = auth.split()

# 校验合法还是非法用户
if len(auth_list) != 2 or auth_list[0].lower() != 'auth':
raise AuthenticationFailed('The authentication information is incorrect! Illegal user')

# 合法的用户需要进一步从 auth_list[1] 解析
# 假设一种情况:信息为 abc.123.xyz 就可以解析 admin 用户
# 实际开发时,该逻辑一定需要校验用户
if auth_list[1] != 'abc.123.xyz':
raise AuthenticationFailed('User verification failed! Illegal user')

user = models.User.objects.filter(username='baimoc').first()

if not user:
raise AuthenticationFailed('User data is incorrect! Illegal user')

return (user, None)

settings.py

# 全局局部配置
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
# 全局局部配置
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'api.authentications.MyAuthentication'
],
}

views.py

from rest_framework.views import APIView
from rest_framework.generics import GenericAPIView
from rest_framework.viewsets import GenericViewSet, ViewSet

from utils.response import APIResponse


class LoginView(APIView):
def get(self, request, *args, **kwargs):
# 如果认证通过,request.user 就一定有值
# 游客:AnonymousUser
# 用户:User
return APIResponse(0, from rest_framework.views import APIView
from rest_framework.generics import GenericAPIView
from rest_framework.viewsets import GenericViewSet, ViewSet

from utils.response import APIResponse


class LoginView(APIView):
def get(self, request, *args, **kwargs):
# 如果认证通过,request.user 就一定有值
# 游客:AnonymousUser
# 用户:User
return APIResponse(0, 'Login successful')

urls.py

from django.conf.urls import url
from api import views

urlpatterns = [
url(from django.conf.urls import url
from api import views

urlpatterns = [
url(r'^login/$', views.LoginView.as_view()),
]

2. 接口测试

mark