All Articles

Java New Generation Concurency API

Pasti sering menggunakan Download Accelerator kan? Dibalik layar aplikasi jenis ini melakukan proses download dengan cara memecah tugas download 1file menjadi beberapa bagian (task). Kemudian disiapkan beberapa pekerja (thread) yg menampung tugas-tugas tadi tanpa menunggu satu selesai baru yg lainnya jalan (non blocking) tapi langsung dijalankan secara bersama-sama. Hasil dari masing-masing thread akan digabungkan lagi menjadi 1 file utuh.

Java telah lama memiliki implementasi solusi seperti kasus tadi dengan Java Thread API. Java old Thread API, bukannya ga bagus, tapi "Is There a Reason to Use the Old Threading API?" ketika sejak Java 5 telah diciptakan Pustaka sakti yg dilupakan banyak org: Java New Generation Concurency API.

Lupakan jelimetnya berurusan dengan keyword ini di Thread: wait, notify, synchronize, join. Mari berkenalan dengan Pustaka Sakti generasi terbaru!

Contoh kasus aplikasi Currency Converter, aplikasi ini akan menghitung kurs beberapa mata uang terhadap USD.

Berikut ini fungsi yg digunakan untuk mendapatkan nilai kurs terharap USD melalui Yahoo Finance, sesuai dengan mata uang yg ingin dicari :

private HashMap getRates(String ccy) throws Exception {

	URL url = new URL("http://finance.yahoo.com/d/quotes.csv?e=.csv&f=sl1d1t1&s=USD"
					+ ccy + "=X");
	BufferedReader reader = new BufferedReader(new InputStreamReader(
			url.openStream()));

	String data = reader.readLine();
	String[] dataItems = data.split(",");
	Double rate = Double.valueOf(dataItems[1]);
	HashMap ccyRate = new HashMap();
	ccyRate.put("CCY", ccy);
	ccyRate.put("RATE", rate);

	return ccyRate;
}

Saya akan mulai dengan Strategi yg pertama, dengan Metode Sekuensial (berurutan)

final String[] CURRENCY = new String[] { "IDR", "MYR", "AUD", "SGD", "THB" };

void checkRateSequential() throws Exception {

	System.out.println("checkRateSequential :");
	long start = System.nanoTime();
	for (String ccy : CURRENCY) {
		HashMap ccyRate = getRates(ccy);
		System.out.println("Value of $1 in " + ccy + " is " + ccyRate.get("RATE"));
	}

	long end = System.nanoTime();
	System.out.println("Time (seconds) taken " + (end - start) / 1.0e9);
}
checkRateSequential :
Value of $1 in IDR is 9218.0
Value of $1 in MYR is 3.0717
Value of $1 in AUD is 0.9976
Value of $1 in SGD is 1.2526
Value of $1 in THB is 31.193
Time (seconds) taken 14.404746959

Metode ini paling umum digunakan, aplikasi akan melakukan request berulang-ulang berurutan untuk mendapatkan nilai kurs, satu request selesai baru request yg lain dijalankan (blocking IO), sampai selesai.
Dengan metode ini hanya ada satu proses IO yg berjalan. Sayang banget, CPU udah multicore, internet udah ngebut kok disia-siakan resource yg ada ?
Ok, Strategi berikutnya, Menjalankan beberapa request sekaligus secara bersamaan. Saya akan gunakan pustaka sakti Java Concurent API

void checkRateAllAtEnd() throws Exception {
	System.out.println("checkRateAllAtEnd :");
	long start = System.nanoTime();
	
	ExecutorService executorPool = Executors.newCachedThreadPool();

	List<Callable> tasks = new ArrayList<Callable>();
	for (final String ccy : CURRENCY) {
		tasks.add(new Callable() {
			public HashMap call() throws Exception {
				return getRates(ccy);
			}
		});
	}
	final List<Future> listRates = executorPool.invokeAll(tasks, 3600, TimeUnit.SECONDS);
	
	for (Future rate : listRates) {
		HashMap ccyRate = rate.get();
		System.out.println("Value of $1 in " + ccyRate.get("CCY") + " is " + ccyRate.get("RATE"));
	}
	
	executorPool.shutdown();
	
	long end = System.nanoTime();
	System.out.println("Time (seconds) taken " + (end - start) / 1.0e9);
}
checkRateAllAtEnd :
Value of $1 in IDR is 9218.0
Value of $1 in MYR is 3.0717
Value of $1 in AUD is 0.9976
Value of $1 in SGD is 1.2526
Value of $1 in THB is 31.193
Time (seconds) taken 3.782909569

Wow, ada penambahan kecepatan hingga lebih dari 3 kali lipat! Dengan memanfaatkan Pustaka Concurency, aplikasi dapat melakukan beberapa request sekaligus secara bersama-sama. Tidak perlu harus nganggur menunggu task yg satu selesai.
Penjelasan singkat mengenai penggunaan Java Concurency diatas:

