我有InputStreamOutputStream(there is no socket).

我有一个基于流的代码,可以执行大约mapping/filtering/grouping/processing次.

如果超过maxDuration,我的主要目标是终止流:

void fillStreamMap(BufferedReader reader) {
    final Instant end = Instant.now().plusNanos(TimeUnit.NANOSECONDS.convert(maxDuration));

    this.map = reader.lines()
        .takeWhile(e -> checkTimeout(end))
        .map(this::jsonToBuyerEventInput)
        .filter(Objects::nonNull)
        .filter(getFilter()::apply)
        .limit(super.maxEvent)
        .collect(Collectors.groupingBy(BuyerEventInput::getBuyer));
}

boolean checkTimeout(Instant end){
    return Instant.now().getEpochSecond() <= end.getEpochSecond();
}

我使用的是takeWhile,这是一个非常有用的函数,但如果有即将到来的事件,它会判断终止条件.

因此,如果没有发送数据,它不会判断条件,因为此函数是以Predicate作为参数构建的.

有没有办法实现这个目标?

推荐答案

EDIT:我最初的答案(如下)是在readLine()级操作的,但再想想,在流级操作会更好.

这是一种对流进行操作的方法.核心函数是takeUntilTime(Stream<T> stream, Instant end),其余的是小助手.其 idea 是使用原始流Spliterator遍历原始流,这样就可以设置超时.

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.Optional;
import java.util.Spliterator;
import java.util.stream.Stream;

class Main {
    static long maxDurationNanos = TimeUnit.SECONDS.toNanos(5);

    static <T> Stream<T> generateOrderedStream(Supplier<T> s) {
        // Like Stream.generate(), but the returned stream is ordered.
        return Stream.iterate(s.get(), e -> s.get());
    }

    static <T> Optional<T> advance(Spliterator<T> iter) {
        // Returns an Optional with the next element of the iterator, or an empty Optional if there are no more elements.
        // (This is much nicer than calling iter.tryAdvance() directly.)
        final var r = new Object() { T elem; };
        return iter.tryAdvance(elem -> r.elem = elem) ? Optional.of(r.elem) : Optional.empty();
    }

