`

使用java实现的线程池 消息队列功能

 
阅读更多

ThreadPoolManager类:负责管理线程池,调用轮询的线程来访问字符串缓冲区的内容,维护缓冲区,当线程池溢出时抛出的Runnable任务被加入到字符缓冲区。

  public class ThreadPoolManager

  {

  private static ThreadPoolManager tpm = new ThreadPoolManager();

  // 线程池维护线程的最少数量

  private final static int CORE_POOL_SIZE = 4;

  // 线程池维护线程的最大数量

  private final static int MAX_POOL_SIZE = 10;

  // 线程池维护线程所允许的空闲时间

  private final static int KEEP_ALIVE_TIME = 0;

  // 线程池所使用的缓冲队列大小

  private final static int WORK_QUEUE_SIZE = 10;

  // 消息缓冲队列

  Queue msgQueue = new LinkedList();

  // 访问消息缓存的调度线程

  final Runnable accessBufferThread = new Runnable()

  {

  public void run()

  {

  // 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中

  if( hasMoreAcquire() )

  {

  String msg = ( String ) msgQueue.poll();

  Runnable task = new AccessDBThread( msg );

  threadPool.execute( task );

  }

  }

  };

  final RejectedExecutionHandler handler = new RejectedExecutionHandler()

  {

  public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )

  {

  System.out.println(((AccessDBThread )r).getMsg()+"消息放入队列中重新等待执行");

  msgQueue.offer((( AccessDBThread ) r ).getMsg() );

  }

  };

  // 管理数据库访问的线程池

  final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(

  CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,

  new ArrayBlockingQueue( WORK_QUEUE_SIZE ), this.handler );

  // 调度线程池

  final ScheduledExecutorService scheduler = Executors

  .newScheduledThreadPool( 1 );

  final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(

  accessBufferThread, 0, 1, TimeUnit.SECONDS );

  public static ThreadPoolManager newInstance()

  {

  return tpm;

  }

  private ThreadPoolManager(){}

  private boolean hasMoreAcquire()

  {

  return !msgQueue.isEmpty();

  }

  public void addLogMsg( String msg )

  {

  Runnable task = new AccessDBThread( msg );

  threadPool.execute( task );

  }

  }

  public class AccessDBThread implements Runnable

  {

  private String msg;

  public String getMsg()

  {

  return msg;

  }

  public void setMsg( String msg )

  {

  this.msg = msg;

  }

  public AccessDBThread(){

  super();

  }

  public AccessDBThread(String msg){

  this.msg = msg;

  }

  public void run()

  {

  // 向数据库中添加Msg变量值

  System.out.println("Added the message: "+msg+" into the Database");

  }

  }

  public class TestDriver

  {

  ThreadPoolManager tpm = ThreadPoolManager.newInstance();

  public void sendMsg( String msg )

  {

  tpm.addLogMsg( msg + "记录一条日志 " );

  }

  public static void main( String[] args )

  {

  for( int i = 0; i <100; i++ )

  {

  new TestDriver().sendMsg( Integer.toString( i ) );

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics