并发编程 #
一、Future 基础 #
1.1 创建 Future #
scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val future = Future {
Thread.sleep(1000)
42
}
println("Future created")
1.2 Future 回调 #
scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val future = Future {
Thread.sleep(1000)
42
}
future.onComplete {
case scala.util.Success(value) => println(s"Success: $value")
case scala.util.Failure(error) => println(s"Failure: ${error.getMessage}")
}
1.3 获取 Future 结果 #
scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await
import scala.concurrent.duration._
val future = Future {
Thread.sleep(1000)
42
}
val result = Await.result(future, 5.seconds)
println(result)
二、Future 组合 #
2.1 map #
scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val future = Future {
42
}
val mapped = future.map(_ * 2)
2.2 flatMap #
scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
def fetchUser(id: Int): Future[User] = Future {
User(id, s"User$id")
}
def fetchPosts(user: User): Future[List[Post]] = Future {
List(Post(user.id, "Post 1"), Post(user.id, "Post 2"))
}
val result = fetchUser(1).flatMap(user => fetchPosts(user))
2.3 for 推导式 #
scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
def fetchUser(id: Int): Future[User] = ???
def fetchPosts(userId: Int): Future[List[Post]] = ???
def fetchComments(postId: Int): Future[List[Comment]] = ???
val result = for
user <- fetchUser(1)
posts <- fetchPosts(user.id)
comments <- fetchComments(posts.head.id)
yield (user, posts, comments)
2.4 zip #
scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val future1 = Future { 42 }
val future2 = Future { "hello" }
val combined = future1.zip(future2)
2.5 sequence #
scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val futures = List(
Future { 1 },
Future { 2 },
Future { 3 }
)
val sequence = Future.sequence(futures)
三、Promise #
3.1 创建 Promise #
scala
import scala.concurrent.Promise
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val promise = Promise[Int]()
val future = promise.future
future.onComplete(println)
promise.success(42)
3.2 完成 Promise #
scala
import scala.concurrent.Promise
val promise = Promise[Int]()
promise.success(42)
promise.failure(new Exception("error"))
promise.complete(scala.util.Success(42))
promise.complete(scala.util.Failure(new Exception("error")))
3.3 Promise 实战 #
scala
import scala.concurrent.{Promise, Future}
import scala.concurrent.ExecutionContext.Implicits.global
def withTimeout[T](future: Future[T], timeout: Long): Future[T] =
val promise = Promise[T]()
val timeoutFuture = Future {
Thread.sleep(timeout)
promise.failure(new TimeoutException("Timeout"))
}
future.onComplete(promise.complete)
promise.future
val slowFuture = Future {
Thread.sleep(5000)
42
}
val result = withTimeout(slowFuture, 1000)
四、错误处理 #
4.1 recover #
scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val future = Future {
throw new Exception("error")
}
val recovered = future.recover {
case e: Exception => 0
}
4.2 recoverWith #
scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val future = Future {
throw new Exception("error")
}
val recovered = future.recoverWith {
case e: Exception => Future.successful(0)
}
4.3 fallbackTo #
scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val future1 = Future {
throw new Exception("error")
}
val future2 = Future.successful(42)
val result = future1.fallbackTo(future2)
4.4 transform #
scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val future = Future {
42
}
val transformed = future.transform(
value => value * 2,
error => new Exception(s"Wrapped: ${error.getMessage}", error)
)
五、并行集合 #
5.1 创建并行集合 #
scala
val parList = List(1, 2, 3, 4, 5).par
val result = parList.map(_ * 2).sum
5.2 并行操作 #
scala
val numbers = (1 to 1000000).toList
val result = numbers.par
.filter(_ % 2 == 0)
.map(_ * 2)
.sum
5.3 并行集合配置 #
scala
import scala.collection.parallel.ForkJoinTaskSupport
import java.util.concurrent.ForkJoinPool
val parList = List(1, 2, 3, 4, 5).par
parList.tasksupport = ForkJoinTaskSupport(new ForkJoinPool(4))
六、异步编程模式 #
6.1 异步 HTTP 客户端 #
scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object HttpClient:
def get(url: String): Future[String] = Future {
val source = scala.io.Source.fromURL(url)
try source.mkString
finally source.close()
}
val response = HttpClient.get("https://api.example.com/data")
6.2 批量处理 #
scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
def processBatch[T, R](items: List[T])(process: T => R): Future[List[R]] =
Future.traverse(items)(item => Future(process(item)))
val results = processBatch(List(1, 2, 3, 4, 5))(_ * 2)
6.3 重试机制 #
scala
import scala.concurrent.{Future, Promise}
import scala.concurrent.ExecutionContext.Implicits.global
def retry[T](maxRetries: Int)(action: => Future[T]): Future[T] =
action.recoverWith {
case _ if maxRetries > 0 => retry(maxRetries - 1)(action)
}
val result = retry(3) {
Future {
if scala.util.Random.nextBoolean() then
throw new Exception("Random failure")
else
"Success"
}
}
6.4 限流 #
scala
import scala.concurrent.{Future, Semaphore}
import scala.concurrent.ExecutionContext.Implicits.global
class RateLimiter(maxConcurrent: Int):
private val semaphore = Semaphore(maxConcurrent)
def execute[T](action: => Future[T]): Future[T] =
semaphore.acquire()
action.andThen { case _ => semaphore.release() }
val limiter = RateLimiter(5)
val futures = (1 to 100).map { i =>
limiter.execute(Future {
Thread.sleep(100)
i
})
}
七、ExecutionContext #
7.1 全局 ExecutionContext #
scala
import scala.concurrent.ExecutionContext.Implicits.global
7.2 自定义 ExecutionContext #
scala
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
val executor = Executors.newFixedThreadPool(10)
given ec: ExecutionContext = ExecutionContext.fromExecutor(executor)
val future = Future {
42
}
7.3 ExecutionContext 选择 #
| 类型 | 适用场景 |
|---|---|
| global | CPU 密集型任务 |
| fixedThreadPool | 阻塞 I/O 操作 |
| cachedThreadPool | 短期异步任务 |
八、最佳实践 #
8.1 避免阻塞 #
scala
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val future = Future {
Thread.sleep(1000)
42
}
val result = future.map(_ * 2)
8.2 使用 for 推导式 #
scala
val result = for
user <- fetchUser(1)
posts <- fetchPosts(user.id)
yield (user, posts)
8.3 正确处理错误 #
scala
val result = future
.recover { case e: Exception => defaultValue }
.map(_.toString)
8.4 使用超时 #
scala
import scala.concurrent.{Future, Await}
import scala.concurrent.duration._
val result = Await.result(future, 5.seconds)
九、总结 #
Future 方法 #
| 方法 | 说明 |
|---|---|
| map | 转换结果 |
| flatMap | 链式异步操作 |
| zip | 组合两个 Future |
| sequence | Future 列表转 List Future |
| recover | 错误恢复 |
| recoverWith | 错误恢复(返回 Future) |
并发工具 #
| 工具 | 说明 |
|---|---|
| Future | 异步计算 |
| Promise | 手动完成 Future |
| Parallel Collection | 并行处理集合 |
| ExecutionContext | 线程池管理 |
恭喜你完成了 Scala 完全指南的学习!
最后更新:2026-03-27