mirror of
https://github.com/actix/actix-extras.git
synced 2025-08-30 11:08:08 +02:00
Deploying to gh-pages from @ 8741cd32cc
🚀
This commit is contained in:
@@ -143,23 +143,23 @@
|
||||
<span id="140">140</span>
|
||||
<span id="141">141</span>
|
||||
</pre><div class="example-wrap"><pre class="rust ">
|
||||
<span class="kw">use</span> <span class="ident">std</span>::<span class="ident">collections</span>::<span class="ident">VecDeque</span>;
|
||||
<span class="kw">use</span> <span class="ident">std</span>::<span class="ident">io</span>;
|
||||
<span class="kw">use</span> <span class="ident">std::collections::VecDeque</span>;
|
||||
<span class="kw">use</span> <span class="ident">std::io</span>;
|
||||
|
||||
<span class="kw">use</span> <span class="ident">actix</span>::<span class="ident">prelude</span>::<span class="kw-2">*</span>;
|
||||
<span class="kw">use</span> <span class="ident">actix_rt</span>::<span class="ident">net</span>::<span class="ident">TcpStream</span>;
|
||||
<span class="kw">use</span> <span class="ident">actix_service</span>::<span class="ident">boxed</span>::{<span class="ident">service</span>, <span class="ident">BoxService</span>};
|
||||
<span class="kw">use</span> <span class="ident">actix_tls</span>::<span class="ident">connect</span>::{<span class="ident">default_connector</span>, <span class="ident">Connect</span>, <span class="ident">ConnectError</span>, <span class="ident">Connection</span>};
|
||||
<span class="kw">use</span> <span class="ident">backoff</span>::<span class="ident">backoff</span>::<span class="ident">Backoff</span>;
|
||||
<span class="kw">use</span> <span class="ident">backoff</span>::<span class="ident">ExponentialBackoff</span>;
|
||||
<span class="kw">use</span> <span class="ident">actix::prelude</span>::<span class="kw-2">*</span>;
|
||||
<span class="kw">use</span> <span class="ident">actix_rt::net::TcpStream</span>;
|
||||
<span class="kw">use</span> <span class="ident">actix_service::boxed</span>::{<span class="ident">service</span>, <span class="ident">BoxService</span>};
|
||||
<span class="kw">use</span> <span class="ident">actix_tls::connect</span>::{<span class="ident">default_connector</span>, <span class="ident">Connect</span>, <span class="ident">ConnectError</span>, <span class="ident">Connection</span>};
|
||||
<span class="kw">use</span> <span class="ident">backoff::backoff::Backoff</span>;
|
||||
<span class="kw">use</span> <span class="ident">backoff::ExponentialBackoff</span>;
|
||||
<span class="kw">use</span> <span class="ident">log</span>::{<span class="ident">error</span>, <span class="ident">info</span>, <span class="ident">warn</span>};
|
||||
<span class="kw">use</span> <span class="ident">redis_async</span>::<span class="ident">error</span>::<span class="ident">Error</span> <span class="kw">as</span> <span class="ident">RespError</span>;
|
||||
<span class="kw">use</span> <span class="ident">redis_async</span>::<span class="ident">resp</span>::{<span class="ident">RespCodec</span>, <span class="ident">RespValue</span>};
|
||||
<span class="kw">use</span> <span class="ident">tokio</span>::<span class="ident">io</span>::{<span class="ident">split</span>, <span class="ident">WriteHalf</span>};
|
||||
<span class="kw">use</span> <span class="ident">tokio</span>::<span class="ident">sync</span>::<span class="ident">oneshot</span>;
|
||||
<span class="kw">use</span> <span class="ident">tokio_util</span>::<span class="ident">codec</span>::<span class="ident">FramedRead</span>;
|
||||
<span class="kw">use</span> <span class="ident">redis_async::error::Error</span> <span class="kw">as</span> <span class="ident">RespError</span>;
|
||||
<span class="kw">use</span> <span class="ident">redis_async::resp</span>::{<span class="ident">RespCodec</span>, <span class="ident">RespValue</span>};
|
||||
<span class="kw">use</span> <span class="ident">tokio::io</span>::{<span class="ident">split</span>, <span class="ident">WriteHalf</span>};
|
||||
<span class="kw">use</span> <span class="ident">tokio::sync::oneshot</span>;
|
||||
<span class="kw">use</span> <span class="ident">tokio_util::codec::FramedRead</span>;
|
||||
|
||||
<span class="kw">use</span> <span class="kw">crate</span>::<span class="ident">Error</span>;
|
||||
<span class="kw">use</span> <span class="kw">crate</span><span class="ident">::Error</span>;
|
||||
|
||||
<span class="doccomment">/// Command for send data to Redis</span>
|
||||
<span class="attribute">#[<span class="ident">derive</span>(<span class="ident">Debug</span>)]</span>
|
||||
@@ -174,8 +174,8 @@
|
||||
<span class="ident">addr</span>: <span class="ident">String</span>,
|
||||
<span class="ident">connector</span>: <span class="ident">BoxService</span><span class="op"><</span><span class="ident">Connect</span><span class="op"><</span><span class="ident">String</span><span class="op">></span>, <span class="ident">Connection</span><span class="op"><</span><span class="ident">String</span>, <span class="ident">TcpStream</span><span class="op">></span>, <span class="ident">ConnectError</span><span class="op">></span>,
|
||||
<span class="ident">backoff</span>: <span class="ident">ExponentialBackoff</span>,
|
||||
<span class="ident">cell</span>: <span class="prelude-ty">Option</span><span class="op"><</span><span class="ident">actix</span>::<span class="ident">io</span>::<span class="ident">FramedWrite</span><span class="op"><</span><span class="ident">RespValue</span>, <span class="ident">WriteHalf</span><span class="op"><</span><span class="ident">TcpStream</span><span class="op">></span>, <span class="ident">RespCodec</span><span class="op">></span><span class="op">></span>,
|
||||
<span class="ident">queue</span>: <span class="ident">VecDeque</span><span class="op"><</span><span class="ident">oneshot</span>::<span class="ident">Sender</span><span class="op"><</span><span class="prelude-ty">Result</span><span class="op"><</span><span class="ident">RespValue</span>, <span class="ident">Error</span><span class="op">></span><span class="op">></span><span class="op">></span>,
|
||||
<span class="ident">cell</span>: <span class="prelude-ty">Option</span><span class="op"><</span><span class="ident">actix::io::FramedWrite</span><span class="op"><</span><span class="ident">RespValue</span>, <span class="ident">WriteHalf</span><span class="op"><</span><span class="ident">TcpStream</span><span class="op">></span>, <span class="ident">RespCodec</span><span class="op">></span><span class="op">></span>,
|
||||
<span class="ident">queue</span>: <span class="ident">VecDeque</span><span class="op"><</span><span class="ident">oneshot::Sender</span><span class="op"><</span><span class="prelude-ty">Result</span><span class="op"><</span><span class="ident">RespValue</span>, <span class="ident">Error</span><span class="op">></span><span class="op">></span><span class="op">></span>,
|
||||
}
|
||||
|
||||
<span class="kw">impl</span> <span class="ident">RedisActor</span> {
|
||||
@@ -185,15 +185,15 @@
|
||||
|
||||
<span class="kw">let</span> <span class="ident">backoff</span> <span class="op">=</span> <span class="ident">ExponentialBackoff</span> {
|
||||
<span class="ident">max_elapsed_time</span>: <span class="prelude-val">None</span>,
|
||||
..<span class="ident">Default</span>::<span class="ident">default</span>()
|
||||
..<span class="ident">Default::default</span>()
|
||||
};
|
||||
|
||||
<span class="ident">Supervisor</span>::<span class="ident">start</span>(<span class="op">|</span><span class="kw">_</span><span class="op">|</span> <span class="ident">RedisActor</span> {
|
||||
<span class="ident">Supervisor::start</span>(<span class="op">|</span><span class="kw">_</span><span class="op">|</span> <span class="ident">RedisActor</span> {
|
||||
<span class="ident">addr</span>,
|
||||
<span class="ident">connector</span>: <span class="ident">service</span>(<span class="ident">default_connector</span>()),
|
||||
<span class="ident">cell</span>: <span class="prelude-val">None</span>,
|
||||
<span class="ident">backoff</span>,
|
||||
<span class="ident">queue</span>: <span class="ident">VecDeque</span>::<span class="ident">new</span>(),
|
||||
<span class="ident">queue</span>: <span class="ident">VecDeque::new</span>(),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -202,28 +202,28 @@
|
||||
<span class="kw">type</span> <span class="ident">Context</span> <span class="op">=</span> <span class="ident">Context</span><span class="op"><</span><span class="self">Self</span><span class="op">></span>;
|
||||
|
||||
<span class="kw">fn</span> <span class="ident">started</span>(<span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">self</span>, <span class="ident">ctx</span>: <span class="kw-2">&</span><span class="kw-2">mut</span> <span class="ident">Context</span><span class="op"><</span><span class="self">Self</span><span class="op">></span>) {
|
||||
<span class="kw">let</span> <span class="ident">req</span> <span class="op">=</span> <span class="ident">Connect</span>::<span class="ident">new</span>(<span class="self">self</span>.<span class="ident">addr</span>.<span class="ident">to_owned</span>());
|
||||
<span class="kw">let</span> <span class="ident">req</span> <span class="op">=</span> <span class="ident">Connect::new</span>(<span class="self">self</span>.<span class="ident">addr</span>.<span class="ident">to_owned</span>());
|
||||
<span class="self">self</span>.<span class="ident">connector</span>
|
||||
.<span class="ident">call</span>(<span class="ident">req</span>)
|
||||
.<span class="ident">into_actor</span>(<span class="self">self</span>)
|
||||
.<span class="ident">map</span>(<span class="op">|</span><span class="ident">res</span>, <span class="ident">act</span>, <span class="ident">ctx</span><span class="op">|</span> <span class="kw">match</span> <span class="ident">res</span> {
|
||||
<span class="prelude-val">Ok</span>(<span class="ident">conn</span>) <span class="op">=</span><span class="op">></span> {
|
||||
<span class="kw">let</span> <span class="ident">stream</span> <span class="op">=</span> <span class="ident">conn</span>.<span class="ident">into_parts</span>().<span class="number">0</span>;
|
||||
<span class="macro">info</span><span class="macro">!</span>(<span class="string">"Connected to redis server: {}"</span>, <span class="ident">act</span>.<span class="ident">addr</span>);
|
||||
<span class="macro">info!</span>(<span class="string">"Connected to redis server: {}"</span>, <span class="ident">act</span>.<span class="ident">addr</span>);
|
||||
|
||||
<span class="kw">let</span> (<span class="ident">r</span>, <span class="ident">w</span>) <span class="op">=</span> <span class="ident">split</span>(<span class="ident">stream</span>);
|
||||
|
||||
<span class="comment">// configure write side of the connection</span>
|
||||
<span class="kw">let</span> <span class="ident">framed</span> <span class="op">=</span> <span class="ident">actix</span>::<span class="ident">io</span>::<span class="ident">FramedWrite</span>::<span class="ident">new</span>(<span class="ident">w</span>, <span class="ident">RespCodec</span>, <span class="ident">ctx</span>);
|
||||
<span class="kw">let</span> <span class="ident">framed</span> <span class="op">=</span> <span class="ident">actix::io::FramedWrite::new</span>(<span class="ident">w</span>, <span class="ident">RespCodec</span>, <span class="ident">ctx</span>);
|
||||
<span class="ident">act</span>.<span class="ident">cell</span> <span class="op">=</span> <span class="prelude-val">Some</span>(<span class="ident">framed</span>);
|
||||
|
||||
<span class="comment">// read side of the connection</span>
|
||||
<span class="ident">ctx</span>.<span class="ident">add_stream</span>(<span class="ident">FramedRead</span>::<span class="ident">new</span>(<span class="ident">r</span>, <span class="ident">RespCodec</span>));
|
||||
<span class="ident">ctx</span>.<span class="ident">add_stream</span>(<span class="ident">FramedRead::new</span>(<span class="ident">r</span>, <span class="ident">RespCodec</span>));
|
||||
|
||||
<span class="ident">act</span>.<span class="ident">backoff</span>.<span class="ident">reset</span>();
|
||||
}
|
||||
<span class="prelude-val">Err</span>(<span class="ident">err</span>) <span class="op">=</span><span class="op">></span> {
|
||||
<span class="macro">error</span><span class="macro">!</span>(<span class="string">"Can not connect to redis server: {}"</span>, <span class="ident">err</span>);
|
||||
<span class="macro">error!</span>(<span class="string">"Can not connect to redis server: {}"</span>, <span class="ident">err</span>);
|
||||
<span class="comment">// re-connect with backoff time.</span>
|
||||
<span class="comment">// we stop current context, supervisor will restart it.</span>
|
||||
<span class="kw">if</span> <span class="kw">let</span> <span class="prelude-val">Some</span>(<span class="ident">timeout</span>) <span class="op">=</span> <span class="ident">act</span>.<span class="ident">backoff</span>.<span class="ident">next_backoff</span>() {
|
||||
@@ -236,23 +236,23 @@
|
||||
}
|
||||
|
||||
<span class="kw">impl</span> <span class="ident">Supervised</span> <span class="kw">for</span> <span class="ident">RedisActor</span> {
|
||||
<span class="kw">fn</span> <span class="ident">restarting</span>(<span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">self</span>, <span class="kw">_</span>: <span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">Self</span>::<span class="ident">Context</span>) {
|
||||
<span class="kw">fn</span> <span class="ident">restarting</span>(<span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">self</span>, <span class="kw">_</span>: <span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">Self</span><span class="ident">::Context</span>) {
|
||||
<span class="self">self</span>.<span class="ident">cell</span>.<span class="ident">take</span>();
|
||||
<span class="kw">for</span> <span class="ident">tx</span> <span class="kw">in</span> <span class="self">self</span>.<span class="ident">queue</span>.<span class="ident">drain</span>(..) {
|
||||
<span class="kw">let</span> <span class="kw">_</span> <span class="op">=</span> <span class="ident">tx</span>.<span class="ident">send</span>(<span class="prelude-val">Err</span>(<span class="ident">Error</span>::<span class="ident">Disconnected</span>));
|
||||
<span class="kw">let</span> <span class="kw">_</span> <span class="op">=</span> <span class="ident">tx</span>.<span class="ident">send</span>(<span class="prelude-val">Err</span>(<span class="ident">Error::Disconnected</span>));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
<span class="kw">impl</span> <span class="ident">actix</span>::<span class="ident">io</span>::<span class="ident">WriteHandler</span><span class="op"><</span><span class="ident">io</span>::<span class="ident">Error</span><span class="op">></span> <span class="kw">for</span> <span class="ident">RedisActor</span> {
|
||||
<span class="kw">fn</span> <span class="ident">error</span>(<span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">self</span>, <span class="ident">err</span>: <span class="ident">io</span>::<span class="ident">Error</span>, <span class="kw">_</span>: <span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">Self</span>::<span class="ident">Context</span>) <span class="op">-</span><span class="op">></span> <span class="ident">Running</span> {
|
||||
<span class="macro">warn</span><span class="macro">!</span>(<span class="string">"Redis connection dropped: {} error: {}"</span>, <span class="self">self</span>.<span class="ident">addr</span>, <span class="ident">err</span>);
|
||||
<span class="ident">Running</span>::<span class="ident">Stop</span>
|
||||
<span class="kw">impl</span> <span class="ident">actix::io::WriteHandler</span><span class="op"><</span><span class="ident">io::Error</span><span class="op">></span> <span class="kw">for</span> <span class="ident">RedisActor</span> {
|
||||
<span class="kw">fn</span> <span class="ident">error</span>(<span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">self</span>, <span class="ident">err</span>: <span class="ident">io::Error</span>, <span class="kw">_</span>: <span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">Self</span><span class="ident">::Context</span>) <span class="op">-</span><span class="op">></span> <span class="ident">Running</span> {
|
||||
<span class="macro">warn!</span>(<span class="string">"Redis connection dropped: {} error: {}"</span>, <span class="self">self</span>.<span class="ident">addr</span>, <span class="ident">err</span>);
|
||||
<span class="ident">Running::Stop</span>
|
||||
}
|
||||
}
|
||||
|
||||
<span class="kw">impl</span> <span class="ident">StreamHandler</span><span class="op"><</span><span class="prelude-ty">Result</span><span class="op"><</span><span class="ident">RespValue</span>, <span class="ident">RespError</span><span class="op">></span><span class="op">></span> <span class="kw">for</span> <span class="ident">RedisActor</span> {
|
||||
<span class="kw">fn</span> <span class="ident">handle</span>(<span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">self</span>, <span class="ident">msg</span>: <span class="prelude-ty">Result</span><span class="op"><</span><span class="ident">RespValue</span>, <span class="ident">RespError</span><span class="op">></span>, <span class="ident">ctx</span>: <span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">Self</span>::<span class="ident">Context</span>) {
|
||||
<span class="kw">fn</span> <span class="ident">handle</span>(<span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">self</span>, <span class="ident">msg</span>: <span class="prelude-ty">Result</span><span class="op"><</span><span class="ident">RespValue</span>, <span class="ident">RespError</span><span class="op">></span>, <span class="ident">ctx</span>: <span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">Self</span><span class="ident">::Context</span>) {
|
||||
<span class="kw">match</span> <span class="ident">msg</span> {
|
||||
<span class="prelude-val">Err</span>(<span class="ident">e</span>) <span class="op">=</span><span class="op">></span> {
|
||||
<span class="kw">if</span> <span class="kw">let</span> <span class="prelude-val">Some</span>(<span class="ident">tx</span>) <span class="op">=</span> <span class="self">self</span>.<span class="ident">queue</span>.<span class="ident">pop_front</span>() {
|
||||
@@ -272,16 +272,16 @@
|
||||
<span class="kw">impl</span> <span class="ident">Handler</span><span class="op"><</span><span class="ident">Command</span><span class="op">></span> <span class="kw">for</span> <span class="ident">RedisActor</span> {
|
||||
<span class="kw">type</span> <span class="prelude-ty">Result</span> <span class="op">=</span> <span class="ident">ResponseFuture</span><span class="op"><</span><span class="prelude-ty">Result</span><span class="op"><</span><span class="ident">RespValue</span>, <span class="ident">Error</span><span class="op">></span><span class="op">></span>;
|
||||
|
||||
<span class="kw">fn</span> <span class="ident">handle</span>(<span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">self</span>, <span class="ident">msg</span>: <span class="ident">Command</span>, <span class="kw">_</span>: <span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">Self</span>::<span class="ident">Context</span>) <span class="op">-</span><span class="op">></span> <span class="self">Self</span>::<span class="prelude-ty">Result</span> {
|
||||
<span class="kw">let</span> (<span class="ident">tx</span>, <span class="ident">rx</span>) <span class="op">=</span> <span class="ident">oneshot</span>::<span class="ident">channel</span>();
|
||||
<span class="kw">fn</span> <span class="ident">handle</span>(<span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">self</span>, <span class="ident">msg</span>: <span class="ident">Command</span>, <span class="kw">_</span>: <span class="kw-2">&</span><span class="kw-2">mut</span> <span class="self">Self</span><span class="ident">::Context</span>) <span class="op">-</span><span class="op">></span> <span class="self">Self</span><span class="ident">::Result</span> {
|
||||
<span class="kw">let</span> (<span class="ident">tx</span>, <span class="ident">rx</span>) <span class="op">=</span> <span class="ident">oneshot::channel</span>();
|
||||
<span class="kw">if</span> <span class="kw">let</span> <span class="prelude-val">Some</span>(<span class="kw-2">ref</span> <span class="kw-2">mut</span> <span class="ident">cell</span>) <span class="op">=</span> <span class="self">self</span>.<span class="ident">cell</span> {
|
||||
<span class="self">self</span>.<span class="ident">queue</span>.<span class="ident">push_back</span>(<span class="ident">tx</span>);
|
||||
<span class="ident">cell</span>.<span class="ident">write</span>(<span class="ident">msg</span>.<span class="number">0</span>);
|
||||
} <span class="kw">else</span> {
|
||||
<span class="kw">let</span> <span class="kw">_</span> <span class="op">=</span> <span class="ident">tx</span>.<span class="ident">send</span>(<span class="prelude-val">Err</span>(<span class="ident">Error</span>::<span class="ident">NotConnected</span>));
|
||||
<span class="kw">let</span> <span class="kw">_</span> <span class="op">=</span> <span class="ident">tx</span>.<span class="ident">send</span>(<span class="prelude-val">Err</span>(<span class="ident">Error::NotConnected</span>));
|
||||
}
|
||||
|
||||
<span class="ident">Box</span>::<span class="ident">pin</span>(<span class="kw">async</span> <span class="kw">move</span> { <span class="ident">rx</span>.<span class="kw">await</span>.<span class="ident">map_err</span>(<span class="op">|</span><span class="kw">_</span><span class="op">|</span> <span class="ident">Error</span>::<span class="ident">Disconnected</span>)<span class="question-mark">?</span> })
|
||||
<span class="ident">Box::pin</span>(<span class="kw">async</span> <span class="kw">move</span> { <span class="ident">rx</span>.<span class="kw">await</span>.<span class="ident">map_err</span>(<span class="op">|</span><span class="kw">_</span><span class="op">|</span> <span class="ident">Error::Disconnected</span>)<span class="question-mark">?</span> })
|
||||
}
|
||||
}
|
||||
</pre></div>
|
||||
|
Reference in New Issue
Block a user