{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"\n",
"![Dask Icon](images/dask_horizontal_black.gif \"Dask Icon\")\n",
"![Pandas Icon](images/pandas_logo.png \"Pandas Icon\")\n",
"\n",
"# Gotcha's from Pandas to Dask\n",
"\n",
"https://github.com/sephib/dask_pyconil2019"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"source": [
"This notebook highlights some key differences when transfering code from `Pandas` to run in a `Dask` environment. \n",
"Most issues have a link to the [Dask documentation](https://docs.dask.org/en/latest/) for additional information."
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"# Agenda \n",
"1. Intro to `Dask` framework\n",
"2. Basic setup `Client`\n",
"3. Dask.dataframe\n",
"4. Data manipulation\n",
"5. Read/Write files\n",
"6. Advanced `groupby`\n",
"7. Debugging\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"![Dask Icon](images/dask_horizontal_black.gif \"Dask Icon\")\n",
"\n",
"Dask is a flexible library for parallel computing in Python.\n",
"\n",
"![Dask Framework](images/dask_graph_outline.jpg)\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"source": [
"\n",
"Dask is composed of two parts:\n",
"\n",
"1. *Dynamic task scheduling* optimized for computation. This is similar to Airflow, Luigi, Celery, or Make, but optimized for interactive computational workloads.\n",
"2. *“Big Data” collections* like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of dynamic task schedulers.\n",
"\n",
"[link to documentation](https://docs.dask.org/en/latest/)\n",
"\n",
"Dask emphasizes the following virtues:\n",
"\n",
"* Familiar: Provides parallelized NumPy array and Pandas DataFrame objects\n",
"* Flexible: Provides a task scheduling interface for more custom workloads and integration with other projects.\n",
"* Native: Enables distributed computing in pure Python with access to the PyData stack.\n",
"* Fast: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms\n",
"* Scales up: Runs resiliently on clusters with 1000s of cores\n",
"* Scales down: Trivial to set up and run on a laptop in a single process\n",
"* Responsive: Designed with interactive computing in mind, it provides rapid feedback and diagnostics to aid humans\n",
"\n",
"\n",
"See the [dask.distributed documentation (separate website)](https://distributed.dask.org/en/latest/) for more technical information on Dask’s distributed scheduler."
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"# [Why Dask?](https://docs.dask.org/en/latest/why.html)\n",
"* Scales from single comptuer out to clusters\n",
"* Familiar API\n",
"* Responsive feedback (live dashboard)\n",
"* ..."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Dask versoin: 1.2.2\n",
"Pandas versoin: 0.24.2\n"
]
}
],
"source": [
"# since Dask is activly beeing developed - the current example is running with the below version\n",
"import dask\n",
"import dask.dataframe as dd\n",
"import pandas as pd\n",
"print(f'Dask versoin: {dask.__version__}')\n",
"print(f'Pandas versoin: {pd.__version__}')"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"## Dask `Distributed` scheduler "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```python \n",
"import dask.dataframe as dd \n",
"from dask.distributed import Client \n",
"client = Client()\n",
"df = dd.read_csv(...) # do something\n",
"``` \n",
"vs \n",
"\\# When running code within a script use a `context manager` \n",
"```python \n",
"if __name__ == '__main__':\n",
" with Client() as client:\n",
" df = dd.read_csv(...) # do something\n",
"```\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"source": [
"* see question in [stack overflow](https://stackoverflow.com/a/53520917/5817977) \n",
"* In order to get url dashboard use [inner function ](https://github.com/dask/distributed/issues/2083#issue-337057906) \n"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"## Start Dask Client for Dashboard\n",
"![Dask Dashboard](images/dask_dashboard.png)\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"\n",
"Client\n",
"\n",
" | \n",
"\n",
"Cluster\n",
"\n",
" - Workers: 4
\n",
" - Cores: 8
\n",
" - Memory: 67.44 GB
\n",
" \n",
" | \n",
"
\n",
"
"
],
"text/plain": [
""
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask.distributed import Client \n",
"# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')\n",
"client = Client()\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"source": [
"Starting the Dask Client is optional. In this example we are running on a `LocalCluster`, this will also provide a dashboard which is useful to gain insight on the computation. \n",
"For additional information on [Dask Client see documentation](https://docs.dask.org/en/latest/setup.html?highlight=client#setup) \n",
"\n",
"The link to the dashboard will become visible when you create a client (as shown below). \n",
"When running in `Jupyter Lab` an [extenstion](https://github.com/dask/dask-labextension) can be installed to be able to view the various dashboard widgets. "
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"source": [
"See [documentation for addtional cluster configuration](http://distributed.dask.org/en/latest/local-cluster.html)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"# Create 2 DataFrames for comparison: \n",
"* `Dask framework` is **lazy** ![lazy python](images/Sleeping-snake.jpg)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"Dask DataFrame Structure:
\n",
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" name | \n",
" x | \n",
" y | \n",
"
\n",
" \n",
" npartitions=30 | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2000-01-01 | \n",
" int64 | \n",
" object | \n",
" float64 | \n",
" float64 | \n",
"
\n",
" \n",
" 2000-01-02 | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
"
\n",
" \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
"
\n",
" \n",
" 2000-01-30 | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
"
\n",
" \n",
" 2000-01-31 | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
"
\n",
" \n",
"
\n",
"
\n",
"Dask Name: make-timeseries, 30 tasks
"
],
"text/plain": [
"Dask DataFrame Structure:\n",
" id name x y\n",
"npartitions=30 \n",
"2000-01-01 int64 object float64 float64\n",
"2000-01-02 ... ... ... ...\n",
"... ... ... ... ...\n",
"2000-01-30 ... ... ... ...\n",
"2000-01-31 ... ... ... ...\n",
"Dask Name: make-timeseries, 30 tasks"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf = dask.datasets.timeseries() # Dask comes with builtin dataset samples, we will use this sample for our example. \n",
"ddf"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"In order to see the result we need to run [compute()](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.compute) \n",
" (or `head()` which runs under the hood compute()) )"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" name | \n",
" x | \n",
" y | \n",
"
\n",
" \n",
" timestamp | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2000-01-01 00:00:00 | \n",
" 960 | \n",
" Oliver | \n",
" -0.745248 | \n",
" -0.198965 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:01 | \n",
" 1000 | \n",
" Dan | \n",
" 0.184641 | \n",
" 0.295430 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:02 | \n",
" 1020 | \n",
" Norbert | \n",
" -0.641957 | \n",
" -0.458981 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:03 | \n",
" 1015 | \n",
" Dan | \n",
" -0.631978 | \n",
" 0.454573 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:04 | \n",
" 998 | \n",
" Edith | \n",
" 0.508003 | \n",
" 0.076637 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:05 | \n",
" 1025 | \n",
" Oliver | \n",
" 0.633132 | \n",
" -0.968848 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:06 | \n",
" 978 | \n",
" Ursula | \n",
" -0.588642 | \n",
" -0.032841 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:07 | \n",
" 982 | \n",
" Charlie | \n",
" 0.119313 | \n",
" -0.518422 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:08 | \n",
" 1043 | \n",
" Victor | \n",
" -0.895856 | \n",
" -0.954497 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:09 | \n",
" 1020 | \n",
" Wendy | \n",
" 0.436936 | \n",
" -0.312972 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:10 | \n",
" 988 | \n",
" Michael | \n",
" 0.286573 | \n",
" -0.615041 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:11 | \n",
" 1036 | \n",
" Sarah | \n",
" -0.225445 | \n",
" 0.062726 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:12 | \n",
" 983 | \n",
" Ray | \n",
" -0.087305 | \n",
" 0.392608 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:13 | \n",
" 992 | \n",
" Ursula | \n",
" -0.268082 | \n",
" -0.700998 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:14 | \n",
" 1015 | \n",
" Zelda | \n",
" -0.541838 | \n",
" 0.364939 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:15 | \n",
" 1017 | \n",
" Michael | \n",
" -0.373376 | \n",
" 0.504668 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:16 | \n",
" 1022 | \n",
" Hannah | \n",
" 0.327997 | \n",
" -0.289495 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:17 | \n",
" 1054 | \n",
" Victor | \n",
" -0.583977 | \n",
" -0.654631 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:18 | \n",
" 967 | \n",
" Frank | \n",
" 0.199236 | \n",
" -0.657553 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:19 | \n",
" 1022 | \n",
" Ray | \n",
" -0.509110 | \n",
" 0.923490 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:20 | \n",
" 960 | \n",
" Charlie | \n",
" -0.003294 | \n",
" 0.038744 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:21 | \n",
" 980 | \n",
" Charlie | \n",
" -0.198883 | \n",
" 0.541580 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:22 | \n",
" 1052 | \n",
" Oliver | \n",
" -0.862566 | \n",
" -0.976609 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:23 | \n",
" 936 | \n",
" Tim | \n",
" -0.970641 | \n",
" 0.077440 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:24 | \n",
" 997 | \n",
" Edith | \n",
" 0.647717 | \n",
" 0.591489 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:25 | \n",
" 1063 | \n",
" Ray | \n",
" -0.650042 | \n",
" 0.499804 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:26 | \n",
" 1060 | \n",
" Tim | \n",
" -0.932838 | \n",
" 0.016557 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:27 | \n",
" 1082 | \n",
" Victor | \n",
" 0.604015 | \n",
" -0.646004 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:28 | \n",
" 1019 | \n",
" Bob | \n",
" -0.000948 | \n",
" 0.933974 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:29 | \n",
" 979 | \n",
" Alice | \n",
" -0.255095 | \n",
" 0.991901 | \n",
"
\n",
" \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
" ... | \n",
"
\n",
" \n",
" 2000-01-30 23:59:30 | \n",
" 959 | \n",
" Jerry | \n",
" -0.382988 | \n",
" -0.685075 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:31 | \n",
" 1015 | \n",
" Kevin | \n",
" -0.140484 | \n",
" -0.162593 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:32 | \n",
" 992 | \n",
" Tim | \n",
" -0.731128 | \n",
" -0.814783 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:33 | \n",
" 1005 | \n",
" Alice | \n",
" -0.270136 | \n",
" 0.132457 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:34 | \n",
" 1002 | \n",
" Quinn | \n",
" 0.537519 | \n",
" -0.086152 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:35 | \n",
" 994 | \n",
" Ray | \n",
" 0.963644 | \n",
" -0.561226 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:36 | \n",
" 1013 | \n",
" Jerry | \n",
" 0.108260 | \n",
" -0.756177 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:37 | \n",
" 1001 | \n",
" Edith | \n",
" 0.201864 | \n",
" -0.345191 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:38 | \n",
" 969 | \n",
" Kevin | \n",
" -0.392416 | \n",
" 0.482699 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:39 | \n",
" 1006 | \n",
" Yvonne | \n",
" 0.858664 | \n",
" -0.864165 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:40 | \n",
" 998 | \n",
" Oliver | \n",
" -0.034046 | \n",
" 0.942656 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:41 | \n",
" 1019 | \n",
" Charlie | \n",
" -0.189008 | \n",
" -0.265194 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:42 | \n",
" 1005 | \n",
" Oliver | \n",
" -0.297703 | \n",
" 0.819902 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:43 | \n",
" 1006 | \n",
" Ray | \n",
" -0.526987 | \n",
" -0.908261 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:44 | \n",
" 1025 | \n",
" Yvonne | \n",
" 0.974756 | \n",
" 0.283277 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:45 | \n",
" 978 | \n",
" Jerry | \n",
" -0.880942 | \n",
" 0.006286 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:46 | \n",
" 961 | \n",
" Ray | \n",
" 0.719504 | \n",
" 0.253418 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:47 | \n",
" 1007 | \n",
" Charlie | \n",
" -0.050834 | \n",
" 0.695579 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:48 | \n",
" 1027 | \n",
" Hannah | \n",
" -0.059038 | \n",
" 0.104054 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:49 | \n",
" 927 | \n",
" Ingrid | \n",
" -0.362784 | \n",
" 0.393166 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:50 | \n",
" 1027 | \n",
" Xavier | \n",
" -0.743345 | \n",
" -0.854612 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:51 | \n",
" 1032 | \n",
" Alice | \n",
" 0.554830 | \n",
" -0.668990 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:52 | \n",
" 1027 | \n",
" Zelda | \n",
" 0.412367 | \n",
" 0.055441 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:53 | \n",
" 999 | \n",
" Victor | \n",
" -0.326740 | \n",
" -0.423406 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:54 | \n",
" 1037 | \n",
" Sarah | \n",
" 0.168248 | \n",
" -0.136430 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:55 | \n",
" 961 | \n",
" Xavier | \n",
" 0.232512 | \n",
" 0.087668 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:56 | \n",
" 986 | \n",
" Quinn | \n",
" -0.667248 | \n",
" 0.504217 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:57 | \n",
" 1005 | \n",
" Oliver | \n",
" 0.077266 | \n",
" -0.008312 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:58 | \n",
" 969 | \n",
" Norbert | \n",
" 0.353910 | \n",
" 0.975120 | \n",
"
\n",
" \n",
" 2000-01-30 23:59:59 | \n",
" 1006 | \n",
" Kevin | \n",
" -0.370201 | \n",
" 0.251236 | \n",
"
\n",
" \n",
"
\n",
"
2592000 rows × 4 columns
\n",
"
"
],
"text/plain": [
" id name x y\n",
"timestamp \n",
"2000-01-01 00:00:00 960 Oliver -0.745248 -0.198965\n",
"2000-01-01 00:00:01 1000 Dan 0.184641 0.295430\n",
"2000-01-01 00:00:02 1020 Norbert -0.641957 -0.458981\n",
"2000-01-01 00:00:03 1015 Dan -0.631978 0.454573\n",
"2000-01-01 00:00:04 998 Edith 0.508003 0.076637\n",
"2000-01-01 00:00:05 1025 Oliver 0.633132 -0.968848\n",
"2000-01-01 00:00:06 978 Ursula -0.588642 -0.032841\n",
"2000-01-01 00:00:07 982 Charlie 0.119313 -0.518422\n",
"2000-01-01 00:00:08 1043 Victor -0.895856 -0.954497\n",
"2000-01-01 00:00:09 1020 Wendy 0.436936 -0.312972\n",
"2000-01-01 00:00:10 988 Michael 0.286573 -0.615041\n",
"2000-01-01 00:00:11 1036 Sarah -0.225445 0.062726\n",
"2000-01-01 00:00:12 983 Ray -0.087305 0.392608\n",
"2000-01-01 00:00:13 992 Ursula -0.268082 -0.700998\n",
"2000-01-01 00:00:14 1015 Zelda -0.541838 0.364939\n",
"2000-01-01 00:00:15 1017 Michael -0.373376 0.504668\n",
"2000-01-01 00:00:16 1022 Hannah 0.327997 -0.289495\n",
"2000-01-01 00:00:17 1054 Victor -0.583977 -0.654631\n",
"2000-01-01 00:00:18 967 Frank 0.199236 -0.657553\n",
"2000-01-01 00:00:19 1022 Ray -0.509110 0.923490\n",
"2000-01-01 00:00:20 960 Charlie -0.003294 0.038744\n",
"2000-01-01 00:00:21 980 Charlie -0.198883 0.541580\n",
"2000-01-01 00:00:22 1052 Oliver -0.862566 -0.976609\n",
"2000-01-01 00:00:23 936 Tim -0.970641 0.077440\n",
"2000-01-01 00:00:24 997 Edith 0.647717 0.591489\n",
"2000-01-01 00:00:25 1063 Ray -0.650042 0.499804\n",
"2000-01-01 00:00:26 1060 Tim -0.932838 0.016557\n",
"2000-01-01 00:00:27 1082 Victor 0.604015 -0.646004\n",
"2000-01-01 00:00:28 1019 Bob -0.000948 0.933974\n",
"2000-01-01 00:00:29 979 Alice -0.255095 0.991901\n",
"... ... ... ... ...\n",
"2000-01-30 23:59:30 959 Jerry -0.382988 -0.685075\n",
"2000-01-30 23:59:31 1015 Kevin -0.140484 -0.162593\n",
"2000-01-30 23:59:32 992 Tim -0.731128 -0.814783\n",
"2000-01-30 23:59:33 1005 Alice -0.270136 0.132457\n",
"2000-01-30 23:59:34 1002 Quinn 0.537519 -0.086152\n",
"2000-01-30 23:59:35 994 Ray 0.963644 -0.561226\n",
"2000-01-30 23:59:36 1013 Jerry 0.108260 -0.756177\n",
"2000-01-30 23:59:37 1001 Edith 0.201864 -0.345191\n",
"2000-01-30 23:59:38 969 Kevin -0.392416 0.482699\n",
"2000-01-30 23:59:39 1006 Yvonne 0.858664 -0.864165\n",
"2000-01-30 23:59:40 998 Oliver -0.034046 0.942656\n",
"2000-01-30 23:59:41 1019 Charlie -0.189008 -0.265194\n",
"2000-01-30 23:59:42 1005 Oliver -0.297703 0.819902\n",
"2000-01-30 23:59:43 1006 Ray -0.526987 -0.908261\n",
"2000-01-30 23:59:44 1025 Yvonne 0.974756 0.283277\n",
"2000-01-30 23:59:45 978 Jerry -0.880942 0.006286\n",
"2000-01-30 23:59:46 961 Ray 0.719504 0.253418\n",
"2000-01-30 23:59:47 1007 Charlie -0.050834 0.695579\n",
"2000-01-30 23:59:48 1027 Hannah -0.059038 0.104054\n",
"2000-01-30 23:59:49 927 Ingrid -0.362784 0.393166\n",
"2000-01-30 23:59:50 1027 Xavier -0.743345 -0.854612\n",
"2000-01-30 23:59:51 1032 Alice 0.554830 -0.668990\n",
"2000-01-30 23:59:52 1027 Zelda 0.412367 0.055441\n",
"2000-01-30 23:59:53 999 Victor -0.326740 -0.423406\n",
"2000-01-30 23:59:54 1037 Sarah 0.168248 -0.136430\n",
"2000-01-30 23:59:55 961 Xavier 0.232512 0.087668\n",
"2000-01-30 23:59:56 986 Quinn -0.667248 0.504217\n",
"2000-01-30 23:59:57 1005 Oliver 0.077266 -0.008312\n",
"2000-01-30 23:59:58 969 Norbert 0.353910 0.975120\n",
"2000-01-30 23:59:59 1006 Kevin -0.370201 0.251236\n",
"\n",
"[2592000 rows x 4 columns]"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"## Pandas\n",
"In order to create a `Pandas` dataframe we can use the `compute()` "
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" name | \n",
" x | \n",
" y | \n",
"
\n",
" \n",
" timestamp | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2000-01-01 00:00:00 | \n",
" 960 | \n",
" Oliver | \n",
" -0.745248 | \n",
" -0.198965 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:01 | \n",
" 1000 | \n",
" Dan | \n",
" 0.184641 | \n",
" 0.295430 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:02 | \n",
" 1020 | \n",
" Norbert | \n",
" -0.641957 | \n",
" -0.458981 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:03 | \n",
" 1015 | \n",
" Dan | \n",
" -0.631978 | \n",
" 0.454573 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:04 | \n",
" 998 | \n",
" Edith | \n",
" 0.508003 | \n",
" 0.076637 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id name x y\n",
"timestamp \n",
"2000-01-01 00:00:00 960 Oliver -0.745248 -0.198965\n",
"2000-01-01 00:00:01 1000 Dan 0.184641 0.295430\n",
"2000-01-01 00:00:02 1020 Norbert -0.641957 -0.458981\n",
"2000-01-01 00:00:03 1015 Dan -0.631978 0.454573\n",
"2000-01-01 00:00:04 998 Edith 0.508003 0.076637"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"pdf = ddf.compute() \n",
"print(type(pdf))\n",
"pdf.head()"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"## Creating a `Dask dataframe` from `Pandas`"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"source": [
"In order to utilize `Dask` capablities on an existing `Pandas dataframe` (pdf) we need to convert the `Pandas dataframe` into a `Dask dataframe` (ddf) with the [from_pandas](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.from_pandas) method. \n",
"You must supply the number of `partitions` or `chunksize` that will be used to generate the dask dataframe"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n"
]
},
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" name | \n",
" x | \n",
" y | \n",
"
\n",
" \n",
" timestamp | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2000-01-01 00:00:00 | \n",
" 960 | \n",
" Oliver | \n",
" -0.745248 | \n",
" -0.198965 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:01 | \n",
" 1000 | \n",
" Dan | \n",
" 0.184641 | \n",
" 0.295430 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:02 | \n",
" 1020 | \n",
" Norbert | \n",
" -0.641957 | \n",
" -0.458981 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:03 | \n",
" 1015 | \n",
" Dan | \n",
" -0.631978 | \n",
" 0.454573 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:04 | \n",
" 998 | \n",
" Edith | \n",
" 0.508003 | \n",
" 0.076637 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id name x y\n",
"timestamp \n",
"2000-01-01 00:00:00 960 Oliver -0.745248 -0.198965\n",
"2000-01-01 00:00:01 1000 Dan 0.184641 0.295430\n",
"2000-01-01 00:00:02 1020 Norbert -0.641957 -0.458981\n",
"2000-01-01 00:00:03 1015 Dan -0.631978 0.454573\n",
"2000-01-01 00:00:04 998 Edith 0.508003 0.076637"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf2 = dd.from_pandas(pdf, npartitions=10)\n",
"print(type(ddf2))\n",
"\n",
"ddf2.head() "
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"## Partitions in Dask Dataframes"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "notes"
}
},
"source": [
"Notice that when we created a `Dask dataframe` we needed to supply an argument of `npartitions`. \n",
"The number of partitions will assist `Dask` on how it's going to parallelize the computation. \n",
"Each partition is a *separate* dataframe. For additional information see [partition documentation](https://docs.dask.org/en/latest/dataframe-design.html?highlight=meta%20utils#partitions) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
"Using `reset_index()` method we can examin the partitions: "
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"timestamp 2000-01-01 00:00:00\n",
"id 960\n",
"name Oliver\n",
"x -0.745248\n",
"y -0.198965\n",
"Name: 0, dtype: object"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"pdf2 = pdf.reset_index()\n",
"pdf2.loc[0] # Only 1 row"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"source": [
"Now lets look at a `Dask` dataframe"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"slideshow": {
"slide_type": "fragment"
}
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" timestamp | \n",
" id | \n",
" name | \n",
" x | \n",
" y | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 2000-01-01 | \n",
" 960 | \n",
" Oliver | \n",
" -0.745248 | \n",
" -0.198965 | \n",
"
\n",
" \n",
" 0 | \n",
" 2000-01-04 | \n",
" 1051 | \n",
" Jerry | \n",
" -0.810596 | \n",
" 0.703804 | \n",
"
\n",
" \n",
" 0 | \n",
" 2000-01-07 | \n",
" 1044 | \n",
" Sarah | \n",
" 0.622235 | \n",
" 0.849257 | \n",
"
\n",
" \n",
" 0 | \n",
" 2000-01-10 | \n",
" 983 | \n",
" Jerry | \n",
" 0.472132 | \n",
" -0.323176 | \n",
"
\n",
" \n",
" 0 | \n",
" 2000-01-13 | \n",
" 1027 | \n",
" Ray | \n",
" 0.770773 | \n",
" 0.441961 | \n",
"
\n",
" \n",
" 0 | \n",
" 2000-01-16 | \n",
" 1042 | \n",
" Norbert | \n",
" 0.611781 | \n",
" 0.139298 | \n",
"
\n",
" \n",
" 0 | \n",
" 2000-01-19 | \n",
" 1043 | \n",
" Xavier | \n",
" 0.178683 | \n",
" -0.507595 | \n",
"
\n",
" \n",
" 0 | \n",
" 2000-01-22 | \n",
" 1004 | \n",
" Tim | \n",
" -0.080435 | \n",
" 0.210997 | \n",
"
\n",
" \n",
" 0 | \n",
" 2000-01-25 | \n",
" 965 | \n",
" Wendy | \n",
" 0.724862 | \n",
" 0.148501 | \n",
"
\n",
" \n",
" 0 | \n",
" 2000-01-28 | \n",
" 1014 | \n",
" Bob | \n",
" -0.929208 | \n",
" 0.858530 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" timestamp id name x y\n",
"0 2000-01-01 960 Oliver -0.745248 -0.198965\n",
"0 2000-01-04 1051 Jerry -0.810596 0.703804\n",
"0 2000-01-07 1044 Sarah 0.622235 0.849257\n",
"0 2000-01-10 983 Jerry 0.472132 -0.323176\n",
"0 2000-01-13 1027 Ray 0.770773 0.441961\n",
"0 2000-01-16 1042 Norbert 0.611781 0.139298\n",
"0 2000-01-19 1043 Xavier 0.178683 -0.507595\n",
"0 2000-01-22 1004 Tim -0.080435 0.210997\n",
"0 2000-01-25 965 Wendy 0.724862 0.148501\n",
"0 2000-01-28 1014 Bob -0.929208 0.858530"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf2 = ddf2.reset_index() \n",
"ddf2.loc[0].compute() # each partition has an index=0"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"## dataframe.shape \n",
"since `Dask` is lazy we cannot get the full shape before running `len`"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Pandas shape: (2592000, 4)\n",
"---------------------------\n",
"Dask lazy shape: (Delayed('int-fecc34b2-1c31-40bf-b068-1159d4ab4cbc'), 4)\n"
]
}
],
"source": [
"print(f'Pandas shape: {pdf.shape}')\n",
"print('---------------------------')\n",
"print(f'Dask lazy shape: {ddf.shape}') "
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Dask computed shape: 2,592,000\n"
]
}
],
"source": [
"print(f'Dask computed shape: {len(ddf.index):,}') # expensive"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"source": [
"Now that we have a `dask` (ddf) and a `pandas` (pdf) dataframe we can start to compair the interactions with them."
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"# Moving from Update to Insert/Delete\n",
"\n",
"\n",
"![inplaceTrue](images/inplace_true.png \"inplace_true\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"source": [
"Dask does not update - thus there are no arguments such as `inplace=True` which exist in Pandas. \n",
"For more detials see [issue#653 on github](https://github.com/dask/dask/issues/653)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"### Rename Columns"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Index(['id', 'name', 'x', 'y'], dtype='object')\n"
]
},
{
"data": {
"text/plain": [
"Index(['ID', 'name', 'x', 'y'], dtype='object')"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Pandas \n",
"print(pdf.columns)\n",
"pdf.rename(columns={'id':'ID'}, inplace=True)\n",
"pdf.columns"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"slideshow": {
"slide_type": "fragment"
}
},
"outputs": [],
"source": [
"# Dask - Error\n",
"# ddf.rename(columns={'id':'ID'}, inplace=True)\n",
"# ddf.columns\n",
"\n",
"'''\n",
"--------------------------------------------------------------------------- \n",
"TypeError Traceback (most recent call last) \n",
" in \n",
" 1 # Dask - Error \n",
"----> 2 ddf.rename(columns={'id':'ID'}, inplace=True) \n",
" 3 ddf.columns \n",
"TypeError: rename() got an unexpected keyword argument 'inplace' \n",
"'''"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"source": [
"* using `inplace=True` is *not* considerd to be *best practice*. "
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Index(['id', 'name', 'x', 'y'], dtype='object')\n"
]
},
{
"data": {
"text/plain": [
"Index(['ID', 'name', 'x', 'y'], dtype='object')"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Dask or Pandas\n",
"print(ddf.columns)\n",
"ddf = ddf.rename(columns={'id':'ID'})\n",
"ddf.columns"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## Data manipulation "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### loc - Pandas"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" ID | \n",
" name | \n",
" x | \n",
" y | \n",
"
\n",
" \n",
" timestamp | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2000-01-01 00:00:04 | \n",
" 998 | \n",
" Edith | \n",
" 0.508003 | \n",
" 7.663685 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:05 | \n",
" 1025 | \n",
" Oliver | \n",
" 0.633132 | \n",
" -96.884843 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" ID name x y\n",
"timestamp \n",
"2000-01-01 00:00:04 998 Edith 0.508003 7.663685\n",
"2000-01-01 00:00:05 1025 Oliver 0.633132 -96.884843"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"mask_cond = (pdf['x']>0.5) & (pdf['x']<0.8)\n",
"pdf.loc[mask_cond, ['y']] = pdf['y']* 100\n",
"pdf[mask_cond].head(2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Error\n",
"# cond_dask = (ddf['x']>0.5) & (ddf['x']<0.8)\n",
"# ddf.loc[cond_dask, ['y']] = ddf['y']* 100\n",
"\n",
"'''\n",
"> TypeError Traceback (most recent call last) \n",
"> in \n",
"> 2 # Error \n",
"> ----> 3 ddf.loc[cond_dask, ['y']] = ddf['y']* 100 \n",
"> TypeError: '_LocIndexer' object does not support item assignment \n",
"'''"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"### Dask - use mask/where"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```python\n",
"# Pandas\n",
"mask_cond = (pdf['x']>0.5) & (pdf['x']<0.8)\n",
"pdf.loc[mask_cond, ['y']] = pdf['y']* 100\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" ID | \n",
" name | \n",
" x | \n",
" y | \n",
"
\n",
" \n",
" timestamp | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2000-01-01 00:00:04 | \n",
" 998 | \n",
" Edith | \n",
" 0.508003 | \n",
" 7.663685 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:05 | \n",
" 1025 | \n",
" Oliver | \n",
" 0.633132 | \n",
" -96.884843 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" ID name x y\n",
"timestamp \n",
"2000-01-01 00:00:04 998 Edith 0.508003 7.663685\n",
"2000-01-01 00:00:05 1025 Oliver 0.633132 -96.884843"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"mask_cond = (ddf['x']>0.5) & (ddf['x']<0.8)\n",
"\n",
"ddf['y'] = ddf['y'].mask(cond=mask_cond, other=ddf['y']* 100)\n",
"ddf[mask_cond].head(2)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"source": [
"[dask mask documentation](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.mask)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## Meta argument\n",
"\n",
"> `meta` is the prescription of the names/types of the computation output \n",
"[see stack overflow answer](https://stackoverflow.com/questions/44432868/dask-dataframe-apply-meta)\n",
"\n",
"![crystal python](images/crystalBallsnake.png \"crystal snake\")\n",
"Since `Dask` creates a DAG for the computation it requires to understand what are the outputs of each calculation (see [meta documentation](https://docs.dask.org/en/latest/dataframe-design.html?highlight=meta%20utils#metadata))"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" ID | \n",
" name | \n",
" x | \n",
" y | \n",
" initials | \n",
"
\n",
" \n",
" timestamp | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2000-01-01 00:00:00 | \n",
" 960 | \n",
" Oliver | \n",
" -0.745248 | \n",
" -0.198965 | \n",
" Ol | \n",
"
\n",
" \n",
" 2000-01-01 00:00:01 | \n",
" 1000 | \n",
" Dan | \n",
" 0.184641 | \n",
" 0.295430 | \n",
" Da | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" ID name x y initials\n",
"timestamp \n",
"2000-01-01 00:00:00 960 Oliver -0.745248 -0.198965 Ol\n",
"2000-01-01 00:00:01 1000 Dan 0.184641 0.295430 Da"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])\n",
"pdf.head(2)"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/home/ds/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/dataframe/core.py:2345: UserWarning: \n",
"You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.\n",
"To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.\n",
" Before: .apply(func)\n",
" After: .apply(func, meta=('name', 'object'))\n",
"\n",
" warnings.warn(meta_warning(meta))\n"
]
},
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" ID | \n",
" name | \n",
" x | \n",
" y | \n",
" initials | \n",
"
\n",
" \n",
" timestamp | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2000-01-01 00:00:00 | \n",
" 960 | \n",
" Oliver | \n",
" -0.745248 | \n",
" -0.198965 | \n",
" Ol | \n",
"
\n",
" \n",
" 2000-01-01 00:00:01 | \n",
" 1000 | \n",
" Dan | \n",
" 0.184641 | \n",
" 0.295430 | \n",
" Da | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" ID name x y initials\n",
"timestamp \n",
"2000-01-01 00:00:00 960 Oliver -0.745248 -0.198965 Ol\n",
"2000-01-01 00:00:01 1000 Dan 0.184641 0.295430 Da"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])\n",
"ddf.head(2)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"#### Introducing meta argument"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" ID | \n",
" name | \n",
" x | \n",
" y | \n",
" initials | \n",
"
\n",
" \n",
" timestamp | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2000-01-01 00:00:00 | \n",
" 960 | \n",
" Oliver | \n",
" -0.745248 | \n",
" -0.198965 | \n",
" Ol | \n",
"
\n",
" \n",
" 2000-01-01 00:00:01 | \n",
" 1000 | \n",
" Dan | \n",
" 0.184641 | \n",
" 0.295430 | \n",
" Da | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" ID name x y initials\n",
"timestamp \n",
"2000-01-01 00:00:00 960 Oliver -0.745248 -0.198965 Ol\n",
"2000-01-01 00:00:01 1000 Dan 0.184641 0.295430 Da"
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Describe the outcome type of the calculation\n",
"meta_cal = pd.Series(object, name='initials')\n",
"ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1]\n",
" , meta = meta_cal)\n",
"ddf.head(2)"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" ID | \n",
" name | \n",
" x | \n",
" y | \n",
" initials | \n",
" z | \n",
"
\n",
" \n",
" timestamp | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2000-01-01 00:00:00 | \n",
" 960 | \n",
" Oliver | \n",
" -0.745248 | \n",
" -0.198965 | \n",
" Ol | \n",
" 0.198965 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:01 | \n",
" 1000 | \n",
" Dan | \n",
" 0.184641 | \n",
" 0.295430 | \n",
" Da | \n",
" 184.640930 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" ID name x y initials z\n",
"timestamp \n",
"2000-01-01 00:00:00 960 Oliver -0.745248 -0.198965 Ol 0.198965\n",
"2000-01-01 00:00:01 1000 Dan 0.184641 0.295430 Da 184.640930"
]
},
"execution_count": 22,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def func(row, col1, col2):\n",
" if (row[col1]> 0): return row[col1] * 1000 \n",
" else: return row[col2] * -1\n",
"ddf['z'] = ddf.apply(func, args=('x', 'y'), axis=1\n",
" , meta=('z', 'float'))\n",
"ddf.head(2)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"### Map partitions\n",
"* We can supply an ad-hoc function to run on each partition using the [map_partitions](https://dask.readthedocs.io/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions) method. \n",
"Mainly useful for functions that are not implemented in `Dask` or `Pandas` . \n",
"* Finally we can return a new `dataframe` which needs to be described in the `meta` argument \n",
"The function could also include arguments."
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" ID | \n",
" name | \n",
" x | \n",
" y | \n",
" dist | \n",
"
\n",
" \n",
" timestamp | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2000-01-01 00:00:00 | \n",
" 960 | \n",
" Oliver | \n",
" -0.745248 | \n",
" -0.198965 | \n",
" NaN | \n",
"
\n",
" \n",
" 2000-01-01 00:00:01 | \n",
" 1000 | \n",
" Dan | \n",
" 0.184641 | \n",
" 0.295430 | \n",
" 1.053148 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:02 | \n",
" 1020 | \n",
" Norbert | \n",
" -0.641957 | \n",
" -0.458981 | \n",
" 1.119107 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:03 | \n",
" 1015 | \n",
" Dan | \n",
" -0.631978 | \n",
" 0.454573 | \n",
" 0.913609 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:04 | \n",
" 998 | \n",
" Edith | \n",
" 0.508003 | \n",
" 7.663685 | \n",
" 7.298689 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" ID name x y dist\n",
"timestamp \n",
"2000-01-01 00:00:00 960 Oliver -0.745248 -0.198965 NaN\n",
"2000-01-01 00:00:01 1000 Dan 0.184641 0.295430 1.053148\n",
"2000-01-01 00:00:02 1020 Norbert -0.641957 -0.458981 1.119107\n",
"2000-01-01 00:00:03 1015 Dan -0.631978 0.454573 0.913609\n",
"2000-01-01 00:00:04 998 Edith 0.508003 7.663685 7.298689"
]
},
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import numpy as np\n",
"def func2(df, coor_x, coor_y, drop_cols):\n",
" df['dist'] = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 \n",
" + (df[coor_y] - df[coor_y].shift())**2 )\n",
" df = df.drop(drop_cols, axis=1)\n",
" return df\n",
"\n",
"ddf2 = ddf.map_partitions(func2\n",
" , coor_x='x'\n",
" , coor_y='y'\n",
" , drop_cols=['initials', 'z']\n",
" , meta=pd.DataFrame({'ID':'i8'\n",
" , 'name':str\n",
" , 'x':'f8'\n",
" , 'y':'f8' \n",
" , 'dist':'f8'}, index=[0]))\n",
"ddf2.head()"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"### Convert index into DateTime column"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" ID | \n",
" name | \n",
" x | \n",
" y | \n",
" initials | \n",
" times | \n",
"
\n",
" \n",
" timestamp | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2000-01-01 00:00:00 | \n",
" 960 | \n",
" Oliver | \n",
" -0.745248 | \n",
" -0.198965 | \n",
" Ol | \n",
" 00:00:00 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:01 | \n",
" 1000 | \n",
" Dan | \n",
" 0.184641 | \n",
" 0.295430 | \n",
" Da | \n",
" 00:00:01 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" ID name x y initials times\n",
"timestamp \n",
"2000-01-01 00:00:00 960 Oliver -0.745248 -0.198965 Ol 00:00:00\n",
"2000-01-01 00:00:01 1000 Dan 0.184641 0.295430 Da 00:00:01"
]
},
"execution_count": 24,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Only Pandas\n",
"pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)\n",
"pdf.head(2)"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {
"slideshow": {
"slide_type": "fragment"
}
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" ID | \n",
" name | \n",
" x | \n",
" y | \n",
" initials | \n",
" z | \n",
" times | \n",
"
\n",
" \n",
" timestamp | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2000-01-01 00:00:00 | \n",
" 960 | \n",
" Oliver | \n",
" -0.745248 | \n",
" -0.198965 | \n",
" Ol | \n",
" 0.198965 | \n",
" 00:00:00 | \n",
"
\n",
" \n",
" 2000-01-01 00:00:01 | \n",
" 1000 | \n",
" Dan | \n",
" 0.184641 | \n",
" 0.295430 | \n",
" Da | \n",
" 184.640930 | \n",
" 00:00:01 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" ID name x y initials z \\\n",
"timestamp \n",
"2000-01-01 00:00:00 960 Oliver -0.745248 -0.198965 Ol 0.198965 \n",
"2000-01-01 00:00:01 1000 Dan 0.184641 0.295430 Da 184.640930 \n",
"\n",
" times \n",
"timestamp \n",
"2000-01-01 00:00:00 00:00:00 \n",
"2000-01-01 00:00:01 00:00:01 "
]
},
"execution_count": 25,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# ddf.assign(times= dd.to_datetime(ddf.index).dt.time)\n",
"# Dask or Pandas\n",
"ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))\n",
"ddf['times'] = ddf['times'].dt.time\n",
"ddf =client.persist(ddf)\n",
"ddf.head(2)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## Drop NA on column"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [],
"source": [
"pdf = pdf.drop(labels=['initials'],axis=1)\n",
"ddf = ddf.drop(labels=['initials','z'],axis=1)"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"pandas: ID name x y times colna\n",
"timestamp \n",
"2000-01-01 960 Oliver -0.745248 -0.198965 00:00:00 None\n",
"dask: ID name x y times colna\n",
"timestamp \n",
"2000-01-01 960 Oliver -0.745248 -0.198965 00:00:00 None\n"
]
}
],
"source": [
"pdf = pdf.assign(colna = None)\n",
"print(f'pandas: {pdf.head(1)}')\n",
"ddf = ddf.assign(colna = None)\n",
"print(f'dask: {ddf.head(1)}')"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"In odrer for `Dask` to drop a column with all `na` we need to assist the graph"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"pandas: ID name x y times\n",
"timestamp \n",
"2000-01-01 960 Oliver -0.745248 -0.198965 00:00:00\n",
"dask: ID name x y times\n",
"timestamp \n",
"2000-01-01 960 Oliver -0.745248 -0.198965 00:00:00\n"
]
}
],
"source": [
"pdf = pdf.dropna(axis=1, how='all')\n",
"print(f'pandas: {pdf.head(1)}')\n",
"# check if all values in column are Null - expensive\n",
"if ddf.colna.isnull().all() == True: \n",
" ddf = ddf.drop(labels=['colna'],axis=1)\n",
"print(f'dask: {ddf.compute().head(1)}')"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"## Reset Index"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" ID | \n",
" name | \n",
" x | \n",
" y | \n",
" times | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 960 | \n",
" Oliver | \n",
" -0.745248 | \n",
" -0.198965 | \n",
" 00:00:00 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" ID name x y times\n",
"0 960 Oliver -0.745248 -0.198965 00:00:00"
]
},
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Pandas\n",
"pdf = pdf.reset_index(drop=True)\n",
"pdf.head(1)"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {
"scrolled": true
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" ID | \n",
" name | \n",
" x | \n",
" y | \n",
" times | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 960 | \n",
" Oliver | \n",
" -0.745248 | \n",
" -0.198965 | \n",
" 00:00:00 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" ID name x y times\n",
"0 960 Oliver -0.745248 -0.198965 00:00:00"
]
},
"execution_count": 30,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Dask\n",
"ddf = ddf.reset_index()\n",
"ddf = ddf.drop(labels=['timestamp'], axis=1 )\n",
"ddf.head(1)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"# Read / Save files"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"When working with `pandas` and `dask` preferable try and work with [parquet](https://docs.dask.org/en/latest/dataframe-best-practices.html?highlight=parquet#store-data-in-apache-parquet-format). \n",
"Even so when working with `Dask` - the files can be read with multiple workers . \n",
"Most `kwargs` are applicable for reading and writing files [see documentaion](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.to_csv) (including the option for output file naming). \n",
"e.g. \n",
"ddf = dd.read_csv('data/pd2dd/ddf*.csv', compression='gzip', header=False). \n",
"\n",
"However some are not available such as `nrows`."
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"## Save files"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 16 s, sys: 34 ms, total: 16.1 s\n",
"Wall time: 16 s\n"
]
}
],
"source": [
"%%time\n",
"# Pandas\n",
"from pathlib import Path\n",
"output_file = 'pdf_single_file.csv'\n",
"output_dir = Path('data/')\n",
"output_dir.mkdir(parents=True, exist_ok=True)\n",
"pdf.to_csv(output_dir / output_file)"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[PosixPath('data/pdf_single_file.csv')]"
]
},
"execution_count": 32,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"list(Path(output_dir).glob('*.csv'))"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"`Dask`\n",
"Notice the '*' to allow for multiple file renaming. \n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 884 ms, sys: 76.6 ms, total: 961 ms\n",
"Wall time: 6.96 s\n"
]
}
],
"source": [
"%%time\n",
"# Dask\n",
"output_dask_dir = Path('data/pd2dd/')\n",
"output_dir.mkdir(parents=True, exist_ok=True)\n",
"ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"To find the number of partitions which will determine the number of output files use [dask.dataframe.npartitions](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.npartitions) "
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"30"
]
},
"execution_count": 34,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf.npartitions"
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [
{
"data": {
"text/plain": [
"[PosixPath('data/pd2dd/ddf20.csv'),\n",
" PosixPath('data/pd2dd/ddf27.csv'),\n",
" PosixPath('data/pd2dd/ddf06.csv'),\n",
" PosixPath('data/pd2dd/ddf22.csv'),\n",
" PosixPath('data/pd2dd/ddf18.csv'),\n",
" PosixPath('data/pd2dd/ddf16.csv'),\n",
" PosixPath('data/pd2dd/ddf03.csv'),\n",
" PosixPath('data/pd2dd/ddf07.csv'),\n",
" PosixPath('data/pd2dd/ddf12.csv'),\n",
" PosixPath('data/pd2dd/ddf25.csv'),\n",
" PosixPath('data/pd2dd/ddf10.csv'),\n",
" PosixPath('data/pd2dd/ddf29.csv'),\n",
" PosixPath('data/pd2dd/ddf09.csv'),\n",
" PosixPath('data/pd2dd/ddf28.csv'),\n",
" PosixPath('data/pd2dd/ddf19.csv'),\n",
" PosixPath('data/pd2dd/ddf23.csv'),\n",
" PosixPath('data/pd2dd/ddf02.csv'),\n",
" PosixPath('data/pd2dd/ddf24.csv'),\n",
" PosixPath('data/pd2dd/ddf15.csv'),\n",
" PosixPath('data/pd2dd/ddf21.csv'),\n",
" PosixPath('data/pd2dd/ddf13.csv'),\n",
" PosixPath('data/pd2dd/ddf26.csv'),\n",
" PosixPath('data/pd2dd/ddf17.csv'),\n",
" PosixPath('data/pd2dd/ddf14.csv'),\n",
" PosixPath('data/pd2dd/ddf01.csv'),\n",
" PosixPath('data/pd2dd/ddf04.csv'),\n",
" PosixPath('data/pd2dd/ddf08.csv'),\n",
" PosixPath('data/pd2dd/ddf00.csv'),\n",
" PosixPath('data/pd2dd/ddf11.csv'),\n",
" PosixPath('data/pd2dd/ddf05.csv')]"
]
},
"execution_count": 35,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"list(Path(output_dask_dir).glob('*.csv'))"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"source": [
"To change the number of output files use [repartition](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.repartition) which is an expensive operation."
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"## Read files"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"source": [
"For `pandas` it is possible to iterate and concat the files [see answer from stack overflow](https://stackoverflow.com/questions/20906474/import-multiple-csv-files-into-pandas-and-concatenate-into-one-dataframe)."
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 2.34 s, sys: 131 ms, total: 2.47 s\n",
"Wall time: 2.37 s\n"
]
}
],
"source": [
"%%time\n",
"# Pandas\n",
"dir_path = Path(r'data/pd2dd')\n",
"concat_df = pd.concat([pd.read_csv(f) for f in list(dir_path.glob('*.csv'))])\n",
"len(concat_df)"
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {
"slideshow": {
"slide_type": "fragment"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 361 ms, sys: 4.78 ms, total: 366 ms\n",
"Wall time: 1.06 s\n"
]
}
],
"source": [
"%%time\n",
"# Dask\n",
"_ddf = dd.read_csv('data/pd2dd/ddf*.csv')\n",
"len(_ddf)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
" ## Consider using Persist\n",
"Since Dask is lazy - it may run the **entire** graph/DAG (again) even if it already run part of the calculation in a previous cell. Thus use [persist](https://docs.dask.org/en/latest/dataframe-best-practices.html?highlight=parquet#persist-intelligently) to keep the results in memory \n",
"```python\n",
"ddf = client.persist(ddf)\n",
"```\n",
"This is different from Pandas which once a variable was created it will keep all data in memory. \n",
"Additional information can be read in this [stackoverflow issue](https://stackoverflow.com/questions/45941528/how-to-efficiently-send-a-large-numpy-array-to-the-cluster-with-dask-array/45941529#45941529) or see an exampel in [this post](http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes) \n",
"This concept should also be used when running a code within a script (rather then a jupyter notebook) which incoperates loops within the code."
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {
"scrolled": true
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" ID | \n",
" name | \n",
" x | \n",
" y | \n",
" times | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 960 | \n",
" Oliver | \n",
" -0.745248 | \n",
" -0.198965 | \n",
" 00:00:00 | \n",
"
\n",
" \n",
" 1 | \n",
" 1000 | \n",
" Dan | \n",
" 0.184641 | \n",
" 0.295430 | \n",
" 00:00:01 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" ID name x y times\n",
"0 960 Oliver -0.745248 -0.198965 00:00:00\n",
"1 1000 Dan 0.184641 0.295430 00:00:01"
]
},
"execution_count": 39,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"_ddf = dd.read_csv('data/pd2dd/ddf*.csv')\n",
"# do some filter\n",
"_ddf = client.persist(_ddf)\n",
"# do some computations\n",
"_ddf.head(2)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"# Group By - custom aggregations\n",
"In addition to the [groupby notebook example](https://github.com/dask/dask-examples/blob/master/dataframes/02-groupby.ipynb) - \n",
"this is another example how to try to eliminate the use of `groupby.apply` \n",
"In this example we are grouping by columns into unique list."
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" name | \n",
" ID | \n",
" seconds | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" Oliver | \n",
" 960 | \n",
" 00 | \n",
"
\n",
" \n",
" 1 | \n",
" Dan | \n",
" 1000 | \n",
" 00 | \n",
"
\n",
" \n",
" 2 | \n",
" Norbert | \n",
" 1020 | \n",
" 00 | \n",
"
\n",
" \n",
" 3 | \n",
" Dan | \n",
" 1015 | \n",
" 00 | \n",
"
\n",
" \n",
" 4 | \n",
" Edith | \n",
" 998 | \n",
" 00 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" name ID seconds\n",
"0 Oliver 960 00\n",
"1 Dan 1000 00\n",
"2 Norbert 1020 00\n",
"3 Dan 1015 00\n",
"4 Edith 998 00"
]
},
"execution_count": 41,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# prepare pandas dataframe\n",
"pdf = pdf.assign(time=pd.to_datetime(pdf.index).time)\n",
"pdf['seconds'] = pdf.time.astype(str).str[-2:]\n",
"cols_for_demo =['name', 'ID','seconds']\n",
"pdf[cols_for_demo].head()"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.14 s, sys: 14.9 ms, total: 1.16 s\n",
"Wall time: 1.14 s\n"
]
}
],
"source": [
"%%time\n",
"pdf_gb = pdf.groupby(pdf.name)\n",
"gp_col = ['ID', 'seconds']\n",
"list_ser_gb = [pdf_gb[att_col_gr].apply\n",
" (lambda x: list(set(x.to_list()))) \n",
" for att_col_gr in gp_col]\n",
"df_edge_att = pdf_gb.size().to_frame(name=\"Weight\")\n",
"for ser in list_ser_gb:\n",
" df_edge_att = df_edge_att.join(ser.to_frame(), how='left') "
]
},
{
"cell_type": "code",
"execution_count": 43,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" Weight | \n",
" ID | \n",
" seconds | \n",
"
\n",
" \n",
" name | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" Alice | \n",
" 99839 | \n",
" [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... | \n",
" [51, 21, 57, 30, 26, 41, 94, 45, 07, 54, 78, 7... | \n",
"
\n",
" \n",
" Bob | \n",
" 99407 | \n",
" [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... | \n",
" [51, 21, 57, 30, 26, 41, 94, 45, 07, 54, 78, 7... | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" Weight ID \\\n",
"name \n",
"Alice 99839 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... \n",
"Bob 99407 [1024, 1025, 1026, 1027, 1028, 1029, 1030, 103... \n",
"\n",
" seconds \n",
"name \n",
"Alice [51, 21, 57, 30, 26, 41, 94, 45, 07, 54, 78, 7... \n",
"Bob [51, 21, 57, 30, 26, 41, 94, 45, 07, 54, 78, 7... "
]
},
"execution_count": 43,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_edge_att.head(2)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"source": [
"In any case sometimes using Pandas is more efficiante (assuming that you can load all the data into the RAM). \n",
"In this case Pandas is faster"
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" name | \n",
" ID | \n",
" seconds | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" Oliver | \n",
" 960 | \n",
" 00 | \n",
"
\n",
" \n",
" 1 | \n",
" Dan | \n",
" 1000 | \n",
" 01 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" name ID seconds\n",
"0 Oliver 960 00\n",
"1 Dan 1000 01"
]
},
"execution_count": 44,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"def set_list_att(x: dd.Series):\n",
" return list(set([item for item in x.values]))\n",
"ddf['seconds'] = ddf.times.astype(str).str[-2:]\n",
"ddf = client.persist(ddf)\n",
"ddf[cols_for_demo].head(2)"
]
},
{
"cell_type": "code",
"execution_count": 48,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 2.22 s, sys: 102 ms, total: 2.32 s\n",
"Wall time: 6.24 s\n"
]
}
],
"source": [
"%%time\n",
"# Dask option1 using apply\n",
"# notice the meta argument in the apply function\n",
"df_gb = ddf.groupby(ddf.name)\n",
"gp_col = ['ID', 'seconds']\n",
"list_ser_gb = [df_gb[att_col_gr].apply(set_list_att\n",
" ,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att')) \n",
" for att_col_gr in gp_col]\n",
"df_edge_att = df_gb.size().to_frame(name=\"Weight\")\n",
"for ser in list_ser_gb:\n",
" df_edge_att = df_edge_att.join(ser.to_frame(), how='left')\n",
"df_edge_att.head(2)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"Using [dask custom aggregation](https://docs.dask.org/en/latest/dataframe-api.html?highlight=dropna#dask.dataframe.groupby.Aggregation) is consideribly better"
]
},
{
"cell_type": "code",
"execution_count": 46,
"metadata": {},
"outputs": [],
"source": [
"# Dask\n",
"import itertools\n",
"custom_agg = dd.Aggregation(\n",
" 'custom_agg', \n",
" lambda s: s.apply(set), \n",
" lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)"
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 385 ms, sys: 14.2 ms, total: 399 ms\n",
"Wall time: 1.3 s\n"
]
}
],
"source": [
"%%time\n",
"# Dask option1 using apply\n",
"df_gb = ddf.groupby(ddf.name)\n",
"gp_col = ['ID', 'seconds']\n",
"list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]\n",
"df_edge_att = df_gb.size().to_frame(name=\"Weight\")\n",
"for ser in list_ser_gb:\n",
" df_edge_att = df_edge_att.join(ser.to_frame(), how='left')\n",
"df_edge_att.head(2) "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"outputs": [],
"source": [
"df_edge_att.head()"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## [Debugging](https://docs.dask.org/en/latest/debugging.html)\n",
"Debugging may be challenging...\n",
"1. Run code without client \n",
"2. Verify integraty of DAG\n",
"3. Use Dashboard profiler"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"## Corrupted DAG"
]
},
{
"cell_type": "code",
"execution_count": 49,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" name | \n",
" x | \n",
" y | \n",
"
\n",
" \n",
" timestamp | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2000-01-01 | \n",
" 1033 | \n",
" Michael | \n",
" 0.974223 | \n",
" -0.087096 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id name x y\n",
"timestamp \n",
"2000-01-01 1033 Michael 0.974223 -0.087096"
]
},
"execution_count": 49,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# reset dataframe\n",
"ddf = dask.datasets.timeseries()\n",
"ddf.head(1)"
]
},
{
"cell_type": "code",
"execution_count": 50,
"metadata": {},
"outputs": [],
"source": [
"def func_dist2(df, coor_x, coor_y):\n",
" dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())^2 \n",
" + (df[coor_y] - df[coor_y].shift())^2 )\n",
" return dist\n",
"ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'\n",
" , meta=('float'))"
]
},
{
"cell_type": "code",
"execution_count": 51,
"metadata": {
"slideshow": {
"slide_type": "fragment"
}
},
"outputs": [
{
"ename": "TypeError",
"evalue": "unsupported operand type(s) for ^: 'float' and 'bool'",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0;31m# returns an error because of ^2\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 2\u001b[0;31m \u001b[0mddf\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mhead\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/dataframe/core.py\u001b[0m in \u001b[0;36mhead\u001b[0;34m(self, n, npartitions, compute)\u001b[0m\n\u001b[1;32m 898\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 899\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mcompute\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 900\u001b[0;31m \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcompute\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 901\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 902\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/base.py\u001b[0m in \u001b[0;36mcompute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 154\u001b[0m \u001b[0mdask\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mbase\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcompute\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 155\u001b[0m \"\"\"\n\u001b[0;32m--> 156\u001b[0;31m \u001b[0;34m(\u001b[0m\u001b[0mresult\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mcompute\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtraverse\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mFalse\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 157\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 158\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/base.py\u001b[0m in \u001b[0;36mcompute\u001b[0;34m(*args, **kwargs)\u001b[0m\n\u001b[1;32m 396\u001b[0m \u001b[0mkeys\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__dask_keys__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mx\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mcollections\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 397\u001b[0m \u001b[0mpostcomputes\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__dask_postcompute__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mx\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mcollections\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 398\u001b[0;31m \u001b[0mresults\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mschedule\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdsk\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkeys\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 399\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mrepack\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mr\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mr\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m(\u001b[0m\u001b[0mf\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0ma\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mzip\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mresults\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mpostcomputes\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 400\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/distributed/client.py\u001b[0m in \u001b[0;36mget\u001b[0;34m(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)\u001b[0m\n\u001b[1;32m 2566\u001b[0m \u001b[0mshould_rejoin\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mFalse\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2567\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 2568\u001b[0;31m \u001b[0mresults\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgather\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpacked\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0masynchronous\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0masynchronous\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdirect\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mdirect\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 2569\u001b[0m \u001b[0;32mfinally\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2570\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mf\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mfutures\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mvalues\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/distributed/client.py\u001b[0m in \u001b[0;36mgather\u001b[0;34m(self, futures, errors, maxsize, direct, asynchronous)\u001b[0m\n\u001b[1;32m 1820\u001b[0m \u001b[0mdirect\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mdirect\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1821\u001b[0m \u001b[0mlocal_worker\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mlocal_worker\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1822\u001b[0;31m \u001b[0masynchronous\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0masynchronous\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1823\u001b[0m )\n\u001b[1;32m 1824\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/distributed/client.py\u001b[0m in \u001b[0;36msync\u001b[0;34m(self, func, *args, **kwargs)\u001b[0m\n\u001b[1;32m 751\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mfuture\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 752\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 753\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0msync\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mloop\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 754\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 755\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__repr__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/distributed/utils.py\u001b[0m in \u001b[0;36msync\u001b[0;34m(loop, func, *args, **kwargs)\u001b[0m\n\u001b[1;32m 329\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwait\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m10\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 330\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0merror\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 331\u001b[0;31m \u001b[0msix\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mreraise\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0merror\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 332\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 333\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/six.py\u001b[0m in \u001b[0;36mreraise\u001b[0;34m(tp, value, tb)\u001b[0m\n\u001b[1;32m 691\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mvalue\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__traceback__\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mtb\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 692\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mvalue\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwith_traceback\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtb\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 693\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mvalue\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 694\u001b[0m \u001b[0;32mfinally\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 695\u001b[0m \u001b[0mvalue\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/distributed/utils.py\u001b[0m in \u001b[0;36mf\u001b[0;34m()\u001b[0m\n\u001b[1;32m 314\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mtimeout\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 315\u001b[0m \u001b[0mfuture\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mgen\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwith_timeout\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtimedelta\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mseconds\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mtimeout\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mfuture\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 316\u001b[0;31m \u001b[0mresult\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32myield\u001b[0m \u001b[0mfuture\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 317\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mException\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mexc\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 318\u001b[0m \u001b[0merror\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0msys\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mexc_info\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/tornado/gen.py\u001b[0m in \u001b[0;36mrun\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 727\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 728\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 729\u001b[0;31m \u001b[0mvalue\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfuture\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mresult\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 730\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mException\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 731\u001b[0m \u001b[0mexc_info\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0msys\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mexc_info\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/tornado/gen.py\u001b[0m in \u001b[0;36mrun\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 734\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mexc_info\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 735\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 736\u001b[0;31m \u001b[0myielded\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgen\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mthrow\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0mexc_info\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;31m# type: ignore\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 737\u001b[0m \u001b[0;32mfinally\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 738\u001b[0m \u001b[0;31m# Break up a reference to itself\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/distributed/client.py\u001b[0m in \u001b[0;36m_gather\u001b[0;34m(self, futures, errors, direct, local_worker)\u001b[0m\n\u001b[1;32m 1651\u001b[0m \u001b[0msix\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mreraise\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mCancelledError\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mCancelledError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mkey\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1652\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1653\u001b[0;31m \u001b[0msix\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mreraise\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtype\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mexception\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mexception\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtraceback\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1654\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0merrors\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;34m\"skip\"\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1655\u001b[0m \u001b[0mbad_keys\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madd\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mkey\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/six.py\u001b[0m in \u001b[0;36mreraise\u001b[0;34m(tp, value, tb)\u001b[0m\n\u001b[1;32m 690\u001b[0m \u001b[0mvalue\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mtp\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 691\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mvalue\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__traceback__\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mtb\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 692\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mvalue\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwith_traceback\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtb\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 693\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mvalue\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 694\u001b[0m \u001b[0;32mfinally\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/optimization.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m()\u001b[0m\n\u001b[1;32m 940\u001b[0m % (len(self.inkeys), len(args)))\n\u001b[1;32m 941\u001b[0m return core.get(self.dsk, self.outkey,\n\u001b[0;32m--> 942\u001b[0;31m dict(zip(self.inkeys, args)))\n\u001b[0m\u001b[1;32m 943\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 944\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__reduce__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/core.py\u001b[0m in \u001b[0;36mget\u001b[0;34m()\u001b[0m\n\u001b[1;32m 147\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mkey\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtoposort\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdsk\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 148\u001b[0m \u001b[0mtask\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdsk\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mkey\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 149\u001b[0;31m \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0m_execute_task\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtask\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcache\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 150\u001b[0m \u001b[0mcache\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mkey\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 151\u001b[0m \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0m_execute_task\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mout\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcache\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/core.py\u001b[0m in \u001b[0;36m_execute_task\u001b[0;34m()\u001b[0m\n\u001b[1;32m 117\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0margs\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0marg\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0marg\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 118\u001b[0m \u001b[0margs2\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0m_execute_task\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcache\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0ma\u001b[0m \u001b[0;32min\u001b[0m \u001b[0margs\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 119\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs2\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 120\u001b[0m \u001b[0;32melif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mishashable\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0marg\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 121\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0marg\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/compatibility.py\u001b[0m in \u001b[0;36mapply\u001b[0;34m()\u001b[0m\n\u001b[1;32m 91\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mapply\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfunc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mNone\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 92\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 93\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 94\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 95\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/dataframe/core.py\u001b[0m in \u001b[0;36mapply_and_enforce\u001b[0;34m()\u001b[0m\n\u001b[1;32m 3877\u001b[0m \u001b[0mfunc\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpop\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'_func'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 3878\u001b[0m \u001b[0mmeta\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpop\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'_meta'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 3879\u001b[0;31m \u001b[0mdf\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 3880\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mis_dataframe_like\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdf\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mor\u001b[0m \u001b[0mis_series_like\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdf\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mor\u001b[0m \u001b[0mis_index_like\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdf\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 3881\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mlen\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdf\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m\u001b[0m in \u001b[0;36mfunc_dist2\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mfunc_dist2\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdf\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcoor_x\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcoor_y\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2\u001b[0m dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())^2 \n\u001b[0;32m----> 3\u001b[0;31m + (df[coor_y] - df[coor_y].shift())^2 )\n\u001b[0m\u001b[1;32m 4\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mdist\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 5\u001b[0m ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/pandas/core/ops.py\u001b[0m in \u001b[0;36mwrapper\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1848\u001b[0m filler = (fill_int if is_self_int_dtype and is_other_int_dtype\n\u001b[1;32m 1849\u001b[0m else fill_bool)\n\u001b[0;32m-> 1850\u001b[0;31m \u001b[0mres_values\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mna_op\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mvalues\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0movalues\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1851\u001b[0m unfilled = self._constructor(res_values,\n\u001b[1;32m 1852\u001b[0m index=self.index, name=res_name)\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/pandas/core/ops.py\u001b[0m in \u001b[0;36mna_op\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1795\u001b[0m \u001b[0mx\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mensure_object\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1796\u001b[0m \u001b[0my\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mensure_object\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0my\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1797\u001b[0;31m \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mlibops\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mvec_binop\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0my\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mop\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1798\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1799\u001b[0m \u001b[0;31m# let null fall thru\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32mpandas/_libs/ops.pyx\u001b[0m in \u001b[0;36mpandas._libs.ops.vec_binop\u001b[0;34m()\u001b[0m\n",
"\u001b[0;32mpandas/_libs/ops.pyx\u001b[0m in \u001b[0;36mpandas._libs.ops.vec_binop\u001b[0;34m()\u001b[0m\n",
"\u001b[0;31mTypeError\u001b[0m: unsupported operand type(s) for ^: 'float' and 'bool'"
]
}
],
"source": [
"# returns an error because of ^2\n",
"ddf.head()"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"* Even if the function is currected the DAG is corrupted"
]
},
{
"cell_type": "code",
"execution_count": 52,
"metadata": {},
"outputs": [
{
"ename": "TypeError",
"evalue": "unsupported operand type(s) for ^: 'float' and 'bool'",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 6\u001b[0m ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'\n\u001b[1;32m 7\u001b[0m , meta=('float'))\n\u001b[0;32m----> 8\u001b[0;31m \u001b[0mddf\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mhead\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m2\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/dataframe/core.py\u001b[0m in \u001b[0;36mhead\u001b[0;34m(self, n, npartitions, compute)\u001b[0m\n\u001b[1;32m 898\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 899\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mcompute\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 900\u001b[0;31m \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcompute\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 901\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 902\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/base.py\u001b[0m in \u001b[0;36mcompute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 154\u001b[0m \u001b[0mdask\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mbase\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcompute\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 155\u001b[0m \"\"\"\n\u001b[0;32m--> 156\u001b[0;31m \u001b[0;34m(\u001b[0m\u001b[0mresult\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mcompute\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtraverse\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mFalse\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 157\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 158\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/base.py\u001b[0m in \u001b[0;36mcompute\u001b[0;34m(*args, **kwargs)\u001b[0m\n\u001b[1;32m 396\u001b[0m \u001b[0mkeys\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__dask_keys__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mx\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mcollections\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 397\u001b[0m \u001b[0mpostcomputes\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__dask_postcompute__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mx\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mcollections\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 398\u001b[0;31m \u001b[0mresults\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mschedule\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdsk\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkeys\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 399\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mrepack\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mr\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mr\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m(\u001b[0m\u001b[0mf\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0ma\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mzip\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mresults\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mpostcomputes\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 400\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/distributed/client.py\u001b[0m in \u001b[0;36mget\u001b[0;34m(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)\u001b[0m\n\u001b[1;32m 2566\u001b[0m \u001b[0mshould_rejoin\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mFalse\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2567\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 2568\u001b[0;31m \u001b[0mresults\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgather\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpacked\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0masynchronous\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0masynchronous\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdirect\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mdirect\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 2569\u001b[0m \u001b[0;32mfinally\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2570\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mf\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mfutures\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mvalues\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/distributed/client.py\u001b[0m in \u001b[0;36mgather\u001b[0;34m(self, futures, errors, maxsize, direct, asynchronous)\u001b[0m\n\u001b[1;32m 1820\u001b[0m \u001b[0mdirect\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mdirect\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1821\u001b[0m \u001b[0mlocal_worker\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mlocal_worker\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1822\u001b[0;31m \u001b[0masynchronous\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0masynchronous\u001b[0m\u001b[0;34m,\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1823\u001b[0m )\n\u001b[1;32m 1824\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/distributed/client.py\u001b[0m in \u001b[0;36msync\u001b[0;34m(self, func, *args, **kwargs)\u001b[0m\n\u001b[1;32m 751\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mfuture\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 752\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 753\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0msync\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mloop\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 754\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 755\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__repr__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/distributed/utils.py\u001b[0m in \u001b[0;36msync\u001b[0;34m(loop, func, *args, **kwargs)\u001b[0m\n\u001b[1;32m 329\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwait\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m10\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 330\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0merror\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 331\u001b[0;31m \u001b[0msix\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mreraise\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0merror\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 332\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 333\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/six.py\u001b[0m in \u001b[0;36mreraise\u001b[0;34m(tp, value, tb)\u001b[0m\n\u001b[1;32m 691\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mvalue\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__traceback__\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mtb\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 692\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mvalue\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwith_traceback\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtb\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 693\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mvalue\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 694\u001b[0m \u001b[0;32mfinally\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 695\u001b[0m \u001b[0mvalue\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/distributed/utils.py\u001b[0m in \u001b[0;36mf\u001b[0;34m()\u001b[0m\n\u001b[1;32m 314\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mtimeout\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 315\u001b[0m \u001b[0mfuture\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mgen\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwith_timeout\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtimedelta\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mseconds\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mtimeout\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mfuture\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 316\u001b[0;31m \u001b[0mresult\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;32myield\u001b[0m \u001b[0mfuture\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 317\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mException\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mexc\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 318\u001b[0m \u001b[0merror\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0msys\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mexc_info\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/tornado/gen.py\u001b[0m in \u001b[0;36mrun\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 727\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 728\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 729\u001b[0;31m \u001b[0mvalue\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfuture\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mresult\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 730\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mException\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 731\u001b[0m \u001b[0mexc_info\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0msys\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mexc_info\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/tornado/gen.py\u001b[0m in \u001b[0;36mrun\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 734\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mexc_info\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 735\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 736\u001b[0;31m \u001b[0myielded\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgen\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mthrow\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0mexc_info\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;31m# type: ignore\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 737\u001b[0m \u001b[0;32mfinally\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 738\u001b[0m \u001b[0;31m# Break up a reference to itself\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/distributed/client.py\u001b[0m in \u001b[0;36m_gather\u001b[0;34m(self, futures, errors, direct, local_worker)\u001b[0m\n\u001b[1;32m 1651\u001b[0m \u001b[0msix\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mreraise\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mCancelledError\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mCancelledError\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mkey\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;32mNone\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1652\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1653\u001b[0;31m \u001b[0msix\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mreraise\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtype\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mexception\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mexception\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtraceback\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1654\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0merrors\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;34m\"skip\"\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1655\u001b[0m \u001b[0mbad_keys\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madd\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mkey\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/six.py\u001b[0m in \u001b[0;36mreraise\u001b[0;34m(tp, value, tb)\u001b[0m\n\u001b[1;32m 690\u001b[0m \u001b[0mvalue\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mtp\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 691\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mvalue\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m__traceback__\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mtb\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 692\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mvalue\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mwith_traceback\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtb\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 693\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mvalue\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 694\u001b[0m \u001b[0;32mfinally\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/optimization.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m()\u001b[0m\n\u001b[1;32m 940\u001b[0m % (len(self.inkeys), len(args)))\n\u001b[1;32m 941\u001b[0m return core.get(self.dsk, self.outkey,\n\u001b[0;32m--> 942\u001b[0;31m dict(zip(self.inkeys, args)))\n\u001b[0m\u001b[1;32m 943\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 944\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0m__reduce__\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/core.py\u001b[0m in \u001b[0;36mget\u001b[0;34m()\u001b[0m\n\u001b[1;32m 147\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mkey\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtoposort\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdsk\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 148\u001b[0m \u001b[0mtask\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdsk\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mkey\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 149\u001b[0;31m \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0m_execute_task\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtask\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcache\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 150\u001b[0m \u001b[0mcache\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0mkey\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mresult\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 151\u001b[0m \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0m_execute_task\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mout\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcache\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/core.py\u001b[0m in \u001b[0;36m_execute_task\u001b[0;34m()\u001b[0m\n\u001b[1;32m 117\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0margs\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0marg\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0marg\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 118\u001b[0m \u001b[0margs2\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0m_execute_task\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcache\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0ma\u001b[0m \u001b[0;32min\u001b[0m \u001b[0margs\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 119\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs2\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 120\u001b[0m \u001b[0;32melif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mishashable\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0marg\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 121\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0marg\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/compatibility.py\u001b[0m in \u001b[0;36mapply\u001b[0;34m()\u001b[0m\n\u001b[1;32m 91\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mapply\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfunc\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mNone\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 92\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 93\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 94\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 95\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/dask/dataframe/core.py\u001b[0m in \u001b[0;36mapply_and_enforce\u001b[0;34m()\u001b[0m\n\u001b[1;32m 3877\u001b[0m \u001b[0mfunc\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpop\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'_func'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 3878\u001b[0m \u001b[0mmeta\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mkwargs\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mpop\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'_meta'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 3879\u001b[0;31m \u001b[0mdf\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0margs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 3880\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mis_dataframe_like\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdf\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mor\u001b[0m \u001b[0mis_series_like\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdf\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mor\u001b[0m \u001b[0mis_index_like\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdf\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 3881\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mlen\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdf\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m\u001b[0m in \u001b[0;36mfunc_dist2\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mfunc_dist2\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdf\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcoor_x\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcoor_y\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2\u001b[0m dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())^2 \n\u001b[0;32m----> 3\u001b[0;31m + (df[coor_y] - df[coor_y].shift())^2 )\n\u001b[0m\u001b[1;32m 4\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mdist\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 5\u001b[0m ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/pandas/core/ops.py\u001b[0m in \u001b[0;36mwrapper\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1848\u001b[0m filler = (fill_int if is_self_int_dtype and is_other_int_dtype\n\u001b[1;32m 1849\u001b[0m else fill_bool)\n\u001b[0;32m-> 1850\u001b[0;31m \u001b[0mres_values\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mna_op\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mvalues\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0movalues\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1851\u001b[0m unfilled = self._constructor(res_values,\n\u001b[1;32m 1852\u001b[0m index=self.index, name=res_name)\n",
"\u001b[0;32m~/.local/share/virtualenvs/dask_pyconil2019-9doxB0Ra/lib/python3.7/site-packages/pandas/core/ops.py\u001b[0m in \u001b[0;36mna_op\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1795\u001b[0m \u001b[0mx\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mensure_object\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1796\u001b[0m \u001b[0my\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mensure_object\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0my\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m-> 1797\u001b[0;31m \u001b[0mresult\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mlibops\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mvec_binop\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0my\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mop\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 1798\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1799\u001b[0m \u001b[0;31m# let null fall thru\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32mpandas/_libs/ops.pyx\u001b[0m in \u001b[0;36mpandas._libs.ops.vec_binop\u001b[0;34m()\u001b[0m\n",
"\u001b[0;32mpandas/_libs/ops.pyx\u001b[0m in \u001b[0;36mpandas._libs.ops.vec_binop\u001b[0;34m()\u001b[0m\n",
"\u001b[0;31mTypeError\u001b[0m: unsupported operand type(s) for ^: 'float' and 'bool'"
]
}
],
"source": [
"# Still results with an error\n",
"def func_dist2(df, coor_x, coor_y):\n",
" dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 \n",
" + (df[coor_y] - df[coor_y].shift())**2 )\n",
" return dist\n",
"ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'\n",
" , meta=('float'))\n",
"ddf.head(2)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"Need to reset the dataframe"
]
},
{
"cell_type": "code",
"execution_count": 53,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" id | \n",
" name | \n",
" x | \n",
" y | \n",
" col | \n",
"
\n",
" \n",
" timestamp | \n",
" | \n",
" | \n",
" | \n",
" | \n",
" | \n",
"
\n",
" \n",
" \n",
" \n",
" 2000-01-01 00:00:00 | \n",
" 1026 | \n",
" George | \n",
" -0.813055 | \n",
" -0.263826 | \n",
" NaN | \n",
"
\n",
" \n",
" 2000-01-01 00:00:01 | \n",
" 990 | \n",
" Wendy | \n",
" -0.137142 | \n",
" -0.495129 | \n",
" 0.714395 | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" id name x y col\n",
"timestamp \n",
"2000-01-01 00:00:00 1026 George -0.813055 -0.263826 NaN\n",
"2000-01-01 00:00:01 990 Wendy -0.137142 -0.495129 0.714395"
]
},
"execution_count": 53,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ddf = dask.datasets.timeseries()\n",
"def func_dist2(df, coor_x, coor_y):\n",
" dist = np.sqrt ( (df[coor_x] - df[coor_x].shift())**2 \n",
" + (df[coor_y] - df[coor_y].shift())**2 )\n",
" return dist\n",
"ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'\n",
" , meta=('float'))\n",
"ddf.head(2)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"# Summary\n",
"1. `Dask` is lazy but efficient (parallel computing)\n",
"2. Flexable environments - from single laptop to thousands of nodes (`client`)\n",
"3. Usefull when comming from a `Pandas` (especially with comparison to `pyspark`) \n",
"4. Bonus - dashboard\n",
"5. But beware of:\n",
" * missing functionalities from `Pandas` API\n",
" * currupted DAGs\n",
"\n",
"js.berry@gmail.com\n",
"\n",
"https://github.com/sephib/dask_pyconil2019"
]
}
],
"metadata": {
"celltoolbar": "Slideshow",
"file_extension": ".py",
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.2"
},
"mimetype": "text/x-python",
"name": "python",
"npconvert_exporter": "python",
"pygments_lexer": "ipython3",
"rise": {
"enable_chalkboard": true,
"footer": "Gotcha's from Pandas to Dask - pyconil 2019"
},
"version": 3
},
"nbformat": 4,
"nbformat_minor": 2
}