multithreading - Thread Pool per key in Java -


suppose have grid g of n x m cells, n , m huge. further, suppose have numerous tasks, each task belong single cell in g, , should executed in parallel (in thread pool or other resource pool).

however, task belonging same cell must done serially, is, should wait previous task in same cell done.

how can solve issue? i've search , used several thread pools (executors, thread), no luck.

minimum working example

import java.util.random; import java.util.concurrent.executorservice; import java.util.concurrent.executors;  public class mwe {      public static void main(string[] args) {         executorservice threadpool = executors.newfixedthreadpool(16);         random r = new random();          (int = 0; < 10000; i++) {             int nx = r.nextint(10);             int ny = r.nextint(10);              runnable task = new runnable() {                  public void run() {                    try {                     system.out.println("task running");                      thread.sleep(1000);                   } catch (interruptedexception e) {                     e.printstacktrace();                   }                 }              };              threadpool.submit(new thread(task)); // should use nx,ny here somehow         }     }  } 

a callback mechanism synchronized block work efficiently here. have answered similar question here. there limitations (see linked answer), simple enough keep track of going on (good maintainability). have adapted source code , made more efficient case tasks executed in parallel (since n , m huge), on occasion must serial (when task same point in grid g).

import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.reentrantlock;  // adapted https://stackoverflow.com/a/33113200/3080094 public class gridtaskexecutor {      public static void main(string[] args) {          final int maxtasks = 10_000;         final countdownlatch tasksdone = new countdownlatch(maxtasks);         threadpoolexecutor executor = (threadpoolexecutor) executors.newfixedthreadpool(16);         try {             gridtaskexecutor gte = new gridtaskexecutor(executor);              random r = new random();              (int = 0; < maxtasks; i++) {                  final int nx = r.nextint(10);                 final int ny = r.nextint(10);                  runnable task = new runnable() {                      public void run() {                          try {                             // system.out.println("task " + nx + " / " + ny + " running");                             thread.sleep(1);                         } catch (exception e) {                             e.printstacktrace();                         } {                             tasksdone.countdown();                         }                     }                  };                 gte.addtask(task, nx, ny);             }             tasksdone.await();             system.out.println("all tasks done, task points remaining: " + gte.size());         } catch (exception e) {             e.printstacktrace();         } {             executor.shutdownnow();         }     }      private final executor executor;     private final map<long, list<callbackpointtask>> taskswaiting = new hashmap<>();     // make lock fair adding , removing tasks balanced.     private final reentrantlock lock = new reentrantlock(true);      public gridtaskexecutor(executor executor) {         this.executor = executor;     }      public void addtask(runnable r, int x, int y) {          long point = topoint(x, y);         callbackpointtask pr = new callbackpointtask(point, r);         boolean runnow = false;         lock.lock();         try {             list<callbackpointtask> pointtasks = taskswaiting.get(point);             if (pointtasks == null) {                 if (taskswaiting.containskey(point)) {                     pointtasks = new linkedlist<callbackpointtask>();                     pointtasks.add(pr);                     taskswaiting.put(point, pointtasks);                 } else {                     taskswaiting.put(point, null);                     runnow = true;                 }             } else {                 pointtasks.add(pr);             }         } {             lock.unlock();         }         if (runnow) {             executor.execute(pr);         }     }      private void taskcompleted(long point) {          lock.lock();         try {             list<callbackpointtask> pointtasks = taskswaiting.get(point);             if (pointtasks == null || pointtasks.isempty()) {                 taskswaiting.remove(point);             } else {                 system.out.println(arrays.tostring(frompoint(point)) + " executing task " + pointtasks.size());                 executor.execute(pointtasks.remove(0));             }         } {             lock.unlock();         }     }      // general callback-task, see https://stackoverflow.com/a/826283/3080094     private class callbackpointtask implements runnable {          final long point;         final runnable original;          callbackpointtask(long point, runnable original) {             this.point = point;             this.original = original;         }          @override         public void run() {              try {                 original.run();             } {                 taskcompleted(point);             }         }     }      /** amount of points tasks. */      public int size() {          int l = 0;         lock.lock();         try {             l = taskswaiting.size();          } {             lock.unlock();         }         return l;     }      // https://stackoverflow.com/a/12772968/3080094     public static long topoint(int x, int y) {         return (((long)x) << 32) | (y & 0xffffffffl);     }      public static int[] frompoint(long p) {         return new int[] {(int)(p >> 32), (int)p };     }  } 

Comments

Popular posts from this blog

sublimetext3 - what keyboard shortcut is to comment/uncomment for this script tag in sublime -

java - No use of nillable="0" in SOAP Webservice -

ubuntu - Laravel 5.2 quickstart guide gives Not Found Error -