public class hiSyncQue<T> extends Object
スレッドからスレッドにデータを引き渡すQUEUEです。
生産者側、消費者側とも複数のスレッドであることが許されます。
生産者側が投じたデータ順、消費者側が待ち受けを開始した順はともに守られます。
put(T)でデータをセットしget()でデータを
取得します。
データがセットされていない状態でget()するとデータがput()され
るまでブロックします。
setLimit(int)QUEUEの最大サイズを指定すると
QUEUEサイズがそれを超えないよう
put()側でブロックさせる
ことができます。
put()側でブロックしている状態でget()があると、put()
側のブロックは解けます。
次のように使用できます。
import otsu.hiNote.hiSyncQue;
class data {
int id;String text;
data(int id_,String text_){id=id_;text=text_;}
}
class Producer extends Thread{ // 生産者側
hiSyncQue<data> que;
Producer(hiSyncQue<data> que_){ que= que_; }
public void run(){
try{
for(int n=10;n>=0;--n){
que.put(new data(n,"data:"+n)); // 送る
System.out.println("put:"+n);
}
que.set_return_null_when_empty(true);// データ終了
}
catch(Exception e){e.printStackTrace(hiU.err);}
}
}
class Consumer extends Thread{ // 消費者側
hiSyncQue<data> que;
Consumer(hiSyncQue<data> que_){ que= que_; }
public void run(){
try{
data d;
while( (d=que.get())!=null ){ // 受ける
System.out.println("get:"+d.id+" "+d.text);
Thread.sleep(150);// 150 milli-sec. sleep
}
}
catch(Exception e){e.printStackTrace(hiU.err);}
}
}
public class MyClass {
final static boolean D=true;
public static void main(String args[]){
try{
hiSyncQue<data> que = new hiSyncQue<data>(5);// queueのサイズ5
Producer producer= new Producer(que);
Consumer consumer= new Consumer(que);
producer.start();
consumer.start();
}
catch(Exception e){
e.printStackTrace(hiU.err);
System.exit(1);
}
}
}
これを動かした結果は次のようになります。消費者側の速度が 遅いため、生産者側にqueueサイズによる調整が入ることが分かります。
put:10 get:10 data:10 put:9 put:8 put:7 put:6 put:5 get:9 data:9 put:4 get:8 data:8 put:3 get:7 data:7 put:2 get:6 data:6 put:1 get:5 data:5 put:0 get:4 data:4 get:3 data:3 get:2 data:2 get:1 data:1 get:0 data:0
最大数を超える場合デフォルトではputがブロックしますが、次の設定を行うこともできます。
| 先頭を削除し、追加 |
:remove_top_at_limit()
|
| 最後を削除し、追加 |
:remove_last_at_limit()
|
| 追加せず廃棄 |
:abandon_at_limit()
|
一つのSyncQueに対し複数スレッドからputし、複数スレッドがgetすることも可能です。
複数のスレッドからget()が出ている場合、先に発行されたget()から順に値が渡されます。
set_return_null_when_emptyを発行するとデータが空になるとget()は待ちに入らずnullを
返します。穏やかに消費スレッドを終了させることができます。
例えば次のコードでは予め起動した5個のデータ処理スレッドは、queにデータが
投入されるとそれらの処理をします。生産者側がset_return_null_when_emptyを
発行した後は、データがなくなり次第速やかにデータ処理スレッドは終了します。
// 消費者側スレッド
class Consumer extends Thread {
hiSyncQue<MyData> que;
public Consumer(hiSyncQue<MyData> que_){que=que_;}
public void run(){
MyData data;
while( (data=que.get())!=null ){ // nullなら終了
// data処理
}
}
}
// 生産者側
hiSyncQue<MyData> que=new hiSyncQue<MyData>();
// 処理スレッドを起動
for(int n=0;n<5;++n){
(new Consumer(que)).start();
}
// データセット
for(MyData data:dataList){
que.put(data);
}
// データがなくなったらgetがnullを返す指定
que.set_return_null_when_empty(true);
複数のget()あるいはput()がブロックされている状態で
#clean(true);
を発行すると全てのブロックが解除されます。
LinkedBlockingDequeとArrayBlockingQueue
hiSyncQueはjavaの初期に開発されたものです。前身はジェネリック機構すらない
時代に作成されました。
その後javaの標準ライブラリに同様のクラス
など が作成されました。hiSyncQueを置き換える場合の注意点を説明します。
| コンストラクタと説明 |
|---|
hiSyncQue()
QUEUEサイズ限定なしの同期QUEUEを作成する.
|
hiSyncQue(int limit_)
予めQUEUEの最大サイズを指定する
|
hiSyncQue(int limit_,
String name_)
予めQUEUEの最大サイズを指定する
|
hiSyncQue(String name_)
QUEUEサイズ限定なしの名前付き同期QUEUEを作成する.
|
| 修飾子とタイプ | メソッドと説明 |
|---|---|
void |
abandon_at_limit()
最大数を超える場合,追加せず廃棄する
|
void |
block_at_limit()
最大数を超える場合,ブロックする(デフォルト)
|
ArrayList<T> |
clean(boolean wakeup_)
QUEUEに溜まっているデータを破棄する.
|
boolean |
detachData(ArrayList<T> data_)
QUEUEから指定データを取り外す.
|
boolean |
detachData(T data_)
QUEUEから指定データを取り外す.
|
T |
get()
QUEUEのデータを取り出す.
|
T |
get(long timeout_)
QUEUEのデータを取り出す.タイムアウト指定あり(試験中)
QUEUEが空の場合ブロックします。
|
T |
getFirst() |
int |
getLimit()
QUEUEに溜められる最大数を得る.
|
ArrayList<T> |
getList()
QUEUEに溜まっているデータリストを得る.
|
String |
getName()
queueの名前を取得する.
|
ArrayList<Long> |
getWaitingThreadIdList()
待ち状態のスレッドのidリストを得る
呼び出し時点での待ち状態のスレッドのidのリスト。
|
boolean |
put(T obj_)
QUEUEの最後にデータを追加する.
|
boolean |
put(T obj_,
int timeout_)
QUEUEの最後にデータを追加する;タイムアウトあり(試験中).
|
boolean |
putTop(T obj_)
QUEUEの先頭にデータを追加する
|
boolean |
putTop(T obj_,
int timeout_)
QUEUEの先頭にデータを追加する;タイムアウトあり(試験中)
|
void |
remove_last_at_limit()
最大数を超える場合,最後を削除する
|
void |
remove_top_at_limit()
最大数を超える場合,先頭を削除する
|
void |
restart(boolean return_null_)
clean(true)で停止状態にしたものを、再使用可能にします。
|
void |
set_return_null_when_empty(boolean return_null_)
QUEUEが空の時、get()でwaitせずnullを返す指定.
|
void |
setLimit(int limit_)
QUEUEに溜められる最大数を設定する.
|
void |
setName(String name_)
queueにデバッグ用の名前を設定する.
|
int |
size()
QUEUEに溜まっているデータ数を得る.
|
T |
tryGet()
QUEUEのデータを取り出す.
|
T |
waitFor()
QUEUEのデータを取り出す.
|
T |
waitFor(long timeout_)
QUEUEのデータを取り出す.
|
boolean |
wakeupThread(ArrayList<Long> tids_)
待ち状態のスレッドの待ちを解く(未).
|
boolean |
wakeupThread(long tid_)
待ち状態のスレッドの待ちを解く(未).
|
public hiSyncQue()
QUEUEの最大サイズは後からsetLimit(int)で設定することができます。
public hiSyncQue(String name_)
名前は主にデバッグ用に使います。
QUEUEの最大サイズは後からsetLimit(int)で設定することができます。
name_ - 名前public hiSyncQue(int limit_)
limit_ - QUEUEの最大サイズpublic hiSyncQue(int limit_,
String name_)
limit_ - QUEUEの最大サイズname_ - 名前public void setName(String name_)
name_ - 名前public String getName()
public boolean put(T obj_)
obj_ - QUEUEに追加するデータオブジェクトRuntimeException - IllegalMonitorStateExceptionまたは
InterruptedExceptionをcauseとして持つ。public boolean put(T obj_, int timeout_)
obj_ - QUEUEに追加するデータオブジェクトtimeout_ - タイムアウト・ミリ秒RuntimeException - IllegalMonitorStateExceptionまたは
InterruptedExceptionをcauseとして持つ。public boolean putTop(T obj_)
obj_ - QUEUEに追加するデータオブジェクトRuntimeException - IllegalMonitorStateExceptionまたは
InterruptedExceptionをcauseとして持つ。public boolean putTop(T obj_, int timeout_)
obj_ - QUEUEに追加するオブジェクトtimeout_ - タイムアウト・ミリ秒RuntimeException - IllegalMonitorStateExceptionまたは
InterruptedExceptionをcauseとして持つ。public T get()
QUEUEが空の場合ブロックします。clean()が発行された場合はnullが
戻ります。
通常ひとつのQUEUEに複数のスレッドからget()が出ている場合、先に出たget()
から順にブロックが解けます。
RuntimeException - IllegalMonitorStateExceptionまたは
InterruptedExceptionをcauseとして持つ。public T getFirst()
public T tryGet()
QUEUEが空の場合ブロックせずnullを返します。
RuntimeException - IllegalMonitorStateExceptionまたは
InterruptedExceptionをcauseとして持つ。public T get(long timeout_)
QUEUEが空の場合ブロックします。clean()が発行された場合はnullが
戻ります。
ひとつのQUEUEに複数のスレッドからget()が出ている場合、先に出たget()
から順にブロックが解けます。
タイムアウトした場合nullが返ります。複数のスレッドでgetを出す場合
各get毎に異なるタイムアウトが設定できます。
timeout_ - タイムアウト時間をミリ秒で指定RuntimeException - IllegalMonitorStateExceptionまたは
InterruptedExceptionをcauseとして持つ。public T waitFor(long timeout_)
get()と同じです。timeout_ - タイムアウト・ミリ秒public ArrayList<T> clean(boolean wakeup_)
QUEUE内のデータを廃棄します。
引数にtrueを設定すると、get(),put()の待ち状態は全て解除され、このQUEUEは
使用できなくなります。
使用可能とするにはrestartを呼びます。
wakeup_ - get()のブロックを解除する。get()側にはnullが通知される。
put()のブロックも解除される。public void restart(boolean return_null_)
return_null_ - trueを指定するとQUEUEが空の時nullリターンします。public void set_return_null_when_empty(boolean return_null_)
送り手側(put側)で一連のデータが終わったことをこの指定で示せば、受け側(get側) は溜まっているデータを全て受け取った後はnullを受け取り、終わりを知ることが できます。
set_return_null_when_empty(true)を発行すると、 set_return_null_when_empty(false)を発行するまで、put()は無処理となります。
return_null_ - trueを指定するとQUEUEが空の時nullリターンします。public int size()
public void setLimit(int limit_)
QUEUEに溜まったデータ数最大数に達すると
put()はブロックします。
0は設定無しを表します。
limit_ - QUEUEに溜められるオブジェクトの最大数public int getLimit()
public void block_at_limit()
public void remove_top_at_limit()
public void remove_last_at_limit()
public void abandon_at_limit()
public ArrayList<T> getList()
呼び出し時点でQUEUEに溜まっているデータのリストを得ます。
データオブジェクトは溜まっているオブジェクトそのものですが、
リストは応答ように作られたものです。
リストに変更を加えてもQUEUEには影響はありません。
public boolean detachData(T data_)
指定のデータインスタンスがQUEUEにある場合取り外します。
指定するのはQUEUEに入れたデータそのもので、equalsなどによる
比較は行いません。
data_ - 取り外すデータ。public boolean detachData(ArrayList<T> data_)
指定のデータインスタンスがQUEUEにある場合取り外します。
指定するのはQUEUEに入れたデータそのもので、equalsなどによる
比較は行いません。
data_ - 取り外すデータ。public ArrayList<Long> getWaitingThreadIdList()
呼び出し時点での待ち状態のスレッドのidのリスト。
リストは応答用に作られたものです。
リストに変更を加えてもQUEUEには影響はありません。
public boolean wakeupThread(long tid_)
指定スレッドidのスレッドが待ち状態の場合、待ち状態を解きます。
解かれた側のスレッドのget()はnullが返ります。
tid_ - 待ちを解くスレッドid