Discussion threadhttps://lists.apache.org/thread/47pdjggh0q0tdkq0cwt6y5o2o8wrl9jl
Vote threadhttps://lists.apache.org/thread/9z2oghkhfkgzk9nx8zc33jks3ccoo3dl
JIRA

Unable to render Jira issues macro, execution error.

Release1.19

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In various production environments, either cloud native or physical machines, the disk space that Flink TaskManager can use is limited.

  • If yarn or kubernetes limits the upper limit that TaskManager can use, once the usage reaches the upper limit, TM will no longer be able to write new data, eventually causing TM to fail.
  • If yarn does not limit the upper limit that TaskManager can use, TM may use up all disk space on the current machine, causing all Containers on the machine to fail.

In general, the flink users shouldn't use the `System.out.println` in production, however this may happen when the number of Flink jobs and job developers is very large. Flink job may use System.out to output a large amount of data to the taskmanager.out file. This file will not roll, it will always increment. Eventually the upper limit of what the TM can be used for is reached.

Solution:

Support System out and err to be redirected to LOG or discarded, the LOG can roll and won't increment forever.

This feature is useful for SREs who maintain Flink environments, they can redirect System.out to LOG by default. Although the cause of this problem is that the user's code is not standardized, for SRE, pushing users to modify the code one by one is usually a very time-consuming operation. It's also useful for job stability where System.out is accidentally used.


Public Interfaces&Proposed Changes

Introduce an enum:

public enum SystemOutMode {
    // Don't change the System.out and System.err, it's the default value.
    DEFAULT,
    // Ignore all System.out and err directly
    IGNORE,
    // Redirect System.out and err to LOG
    LOG
}


Introduce 3 options:

  • taskmanager.system-out.mode: DEFAULT
  • taskmanager.system-out.log.thread-name.enabled: false
  • taskmanager.system-out.log.cache-upper-size: 100 kb


Why need the log.thread-name.enabled?

During discussions on the mailing list, Hangxiang suggest to log the subtask info as well, this information maybe useful for troubleshooting.

Flink's RichFunction can get subtaskId, but redirecting System.out to LOG is public code, so it is difficult to get subtaskId. After consideration, a workaround solution is to log thread name, because the thread name of the default Task Thread contains the task name and subtask id. And this solution can support all threads, because some non-task threads may also call println.

Why need the system-out.log.cache-upper-size?

In order to simulate the newline semantics of System.out, we should call LOG.info or LOG.error at the end of each line. So before the user print(lineSeparator) or println, we should cache the existing data into memory.

If the user keeps calling System.out.print without lineSeparator, OOM may occur. In order to prevent OOM, when the amount of data in the cache reaches a certain threshold, flink should call LOG.info and clear the cache.


Following is the core code for discarding System.out or redirecting to LOG, this demo doesn't consider the logic related to cache-upper-size.

import org.apache.commons.io.output.NullPrintStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Optional;

/**
 * Test for ignore System.out and redirect System.out to LOG.
 */
public class SystemOutRedirectToLog {

    private static final Logger LOG = LoggerFactory.getLogger(SystemOutRedirectToLog.class);

    public static void main(String[] args) {
        // Normal
        sysout();

        // Discard all output
        System.setOut(new NullPrintStream());
//        System.setOut(new PrintStream(NullOutputStream.INSTANCE));
        sysout();

        // Redirect all System.out to LOG.
        System.setOut(new LoggingPrintStream(LOG));
        sysout();
    }

    private static void sysout() {
        System.out.println("aa1bb");
        System.out.println("jsfkjskjl " + System.lineSeparator() + "fjskdfsvnmx");
        System.out.print(1);
        System.out.print('c');
        System.out.println("aa2bb");
        System.out.println();
        System.out.println();
        System.out.println("aa3bb");
        System.out.println("aa4bb");
    }

    /**
     * Cache current line context, generateContext() and reset() after the line is ended.
     */
    private static class LoggingOutputStreamHelper extends ByteArrayOutputStream {

        private static final byte[] LINE_SEPARATOR_BYTES = System.lineSeparator().getBytes();
        private static final int LINE_SEPARATOR_LENGTH = LINE_SEPARATOR_BYTES.length;

        public synchronized Optional<String> tryGenerateContext() {
            if (!isLineEnded()) {
                return Optional.empty();
            }
            try {
                return Optional.of(new String(buf, 0, count - LINE_SEPARATOR_LENGTH));
            } finally {
                reset();
            }
        }

        private synchronized boolean isLineEnded() {
            if (count < LINE_SEPARATOR_LENGTH) {
                return false;
            }

            if (LINE_SEPARATOR_LENGTH == 1) {
                return LINE_SEPARATOR_BYTES[0] == buf[count - 1];
            }  

            for (int i = 0; i < LINE_SEPARATOR_LENGTH; i++) {
                if (LINE_SEPARATOR_BYTES[i] == buf[count - LINE_SEPARATOR_LENGTH + i]) {
                    continue;
                }
                return false;
            }
            return true;
        }
    }

    /**
     * Redirect the PrintStream to Logger.
     */
    private static class LoggingPrintStream extends PrintStream {

        private final Logger logger;

        private final LoggingOutputStreamHelper helper;

        private LoggingPrintStream(Logger logger) {
            super(new LoggingOutputStreamHelper());
            helper = (LoggingOutputStreamHelper) super.out;
            this.logger = logger;
        }

        public void write(int b) {
            super.write(b);
            tryLogCurrentLine();
        }

        public void write(byte[] b, int off, int len) {
            super.write(b, off, len);
            tryLogCurrentLine();
        }

        private void tryLogCurrentLine() {
            synchronized (this) {
                helper.tryGenerateContext().ifPresent(logger::info);
            }
        }
    }

} 


It's already tested, following is the output:

aa1bb
jsfkjskjl 
fjskdfsvnmx
1caa2bb


aa3bb
aa4bb
551  [main] INFO  com.dream.log.SystemOutRedirectToLog [] - aa1bb
552  [main] INFO  com.dream.log.SystemOutRedirectToLog [] - jsfkjskjl 
fjskdfsvnmx
552  [main] INFO  com.dream.log.SystemOutRedirectToLog [] - 1caa2bb
552  [main] INFO  com.dream.log.SystemOutRedirectToLog [] - 
552  [main] INFO  com.dream.log.SystemOutRedirectToLog [] - 
552  [main] INFO  com.dream.log.SystemOutRedirectToLog [] - aa3bb
553  [main] INFO  com.dream.log.SystemOutRedirectToLog [] - aa4bb


Compatibility, Deprecation, and Migration Plan

None


Test Plan

Unit test and manually.


Rejected Alternatives

Making taskmanager.out splittable and rolling: some concerns are mentioned in discuss mail list.


  • No labels