EDIT:我最初的答案(如下)是在readLine()
级操作的,但再想想,在流级操作会更好.
这是一种对流进行操作的方法.核心函数是takeUntilTime(Stream<T> stream, Instant end)
,其余的是小助手.其 idea 是使用原始流Spliterator遍历原始流,这样就可以设置超时.
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.Optional;
import java.util.Spliterator;
import java.util.stream.Stream;
class Main {
static long maxDurationNanos = TimeUnit.SECONDS.toNanos(5);
static <T> Stream<T> generateOrderedStream(Supplier<T> s) {
// Like Stream.generate(), but the returned stream is ordered.
return Stream.iterate(s.get(), e -> s.get());
}
static <T> Optional<T> advance(Spliterator<T> iter) {
// Returns an Optional with the next element of the iterator, or an empty Optional if there are no more elements.
// (This is much nicer than calling iter.tryAdvance() directly.)
final var r = new Object() { T elem; };
return iter.tryAdvance(elem -> r.elem = elem) ? Optional.of(r.elem) : Optional.empty();
}
static <T> Optional<T> getFuture(Future<T> future, long timeout, TimeUnit unit) {
// Returns an Optional with the result of the Future, or an empty Optional on timeout/cancellation (on timeout, also cancels the Future).
// Checked exceptions (ExecutionException and InterruptedException) are wrapped in an unchecked RuntimeException.
try {
return Optional.of(future.get(timeout, unit));
} catch (TimeoutException e) {
future.cancel(/* mayInterruptIfRunning: */ true);
return Optional.empty();
} catch (CancellationException e) {
return Optional.empty();
} catch (ExecutionException|InterruptedException e) {
throw new RuntimeException(e);
}
}
static <T> Optional<T> submitAndGet(ExecutorService executor, Callable<T> task, long timeout, TimeUnit unit) {
// Convenient wrapper for getFuture(executor.submit(task), ...) that returns an empty Optional if the executor has been shut down.
try {
return getFuture(executor.submit(task), timeout, unit);
} catch (RejectedExecutionException e) { // the executor has probably been shut down
return Optional.empty();
}
}
static ThreadFactory daemonThreadFactory() {
return (r) -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
return thread;
};
}
static <T> Stream<T> takeUntilTime(Stream<T> stream, Instant end) {
// Traverses 'stream' until the specified end time and returns the traversed elements.
final ExecutorService executor = Executors.newSingleThreadExecutor(daemonThreadFactory());
final Spliterator<T> iter = stream.spliterator();
Supplier<Optional<T>> s = () -> {
long timeoutNanos = ChronoUnit.NANOS.between(Instant.now(), end);
Optional<T> opt = submitAndGet(executor, () -> advance(iter), timeoutNanos, TimeUnit.NANOSECONDS).flatMap(o -> o);
if (!opt.isPresent()) { // this will be the end of the stream, so we should clean up
executor.shutdownNow();
}
return opt;
};
return generateOrderedStream(s)
.takeWhile(Optional::isPresent)
.map(Optional::get);
}
static void fillStreamMap(BufferedReader reader) {
// streaming demo
final Instant end = Instant.now().plusNanos(maxDurationNanos);
takeUntilTime(reader.lines(), end)
.takeWhile(line -> !line.contains("[stop]"))
.map(line -> "[mapped] " + line)
.forEachOrdered(System.out::println);
}
public static void main(String[] args) {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
fillStreamMap(reader);
}
}
Original answer:
暂且不考虑流媒体方面,问题基本上是在BufferedReader
超时的情况下读取(大概是System.in
).不幸的是,很难正确地做到这一点(参见第Set timeout for user's input条和第Timeout on Console Input条).
这些链接页面的一个 idea 是轮询BufferedReader.ready(),直到返回true
,然后调用readLine()
.这很难看(因为它使用轮询)而且不可靠,因为即使ready()
返回true,readLine()
也会阻塞—例如,因为有一行不完整的可用行(在类Unix系统上,用户可以通过键入一些文本,然后按Ctrl+D而不是Enter来实现这一点).
另一个 idea 是创建一个后台线程,重复调用BufferedReader.readLine()
并将结果插入到BlockingQueue(例如ArrayBlockingQueue)中.然后,主线程可以调用队列上的take()或poll(timeout, unit)来获取行.
这种方法的一个限制是,如果您以后想直接从BufferedReader
中读取数据(而不是通过队列),则几乎不可能避免丢失(至少)一行输入.这是因为线程在readLine()
上被阻塞时无法完全中断,因此如果主线程决定提前停止(例如由于超时),则无法阻止后台线程读取其当前正在等待的行.
您可以try 使用mark(readAheadLimit)和reset()"未读"最后一行,但同步将很困难–另一个线程可以try 在后台线程调用reset()
之前读取BufferedReader
.您可能必须使用lock field进行同步,但是它的访问级别是protected
,所以您只能使用反射或子类BufferedReader
来访问它.此外,如果要读取的行长于readAheadLimit
,则reset()
将失败.
这里是一个假设您只通过队列读取行的实现.
DISCLAIMER:谨防这些代码片段中的错误–多线程是很棘手的.我可能下次再try 改进代码.
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;
class InterruptibleLineReader {
private static final String EOF = new String("<EOF>");
BufferedReader reader;
ArrayBlockingQueue<String> lines = new ArrayBlockingQueue<>(/* capacity: */ 2);
Thread backgroundThread;
IOException exception;
public InterruptibleLineReader(BufferedReader reader) {
this.reader = reader;
// start a background thread to read lines
backgroundThread = new Thread(this::backgroundTask);
backgroundThread.setDaemon(true);
backgroundThread.start();
}
public void close() {
backgroundThread.interrupt();
lines.clear();
lines.add(EOF);
}
private void backgroundTask() {
try {
try {
while (true) {
String line = reader.readLine();
if (Thread.interrupted()) {
// nothing to do (close() is responsible for lines.put(EOF) etc. in this case)
break;
} else if (line == null) {
lines.put(EOF);
break;
}
lines.put(line);
}
} catch (IOException e) {
exception = e;
lines.put(EOF);
}
} catch (InterruptedException e) {
// nothing to do (close() is responsible for lines.put(EOF) etc. in this case)
}
}
public String readLine(long timeout, TimeUnit unit) throws IOException, InterruptedException {
String line = lines.poll(timeout, unit);
if (line == EOF) { // EOF or IOException
lines.put(EOF); // restore the EOF so that any concurrent (and future) calls to this method won't block
if (exception != null) {
throw exception;
} else {
return null;
}
}
return line;
}
}
class Main {
static long maxDurationNanos = TimeUnit.SECONDS.toNanos(5);
static <T> Stream<T> generateOrderedStream(Supplier<T> s) {
// like Stream.generate(), but the returned stream is ordered
return Stream.iterate(s.get(), e -> s.get());
}
static Stream<String> readLinesUntilTime(InterruptibleLineReader lineReader, Instant end) {
// reads lines until the specified end time and returns them as a stream
Supplier<String> readLine = () -> {
long remaining = ChronoUnit.NANOS.between(Instant.now(), end);
try {
return lineReader.readLine(remaining, TimeUnit.NANOSECONDS);
} catch (IOException|InterruptedException e) {
throw new RuntimeException(e);
}
};
return generateOrderedStream(readLine).takeWhile(x -> x != null);
}
static void fillStreamMap(InterruptibleLineReader lineReader) {
// streaming demo
final Instant end = Instant.now().plusNanos(maxDurationNanos);
readLinesUntilTime(lineReader, end)
.takeWhile(line -> !line.contains("[stop]"))
.map(line -> "[mapped] " + line)
.forEachOrdered(System.out::println);
}
public static void main(String[] args) {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
// stream lines
InterruptibleLineReader lineReader = new InterruptibleLineReader(reader);
System.out.println("--- streaming lines using InterruptibleLineReader for " + maxDurationNanos / 1e9 + " sec ---");
fillStreamMap(lineReader);
lineReader.close();
/*
// attempt to use the BufferedReader directly
// NOTE: several lines may be lost (depending on the capacity of the ArrayBlockingQueue and how quickly the lines are consumed)
System.out.println("--- reading directly from BufferedReader ---");
while (true) {
try {
String line = reader.readLine();
if (line == null) { break; }
System.out.println("[raw] " + line);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
*/
}
}
这里有一个更复杂的实现,如果关闭队列并直接从BufferedReader
中读取,只会丢失一行输入.它使用自定义的"0-capacity"队列来确保最多丢失一行.
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.NoSuchElementException;
import java.util.stream.Stream;
class InterruptibleLineReader {
BufferedReader reader;
ZeroCapacityBlockingQueue<String> lines = new ZeroCapacityBlockingQueue<>(); // a null line indicates EOF or IOException
Thread backgroundThread;
IOException exception;
boolean eof;
public InterruptibleLineReader(BufferedReader reader) {
this.reader = reader;
// start a background thread to read lines
backgroundThread = new Thread(this::backgroundTask);
backgroundThread.setDaemon(true);
backgroundThread.start();
}
private void markAsEOF() {
eof = true;
if (lines.poll() != null) { // markAsEOF() should not be called when there are unconsumed lines
throw new IllegalStateException();
}
lines.offer(null); // unblock threads that are waiting on the queue
}
public void close() {
backgroundThread.interrupt();
// warn if there is an unconsumed line, and consume it so we can indicate EOF
String line = lines.poll();
if (line != null) {
System.err.println("InterruptibleLineReader: warning: discarding unconsumed line during close(): '" + line + "'");
}
markAsEOF();
}
private void backgroundTask() {
try {
while (true) {
String line = reader.readLine();
if (Thread.interrupted()) {
if (line != null) {
System.err.println("InterruptibleLineReader: warning: discarding line that was read after close(): '" + line + "'");
}
// nothing further to do (close() is responsible for calling markAsEOF() in this case)
break;
} else if (line == null) { // EOF
markAsEOF();
break;
}
lines.put(line); // this blocks until the line has been consumed ("0-capacity" behaviour)
if (Thread.interrupted()) {
// nothing to do (close() is responsible for calling markAsEOF() in this case)
break;
}
}
} catch (IOException e) {
exception = e;
markAsEOF();
} catch (InterruptedException e) {
// nothing to do (close() is responsible for calling markAsEOF() in this case)
}
}
public String readLine() throws IOException, InterruptedException {
String line = lines.take();
if (line == null) { // EOF or IOException
markAsEOF(); // restore the null so that any concurrent (and future) calls to this method won't block
if (exception != null) {
throw exception;
} else {
return null; // EOF
}
} else {
return line;
}
}
public String readLine(long timeout, TimeUnit unit) throws IOException, InterruptedException {
String line = lines.poll(timeout, unit);
if (line == null && eof) { // EOF or IOException (not timeout)
markAsEOF(); // restore the null so that any concurrent (and future) calls to this method won't block
if (exception != null) {
throw exception;
} else {
return null; // EOF
}
} else {
return line;
}
}
}
class ZeroCapacityBlockingQueue<T> {
int count;
T item;
public synchronized boolean add(T x) {
// does not block (i.e. behaves as if the capacity is actually 1)
if (count == 1) {
throw new IllegalStateException("Queue full");
}
item = x;
count++;
notifyAll();
return true;
}
public synchronized boolean offer(T x) {
// does not block (i.e. behaves as if the capacity is actually 1)
if (count == 1) {
return false;
}
return add(x);
}
public synchronized void put(T x) throws InterruptedException {
// blocks until the item has been removed ("0-capacity" behaviour)
while (count == 1) {
wait();
}
add(x);
while (count == 1 && item == x) {
wait();
}
}
public synchronized T remove() {
if (count == 0) {
throw new NoSuchElementException();
}
T x = item;
item = null;
count--;
notifyAll();
return x;
}
public synchronized T poll() {
if (count == 0) {
return null;
}
return remove();
}
public synchronized T take() throws InterruptedException {
while (count == 0) {
wait();
}
return remove();
}
public synchronized T poll(long timeout, TimeUnit unit) throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
while (count == 0) {
long remaining = deadline - System.nanoTime();
if (remaining <= 0) {
return null;
}
TimeUnit.NANOSECONDS.timedWait(this, remaining);
}
return remove();
}
}
class Main {
static long maxDurationNanos = TimeUnit.SECONDS.toNanos(5);
static <T> Stream<T> generateOrderedStream(Supplier<T> s) {
// like Stream.generate(), but the returned stream is ordered
return Stream.iterate(s.get(), e -> s.get());
}
static Stream<String> readLinesUntilTime(InterruptibleLineReader lineReader, Instant end) {
// reads lines until the specified end time and returns them as a stream
Supplier<String> readLine = () -> {
long remaining = ChronoUnit.NANOS.between(Instant.now(), end);
try {
return lineReader.readLine(remaining, TimeUnit.NANOSECONDS);
} catch (IOException|InterruptedException e) {
throw new RuntimeException(e);
}
};
return generateOrderedStream(readLine).takeWhile(x -> x != null);
}
static void fillStreamMap(InterruptibleLineReader lineReader) {
// streaming demo
final Instant end = Instant.now().plusNanos(maxDurationNanos);
readLinesUntilTime(lineReader, end)
.takeWhile(line -> !line.contains("[stop]"))
.map(line -> "[mapped] " + line)
.forEachOrdered(System.out::println);
}
public static void main(String[] args) {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
// stream lines
InterruptibleLineReader lineReader = new InterruptibleLineReader(reader);
System.out.println("--- streaming lines using InterruptibleLineReader for " + maxDurationNanos / 1e9 + " sec ---");
fillStreamMap(lineReader);
lineReader.close();
/*
// attempt to use the BufferedReader directly
// NOTE: a line will be lost
System.out.println("--- reading directly from BufferedReader ---");
while (true) {
try {
String line = reader.readLine();
if (line == null) { break; }
System.out.println("[raw] " + line);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
*/
}
}
下面是第二个实现的运行示例(未注释main()
的最后一部分).时间戳以秒为单位,并且">;"表示输入.
0.06 --- streaming lines using InterruptibleLineReader for 5.0 sec ---
0.82 > one
0.83 [mapped] one
1.76 > two
1.76 [mapped] two
2.73 > three
2.73 [mapped] three
5.06 --- reading directly from BufferedReader ---
6.93 > four
6.94 InterruptibleLineReader: warning: discarding line that was read after close(): 'four'
7.76 > five
7.76 [raw] five
8.60 > six
8.60 [raw] six
请注意"四"行是如何丢失的.为了避免丢失行,在创建InterruptibleLineReader
实例后不要使用底层BufferedReader
.
(如果在此之后确实需要BufferedReader
,可以编写一个BufferedReader
的伪子类,将InterruptibleLineReader
个调用打包并转发readLine()
个调用给它.其他BufferedReader
个方法,如read()
和mark()
,很难实现.)