public class hiSyncQue<T> extends Object
スレッドからスレッドにデータを引き渡すQUEUEです。
生産者側、消費者側とも複数のスレッドであることが許されます。
生産者側が投じたデータ順、消費者側が待ち受けを開始した順はともに守られます。
put(T)でデータをセットしget()でデータを
取得します。
データがセットされていない状態でget()するとデータがput()され
るまでブロックします。
setLimit(int)QUEUEの最大サイズを指定すると
QUEUEサイズがそれを超えないよう
put()側でブロックさせる
ことができます。
put()側でブロックしている状態でget()があると、put()
側のブロックは解けます。
次のように使用できます。
import otsu.hiNote.hiSyncQue;
class data {
   int    id;String text;
   data(int id_,String text_){id=id_;text=text_;}
   }
class Producer extends Thread{ // 生産者側
   hiSyncQue<data> que;
   Producer(hiSyncQue<data> que_){ que= que_; }
   public void run(){
      try{
         for(int n=10;n>=0;--n){
            que.put(new data(n,"data:"+n)); // 送る
            System.out.println("put:"+n);
            }
         que.set_return_null_when_empty(true);// データ終了
         }
      catch(Exception e){e.printStackTrace(hiU.err);}
      }
   }
class Consumer extends Thread{ // 消費者側
   hiSyncQue<data> que;
   Consumer(hiSyncQue<data> que_){ que= que_; }
   public void run(){
      try{
         data d;
         while( (d=que.get())!=null ){ // 受ける
            System.out.println("get:"+d.id+" "+d.text);
            Thread.sleep(150);// 150 milli-sec. sleep
            }
         }
      catch(Exception e){e.printStackTrace(hiU.err);}
      }
   }
public class MyClass {
   final static boolean D=true;
   public static void main(String args[]){
      try{
         hiSyncQue<data> que     = new hiSyncQue<data>(5);// queueのサイズ5
         Producer        producer= new Producer(que);
         Consumer        consumer= new Consumer(que);
         producer.start();
         consumer.start();
         }
      catch(Exception e){
         e.printStackTrace(hiU.err);
         System.exit(1);
         }
      }
   }
これを動かした結果は次のようになります。消費者側の速度が 遅いため、生産者側にqueueサイズによる調整が入ることが分かります。
put:10 get:10 data:10 put:9 put:8 put:7 put:6 put:5 get:9 data:9 put:4 get:8 data:8 put:3 get:7 data:7 put:2 get:6 data:6 put:1 get:5 data:5 put:0 get:4 data:4 get:3 data:3 get:2 data:2 get:1 data:1 get:0 data:0
最大数を超える場合デフォルトではputがブロックしますが、次の設定を行うこともできます。
| 先頭を削除し、追加 | : remove_top_at_limit() | 
| 最後を削除し、追加 | : remove_last_at_limit() | 
| 追加せず廃棄 | : abandon_at_limit() | 
一つのSyncQueに対し複数スレッドからputし、複数スレッドがgetすることも可能です。
複数のスレッドからget()が出ている場合、先に発行されたget()から順に値が渡されます。
set_return_null_when_emptyを発行するとデータが空になるとget()は待ちに入らずnullを
返します。穏やかに消費スレッドを終了させることができます。
例えば次のコードでは予め起動した5個のデータ処理スレッドは、queにデータが
投入されるとそれらの処理をします。生産者側がset_return_null_when_emptyを
発行した後は、データがなくなり次第速やかにデータ処理スレッドは終了します。
// 消費者側スレッド
class Consumer extends Thread {
   hiSyncQue<MyData> que;
   public Consumer(hiSyncQue<MyData> que_){que=que_;}
   public void run(){
      MyData data;
      while( (data=que.get())!=null ){ // nullなら終了
         // data処理
         }
      }
   }
// 生産者側
   hiSyncQue<MyData> que=new hiSyncQue<MyData>();
   // 処理スレッドを起動
   for(int n=0;n<5;++n){
      (new Consumer(que)).start();
      }
   // データセット
   for(MyData data:dataList){
      que.put(data);
      }
   // データがなくなったらgetがnullを返す指定
   que.set_return_null_when_empty(true);
複数のget()あるいはput()がブロックされている状態で
 #clean(true);
