博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[case48]聊聊flink的SocketClientSink
阅读量:6922 次
发布时间:2019-06-27

本文共 8865 字,大约阅读时间需要 29 分钟。

本文主要研究一下flink的SocketClientSink

DataStream.writeToSocket

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

/**     * Writes the DataStream to a socket as a byte array. The format of the     * output is specified by a {@link SerializationSchema}.     *     * @param hostName     *            host of the socket     * @param port     *            port of the socket     * @param schema     *            schema for serialization     * @return the closed DataStream     */    @PublicEvolving    public DataStreamSink
writeToSocket(String hostName, int port, SerializationSchema
schema) { DataStreamSink
returnStream = addSink(new SocketClientSink<>(hostName, port, schema, 0)); returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port return returnStream; }
  • DataStream的writeToSocket方法,内部创建了SocketClientSink,这里传递了四个构造参数,分别是hostName、port、schema、maxNumRetries(这里为0)

SocketClientSink

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java

/** * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array. * * 

The sink can be set to retry message sends after the sending failed. * *

The sink can be set to 'autoflush', in which case the socket stream is flushed after every * message. This significantly reduced throughput, but also decreases message latency. * * @param

data to be written into the Socket. */@PublicEvolvingpublic class SocketClientSink
extends RichSinkFunction
{ private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class); private static final int CONNECTION_RETRY_DELAY = 500; private final SerializableObject lock = new SerializableObject(); private final SerializationSchema
schema; private final String hostName; private final int port; private final int maxNumRetries; private final boolean autoFlush; private transient Socket client; private transient OutputStream outputStream; private int retries; private volatile boolean isRunning = true; /** * Creates a new SocketClientSink. The sink will not attempt to retry connections upon failure * and will not auto-flush the stream. * * @param hostName Hostname of the server to connect to. * @param port Port of the server. * @param schema Schema used to serialize the data into bytes. */ public SocketClientSink(String hostName, int port, SerializationSchema
schema) { this(hostName, port, schema, 0); } /** * Creates a new SocketClientSink that retries connections upon failure up to a given number of times. * A value of -1 for the number of retries will cause the system to retry an infinite number of times. * The sink will not auto-flush the stream. * * @param hostName Hostname of the server to connect to. * @param port Port of the server. * @param schema Schema used to serialize the data into bytes. * @param maxNumRetries The maximum number of retries after a message send failed. */ public SocketClientSink(String hostName, int port, SerializationSchema
schema, int maxNumRetries) { this(hostName, port, schema, maxNumRetries, false); } /** * Creates a new SocketClientSink that retries connections upon failure up to a given number of times. * A value of -1 for the number of retries will cause the system to retry an infinite number of times. * * @param hostName Hostname of the server to connect to. * @param port Port of the server. * @param schema Schema used to serialize the data into bytes. * @param maxNumRetries The maximum number of retries after a message send failed. * @param autoflush Flag to indicate whether the socket stream should be flushed after each message. */ public SocketClientSink(String hostName, int port, SerializationSchema
schema, int maxNumRetries, boolean autoflush) { checkArgument(port > 0 && port < 65536, "port is out of range"); checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)"); this.hostName = checkNotNull(hostName, "hostname must not be null"); this.port = port; this.schema = checkNotNull(schema); this.maxNumRetries = maxNumRetries; this.autoFlush = autoflush; } // ------------------------------------------------------------------------ // Life cycle // ------------------------------------------------------------------------ /** * Initialize the connection with the Socket in the server. * @param parameters Configuration. */ @Override public void open(Configuration parameters) throws Exception { try { synchronized (lock) { createConnection(); } } catch (IOException e) { throw new IOException("Cannot connect to socket server at " + hostName + ":" + port, e); } } /** * Called when new data arrives to the sink, and forwards it to Socket. * * @param value The value to write to the socket. */ @Override public void invoke(IN value) throws Exception { byte[] msg = schema.serialize(value); try { outputStream.write(msg); if (autoFlush) { outputStream.flush(); } } catch (IOException e) { // if no re-tries are enable, fail immediately if (maxNumRetries == 0) { throw new IOException("Failed to send message '" + value + "' to socket server at " + hostName + ":" + port + ". Connection re-tries are not enabled.", e); } LOG.error("Failed to send message '" + value + "' to socket server at " + hostName + ":" + port + ". Trying to reconnect..." , e); // do the retries in locked scope, to guard against concurrent close() calls // note that the first re-try comes immediately, without a wait! synchronized (lock) { IOException lastException = null; retries = 0; while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) { // first, clean up the old resources try { if (outputStream != null) { outputStream.close(); } } catch (IOException ee) { LOG.error("Could not close output stream from failed write attempt", ee); } try { if (client != null) { client.close(); } } catch (IOException ee) { LOG.error("Could not close socket from failed write attempt", ee); } // try again retries++; try { // initialize a new connection createConnection(); // re-try the write outputStream.write(msg); // success! return; } catch (IOException ee) { lastException = ee; LOG.error("Re-connect to socket server and send message failed. Retry time(s): " + retries, ee); } // wait before re-attempting to connect lock.wait(CONNECTION_RETRY_DELAY); } // throw an exception if the task is still running, otherwise simply leave the method if (isRunning) { throw new IOException("Failed to send message '" + value + "' to socket server at " + hostName + ":" + port + ". Failed after " + retries + " retries.", lastException); } } } } /** * Closes the connection with the Socket server. */ @Override public void close() throws Exception { // flag this as not running any more isRunning = false; // clean up in locked scope, so there is no concurrent change to the stream and client synchronized (lock) { // we notify first (this statement cannot fail). The notified thread will not continue // anyways before it can re-acquire the lock lock.notifyAll(); try { if (outputStream != null) { outputStream.close(); } } finally { if (client != null) { client.close(); } } } } // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ private void createConnection() throws IOException { client = new Socket(hostName, port); client.setKeepAlive(true); client.setTcpNoDelay(true); outputStream = client.getOutputStream(); } // ------------------------------------------------------------------------ // For testing // ------------------------------------------------------------------------ int getCurrentNumberOfRetries() { synchronized (lock) { return retries; } }}

  • SocketClientSink继承了RichSinkFunction,其autoFlush属性默认为false
  • open方法里头调用了createConnection,来初始化与socket的连接,如果此时出现IOException,则立马fail fast;createConnection的时候,这里设置的keepAlive及tcpNoDelay均为true
  • invoke方法首先调用schema.serialize方法来序列化value,然后调用socket的outputStream.write,如果autoFlush为true的话,则立马flush outputStream;如果出现IOException则立马进行重试,这里重试的逻辑直接写在catch里头,根据maxNumRetries来,重试的时候,就是先createConnection,然后调用outputStream.write,重试的delay为CONNECTION_RETRY_DELAY(500)

