认证系统 #

一、认证概述 #

1.1 认证方式 #

方式 说明
Session 传统Session认证
Token API Token认证
JWT JSON Web Token
OAuth 第三方登录

1.2 Phoenix默认认证 #

Phoenix 1.7+提供了内置的认证生成器:

bash
mix phx.gen.auth Accounts User users

二、用户注册 #

2.1 注册Context #

elixir
defmodule Hello.Accounts do
  alias Hello.Accounts.User

  def register_user(attrs) do
    %User{}
    |> User.registration_changeset(attrs)
    |> Repo.insert()
  end

  def change_user_registration(%User{} = user, attrs \\ %{}) do
    User.registration_changeset(user, attrs)
  end
end

2.2 注册Schema #

elixir
defmodule Hello.Accounts.User do
  use Ecto.Schema
  import Ecto.Changeset

  schema "users" do
    field :email, :string
    field :password, :string, virtual: true
    field :hashed_password, :string
    field :confirmed_at, :naive_datetime

    timestamps()
  end

  def registration_changeset(user, attrs) do
    user
    |> cast(attrs, [:email, :password])
    |> validate_email()
    |> validate_password()
  end

  defp validate_email(changeset) do
    changeset
    |> validate_required([:email])
    |> validate_format(:email, ~r/^[^\s]+@[^\s]+$/)
    |> unique_constraint(:email)
  end

  defp validate_password(changeset) do
    changeset
    |> validate_required([:password])
    |> validate_length(:password, min: 8, max: 72)
    |> maybe_hash_password()
  end

  defp maybe_hash_password(changeset) do
    password = get_change(changeset, :password)

    if password && changeset.valid? do
      changeset
      |> put_change(:hashed_password, Bcrypt.hash_pwd_salt(password))
      |> delete_change(:password)
    else
      changeset
    end
  end
end

2.3 注册LiveView #

elixir
defmodule HelloWeb.UserRegistrationLive do
  use HelloWeb, :live_view

  def mount(_params, _session, socket) do
    changeset = Accounts.change_user_registration(%User{})
    {:ok, assign(socket, :changeset, changeset)}
  end

  def handle_event("register", %{"user" => user_params}, socket) do
    case Accounts.register_user(user_params) do
      {:ok, user} ->
        {:ok, _} =
          Accounts.deliver_user_confirmation_instructions(
            user,
            &url(~p"/users/confirm/#{&1}")
          )

        {:noreply,
         socket
         |> put_flash(:info, "User created successfully")
         |> redirect(to: ~p"/")}

      {:error, changeset} ->
        {:noreply, assign(socket, :changeset, changeset)}
    end
  end
end

三、用户登录 #

3.1 Session认证 #

elixir
defmodule Hello.Accounts do
  def get_user_by_email_and_password(email, password) do
    user = Repo.get_by(User, email: email)
    if User.valid_password?(user, password), do: user
  end
end

defmodule Hello.Accounts.User do
  def valid_password?(%User{hashed_password: hashed_password}, password)
      when is_binary(hashed_password) and is_binary(password) do
    Bcrypt.verify_pass(password, hashed_password)
  end

  def valid_password?(_, _), do: false
end

3.2 Session控制器 #

elixir
defmodule HelloWeb.UserSessionController do
  use HelloWeb, :controller

  def create(conn, %{"user" => user_params}) do
    %{"email" => email, "password" => password} = user_params

    if user = Accounts.get_user_by_email_and_password(email, password) do
      conn
      |> put_flash(:info, "Welcome back!")
      |> UserAuth.log_in_user(user, user_params)
    else
      conn
      |> put_flash(:error, "Invalid email or password")
      |> render(:new)
    end
  end

  def delete(conn, _params) do
    conn
    |> put_flash(:info, "Logged out successfully")
    |> UserAuth.log_out_user()
  end
end

3.3 认证Plug #

elixir
defmodule HelloWeb.UserAuth do
  import Plug.Conn
  import Phoenix.Controller

  alias HelloWeb.Router.Helpers, as: Routes

  def log_in_user(conn, user, params \\ %{}) do
    token = Accounts.generate_user_session_token(user)

    conn
    |> renew_session()
    |> put_session(:user_token, token)
    |> put_session(:live_socket_id, "users_sessions:#{Base.url_encode64(token)}")
    |> redirect(to: params["return_to"] || Routes.page_path(conn, :index))
  end

  def log_out_user(conn) do
    user_token = get_session(conn, :user_token)
    user_token && Accounts.delete_user_session_token(user_token)

    if live_socket_id = get_session(conn, :live_socket_id) do
      HelloWeb.Endpoint.broadcast(live_socket_id, "disconnect", %{})
    end

    conn
    |> renew_session()
    |> redirect(to: "/")
  end

  defp renew_session(conn) do
    conn
    |> configure_session(renew: true)
    |> clear_session()
  end

  def fetch_current_user(conn, _opts) do
    {user_token, conn} = ensure_user_token(conn)
    user = user_token && Accounts.get_user_by_session_token(user_token)
    assign(conn, :current_user, user)
  end

  defp ensure_user_token(conn) do
    if user_token = get_session(conn, :user_token) do
      {user_token, conn}
    else
      conn = fetch_cookies(conn, encrypted: ["user_token"])
      user_token = conn.cookies["user_token"]
      {user_token, conn}
    end
  end

  def require_authenticated_user(conn, _opts) do
    if conn.assigns[:current_user] do
      conn
    else
      conn
      |> put_flash(:error, "You must log in to access this page")
      |> maybe_store_return_to()
      |> redirect(to: Routes.user_session_path(conn, :new))
      |> halt()
    end
  end

  defp maybe_store_return_to(%{method: "GET"} = conn) do
    put_session(conn, :user_return_to, current_path(conn))
  end

  defp maybe_store_return_to(conn), do: conn
