public class hiSyncQue<T> extends Object
スレッドからスレッドにデータを引き渡すQUEUEです。
生産者側、消費者側とも複数のスレッドであることが許されます。
生産者側が投じたデータ順、消費者側が待ち受けを開始した順はともに守られます。
put(T)
でデータをセットしget()
でデータを
取得します。
データがセットされていない状態でget()するとデータがput()され
るまでブロックします。
setLimit(int)
QUEUEの最大サイズを指定すると
QUEUEサイズがそれを超えないよう
put()側でブロックさせる
ことができます。
put()側でブロックしている状態でget()があると、put()
側のブロックは解けます。
次のように使用できます。
import otsu.hiNote.hiSyncQue; class data { int id;String text; data(int id_,String text_){id=id_;text=text_;} } class Producer extends Thread{ // 生産者側 hiSyncQue<data> que; Producer(hiSyncQue<data> que_){ que= que_; } public void run(){ try{ for(int n=10;n>=0;--n){ que.put(new data(n,"data:"+n)); // 送る System.out.println("put:"+n); } que.set_return_null_when_empty(true);// データ終了 } catch(Exception e){e.printStackTrace(hiU.err);} } } class Consumer extends Thread{ // 消費者側 hiSyncQue<data> que; Consumer(hiSyncQue<data> que_){ que= que_; } public void run(){ try{ data d; while( (d=que.get())!=null ){ // 受ける System.out.println("get:"+d.id+" "+d.text); Thread.sleep(150);// 150 milli-sec. sleep } } catch(Exception e){e.printStackTrace(hiU.err);} } } public class MyClass { final static boolean D=true; public static void main(String args[]){ try{ hiSyncQue<data> que = new hiSyncQue<data>(5);// queueのサイズ5 Producer producer= new Producer(que); Consumer consumer= new Consumer(que); producer.start(); consumer.start(); } catch(Exception e){ e.printStackTrace(hiU.err); System.exit(1); } } }
これを動かした結果は次のようになります。消費者側の速度が 遅いため、生産者側にqueueサイズによる調整が入ることが分かります。
put:10 get:10 data:10 put:9 put:8 put:7 put:6 put:5 get:9 data:9 put:4 get:8 data:8 put:3 get:7 data:7 put:2 get:6 data:6 put:1 get:5 data:5 put:0 get:4 data:4 get:3 data:3 get:2 data:2 get:1 data:1 get:0 data:0
最大数を超える場合デフォルトではputがブロックしますが、次の設定を行うこともできます。
先頭を削除し、追加 |
:remove_top_at_limit()
|
最後を削除し、追加 |
:remove_last_at_limit()
|
追加せず廃棄 |
:abandon_at_limit()
|
一つのSyncQueに対し複数スレッドからputし、複数スレッドがgetすることも可能です。
複数のスレッドからget()が出ている場合、先に発行されたget()から順に値が渡されます。
set_return_null_when_emptyを発行するとデータが空になるとget()は待ちに入らずnullを
返します。穏やかに消費スレッドを終了させることができます。
例えば次のコードでは予め起動した5個のデータ処理スレッドは、queにデータが
投入されるとそれらの処理をします。生産者側がset_return_null_when_emptyを
発行した後は、データがなくなり次第速やかにデータ処理スレッドは終了します。
// 消費者側スレッド class Consumer extends Thread { hiSyncQue<MyData> que; public Consumer(hiSyncQue<MyData> que_){que=que_;} public void run(){ MyData data; while( (data=que.get())!=null ){ // nullなら終了 // data処理 } } } // 生産者側 hiSyncQue<MyData> que=new hiSyncQue<MyData>(); // 処理スレッドを起動 for(int n=0;n<5;++n){ (new Consumer(que)).start(); } // データセット for(MyData data:dataList){ que.put(data); } // データがなくなったらgetがnullを返す指定 que.set_return_null_when_empty(true);
複数のget()あるいはput()がブロックされている状態で
#clean(true);
を発行すると全てのブロックが解除されます。
LinkedBlockingDequeとArrayBlockingQueue
hiSyncQueはjavaの初期に開発されたものです。前身はジェネリック機構すらない
時代に作成されました。
その後javaの標準ライブラリに同様のクラス
など が作成されました。hiSyncQueを置き換える場合の注意点を説明します。
コンストラクタと説明 |
---|
hiSyncQue()
QUEUEサイズ限定なしの同期QUEUEを作成する.
|
hiSyncQue(int limit_)
予めQUEUEの最大サイズを指定する
|
hiSyncQue(int limit_,
String name_)
予めQUEUEの最大サイズを指定する
|
hiSyncQue(String name_)
QUEUEサイズ限定なしの名前付き同期QUEUEを作成する.
|
修飾子とタイプ | メソッドと説明 |
---|---|
void |
abandon_at_limit()
最大数を超える場合,追加せず廃棄する
|
void |
block_at_limit()
最大数を超える場合,ブロックする(デフォルト)
|
ArrayList<T> |
clean(boolean wakeup_)
QUEUEに溜まっているデータを破棄する.
|
boolean |
detachData(ArrayList<T> data_)
QUEUEから指定データを取り外す.
|
boolean |
detachData(T data_)
QUEUEから指定データを取り外す.
|
T |
get()
QUEUEのデータを取り出す.
|
T |
get(long timeout_)
QUEUEのデータを取り出す.タイムアウト指定あり(試験中)
QUEUEが空の場合ブロックします。
|
T |
getFirst() |
int |
getLimit()
QUEUEに溜められる最大数を得る.
|
ArrayList<T> |
getList()
QUEUEに溜まっているデータリストを得る.
|
String |
getName()
queueの名前を取得する.
|
ArrayList<Long> |
getWaitingThreadIdList()
待ち状態のスレッドのidリストを得る
呼び出し時点での待ち状態のスレッドのidのリスト。
|
boolean |
put(T obj_)
QUEUEの最後にデータを追加する.
|
boolean |
put(T obj_,
int timeout_)
QUEUEの最後にデータを追加する;タイムアウトあり(試験中).
|
boolean |
putTop(T obj_)
QUEUEの先頭にデータを追加する
|
boolean |
putTop(T obj_,
int timeout_)
QUEUEの先頭にデータを追加する;タイムアウトあり(試験中)
|
void |
remove_last_at_limit()
最大数を超える場合,最後を削除する
|
void |
remove_top_at_limit()
最大数を超える場合,先頭を削除する
|
void |
restart(boolean return_null_)
clean(true)で停止状態にしたものを、再使用可能にします。
|
void |
set_return_null_when_empty(boolean return_null_)
QUEUEが空の時、get()でwaitせずnullを返す指定.
|
void |
setLimit(int limit_)
QUEUEに溜められる最大数を設定する.
|
void |
setName(String name_)
queueにデバッグ用の名前を設定する.
|
int |
size()
QUEUEに溜まっているデータ数を得る.
|
T |
tryGet()
QUEUEのデータを取り出す.
|
T |
waitFor()
QUEUEのデータを取り出す.
|
T |
waitFor(long timeout_)
QUEUEのデータを取り出す.
|
boolean |
wakeupThread(ArrayList<Long> tids_)
待ち状態のスレッドの待ちを解く(未).
|
boolean |
wakeupThread(long tid_)
待ち状態のスレッドの待ちを解く(未).
|
public hiSyncQue()
QUEUEの最大サイズは後からsetLimit(int)
で設定することができます。
public hiSyncQue(String name_)
名前は主にデバッグ用に使います。
QUEUEの最大サイズは後からsetLimit(int)
で設定することができます。
name_
- 名前public hiSyncQue(int limit_)
limit_
- QUEUEの最大サイズpublic hiSyncQue(int limit_, String name_)
limit_
- QUEUEの最大サイズname_
- 名前public void setName(String name_)
name_
- 名前public String getName()
public boolean put(T obj_)
obj_
- QUEUEに追加するデータオブジェクトRuntimeException
- IllegalMonitorStateException
または
InterruptedException
をcauseとして持つ。public boolean put(T obj_, int timeout_)
obj_
- QUEUEに追加するデータオブジェクトtimeout_
- タイムアウト・ミリ秒RuntimeException
- IllegalMonitorStateException
または
InterruptedException
をcauseとして持つ。public boolean putTop(T obj_)
obj_
- QUEUEに追加するデータオブジェクトRuntimeException
- IllegalMonitorStateException
または
InterruptedException
をcauseとして持つ。public boolean putTop(T obj_, int timeout_)
obj_
- QUEUEに追加するオブジェクトtimeout_
- タイムアウト・ミリ秒RuntimeException
- IllegalMonitorStateException
または
InterruptedException
をcauseとして持つ。public T get()
QUEUEが空の場合ブロックします。clean()が発行された場合はnullが
戻ります。
通常ひとつのQUEUEに複数のスレッドからget()が出ている場合、先に出たget()
から順にブロックが解けます。
RuntimeException
- IllegalMonitorStateException
または
InterruptedException
をcauseとして持つ。public T getFirst()
public T tryGet()
QUEUEが空の場合ブロックせずnullを返します。
RuntimeException
- IllegalMonitorStateException
または
InterruptedException
をcauseとして持つ。public T get(long timeout_)
QUEUEが空の場合ブロックします。clean()が発行された場合はnullが
戻ります。
ひとつのQUEUEに複数のスレッドからget()が出ている場合、先に出たget()
から順にブロックが解けます。
タイムアウトした場合nullが返ります。複数のスレッドでgetを出す場合
各get毎に異なるタイムアウトが設定できます。
timeout_
- タイムアウト時間をミリ秒で指定RuntimeException
- IllegalMonitorStateException
または
InterruptedException
をcauseとして持つ。public T waitFor(long timeout_)
get()
と同じです。timeout_
- タイムアウト・ミリ秒public ArrayList<T> clean(boolean wakeup_)
QUEUE内のデータを廃棄します。
引数にtrueを設定すると、get(),put()の待ち状態は全て解除され、このQUEUEは
使用できなくなります。
使用可能とするにはrestartを呼びます。
wakeup_
- get()のブロックを解除する。get()側にはnullが通知される。
put()のブロックも解除される。public void restart(boolean return_null_)
return_null_
- trueを指定するとQUEUEが空の時nullリターンします。public void set_return_null_when_empty(boolean return_null_)
送り手側(put側)で一連のデータが終わったことをこの指定で示せば、受け側(get側) は溜まっているデータを全て受け取った後はnullを受け取り、終わりを知ることが できます。
set_return_null_when_empty(true)を発行すると、 set_return_null_when_empty(false)を発行するまで、put()は無処理となります。
return_null_
- trueを指定するとQUEUEが空の時nullリターンします。public int size()
public void setLimit(int limit_)
QUEUEに溜まったデータ数最大数に達すると
put()はブロックします。
0は設定無しを表します。
limit_
- QUEUEに溜められるオブジェクトの最大数public int getLimit()
public void block_at_limit()
public void remove_top_at_limit()
public void remove_last_at_limit()
public void abandon_at_limit()
public ArrayList<T> getList()
呼び出し時点でQUEUEに溜まっているデータのリストを得ます。
データオブジェクトは溜まっているオブジェクトそのものですが、
リストは応答ように作られたものです。
リストに変更を加えてもQUEUEには影響はありません。
public boolean detachData(T data_)
指定のデータインスタンスがQUEUEにある場合取り外します。
指定するのはQUEUEに入れたデータそのもので、equalsなどによる
比較は行いません。
data_
- 取り外すデータ。public boolean detachData(ArrayList<T> data_)
指定のデータインスタンスがQUEUEにある場合取り外します。
指定するのはQUEUEに入れたデータそのもので、equalsなどによる
比較は行いません。
data_
- 取り外すデータ。public ArrayList<Long> getWaitingThreadIdList()
呼び出し時点での待ち状態のスレッドのidのリスト。
リストは応答用に作られたものです。
リストに変更を加えてもQUEUEには影響はありません。
public boolean wakeupThread(long tid_)
指定スレッドidのスレッドが待ち状態の場合、待ち状態を解きます。
解かれた側のスレッドのget()はnullが返ります。
tid_
- 待ちを解くスレッドid