{ "cells": [ { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "# Dask in Python 3 (Ubuntu Linux)\n", "\n", "https://docs.dask.org/en/latest/\n", "\n", "**[CoCalc Wiki instructions](https://github.com/sagemathinc/cocalc/wiki/Dask)**" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "'1.0.0'" ] }, "execution_count": 1, "metadata": { }, "output_type": "execute_result" } ], "source": [ "import dask\n", "dask.__version__" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 2, "metadata": { }, "output_type": "execute_result" } ], "source": [ "import dask\n", "import dask.distributed\n", "import os\n", "dask.config.set({\n", " 'temporary_directory': os.path.expanduser('~/tmp'),\n", " 'scheduler.work-stealing': True\n", "})" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "{'logging': {'distributed': 'info',\n", " 'distributed.client': 'warning',\n", " 'bokeh': 'critical',\n", " 'tornado': 'critical',\n", " 'tornado.application': 'error'},\n", " 'require-encryption': False,\n", " 'client-heartbeat-interval': 5000,\n", " 'distributed': {'version': 2,\n", " 'scheduler': {'allowed-failures': 3,\n", " 'bandwidth': 100000000,\n", " 'default-data-size': 1000,\n", " 'transition-log-length': 100000,\n", " 'work-stealing': True,\n", " 'worker-ttl': None,\n", " 'preload': [],\n", " 'preload-argv': []},\n", " 'worker': {'multiprocessing-method': 'forkserver',\n", " 'use-file-locking': True,\n", " 'connections': {'outgoing': 50, 'incoming': 10},\n", " 'preload': [],\n", " 'preload-argv': [],\n", " 'profile': {'interval': 10, 'cycle': 1000},\n", " 'memory': {'target': 0.6, 'spill': 0.7, 'pause': 0.8, 'terminate': 0.95}},\n", " 'client': {'heartbeat': '5s'},\n", " 'comm': {'compression': 'auto',\n", " 'default-scheme': 'tcp',\n", " 'socket-backlog': 2048,\n", " 'recent-messages-log-length': 0,\n", " 'timeouts': {'connect': 3, 'tcp': 30}},\n", " 'dashboard': {'link': 'http://{host}:{port}/status', 'export-tool': False},\n", " 'admin': {'tick': {'interval': 20, 'limit': 1000},\n", " 'log-length': 10000,\n", " 'log-format': '%(name)s - %(levelname)s - %(message)s',\n", " 'pdb-on-err': False}},\n", " 'temporary-directory': '/home/user/tmp',\n", " 'scheduler': {'work-stealing': True}}" ] }, "execution_count": 3, "metadata": { }, "output_type": "execute_result" } ], "source": [ "dask.config.config" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "

Client

\n", "\n", "
\n", "

Cluster

\n", "
    \n", "
  • Workers: 2
  • \n", "
  • Cores: 2
  • \n", "
  • Memory: 512.00 MB
  • \n", "
\n", "
" ] }, "execution_count": 4, "metadata": { }, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client\n", "client = Client('127.0.0.1:8786')\n", "client" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "The dashboard is actually at `https://cocalc.com/{{ THE PROJECT UUID }}/server/8787/status`\n", "\n", "Websocket forwarding doesn't work, though ... hmm...\n", "\n", "alternatively, start an X11 desktop in cocalc and run `google-chrome` at http://127.0.0.1:8787/status" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "### data array similar to numpy arrays" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": false }, "outputs": [ ], "source": [ "import dask.array as da" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "dask.array" ] }, "execution_count": 6, "metadata": { }, "output_type": "execute_result" } ], "source": [ "x = da.random.random((3000, 3000), chunks=(300, 300))\n", "x" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "dask.array" ] }, "execution_count": 7, "metadata": { }, "output_type": "execute_result" } ], "source": [ "y = x + x.T\n", "z = y[::50, 1000:].mean(axis=1)\n", "z" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "(60,)" ] }, "execution_count": 8, "metadata": { }, "output_type": "execute_result" } ], "source": [ "z.shape" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "array([1.00894334, 0.98686389, 1.00412201, 1.00562101, 0.9984469 ,\n", " 1.00773417, 1.0017398 , 0.99675565, 0.99054068, 0.98704365,\n", " 0.99406761, 1.01857855, 1.00252324, 1.00853879, 0.98123995,\n", " 0.99701221, 0.99059657, 0.99144697, 1.00628833, 0.9947498 ,\n", " 0.99845961, 1.01338604, 1.00216002, 0.99366811, 0.99538552,\n", " 1.03424335, 1.00016978, 0.99584787, 0.99663896, 1.0028844 ,\n", " 0.99098811, 0.9955956 , 0.99308403, 1.01371317, 1.0229804 ,\n", " 0.99350163, 1.00235196, 1.00826049, 0.9949777 , 0.98840276,\n", " 1.00248748, 1.01109067, 1.01085646, 1.01437783, 1.00604058,\n", " 0.99871721, 1.00370476, 0.99210787, 1.00258028, 0.99557382,\n", " 1.0012086 , 0.99908451, 0.98371352, 1.00652402, 0.9849942 ,\n", " 1.0135658 , 0.98824092, 0.99474543, 1.00656597, 0.9921391 ])" ] }, "execution_count": 9, "metadata": { }, "output_type": "execute_result" } ], "source": [ "out = z.compute()\n", "out" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "1.0002978610915068" ] }, "execution_count": 10, "metadata": { }, "output_type": "execute_result" } ], "source": [ "(z[:100].sum() / z[:100].shape[0]).compute()" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "### functions and native lists" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "-1750" ] }, "execution_count": 11, "metadata": { }, "output_type": "execute_result" } ], "source": [ "fn = lambda x : x**3 - x**2 + 1\n", "\n", "def neg(a):\n", " import time\n", " time.sleep(.1)\n", " return -a\n", "\n", "A = client.map(fn, range(10))\n", "B = client.map(neg, A)\n", "C = client.map(lambda a, b: a+b, A, B)\n", "total = client.submit(sum, B)\n", "total.result()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]" ] }, "execution_count": 12, "metadata": { }, "output_type": "execute_result" } ], "source": [ "client.gather(C)" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "### loops?" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "collapsed": false }, "outputs": [ ], "source": [ "def fib(x):\n", " if x <= 1:\n", " return 1\n", " else:\n", " return fib(x - 2) + fib(x - 1)" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [ { "data": { "text/plain": [ "[1, 1, 2, 3, 5, 8, 13, 21, 34, 55]" ] }, "execution_count": 14, "metadata": { }, "output_type": "execute_result" } ], "source": [ "[fib(_) for _ in range(10)]" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "## Dask Bags" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "dask.bag" ] }, "execution_count": 15, "metadata": { }, "output_type": "execute_result" } ], "source": [ "import dask.bag as db\n", "b1 = db.from_sequence(range(-1000, 1000), npartitions=50)\n", "b1" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "collapsed": false }, "outputs": [ ], "source": [ "import operator" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "collapsed": false, "scrolled": true }, "outputs": [ { "data": { "text/plain": [ "[(True, -1000), (False, 0)]" ] }, "execution_count": 17, "metadata": { }, "output_type": "execute_result" } ], "source": [ "is_odd = lambda x : x % 2 == 0\n", "\n", "#b1.groupby(is_odd).map(lambda k_v : (k_v[0], sum(k_v[1]))).compute()\n", "b1.foldby(is_odd, operator.add, 0).compute()" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "## Dask Delayed" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "collapsed": false }, "outputs": [ ], "source": [ "from dask import delayed" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "collapsed": false }, "outputs": [ ], "source": [ "inc = lambda x : x+1\n", "from operator import add" ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "30" ] }, "execution_count": 20, "metadata": { }, "output_type": "execute_result" } ], "source": [ "z = delayed(0)\n", "for i in range(5):\n", " x = delayed(inc)(i)\n", " y = delayed(inc)(delayed(add)(i, x))\n", " z = delayed(add)(z, y)\n", "z.compute()" ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "Delayed('vizualize-38f8c38e-2766-4979-aa1d-a76835e14c0f')" ] }, "execution_count": 21, "metadata": { }, "output_type": "execute_result" } ], "source": [ "z.vizualize(filename='dask-delayed-1.svg')" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "## Dask ML\n", "\n", "https://dask-ml.readthedocs.io/en/latest/" ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "collapsed": false }, "outputs": [ ], "source": [ "import dask_ml" ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "collapsed": false }, "outputs": [ ], "source": [ "from dask_ml.preprocessing import Categorizer, DummyEncoder\n", "from dask_ml.linear_model import LogisticRegression" ] }, { "cell_type": "code", "execution_count": 24, "metadata": { "collapsed": false }, "outputs": [ { "data": { "text/plain": [ "LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True,\n", " intercept_scaling=1.0, max_iter=100, multiclass='ovr', n_jobs=1,\n", " penalty='l2', random_state=None, solver='admm',\n", " solver_kwargs=None, tol=0.0001, verbose=0, warm_start=False)" ] }, "execution_count": 24, "metadata": { }, "output_type": "execute_result" } ], "source": [ "lr = LogisticRegression()\n", "lr" ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "collapsed": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "from sklearn.externals.joblib import parallel_backend\n", "with parallel_backend('dask') as pb:\n", " print(pb[0])" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "collapsed": false }, "outputs": [ ], "source": [ ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "collapsed": false }, "outputs": [ ], "source": [ ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "collapsed": false }, "outputs": [ ], "source": [ ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "collapsed": false }, "outputs": [ ], "source": [ ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (Ubuntu Linux)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.7" } }, "nbformat": 4, "nbformat_minor": 0 }