I work on project with Play framework Scala, with Guice plugins. It's very nice!
Yesterday, I met the exception:
2016-07-11 08:56:22,936 [ERROR] from akka.actor.OneForOneStrategy in application-akka.actor.default-dispatcher-6 - KafkaConsumer is not safe for multi-threaded access java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324) ~[org.apache.kafka.kafka-clients-0.9.0.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(KafkaConsumer.java:713) ~[org.apache.kafka.kafka-clients-0.9.0.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(KafkaConsumer.java:747) ~[org.apache.kafka.kafka-clients-0.9.0.1.jar:na] at services.kafka.KafkaServiceImpl.consume(KafkaService.scala:73) ~[bb-api.bb-api-1.0-sans-externalized.jar:na]
Reason: I run 2 consumers (difference threads), but just one KafkaService. And I use singleton for it (with Google Guice module). So, when running consumer, it conflict each other.
How to solve? Use @Named
annotation in binding.
For example, this is my module file:
class KafkaModule extends AbstractModule with ScalaModule with AkkaGuiceSupport { override def configure(): Unit = { bind[services.kafka.KafkaService[String, String]].annotatedWith(Names.named("kafka-Coccoc")).to[services.kafka.KafkaServiceImpl].in[Singleton] bind[services.kafka.KafkaService[String, String]].annotatedWith(Names.named("kafka-Report")).to[services.kafka.KafkaServiceImpl].in[Singleton] } }
Then, I rewrite my consumer code. Example:
class ReportConsumerActor @Inject() (@Named("kafka-Report") val kafkaService: KafkaService[String, String]) (implicit ec: ExecutionContext){ def receive: Receive = { kafkaService.consume(__SOME_CODE_HERE__) } }
That's all! Hope this help.
No comments:
Post a Comment