ExecutorService executorPool = Executors.newCachedThreadPool(); 

Membuat Object Executor yg akan menjalankan task request. Executor ini sebagai tempat penampung beberapa thread (pekerja keras nan gigih yg siaga), diatas saya memakai "newCachedThreadPool" dengan ini Java akan secara otomatis membuat beberapa thread sesuai kebutuhan dan kemampuan sistem, jika sudah tidak aktif akan kembali ke pool.
Namun apabila ingin menentukan secara manual berapa jumlah thread yg diinginkan, bisa menggunakan ini "Executors.newFixedThreadPool(10)" tapi pastikan jangan sampai jumlah thread lebih kecil dari jumlah core CPU (ya iya lah!)

List<Callable> tasks = new ArrayList<Callable>();
for (final String ccy : CURRENCY) {
	tasks.add(new Callable() {
		public HashMap call() throws Exception {
			return getRates(ccy);
		}
	});
}

Pada baris ke-1 difungsikan untuk menampung task yg akan dikerjakan oleh executor. Untuk itu perlu di isi, task apa aja yg ingin dijalankan. List ini harus berisi daftar object Callable, yaitu object yg mengimplement interface Callable. Object dengan interface Callable memiliki method call() yg nantinya akan dijalankan oleh pekerja-pekerja keras yg ngantor di Executor!

List listRates = executorPool.invokeAll(tasks, 3600, TimeUnit.SECONDS);

Eksekusi seluruh task yg ada tidak boleh lebih dari 3600 detik, jika lebih maka pekerjaan dihentikan (timeout)

for (Future rate : listRates) {
	HashMap ccyRate = rate.get();
	System.out.println("Value of $1 in " + ccyRate.get("CCY") + " is " + ccyRate.get("RATE"));
}	

Objek Future disini merupakan representasi hasil dari object yg akan di hasilkan oleh thread yg mengeksekusi task. Java bisa memprediksi masa depan, hoho :)
Jadi for looping disini bertugas untuk mengumpulkan hasil dari masing-masing thread yg telah susah payah bekerja tidak boleh lebih dari 3600 detik tadi! Mudah bukan? gampang digunakan dan enak dibaca.

Nah ada lagi satu strategi yg akan saya perkenalkan. Metode diatas mengumpulkan hasil dari masing-masing thread secara berurutan dari thread yg pertama ke yg terakhir, Apa yg terjadi jika thread terakhir ternyata lebih berotot sehingga dia lebih dulu selesai ? Harusnya dia yg dapat medali penghargaan kan ?
Dengan Strategi ini, siapa thread yg duluan selesai dia yg akan diambil langsung hasilnya, jadi ga bisa diprediksi dengan pasti urutannya, tergantung kekuatan otot masing-masing pekerja!! haha.

void checkRateAsItComplete() throws Exception {
	System.out.println("checkRateAsItComplete :");
	long start = System.nanoTime();
	
	ExecutorService executorPool = Executors.newCachedThreadPool();
	CompletionService compService = new ExecutorCompletionService(executorPool);
	
	for (final String ccy : CURRENCY) {
		compService.submit(new Callable() {
			public HashMap call() throws Exception {
				return getRates(ccy);
			}
		});
	}
	
	
	for (int i=0; i<CURRENCY.length; i++)
		future = compService.take();
		HashMap ccyRate = future.get();
		System.out.println("Value of $1 in " + ccyRate.get("CCY") + " is " + ccyRate.get("RATE"));
	}
	
	executorPool.shutdown();

	long end = System.nanoTime();
	System.out.println("Time (seconds) taken " + (end - start) / 1.0e9);
checkRateAsItComplete :
Value of $1 in MYR is 3.0717
Value of $1 in SGD is 1.2526
Value of $1 in AUD is 0.9976
Value of $1 in THB is 31.193
Value of $1 in IDR is 9218.0
Time (seconds) taken 3.933620191

Waktu yg dicapai ga jauh beda dengan strategi sebelumnya, agak lebih lama dikit, lihat juga urutan selesai disini gak beraturan MYR diurutan pertama sedangkan IDR posisi buncit :( Tapi ini karena contoh kasus dengan data yg simple, bukan data ribuan, puluhan ratusan ribu, strategi ini tentu lebih cepat. Dimana thread yg telah selesai bekerja harus langsung balik ke pool (Executor) untuk menjalankan task berikutnya yg udah ngantri di pool.

CompletionService compService = new ExecutorCompletionService(executorPool);

for (int i=0; i<CURRENCY.length; i++) {
	Future future = compService.take();
	HashMap ccyRate = future.get();

Perbedaannya ada disini, gunakan jurus ajaib CompletionService. Lalu perhatikan for looping nya tidak menggunakan ordered list tapi menyerahkan kepada aplikasi, thread mana yg lebih dulu selesai, dialah yg akan diambil. Ini baru Fair!

Baris kode lengkap yg telah digabungkan menjadi satu class dapat di lihat disini   https://gist.github.com/2689054