[RxJava] Observable - (2)

๐Ÿ’๋‹จ์ผ ๋ฐ์ดํ„ฐ๊ฐ€ ์•„๋‹Œ ๊ฒฝ์šฐ๋Š” ์–ด๋–ป๊ฒŒ ํ• ๊นŒ์š”,
์•ž์„œ ๊ณต๋ถ€ํ•œ just()๋‚˜ create()๋Š” ๋‹จ์ผ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๋กœ ๋‹ค๋ฃจ์–ด ๋ณด์•˜๋Š”๋ฐ์š”. ์ด๋ฒˆ์—๋Š” ๋‹จ์ผ ๋ฐ์ดํ„ฐ๊ฐ€ ์•„๋‹ ๊ฒฝ์šฐ์— ์–ด๋–ป๊ฒŒ ์‚ฌ์šฉํ•˜๋Š”์ง€

์ฆ‰ fromXXX() ๊ณ„์—ด ํ•จ์ˆ˜์— ๋Œ€ํ•ด ์•Œ์•„๋ณด๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.


fromArray() ํ•จ์ˆ˜

Integer[] arr = {100,200,300};
Observable<Integer> source = Observable.fromArray(arr);
source.subscribe(System.out::println);

๐ŸŽˆint [] ๋ฐฐ์—ด ์‚ฌ์šฉํ•˜๊ธฐ

int[] ๋ฐฐ์—ด์„ ์‹ค์ œ๋กœ ๊ทธ๋Œ€๋กœ ๋„ฃ์œผ๋ฉด ์˜ฌ๋ฐ”๋ฅธ ์‹คํ–‰๊ฒฐ๊ณผ๊ฐ€ ๋‚˜์˜ค์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ⇒ toIntegerArray()๋ฅผ ์‚ฌ์šฉํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

 

int[] intArray = {400,500,600};
Observable<Integer> source = Observable.fromArray(toIntegerArray(intArray));
source.subscribe(System.out::println);

 

 

fromIterable() ํ•จ์ˆ˜

  • Iterable ์ธํ„ฐํŽ˜์ด์Šค๋Š” ๋ฐ˜๋ณต์ž๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
  • Iterator ์ธํ„ฐํŽ˜์ด์Šค๋Š” ์ดํ„ฐ๋ ˆ์ดํ„ฐ ํŒจํ„ด์„ ๊ตฌํ˜„ํ•œ ๊ฒƒ์œผ๋กœ ๋‹ค์Œ์— ์–ด๋–ค ๋ฐ์ดํ„ฐ๊ฐ€ ์žˆ๋Š”์ง€์™€ ๊ทธ ๊ฐ’์„ ์–ป์–ด์˜ค๋Š” ๊ฒƒ๋งŒ ๊ด€์—ฌํ•  ๋ฟ ํŠน์ • ๋ฐ์ดํ„ฐ ํƒ€์ž…์— ์˜์กดํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.
  • hasNext()์™€ next() ๋ฉ”์„œ๋“œ๊ฐ€ ์กด์žฌํ•ฉ๋‹ˆ๋‹ค.
  • ๋Œ€ํ‘œ์ ์ธ ํด๋ž˜์Šค : ArrayList, ArrayBlockingQueue, HashSet, LinkedList, Stack, TreeSet, Vector ๋“ฑ์ด ์กด์žฌํ•ฉ๋‹ˆ๋‹ค.
List<String> names = new ArrayList<>();
names.add("Jerry")
names.add("William")
names.add("Bob")

Observable<String> source = Observable.fromIterable(names);
source.subscribe(System.out::println)

 

fromCallable() ํ•จ์ˆ˜ :  ๋น„๋™๊ธฐ ํด๋ž˜์Šค๋‚˜ ์ธํ„ฐํŽ˜์ด์Šค์™€์˜ ์—ฐ๋™

Callable<String> callable = () -> {
	Thread.sleep(1000);
	return "Hello Callable";
};

Observable<String> source = Observable.fromCallable(callable);
source.subscribe(System.out::println);

*Callable์€ ์‹คํ–‰ ๊ฒฐ๊ณผ๋ฅผ ๋ฆฌํ„ดํ•˜๊ณ  Executor ์ธํ„ฐํŽ˜์ด์Šค์˜ ์ธ์ž๋กœ ํ™œ์šฉ๋˜๊ธฐ ๋•Œ๋ฌธ์— ์ž ์žฌ์ ์œผ๋กœ ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ์—์„œ ์‹คํ–‰๋˜๋Š” ๊ฒƒ์„ ์˜๋ฏธํ•ฉ๋‹ˆ๋‹ค.

 

fromFuture() ํ•จ์ˆ˜

  • Future์€ ๋™์‹œ์„ฑ API๋กœ ๋น„๋™๊ธฐ ๊ณ„์‚ฐ์˜ ๊ฒฐ๊ณผ๋ฅผ ๊ตฌํ•  ๋•Œ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.
  • ๋ณดํ†ต Executor ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•œ ํด๋ž˜์Šค์— Callable ๊ฐ์ฒด๋ฅผ ์ธ์ž๋กœ ๋„ฃ์–ด Future ๊ฐ์ฒด ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
Future<String> future = Executors.newSingleThreadExecutor().submit(() -> {
	Thread.sleep(1000);
	return "Hello Future";
});
Observable<String> source = Observable.fromFuture(future);
source.subscribe(System.out::println);

 

 

fromPublisher() ํ•จ์ˆ˜

Publisher<String> publisher = (Subscriber<? super String> s) -> {
	s.onNext("hello Observable.fromPublisher()");
	s.onComplete();
};
Observable<String> source = Observable.fromPublisher(publisher)
source.subscribe(System.out::println);

*Publisher ์ธํ„ฐํŽ˜์ด์Šค์˜ ํŒจํ‚ค์ง€๋Š” org.reactivestreams

'RxJava' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

[RxJava] Observable - (1)  (0) 2022.10.21
[RxJava] ๊ธฐ๋ณธ ๊ฐœ๋…  (0) 2022.10.21