Tuesday, July 12, 2016

KafkaConsumer is not safe for multi-threaded access

I work on project with Play framework Scala, with Guice plugins. It's very nice!

Yesterday, I met the exception:

2016-07-11 08:56:22,936 [ERROR] from akka.actor.OneForOneStrategy in application-akka.actor.default-dispatcher-6 - KafkaConsumer is not safe for multi-threaded access
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324) ~[org.apache.kafka.kafka-clients-0.9.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(KafkaConsumer.java:713) ~[org.apache.kafka.kafka-clients-0.9.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(KafkaConsumer.java:747) ~[org.apache.kafka.kafka-clients-0.9.0.1.jar:na]
    at services.kafka.KafkaServiceImpl.consume(KafkaService.scala:73) ~[bb-api.bb-api-1.0-sans-externalized.jar:na]

Reason: I run 2 consumers (difference threads), but just one KafkaService. And I use singleton for it (with Google Guice module). So, when running consumer, it conflict each other.

How to solve? Use @Named annotation in binding.

For example, this is my module file:

class KafkaModule extends AbstractModule with ScalaModule with AkkaGuiceSupport {

  override def configure(): Unit = {
    bind[services.kafka.KafkaService[String, String]].annotatedWith(Names.named("kafka-Coccoc")).to[services.kafka.KafkaServiceImpl].in[Singleton]
    bind[services.kafka.KafkaService[String, String]].annotatedWith(Names.named("kafka-Report")).to[services.kafka.KafkaServiceImpl].in[Singleton]
  }
}

Then, I rewrite my consumer code. Example:

class ReportConsumerActor @Inject() (@Named("kafka-Report") val kafkaService: KafkaService[String, String])
                                    (implicit ec: ExecutionContext){

  def receive: Receive = {
    kafkaService.consume(__SOME_CODE_HERE__)
  }
}

That's all! Hope this help.

Biểu mẫu liên hệ

Name

Email *

Message *