を発行すると全てのブロックが解除されます。
LinkedBlockingDequeとArrayBlockingQueue
hiSyncQueはjavaの初期に開発されたものです。前身はジェネリック機構すらない
時代に作成されました。
その後javaの標準ライブラリに同様のクラス
など が作成されました。hiSyncQueを置き換える場合の注意点を説明します。
| コンストラクタと説明 | 
|---|
| hiSyncQue()QUEUEサイズ限定なしの同期QUEUEを作成する. | 
| hiSyncQue(int limit_)予めQUEUEの最大サイズを指定する | 
| hiSyncQue(int limit_,
         String name_)予めQUEUEの最大サイズを指定する | 
| hiSyncQue(String name_)QUEUEサイズ限定なしの名前付き同期QUEUEを作成する. | 
| 修飾子とタイプ | メソッドと説明 | 
|---|---|
| void | abandon_at_limit()最大数を超える場合,追加せず廃棄する | 
| void | block_at_limit()最大数を超える場合,ブロックする(デフォルト) | 
| ArrayList<T> | clean(boolean wakeup_)QUEUEに溜まっているデータを破棄する. | 
| boolean | detachData(ArrayList<T> data_)QUEUEから指定データを取り外す. | 
| boolean | detachData(T data_)QUEUEから指定データを取り外す. | 
| T | get()QUEUEのデータを取り出す. | 
| T | get(long timeout_)QUEUEのデータを取り出す.タイムアウト指定あり(試験中)
QUEUEが空の場合ブロックします。 | 
| T | getFirst() | 
| int | getLimit()QUEUEに溜められる最大数を得る. | 
| ArrayList<T> | getList()QUEUEに溜まっているデータリストを得る. | 
| String | getName()queueの名前を取得する. | 
| ArrayList<Long> | getWaitingThreadIdList()待ち状態のスレッドのidリストを得る
呼び出し時点での待ち状態のスレッドのidのリスト。 | 
| boolean | put(T obj_)QUEUEの最後にデータを追加する. | 
| boolean | put(T obj_,
   int timeout_)QUEUEの最後にデータを追加する;タイムアウトあり(試験中). | 
| boolean | putTop(T obj_)QUEUEの先頭にデータを追加する | 
| boolean | putTop(T obj_,
      int timeout_)QUEUEの先頭にデータを追加する;タイムアウトあり(試験中) | 
| void | remove_last_at_limit()最大数を超える場合,最後を削除する | 
| void | remove_top_at_limit()最大数を超える場合,先頭を削除する | 
| void | restart(boolean return_null_)clean(true)で停止状態にしたものを、再使用可能にします。 | 
| void | set_return_null_when_empty(boolean return_null_)QUEUEが空の時、get()でwaitせずnullを返す指定. | 
| void | setLimit(int limit_)QUEUEに溜められる最大数を設定する. | 
| void | setName(String name_)queueにデバッグ用の名前を設定する. | 
| int | size()QUEUEに溜まっているデータ数を得る. | 
| T | tryGet()QUEUEのデータを取り出す. | 
| T | waitFor()QUEUEのデータを取り出す. | 
| T | waitFor(long timeout_)QUEUEのデータを取り出す. | 
| boolean | wakeupThread(ArrayList<Long> tids_)待ち状態のスレッドの待ちを解く(未). | 
| boolean | wakeupThread(long tid_)待ち状態のスレッドの待ちを解く(未). | 
public hiSyncQue()
QUEUEの最大サイズは後からsetLimit(int)で設定することができます。
public hiSyncQue(String name_)
名前は主にデバッグ用に使います。
QUEUEの最大サイズは後からsetLimit(int)で設定することができます。
name_ - 名前public hiSyncQue(int limit_)
limit_ - QUEUEの最大サイズpublic hiSyncQue(int limit_,
                 String name_)