小结

  • DataStream的writeToSocket方法,内部创建了SocketClientSink,默认传递的maxNumRetries为0,而且没有调用带autoFlush属性默认为false的构造器,其autoFlush属性默认为false
  • open方法创建的socket,其keepAlive及tcpNoDelay均为true,如果open的时候出现IOException,则里头抛出异常终止运行
  • invoke方法比较简单,就是使用SerializationSchema来序列化value,然后write到outputStream;这里进行了简单的失败重试,默认的重试delay为CONNECTION_RETRY_DELAY(500),这个版本实现的重试比较简单,是同步进行的

doc

转载地址:http://xjecl.baihongyu.com/

你可能感兴趣的文章
C语言中存储类别、链接与内存管理
查看>>
Tips of Loadrunner
查看>>
Oracle表空间
查看>>
servlet实时给前端发送数据_百度知道 - 360安全浏览器 8.1
查看>>
git使用
查看>>
第一次负责项目总结
查看>>
解决spf13-vim编辑php丢失语法颜色问题
查看>>
关于注册github
查看>>
redis几种数据类型以及使用场景
查看>>
Silverlight 游戏开发小“.NET研究”技巧:技能冷却效果(Cooldown)
查看>>
Amazon RDS多区域高可用测试
查看>>
(五)java spring cloud版b2b2c社交电商spring cloud分布式微服务-路由网关(zuul)
查看>>
windows 7默认进入安全模式
查看>>
python从入门到放弃QAQ
查看>>
Java操作数据库之类的封装!...
查看>>
C# 对字符串操 替换数字 替换非数字 去除首尾字符
查看>>
Knockout: 实践CSS绑定和afterkeydown事件, 给未通过校验的输入框添加红色边框突出显示; 使用afterkeydown事件自动将输入转大写字母....
查看>>
JAVA JDK环境变量的配置
查看>>
mysql 修改字段属性 删除字段为NULL的数据
查看>>
python输入输出及变量
查看>>