    static <T> Optional<T> getFuture(Future<T> future, long timeout, TimeUnit unit) {
        // Returns an Optional with the result of the Future, or an empty Optional on timeout/cancellation (on timeout, also cancels the Future).
        // Checked exceptions (ExecutionException and InterruptedException) are wrapped in an unchecked RuntimeException.
        try {
            return Optional.of(future.get(timeout, unit));
        } catch (TimeoutException e) {
            future.cancel(/* mayInterruptIfRunning: */ true);
            return Optional.empty();
        } catch (CancellationException e) {
            return Optional.empty();
        } catch (ExecutionException|InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    static <T> Optional<T> submitAndGet(ExecutorService executor, Callable<T> task, long timeout, TimeUnit unit) {
        // Convenient wrapper for getFuture(executor.submit(task), ...) that returns an empty Optional if the executor has been shut down.
        try {
            return getFuture(executor.submit(task), timeout, unit);
        } catch (RejectedExecutionException e) { // the executor has probably been shut down
            return Optional.empty();
        }
    }

    static ThreadFactory daemonThreadFactory() {
        return (r) -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            return thread;
        };
    }

    static <T> Stream<T> takeUntilTime(Stream<T> stream, Instant end) {
        // Traverses 'stream' until the specified end time and returns the traversed elements.
        final ExecutorService executor = Executors.newSingleThreadExecutor(daemonThreadFactory());
        final Spliterator<T> iter = stream.spliterator();
        Supplier<Optional<T>> s = () -> {
            long timeoutNanos = ChronoUnit.NANOS.between(Instant.now(), end);
            Optional<T> opt = submitAndGet(executor, () -> advance(iter), timeoutNanos, TimeUnit.NANOSECONDS).flatMap(o -> o);
            if (!opt.isPresent()) { // this will be the end of the stream, so we should clean up
                executor.shutdownNow();
            }
            return opt;
        };
        return generateOrderedStream(s)
            .takeWhile(Optional::isPresent)
            .map(Optional::get);
    }

    static void fillStreamMap(BufferedReader reader) {
        // streaming demo
        final Instant end = Instant.now().plusNanos(maxDurationNanos);
        takeUntilTime(reader.lines(), end)
            .takeWhile(line -> !line.contains("[stop]"))
            .map(line -> "[mapped] " + line)
            .forEachOrdered(System.out::println);
    }

    public static void main(String[] args) {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        fillStreamMap(reader);
    }
}

Original answer:

暂且不考虑流媒体方面,问题基本上是在BufferedReader超时的情况下读取(大概是System.in).不幸的是,很难正确地做到这一点(参见第Set timeout for user's input条和第Timeout on Console Input条).

这些链接页面的一个 idea 是轮询BufferedReader.ready(),直到返回true,然后调用readLine().这很难看(因为它使用轮询)而且不可靠,因为即使ready()返回true,readLine()也会阻塞—例如,因为有一行不完整的可用行(在类Unix系统上,用户可以通过键入一些文本,然后按Ctrl+D而不是Enter来实现这一点).


另一个 idea 是创建一个后台线程,重复调用BufferedReader.readLine()并将结果插入到BlockingQueue(例如ArrayBlockingQueue)中.然后,主线程可以调用队列上的take()poll(timeout, unit)来获取行.

这种方法的一个限制是,如果您以后想直接从BufferedReader中读取数据(而不是通过队列),则几乎不可能避免丢失(至少)一行输入.这是因为线程在readLine()上被阻塞时无法完全中断,因此如果主线程决定提前停止(例如由于超时),则无法阻止后台线程读取其当前正在等待的行.

您可以try 使用mark(readAheadLimit)reset()"未读"最后一行,但同步将很困难–另一个线程可以try 在后台线程调用reset()之前读取BufferedReader.您可能必须使用lock field进行同步,但是它的访问级别是protected,所以您只能使用反射或子类BufferedReader来访问它.此外,如果要读取的行长于readAheadLimit,则reset()将失败.

这里是一个假设您只通过队列读取行的实现.

DISCLAIMER:谨防这些代码片段中的错误–多线程是很棘手的.我可能下次再try 改进代码.

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;

class InterruptibleLineReader {
    private static final String EOF = new String("<EOF>");
    BufferedReader reader;
    ArrayBlockingQueue<String> lines = new ArrayBlockingQueue<>(/* capacity: */ 2);
    Thread backgroundThread;
    IOException exception;

    public InterruptibleLineReader(BufferedReader reader) {
        this.reader = reader;
        // start a background thread to read lines
        backgroundThread = new Thread(this::backgroundTask);
        backgroundThread.setDaemon(true);
        backgroundThread.start();
    }

    public void close() {
        backgroundThread.interrupt();
        lines.clear();
        lines.add(EOF);
    }

    private void backgroundTask() {
        try {
            try {
                while (true) {
                    String line = reader.readLine();
                    if (Thread.interrupted()) {
                        // nothing to do (close() is responsible for lines.put(EOF) etc. in this case)
                        break;
                    } else if (line == null) {
                        lines.put(EOF);
                        break;
                    }
                    lines.put(line);
                }
            } catch (IOException e) {
                exception = e;
                lines.put(EOF);
            }
        } catch (InterruptedException e) {
            // nothing to do (close() is responsible for lines.put(EOF) etc. in this case)
        }
    }

    public String readLine(long timeout, TimeUnit unit) throws IOException, InterruptedException {
        String line = lines.poll(timeout, unit);
        if (line == EOF) { // EOF or IOException
            lines.put(EOF); // restore the EOF so that any concurrent (and future) calls to this method won't block
            if (exception != null) {
                throw exception;
            } else {
                return null;
            }
        }
        return line;
    }
}

class Main {
    static long maxDurationNanos = TimeUnit.SECONDS.toNanos(5);

    static <T> Stream<T> generateOrderedStream(Supplier<T> s) {
        // like Stream.generate(), but the returned stream is ordered
        return Stream.iterate(s.get(), e -> s.get());
    }

    static Stream<String> readLinesUntilTime(InterruptibleLineReader lineReader, Instant end) {
        // reads lines until the specified end time and returns them as a stream
        Supplier<String> readLine = () -> {
            long remaining = ChronoUnit.NANOS.between(Instant.now(), end);
            try {
                return lineReader.readLine(remaining, TimeUnit.NANOSECONDS);
            } catch (IOException|InterruptedException e) {
                throw new RuntimeException(e);
            }
        };
        return generateOrderedStream(readLine).takeWhile(x -> x != null);
    }

    static void fillStreamMap(InterruptibleLineReader lineReader) {
        // streaming demo
        final Instant end = Instant.now().plusNanos(maxDurationNanos);
        readLinesUntilTime(lineReader, end)
            .takeWhile(line -> !line.contains("[stop]"))
            .map(line -> "[mapped] " + line)
            .forEachOrdered(System.out::println);
    }

    public static void main(String[] args) {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

        // stream lines
        InterruptibleLineReader lineReader = new InterruptibleLineReader(reader);
        System.out.println("--- streaming lines using InterruptibleLineReader for " + maxDurationNanos / 1e9 + " sec  ---");
        fillStreamMap(lineReader);
        lineReader.close();

        /*
        // attempt to use the BufferedReader directly
        // NOTE: several lines may be lost (depending on the capacity of the ArrayBlockingQueue and how quickly the lines are consumed)
        System.out.println("--- reading directly from BufferedReader ---");
        while (true) {
            try {
                String line = reader.readLine();
                if (line == null) { break; }
                System.out.println("[raw] " + line);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        */
    }
}

这里有一个更复杂的实现,如果关闭队列并直接从BufferedReader中读取,只会丢失一行输入.它使用自定义的"0-capacity"队列来确保最多丢失一行.

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.NoSuchElementException;
import java.util.stream.Stream;

class InterruptibleLineReader {
    BufferedReader reader;
    ZeroCapacityBlockingQueue<String> lines = new ZeroCapacityBlockingQueue<>(); // a null line indicates EOF or IOException
    Thread backgroundThread;
    IOException exception;
    boolean eof;

    public InterruptibleLineReader(BufferedReader reader) {
        this.reader = reader;
        // start a background thread to read lines
        backgroundThread = new Thread(this::backgroundTask);
        backgroundThread.setDaemon(true);
        backgroundThread.start();
    }

    private void markAsEOF() {
        eof = true;
        if (lines.poll() != null) { // markAsEOF() should not be called when there are unconsumed lines
            throw new IllegalStateException();
        }
        lines.offer(null); // unblock threads that are waiting on the queue
    }

    public void close() {
        backgroundThread.interrupt();
        // warn if there is an unconsumed line, and consume it so we can indicate EOF
        String line = lines.poll();
        if (line != null) {
            System.err.println("InterruptibleLineReader: warning: discarding unconsumed line during close(): '" + line + "'");
        }
        markAsEOF();
    }

    private void backgroundTask() {
        try {
            while (true) {
                String line = reader.readLine();
                if (Thread.interrupted()) {
                    if (line != null) {
                        System.err.println("InterruptibleLineReader: warning: discarding line that was read after close(): '" + line + "'");
                    }
                    // nothing further to do (close() is responsible for calling markAsEOF() in this case)
                    break;
                } else if (line == null) { // EOF
                    markAsEOF();
                    break;
                }
                lines.put(line); // this blocks until the line has been consumed ("0-capacity" behaviour)
                if (Thread.interrupted()) {
                    // nothing to do (close() is responsible for calling markAsEOF() in this case)
                    break;
                }
            }
        } catch (IOException e) {
            exception = e;
            markAsEOF();
        } catch (InterruptedException e) {
            // nothing to do (close() is responsible for calling markAsEOF() in this case)
        }
    }

    public String readLine() throws IOException, InterruptedException {
        String line = lines.take();
        if (line == null) { // EOF or IOException
            markAsEOF(); // restore the null so that any concurrent (and future) calls to this method won't block
            if (exception != null) {
                throw exception;
            } else {
                return null; // EOF
            }
        } else {
            return line;
        }
    }

    public String readLine(long timeout, TimeUnit unit) throws IOException, InterruptedException {
        String line = lines.poll(timeout, unit);
        if (line == null && eof) { // EOF or IOException (not timeout)
            markAsEOF(); // restore the null so that any concurrent (and future) calls to this method won't block
            if (exception != null) {
                throw exception;
            } else {
                return null; // EOF
            }
        } else {
            return line;
        }
    }
}

class ZeroCapacityBlockingQueue<T> {
    int count;
    T item;

    public synchronized boolean add(T x) {
        // does not block (i.e. behaves as if the capacity is actually 1)
        if (count == 1) {
            throw new IllegalStateException("Queue full");
        }
        item = x;
        count++;
        notifyAll();
        return true;
    }

    public synchronized boolean offer(T x) {
        // does not block (i.e. behaves as if the capacity is actually 1)
        if (count == 1) {
            return false;
        }
        return add(x);
    }

    public synchronized void put(T x) throws InterruptedException {
        // blocks until the item has been removed ("0-capacity" behaviour)
        while (count == 1) {
            wait();
        }
        add(x);
        while (count == 1 && item == x) {
            wait();
        }
    }

    public synchronized T remove() {
        if (count == 0) {
            throw new NoSuchElementException();
        }
        T x = item;
        item = null;
        count--;
        notifyAll();
        return x;
    }

    public synchronized T poll() {
        if (count == 0) {
            return null;
        }
        return remove();
    }

    public synchronized T take() throws InterruptedException {
        while (count == 0) {
            wait();
        }
        return remove();
    }

    public synchronized T poll(long timeout, TimeUnit unit) throws InterruptedException {
        long deadline = System.nanoTime() + unit.toNanos(timeout);
        while (count == 0) {
            long remaining = deadline - System.nanoTime();
            if (remaining <= 0) {
                return null;
            }
            TimeUnit.NANOSECONDS.timedWait(this, remaining);
        }
        return remove();
    }
}

class Main {
    static long maxDurationNanos = TimeUnit.SECONDS.toNanos(5);

    static <T> Stream<T> generateOrderedStream(Supplier<T> s) {
        // like Stream.generate(), but the returned stream is ordered
        return Stream.iterate(s.get(), e -> s.get());
    }

    static Stream<String> readLinesUntilTime(InterruptibleLineReader lineReader, Instant end) {
        // reads lines until the specified end time and returns them as a stream
        Supplier<String> readLine = () -> {
            long remaining = ChronoUnit.NANOS.between(Instant.now(), end);
            try {
                return lineReader.readLine(remaining, TimeUnit.NANOSECONDS);
            } catch (IOException|InterruptedException e) {
                throw new RuntimeException(e);
            }
        };
        return generateOrderedStream(readLine).takeWhile(x -> x != null);
    }

    static void fillStreamMap(InterruptibleLineReader lineReader) {
        // streaming demo
        final Instant end = Instant.now().plusNanos(maxDurationNanos);
        readLinesUntilTime(lineReader, end)
            .takeWhile(line -> !line.contains("[stop]"))
            .map(line -> "[mapped] " + line)
            .forEachOrdered(System.out::println);
    }

    public static void main(String[] args) {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

        // stream lines
        InterruptibleLineReader lineReader = new InterruptibleLineReader(reader);
        System.out.println("--- streaming lines using InterruptibleLineReader for " + maxDurationNanos / 1e9 + " sec  ---");
        fillStreamMap(lineReader);
        lineReader.close();

        /*
        // attempt to use the BufferedReader directly
        // NOTE: a line will be lost
        System.out.println("--- reading directly from BufferedReader ---");
        while (true) {
            try {
                String line = reader.readLine();
                if (line == null) { break; }
                System.out.println("[raw] " + line);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        */
    }
}

下面是第二个实现的运行示例(未注释main()的最后一部分).时间戳以秒为单位,并且">;"表示输入.

0.06 --- streaming lines using InterruptibleLineReader for 5.0 sec  ---
0.82 > one
0.83 [mapped] one
1.76 > two
1.76 [mapped] two
2.73 > three
2.73 [mapped] three
5.06 --- reading directly from BufferedReader ---
6.93 > four
6.94 InterruptibleLineReader: warning: discarding line that was read after close(): 'four'
7.76 > five
7.76 [raw] five
8.60 > six
8.60 [raw] six

请注意"四"行是如何丢失的.为了避免丢失行,在创建InterruptibleLineReader实例后不要使用底层BufferedReader.

(如果在此之后确实需要BufferedReader,可以编写一个BufferedReader的伪子类,将InterruptibleLineReader个调用打包并转发readLine()个调用给它.其他BufferedReader个方法,如read()mark(),很难实现.)

Java相关问答推荐

如何在Inspaut中获取当前路径的 * 模式 *?

Spring安全实现多个SQL表身份验证

在AnyLogic中增加变量计数

SQlite for Android无法使用json_group_array/json_object

Helidon 4和Http API

更新AWS凭据

使用Jolt将字段转换为列表

如何在JavaFX中处理多个按钮

WebSockets和Spring Boot安全性出现错误401

如何使用路径过渡方法使 node 绕圆旋转?

Kotlin Val是否提供了与Java最终版相同的可见性保证?

Spring Boot&;Docker:无法执行目标org.springframework.boot:spring-boot-maven-plugin:3.2.0:build-image

Android Java:已设置但未读取SharedPreferences

Oj算法 MatrixR032从字符串、归一化和余弦相似度计算创建

为什么在下面的Java泛型方法中没有类型限制?

如何生成指定范围内的11位序列号?

Android无法在Java代码中调用Kotlin代码,原因是在Companion中使用Kotlin枚举时

让标签占用JavaFX中HBox的所有可用空间

Spring Mapstruct如何获取Lazy初始化实体字段的信息?

使用StringBuilder和append方法创建字符串时Java字符串内部方法的问题