并发编程 #

一、Future 基础 #

1.1 创建 Future #

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val future = Future {
  Thread.sleep(1000)
  42
}

println("Future created")

1.2 Future 回调 #

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val future = Future {
  Thread.sleep(1000)
  42
}

future.onComplete {
  case scala.util.Success(value) => println(s"Success: $value")
  case scala.util.Failure(error) => println(s"Failure: ${error.getMessage}")
}

1.3 获取 Future 结果 #

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await
import scala.concurrent.duration._

val future = Future {
  Thread.sleep(1000)
  42
}

val result = Await.result(future, 5.seconds)
println(result)

二、Future 组合 #

2.1 map #

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val future = Future {
  42
}

val mapped = future.map(_ * 2)

2.2 flatMap #

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

def fetchUser(id: Int): Future[User] = Future {
  User(id, s"User$id")
}

def fetchPosts(user: User): Future[List[Post]] = Future {
  List(Post(user.id, "Post 1"), Post(user.id, "Post 2"))
}

val result = fetchUser(1).flatMap(user => fetchPosts(user))

2.3 for 推导式 #

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

def fetchUser(id: Int): Future[User] = ???
def fetchPosts(userId: Int): Future[List[Post]] = ???
def fetchComments(postId: Int): Future[List[Comment]] = ???

val result = for
  user <- fetchUser(1)
  posts <- fetchPosts(user.id)
  comments <- fetchComments(posts.head.id)
yield (user, posts, comments)

2.4 zip #

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val future1 = Future { 42 }
val future2 = Future { "hello" }

val combined = future1.zip(future2)

2.5 sequence #

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val futures = List(
  Future { 1 },
  Future { 2 },
  Future { 3 }
)

val sequence = Future.sequence(futures)

三、Promise #

3.1 创建 Promise #

scala
import scala.concurrent.Promise
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val promise = Promise[Int]()
val future = promise.future

future.onComplete(println)

promise.success(42)

3.2 完成 Promise #

scala
import scala.concurrent.Promise

val promise = Promise[Int]()

promise.success(42)
promise.failure(new Exception("error"))
promise.complete(scala.util.Success(42))
promise.complete(scala.util.Failure(new Exception("error")))

3.3 Promise 实战 #

scala
import scala.concurrent.{Promise, Future}
import scala.concurrent.ExecutionContext.Implicits.global

def withTimeout[T](future: Future[T], timeout: Long): Future[T] =
  val promise = Promise[T]()
  
  val timeoutFuture = Future {
    Thread.sleep(timeout)
    promise.failure(new TimeoutException("Timeout"))
  }
  
  future.onComplete(promise.complete)
  
  promise.future

val slowFuture = Future {
  Thread.sleep(5000)
  42
}

val result = withTimeout(slowFuture, 1000)

四、错误处理 #

4.1 recover #

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val future = Future {
  throw new Exception("error")
}

val recovered = future.recover {
  case e: Exception => 0
}

4.2 recoverWith #

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val future = Future {
  throw new Exception("error")
}

val recovered = future.recoverWith {
  case e: Exception => Future.successful(0)
}

4.3 fallbackTo #

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val future1 = Future {
  throw new Exception("error")
}

val future2 = Future.successful(42)

val result = future1.fallbackTo(future2)

4.4 transform #

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val future = Future {
  42
}

val transformed = future.transform(
  value => value * 2,
  error => new Exception(s"Wrapped: ${error.getMessage}", error)
)

五、并行集合 #

5.1 创建并行集合 #

scala
val parList = List(1, 2, 3, 4, 5).par

val result = parList.map(_ * 2).sum

5.2 并行操作 #

scala
val numbers = (1 to 1000000).toList

val result = numbers.par
  .filter(_ % 2 == 0)
  .map(_ * 2)
  .sum

5.3 并行集合配置 #

scala
import scala.collection.parallel.ForkJoinTaskSupport
import java.util.concurrent.ForkJoinPool

val parList = List(1, 2, 3, 4, 5).par
parList.tasksupport = ForkJoinTaskSupport(new ForkJoinPool(4))

六、异步编程模式 #

6.1 异步 HTTP 客户端 #

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object HttpClient:
  def get(url: String): Future[String] = Future {
    val source = scala.io.Source.fromURL(url)
    try source.mkString
    finally source.close()
  }

val response = HttpClient.get("https://api.example.com/data")

6.2 批量处理 #

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

def processBatch[T, R](items: List[T])(process: T => R): Future[List[R]] =
  Future.traverse(items)(item => Future(process(item)))

val results = processBatch(List(1, 2, 3, 4, 5))(_ * 2)

6.3 重试机制 #

scala
import scala.concurrent.{Future, Promise}
import scala.concurrent.ExecutionContext.Implicits.global

def retry[T](maxRetries: Int)(action: => Future[T]): Future[T] =
  action.recoverWith {
    case _ if maxRetries > 0 => retry(maxRetries - 1)(action)
  }

val result = retry(3) {
  Future {
    if scala.util.Random.nextBoolean() then
      throw new Exception("Random failure")
    else
      "Success"
  }
}

6.4 限流 #

scala
import scala.concurrent.{Future, Semaphore}
import scala.concurrent.ExecutionContext.Implicits.global

class RateLimiter(maxConcurrent: Int):
  private val semaphore = Semaphore(maxConcurrent)
  
  def execute[T](action: => Future[T]): Future[T] =
    semaphore.acquire()
    action.andThen { case _ => semaphore.release() }

val limiter = RateLimiter(5)

val futures = (1 to 100).map { i =>
  limiter.execute(Future {
    Thread.sleep(100)
    i
  })
}

七、ExecutionContext #

7.1 全局 ExecutionContext #

scala
import scala.concurrent.ExecutionContext.Implicits.global

7.2 自定义 ExecutionContext #

scala
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors

val executor = Executors.newFixedThreadPool(10)
given ec: ExecutionContext = ExecutionContext.fromExecutor(executor)

val future = Future {
  42
}

7.3 ExecutionContext 选择 #

类型 适用场景
global CPU 密集型任务
fixedThreadPool 阻塞 I/O 操作
cachedThreadPool 短期异步任务

八、最佳实践 #

8.1 避免阻塞 #

scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val future = Future {
  Thread.sleep(1000)
  42
}

val result = future.map(_ * 2)

8.2 使用 for 推导式 #

scala
val result = for
  user <- fetchUser(1)
  posts <- fetchPosts(user.id)
yield (user, posts)

8.3 正确处理错误 #

scala
val result = future
  .recover { case e: Exception => defaultValue }
  .map(_.toString)

8.4 使用超时 #

scala
import scala.concurrent.{Future, Await}
import scala.concurrent.duration._

val result = Await.result(future, 5.seconds)

九、总结 #

Future 方法 #

方法 说明
map 转换结果
flatMap 链式异步操作
zip 组合两个 Future
sequence Future 列表转 List Future
recover 错误恢复
recoverWith 错误恢复(返回 Future)

并发工具 #

工具 说明
Future 异步计算
Promise 手动完成 Future
Parallel Collection 并行处理集合
ExecutionContext 线程池管理

恭喜你完成了 Scala 完全指南的学习!

最后更新:2026-03-27