end

四、API认证 #

4.1 Token认证 #

elixir
defmodule Hello.Accounts do
  alias Hello.Accounts.UserToken

  def generate_api_token(user) do
    {token, user_token} = UserToken.build_token(user, "api")
    Repo.insert!(user_token)
    token
  end

  def get_user_by_api_token(token) do
    {:ok, query} = UserToken.verify_and_validate_token_query(token, "api")
    Repo.one(query)
  end
end

4.2 API认证Plug #

elixir
defmodule HelloWeb.ApiAuth do
  import Plug.Conn
  import Phoenix.Controller

  def init(opts), do: opts

  def call(conn, _opts) do
    with ["Bearer " <> token] <- get_req_header(conn, "authorization"),
         {:ok, user} <- Accounts.get_user_by_api_token(token) do
      assign(conn, :current_user, user)
    else
      _ ->
        conn
        |> put_status(:unauthorized)
        |> json(%{error: "Unauthorized"})
        |> halt()
    end
  end
end

4.3 JWT认证 #

elixir
defmodule Hello.Accounts do
  def generate_jwt(user) do
    claims = %{
      "user_id" => user.id,
      "email" => user.email,
      "exp" => DateTime.utc_now() |> DateTime.add(3600, :second) |> DateTime.to_unix()
    }

    JOSE.JWT.sign(jwk(), claims) |> JOSE.JWS.compact()
  end

  def verify_jwt(token) do
    case JOSE.JWT.verify_strict(jwk(), ["HS256"], token) do
      {true, jwt, _} -> {:ok, jwt.fields}
      _ -> {:error, :invalid_token}
    end
  end

  defp jwk do
    Application.get_env(:hello, :jwt_secret)
    |> JOSE.JWK.from_oct()
  end
end

五、授权 #

5.1 角色系统 #

elixir
defmodule Hello.Accounts.User do
  schema "users" do
    field :role, :string, default: "user"
  end

  def admin?(%User{role: "admin"}), do: true
  def admin?(_), do: false
end

5.2 授权Plug #

elixir
defmodule HelloWeb.RequireAdmin do
  import Plug.Conn
  import Phoenix.Controller

  def init(opts), do: opts

  def call(conn, _opts) do
    if conn.assigns.current_user && conn.assigns.current_user.role == "admin" do
      conn
    else
      conn
      |> put_flash(:error, "Admin access required")
      |> redirect(to: "/")
      |> halt()
    end
  end
end

5.3 Policy模式 #

elixir
defmodule Hello.Accounts.Policy do
  alias Hello.Accounts.User

  def can?(%User{role: "admin"}, :manage, _resource), do: true
  def can?(%User{id: user_id}, :edit, %Post{author_id: user_id}), do: true
  def can?(%User{id: user_id}, :delete, %Post{author_id: user_id}), do: true
  def can?(_, _, _), do: false
end

六、密码重置 #

6.1 生成重置Token #

elixir
defmodule Hello.Accounts do
  def deliver_user_reset_password_instructions(%User{} = user, reset_password_url_fun) do
    {encoded_token, user_token} = UserToken.build_email_token(user, "reset_password")
    Repo.insert!(user_token)

    url = reset_password_url_fun.(encoded_token)
    Mailer.deliver(user.email, "Reset password", reset_password_email_body(url))
  end

  def reset_user_password(user, attrs) do
    Ecto.Multi.new()
    |> Ecto.Multi.update(:user, User.password_changeset(user, attrs))
    |> Ecto.Multi.delete_all(:tokens, UserToken.user_and_contexts_query(user, :all))
    |> Repo.transaction()
    |> case do
      {:ok, %{user: user}} -> {:ok, user}
      {:error, :user, changeset, _} -> {:error, changeset}
    end
  end
end

七、邮箱确认 #

7.1 发送确认邮件 #

elixir
defmodule Hello.Accounts do
  def deliver_user_confirmation_instructions(%User{} = user, confirmation_url_fun) do
    {encoded_token, user_token} = UserToken.build_email_token(user, "confirm")
    Repo.insert!(user_token)

    url = confirmation_url_fun.(encoded_token)
    Mailer.deliver(user.email, "Confirm your email", confirmation_email_body(url))
  end

  def confirm_user(token) do
    with {:ok, query} <- UserToken.verify_email_token_query(token, "confirm"),
         %User{} = user <- Repo.one(query),
         {:ok, %{user: user}} <- Repo.transaction(confirm_user_multi(user)) do
      {:ok, user}
    else
      _ -> :error
    end
  end

  defp confirm_user_multi(user) do
    Ecto.Multi.new()
    |> Ecto.Multi.update(:user, User.confirm_changeset(user))
    |> Ecto.Multi.delete_all(:tokens, UserToken.user_and_contexts_query(user, ["confirm"]))
  end
end

八、总结 #

8.1 核心概念 #

概念 说明
注册 创建用户
登录 验证身份
Session 会话管理
Token API认证
授权 权限控制

8.2 安全建议 #

  • 使用强密码哈希
  • 验证邮箱地址
  • 实现密码重置
  • 使用CSRF保护
  • 设置Token过期时间

8.3 下一步 #

现在你已经了解了认证系统,接下来让我们学习 API开发,深入了解API构建!

最后更新:2026-03-28