limit_ - QUEUEの最大サイズname_ - 名前public void setName(String name_)
name_ - 名前public String getName()
public boolean put(T obj_)
obj_ - QUEUEに追加するデータオブジェクトRuntimeException - IllegalMonitorStateExceptionまたは
InterruptedExceptionをcauseとして持つ。public boolean put(T obj_, int timeout_)
obj_ - QUEUEに追加するデータオブジェクトtimeout_ - タイムアウト・ミリ秒RuntimeException - IllegalMonitorStateExceptionまたは
InterruptedExceptionをcauseとして持つ。public boolean putTop(T obj_)
obj_ - QUEUEに追加するデータオブジェクトRuntimeException - IllegalMonitorStateExceptionまたは
InterruptedExceptionをcauseとして持つ。public boolean putTop(T obj_, int timeout_)
obj_ - QUEUEに追加するオブジェクトtimeout_ - タイムアウト・ミリ秒RuntimeException - IllegalMonitorStateExceptionまたは
InterruptedExceptionをcauseとして持つ。public T get()
QUEUEが空の場合ブロックします。clean()が発行された場合はnullが
戻ります。
通常ひとつのQUEUEに複数のスレッドからget()が出ている場合、先に出たget()
から順にブロックが解けます。
RuntimeException - IllegalMonitorStateExceptionまたは
InterruptedExceptionをcauseとして持つ。public T getFirst()
public T tryGet()
QUEUEが空の場合ブロックせずnullを返します。
RuntimeException - IllegalMonitorStateExceptionまたは
InterruptedExceptionをcauseとして持つ。public T get(long timeout_)
QUEUEが空の場合ブロックします。clean()が発行された場合はnullが
戻ります。
ひとつのQUEUEに複数のスレッドからget()が出ている場合、先に出たget()
から順にブロックが解けます。
タイムアウトした場合nullが返ります。複数のスレッドでgetを出す場合
各get毎に異なるタイムアウトが設定できます。
timeout_ - タイムアウト時間をミリ秒で指定RuntimeException - IllegalMonitorStateExceptionまたは
InterruptedExceptionをcauseとして持つ。public T waitFor(long timeout_)
get()と同じです。timeout_ - タイムアウト・ミリ秒public ArrayList<T> clean(boolean wakeup_)
QUEUE内のデータを廃棄します。
引数にtrueを設定すると、get(),put()の待ち状態は全て解除され、このQUEUEは
使用できなくなります。
使用可能とするにはrestartを呼びます。
wakeup_ - get()のブロックを解除する。get()側にはnullが通知される。
                put()のブロックも解除される。public void restart(boolean return_null_)
return_null_ - trueを指定するとQUEUEが空の時nullリターンします。public void set_return_null_when_empty(boolean return_null_)
送り手側(put側)で一連のデータが終わったことをこの指定で示せば、受け側(get側) は溜まっているデータを全て受け取った後はnullを受け取り、終わりを知ることが できます。
set_return_null_when_empty(true)を発行すると、 set_return_null_when_empty(false)を発行するまで、put()は無処理となります。
return_null_ - trueを指定するとQUEUEが空の時nullリターンします。public int size()
public void setLimit(int limit_)
QUEUEに溜まったデータ数最大数に達すると
put()はブロックします。
0は設定無しを表します。
limit_ - QUEUEに溜められるオブジェクトの最大数public int getLimit()
public void block_at_limit()
public void remove_top_at_limit()
public void remove_last_at_limit()
public void abandon_at_limit()
public ArrayList<T> getList()
呼び出し時点でQUEUEに溜まっているデータのリストを得ます。
データオブジェクトは溜まっているオブジェクトそのものですが、
リストは応答ように作られたものです。
リストに変更を加えてもQUEUEには影響はありません。
public boolean detachData(T data_)
指定のデータインスタンスがQUEUEにある場合取り外します。
指定するのはQUEUEに入れたデータそのもので、equalsなどによる
比較は行いません。
data_ - 取り外すデータ。public boolean detachData(ArrayList<T> data_)
指定のデータインスタンスがQUEUEにある場合取り外します。
指定するのはQUEUEに入れたデータそのもので、equalsなどによる
比較は行いません。
data_ - 取り外すデータ。public ArrayList<Long> getWaitingThreadIdList()
呼び出し時点での待ち状態のスレッドのidのリスト。
リストは応答用に作られたものです。
リストに変更を加えてもQUEUEには影響はありません。
public boolean wakeupThread(long tid_)
指定スレッドidのスレッドが待ち状態の場合、待ち状態を解きます。
解かれた側のスレッドのget()はnullが返ります。
tid_ - 待ちを解くスレッドid