认证系统 #
一、认证概述 #
1.1 认证方式 #
| 方式 | 说明 |
|---|---|
| Session | 传统Session认证 |
| Token | API Token认证 |
| JWT | JSON Web Token |
| OAuth | 第三方登录 |
1.2 Phoenix默认认证 #
Phoenix 1.7+提供了内置的认证生成器:
bash
mix phx.gen.auth Accounts User users
二、用户注册 #
2.1 注册Context #
elixir
defmodule Hello.Accounts do
alias Hello.Accounts.User
def register_user(attrs) do
%User{}
|> User.registration_changeset(attrs)
|> Repo.insert()
end
def change_user_registration(%User{} = user, attrs \\ %{}) do
User.registration_changeset(user, attrs)
end
end
2.2 注册Schema #
elixir
defmodule Hello.Accounts.User do
use Ecto.Schema
import Ecto.Changeset
schema "users" do
field :email, :string
field :password, :string, virtual: true
field :hashed_password, :string
field :confirmed_at, :naive_datetime
timestamps()
end
def registration_changeset(user, attrs) do
user
|> cast(attrs, [:email, :password])
|> validate_email()
|> validate_password()
end
defp validate_email(changeset) do
changeset
|> validate_required([:email])
|> validate_format(:email, ~r/^[^\s]+@[^\s]+$/)
|> unique_constraint(:email)
end
defp validate_password(changeset) do
changeset
|> validate_required([:password])
|> validate_length(:password, min: 8, max: 72)
|> maybe_hash_password()
end
defp maybe_hash_password(changeset) do
password = get_change(changeset, :password)
if password && changeset.valid? do
changeset
|> put_change(:hashed_password, Bcrypt.hash_pwd_salt(password))
|> delete_change(:password)
else
changeset
end
end
end
2.3 注册LiveView #
elixir
defmodule HelloWeb.UserRegistrationLive do
use HelloWeb, :live_view
def mount(_params, _session, socket) do
changeset = Accounts.change_user_registration(%User{})
{:ok, assign(socket, :changeset, changeset)}
end
def handle_event("register", %{"user" => user_params}, socket) do
case Accounts.register_user(user_params) do
{:ok, user} ->
{:ok, _} =
Accounts.deliver_user_confirmation_instructions(
user,
&url(~p"/users/confirm/#{&1}")
)
{:noreply,
socket
|> put_flash(:info, "User created successfully")
|> redirect(to: ~p"/")}
{:error, changeset} ->
{:noreply, assign(socket, :changeset, changeset)}
end
end
end
三、用户登录 #
3.1 Session认证 #
elixir
defmodule Hello.Accounts do
def get_user_by_email_and_password(email, password) do
user = Repo.get_by(User, email: email)
if User.valid_password?(user, password), do: user
end
end
defmodule Hello.Accounts.User do
def valid_password?(%User{hashed_password: hashed_password}, password)
when is_binary(hashed_password) and is_binary(password) do
Bcrypt.verify_pass(password, hashed_password)
end
def valid_password?(_, _), do: false
end
3.2 Session控制器 #
elixir
defmodule HelloWeb.UserSessionController do
use HelloWeb, :controller
def create(conn, %{"user" => user_params}) do
%{"email" => email, "password" => password} = user_params
if user = Accounts.get_user_by_email_and_password(email, password) do
conn
|> put_flash(:info, "Welcome back!")
|> UserAuth.log_in_user(user, user_params)
else
conn
|> put_flash(:error, "Invalid email or password")
|> render(:new)
end
end
def delete(conn, _params) do
conn
|> put_flash(:info, "Logged out successfully")
|> UserAuth.log_out_user()
end
end
3.3 认证Plug #
elixir
defmodule HelloWeb.UserAuth do
import Plug.Conn
import Phoenix.Controller
alias HelloWeb.Router.Helpers, as: Routes
def log_in_user(conn, user, params \\ %{}) do
token = Accounts.generate_user_session_token(user)
conn
|> renew_session()
|> put_session(:user_token, token)
|> put_session(:live_socket_id, "users_sessions:#{Base.url_encode64(token)}")
|> redirect(to: params["return_to"] || Routes.page_path(conn, :index))
end
def log_out_user(conn) do
user_token = get_session(conn, :user_token)
user_token && Accounts.delete_user_session_token(user_token)
if live_socket_id = get_session(conn, :live_socket_id) do
HelloWeb.Endpoint.broadcast(live_socket_id, "disconnect", %{})
end
conn
|> renew_session()
|> redirect(to: "/")
end
defp renew_session(conn) do
conn
|> configure_session(renew: true)
|> clear_session()
end
def fetch_current_user(conn, _opts) do
{user_token, conn} = ensure_user_token(conn)
user = user_token && Accounts.get_user_by_session_token(user_token)
assign(conn, :current_user, user)
end
defp ensure_user_token(conn) do
if user_token = get_session(conn, :user_token) do
{user_token, conn}
else
conn = fetch_cookies(conn, encrypted: ["user_token"])
user_token = conn.cookies["user_token"]
{user_token, conn}
end
end
def require_authenticated_user(conn, _opts) do
if conn.assigns[:current_user] do
conn
else
conn
|> put_flash(:error, "You must log in to access this page")
|> maybe_store_return_to()
|> redirect(to: Routes.user_session_path(conn, :new))
|> halt()
end
end
defp maybe_store_return_to(%{method: "GET"} = conn) do
put_session(conn, :user_return_to, current_path(conn))
end
defp maybe_store_return_to(conn), do: conn
end
四、API认证 #
4.1 Token认证 #
elixir
defmodule Hello.Accounts do
alias Hello.Accounts.UserToken
def generate_api_token(user) do
{token, user_token} = UserToken.build_token(user, "api")
Repo.insert!(user_token)
token
end
def get_user_by_api_token(token) do
{:ok, query} = UserToken.verify_and_validate_token_query(token, "api")
Repo.one(query)
end
end
4.2 API认证Plug #
elixir
defmodule HelloWeb.ApiAuth do
import Plug.Conn
import Phoenix.Controller
def init(opts), do: opts
def call(conn, _opts) do
with ["Bearer " <> token] <- get_req_header(conn, "authorization"),
{:ok, user} <- Accounts.get_user_by_api_token(token) do
assign(conn, :current_user, user)
else
_ ->
conn
|> put_status(:unauthorized)
|> json(%{error: "Unauthorized"})
|> halt()
end
end
end
4.3 JWT认证 #
elixir
defmodule Hello.Accounts do
def generate_jwt(user) do
claims = %{
"user_id" => user.id,
"email" => user.email,
"exp" => DateTime.utc_now() |> DateTime.add(3600, :second) |> DateTime.to_unix()
}
JOSE.JWT.sign(jwk(), claims) |> JOSE.JWS.compact()
end
def verify_jwt(token) do
case JOSE.JWT.verify_strict(jwk(), ["HS256"], token) do
{true, jwt, _} -> {:ok, jwt.fields}
_ -> {:error, :invalid_token}
end
end
defp jwk do
Application.get_env(:hello, :jwt_secret)
|> JOSE.JWK.from_oct()
end
end
五、授权 #
5.1 角色系统 #
elixir
defmodule Hello.Accounts.User do
schema "users" do
field :role, :string, default: "user"
end
def admin?(%User{role: "admin"}), do: true
def admin?(_), do: false
end
5.2 授权Plug #
elixir
defmodule HelloWeb.RequireAdmin do
import Plug.Conn
import Phoenix.Controller
def init(opts), do: opts
def call(conn, _opts) do
if conn.assigns.current_user && conn.assigns.current_user.role == "admin" do
conn
else
conn
|> put_flash(:error, "Admin access required")
|> redirect(to: "/")
|> halt()
end
end
end
5.3 Policy模式 #
elixir
defmodule Hello.Accounts.Policy do
alias Hello.Accounts.User
def can?(%User{role: "admin"}, :manage, _resource), do: true
def can?(%User{id: user_id}, :edit, %Post{author_id: user_id}), do: true
def can?(%User{id: user_id}, :delete, %Post{author_id: user_id}), do: true
def can?(_, _, _), do: false
end
六、密码重置 #
6.1 生成重置Token #
elixir
defmodule Hello.Accounts do
def deliver_user_reset_password_instructions(%User{} = user, reset_password_url_fun) do
{encoded_token, user_token} = UserToken.build_email_token(user, "reset_password")
Repo.insert!(user_token)
url = reset_password_url_fun.(encoded_token)
Mailer.deliver(user.email, "Reset password", reset_password_email_body(url))
end
def reset_user_password(user, attrs) do
Ecto.Multi.new()
|> Ecto.Multi.update(:user, User.password_changeset(user, attrs))
|> Ecto.Multi.delete_all(:tokens, UserToken.user_and_contexts_query(user, :all))
|> Repo.transaction()
|> case do
{:ok, %{user: user}} -> {:ok, user}
{:error, :user, changeset, _} -> {:error, changeset}
end
end
end
七、邮箱确认 #
7.1 发送确认邮件 #
elixir
defmodule Hello.Accounts do
def deliver_user_confirmation_instructions(%User{} = user, confirmation_url_fun) do
{encoded_token, user_token} = UserToken.build_email_token(user, "confirm")
Repo.insert!(user_token)
url = confirmation_url_fun.(encoded_token)
Mailer.deliver(user.email, "Confirm your email", confirmation_email_body(url))
end
def confirm_user(token) do
with {:ok, query} <- UserToken.verify_email_token_query(token, "confirm"),
%User{} = user <- Repo.one(query),
{:ok, %{user: user}} <- Repo.transaction(confirm_user_multi(user)) do
{:ok, user}
else
_ -> :error
end
end
defp confirm_user_multi(user) do
Ecto.Multi.new()
|> Ecto.Multi.update(:user, User.confirm_changeset(user))
|> Ecto.Multi.delete_all(:tokens, UserToken.user_and_contexts_query(user, ["confirm"]))
end
end
八、总结 #
8.1 核心概念 #
| 概念 | 说明 |
|---|---|
| 注册 | 创建用户 |
| 登录 | 验证身份 |
| Session | 会话管理 |
| Token | API认证 |
| 授权 | 权限控制 |
8.2 安全建议 #
- 使用强密码哈希
- 验证邮箱地址
- 实现密码重置
- 使用CSRF保护
- 设置Token过期时间
8.3 下一步 #
现在你已经了解了认证系统,接下来让我们学习 API开发,深入了解API构建!
最后更新:2026-03-28