介绍

我开始分配读取包含100列和10万行的CSV文件并将其写入数据库。

方法1:简单的Java程序

所以我从一个简单的Java程序开始,运行一个while循环直到EOF,然后进行JDBC调用来存储值。这是需要花一个小时才完成了,但后来我意识到程序的运行时比创建程序花费的时间更长。因此,任务并不像看起来那么容易。那可以做些什么呢?当然,我意识到我需要并行完成任务。

方法2:线程Java程序

线程对我来说似乎总是很复杂。“Mutex”,“Semaphores”和“Monitors”的概念让我望而却步。所以我试着理解这个概念。Java使用Monitors来实现同步。Java的Monitors支持两种线程同步:互斥和合作。

通过虚拟机对象锁在Java中支持的互斥操作,使多个线程能够独立地处理共享数据而不会相互干扰。合作,是通过等待和通知来实现的。此方法使线程能够朝着共同的目标一起工作。

Monitor区域

当线程到达监视区域的开头时,它将被放入相关监视器的条目集中。这个集就像银行柜台的队列一样。当一个人到达线路前端时,他们就可以进行交易。

关于线程的每一个讲座或文章总是提到消费者和生产者问题。所以我的问题与此类似,对吧?我有一个读取器(生产者),它读取一行并将其提供给JDBC层(消费者)以将其写入数据库。

Java已经提供了阻塞队列,使实现问题更容易。但我不能产生10万个线程来做到这一点。我需要类似线程池的东西来限制线程数。只需一个简单的循环和线程数就可以了。该解决方案看起来很好,在架构上很📃, 然后我意识到我忘记了错误处理。现在我意识到在线程中处理异常非常困难,因为它们不会返回任何内容。他们还有其他任何办法吗?所以,是的,Java 1.5中有“可调用接口”功能,它作为一个线程运行但返回未来。但这是另一个故事。

方法3:使用Actor的Java程序

执行上述任务使我意识到,随着复杂性的增加,维护此代码将非常困难。此外,Java为每个生成的线程使用系统线程。所以产生线程是有限的。

我需要的是一个为我提供并发处理的框架,我只能专注于它的业务逻辑部分。我找到了这样一个框架:Akka。Akka基于Erlang actor模型。如果您阅读上述问题的实现方式,则使用拉策略实现,消费者线程将在完成当前任务后执行新任务。所以我们需要等到生产者准备好了。如果系统更具反应性,那不是很容易吗?对于每个事件,事件处理程序都应该准备好完成工作。

因此,与银行类似地进行思考,以前我们曾经常常站在队列中,银行很难维持这个队列。有时客户厌倦了排队并离开。因此,银行可以做的是将此问题提交给第三方供应商并寻求解决方案。供应商建议使用令牌系统。让所有顾客坐在椅子上,直到他们的代号出现。对于银行而言,这听起来是一个很好的解决方案,并且为了增加锦上添花,供应商甚至准备好免费维护这个系统。想想银行会感受到的快乐。在Akka之后,我感受到了类似的快乐。Akka基于actors,所以actors是什么?

Actors

actors给你带来:

  • 简单和高级的并发和并行抽象。
  • 异步,非阻塞和高性能的事件驱动编程模型。
  • 非常轻量级的事件驱动进程(每GB堆内存数百万个actor)。

使用Akka非常容易。它可以作为依赖项添加到我们的project.Simple jar文件中。所以,让我们亲自动手,编写一个Hello World程序。示例来自Akka文档。

让我们定义一个actor,Greeter:

Greeter extends UntypedActor {
    String greeting = "";
    public void onReceive(Object message) {
        if (message instanceof WhoToGreet)
        greeting = "hello" + ((WhoToGreet) message).who;
        else if (message instanceof Greet)                                                                                // //Send the current greeting back to the sender
        getSender().tell(new Greeting(greeting), getSelf());
        else unhandled(message)    
    }
}

对,就是那样。它只需要实现onRecieve方法,以便它对tell调用作出反应。

因此,要开始使用此actor,您需要创建一个actor系统:

final ActorSystem system = ActorSystem.create("helloakka");
// Create the greeter actor
final ActorRef greeter = system.actorOf(Props.create(Greeter.class), "greeter");
// Tell the greeter to change its greeting message
greeter.tell(new WhoToGreet("akka"));

因此,通过使用tell方法,您可以调用该方法。所以Akka保证一次只调用一次OnReceive方法。就这么简单,你不需要考虑同步。

Akka是一个非常可扩展的软件,不仅在性能方面,而且在其有用的应用程序大小方面。Akka的核心,akka-actor,非常小,很容易被放入现有的项目中,你需要异步和无锁并发而不会有麻烦。“向外扩展(Remoting)”确实看起来很有意义,对吧?

Akka中的所有内容都设计为在分布式环境中工作:actor的所有交互都使用纯消息传递,一切都是异步的。Actors允许您管理服务故障(Supervisors),负载管理(退避策略,超时和处理隔离),以及水平和垂直可扩展性(添加更多内核或机器)。

Actors往往更适合并行处理单元,这些处理单元对CPU要求不高,也可能更适合分布式并行计算(更高的延迟但更高的吞吐量)。
所以我使用actor的感觉非常好,比传统线程更快。

英文原文:https://dzone.com/articles/concurrency-with-akka